##// END OF EJS Templates
caches: use new sqlalchemy 1.4 caching query approach
super-admin -
r5000:cd6bb3a6 default
parent child Browse files
Show More
@@ -1,298 +1,249 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """caching_query.py
22 22
23 23 Represent functions and classes
24 24 which allow the usage of Dogpile caching with SQLAlchemy.
25 25 Introduces a query option called FromCache.
26 26
27 .. versionchanged:: 1.4 the caching approach has been altered to work
28 based on a session event.
29
30
27 31 The three new concepts introduced here are:
28 32
29 * CachingQuery - a Query subclass that caches and
33 * ORMCache - an extension for an ORM :class:`.Session`
30 34 retrieves results in/from dogpile.cache.
31 35 * FromCache - a query option that establishes caching
32 36 parameters on a Query
33 37 * RelationshipCache - a variant of FromCache which is specific
34 38 to a query invoked during a lazy load.
35 * _params_from_query - extracts value parameters from
36 a Query.
37 39
38 40 The rest of what's here are standard SQLAlchemy and
39 41 dogpile.cache constructs.
40 42
41 43 """
42 from sqlalchemy.orm.interfaces import MapperOption
43 from sqlalchemy.orm.query import Query
44 from sqlalchemy.sql import visitors
45 44 from dogpile.cache.api import NO_VALUE
46 45
47 from rhodecode.lib.utils2 import safe_str
46 from sqlalchemy import event
47 from sqlalchemy.orm import loading
48 from sqlalchemy.orm.interfaces import UserDefinedOption
49
50
51 DEFAULT_REGION = "sql_cache_short"
48 52
49 53
50 class CachingQuery(Query):
51 """A Query subclass which optionally loads full results from a dogpile
52 cache region.
54 class ORMCache:
53 55
54 The CachingQuery optionally stores additional state that allows it to consult
55 a dogpile.cache cache before accessing the database, in the form
56 of a FromCache or RelationshipCache object. Each of these objects
57 refer to the name of a :class:`dogpile.cache.Region` that's been configured
58 and stored in a lookup dictionary. When such an object has associated
59 itself with the CachingQuery, the corresponding :class:`dogpile.cache.Region`
60 is used to locate a cached result. If none is present, then the
61 Query is invoked normally, the results being cached.
56 """An add-on for an ORM :class:`.Session` optionally loads full results
57 from a dogpile cache region.
62 58
63 The FromCache and RelationshipCache mapper options below represent
64 the "public" method of configuring this state upon the CachingQuery.
59 cache = ORMCache(regions={})
60 cache.listen_on_session(Session)
65 61
66 62 """
67 def _get_region(self):
63
64 def __init__(self, regions):
65 self.cache_regions = regions or self._get_region()
66 self._statement_cache = {}
67
68 @classmethod
69 def _get_region(cls):
68 70 from rhodecode.lib.rc_cache import region_meta
69 71 return region_meta.dogpile_cache_regions
70 72
71 def __init__(self, regions, *args, **kw):
72 self.cache_regions = regions or self._get_region()
73 Query.__init__(self, *args, **kw)
73 def listen_on_session(self, session_factory):
74 event.listen(session_factory, "do_orm_execute", self._do_orm_execute)
75
76 def _do_orm_execute(self, orm_context):
74 77
75 def __iter__(self):
76 """override __iter__ to pull results from dogpile
77 if particular attributes have been configured.
78 for opt in orm_context.user_defined_options:
79 if isinstance(opt, RelationshipCache):
80 opt = opt._process_orm_context(orm_context)
81 if opt is None:
82 continue
83
84 if isinstance(opt, FromCache):
85 dogpile_region = self.cache_regions[opt.region]
78 86
79 Note that this approach does *not* detach the loaded objects from
80 the current session. If the cache backend is an in-process cache
81 (like "memory") and lives beyond the scope of the current session's
82 transaction, those objects may be expired. The method here can be
83 modified to first expunge() each loaded item from the current
84 session before returning the list of items, so that the items
85 in the cache are not the same ones in the current Session.
87 if opt.cache_key:
88 our_cache_key = f'SQL_CACHE_{opt.cache_key}'
89 else:
90 our_cache_key = opt._generate_cache_key(
91 orm_context.statement, orm_context.parameters, self
92 )
86 93
87 """
88 super_ = super(CachingQuery, self)
89
90 if hasattr(self, '_cache_region'):
91 return self.get_value(createfunc=lambda: list(super_.__iter__()))
92 else:
93 return super_.__iter__()
94
95 def _execute_and_instances(self, context):
96 """override _execute_and_instances to pull results from dogpile
97 if the query is invoked directly from an external context.
94 if opt.ignore_expiration:
95 cached_value = dogpile_region.get(
96 our_cache_key,
97 expiration_time=opt.expiration_time,
98 ignore_expiration=opt.ignore_expiration,
99 )
100 else:
98 101
99 This method is necessary in order to maintain compatibility
100 with the "baked query" system now used by default in some
101 relationship loader scenarios. Note also the
102 RelationshipCache._generate_cache_key method which enables
103 the baked query to be used within lazy loads.
102 def createfunc():
103 return orm_context.invoke_statement().freeze()
104
105 cached_value = dogpile_region.get_or_create(
106 our_cache_key,
107 createfunc,
108 expiration_time=opt.expiration_time,
109 )
104 110
105 .. versionadded:: 1.2.7
106 """
107 super_ = super(CachingQuery, self)
111 if cached_value is NO_VALUE:
112 # keyerror? this is bigger than a keyerror...
113 raise KeyError()
108 114
109 if context.query is not self and hasattr(self, '_cache_region'):
110 # special logic called when the Query._execute_and_instances()
111 # method is called directly from the baked query
112 return self.get_value(
113 createfunc=lambda: list(
114 super_._execute_and_instances(context)
115 orm_result = loading.merge_frozen_result(
116 orm_context.session,
117 orm_context.statement,
118 cached_value,
119 load=False,
115 120 )
116 )
121 return orm_result()
122
117 123 else:
118 return super_._execute_and_instances(context)
124 return None
119 125
120 def _get_cache_plus_key(self):
121 """Return a cache region plus key."""
122 dogpile_region = self.cache_regions[self._cache_region.region]
123 if self._cache_region.cache_key:
124 key = self._cache_region.cache_key
125 else:
126 key = _key_from_query(self)
127 return dogpile_region, key
126 def invalidate(self, statement, parameters, opt):
127 """Invalidate the cache value represented by a statement."""
128
129 statement = statement.__clause_element__()
128 130
129 def invalidate(self):
130 """Invalidate the cache value represented by this Query."""
131 dogpile_region = self.cache_regions[opt.region]
131 132
132 dogpile_region, cache_key = self._get_cache_plus_key()
133 cache_key = opt._generate_cache_key(statement, parameters, self)
134
133 135 dogpile_region.delete(cache_key)
134 136
135 def get_value(self, merge=True, createfunc=None,
136 expiration_time=None, ignore_expiration=False):
137 """Return the value from the cache for this query.
138 137
139 Raise KeyError if no value present and no
140 createfunc specified.
141
142 """
143 dogpile_region, cache_key = self._get_cache_plus_key()
144
145 # ignore_expiration means, if the value is in the cache
146 # but is expired, return it anyway. This doesn't make sense
147 # with createfunc, which says, if the value is expired, generate
148 # a new value.
149 assert not ignore_expiration or not createfunc, \
150 "Can't ignore expiration and also provide createfunc"
151
152 if ignore_expiration or not createfunc:
153 cached_value = dogpile_region.get(cache_key,
154 expiration_time=expiration_time,
155 ignore_expiration=ignore_expiration)
156 else:
157 cached_value = dogpile_region.get_or_create(
158 cache_key,
159 createfunc,
160 expiration_time=expiration_time
161 )
162 if cached_value is NO_VALUE:
163 raise KeyError(cache_key)
164 if merge:
165 cached_value = self.merge_result(cached_value, load=False)
166 return cached_value
167
168 def set_value(self, value):
169 """Set the value in the cache for this query."""
170
171 dogpile_region, cache_key = self._get_cache_plus_key()
172 dogpile_region.set(cache_key, value)
173
174
175 def query_callable(regions=None, query_cls=CachingQuery):
176 def query(*arg, **kw):
177 return query_cls(regions, *arg, **kw)
178 return query
179
180
181 def _key_from_query(query, qualifier=None):
182 """Given a Query, create a cache key.
183
184 There are many approaches to this; here we use the simplest,
185 which is to create an md5 hash of the text of the SQL statement,
186 combined with stringified versions of all the bound parameters
187 within it. There's a bit of a performance hit with
188 compiling out "query.statement" here; other approaches include
189 setting up an explicit cache key with a particular Query,
190 then combining that with the bound parameter values.
191
192 """
193
194 stmt = query.with_labels().statement
195 compiled = stmt.compile()
196 params = compiled.params
197
198 # here we return the key as a long string. our "key mangler"
199 # set up with the region will boil it down to an md5.
200 return " ".join(
201 [safe_str(compiled)] +
202 [safe_str(params[k]) for k in sorted(params)])
203
204
205 class FromCache(MapperOption):
138 class FromCache(UserDefinedOption):
206 139 """Specifies that a Query should load results from a cache."""
207 140
208 141 propagate_to_loaders = False
209 142
210 def __init__(self, region="sql_cache_short", cache_key=None):
143 def __init__(
144 self,
145 region=DEFAULT_REGION,
146 cache_key=None,
147 expiration_time=None,
148 ignore_expiration=False,
149 ):
211 150 """Construct a new FromCache.
212 151
213 152 :param region: the cache region. Should be a
214 region configured in the dictionary of dogpile
215 regions.
153 region configured in the dictionary of dogpile
154 regions.
216 155
217 156 :param cache_key: optional. A string cache key
218 that will serve as the key to the query. Use this
219 if your query has a huge amount of parameters (such
220 as when using in_()) which correspond more simply to
221 some other identifier.
157 that will serve as the key to the query. Use this
158 if your query has a huge amount of parameters (such
159 as when using in_()) which correspond more simply to
160 some other identifier.
222 161
223 162 """
224 163 self.region = region
225 164 self.cache_key = cache_key
165 self.expiration_time = expiration_time
166 self.ignore_expiration = ignore_expiration
226 167
227 def process_query(self, query):
228 """Process a Query during normal loading operation."""
229 query._cache_region = self
168 # this is not needed as of SQLAlchemy 1.4.28;
169 # UserDefinedOption classes no longer participate in the SQL
170 # compilation cache key
171 def _gen_cache_key(self, anon_map, bindparams):
172 return None
173
174 def _generate_cache_key(self, statement, parameters, orm_cache):
175 """generate a cache key with which to key the results of a statement.
176
177 This leverages the use of the SQL compilation cache key which is
178 repurposed as a SQL results key.
179
180 """
181 statement_cache_key = statement._generate_cache_key()
182
183 key = statement_cache_key.to_offline_string(
184 orm_cache._statement_cache, statement, parameters
185 ) + repr(self.cache_key)
186 # print("here's our key...%s" % key)
187 return key
230 188
231 189
232 class RelationshipCache(MapperOption):
190 class RelationshipCache(FromCache):
233 191 """Specifies that a Query as called within a "lazy load"
234 should load results from a cache."""
192 should load results from a cache."""
235 193
236 194 propagate_to_loaders = True
237 195
238 def __init__(self, attribute, region="sql_cache_short", cache_key=None):
196 def __init__(
197 self,
198 attribute,
199 region=DEFAULT_REGION,
200 cache_key=None,
201 expiration_time=None,
202 ignore_expiration=False,
203 ):
239 204 """Construct a new RelationshipCache.
240 205
241 206 :param attribute: A Class.attribute which
242 indicates a particular class relationship() whose
243 lazy loader should be pulled from the cache.
207 indicates a particular class relationship() whose
208 lazy loader should be pulled from the cache.
244 209
245 210 :param region: name of the cache region.
246 211
247 212 :param cache_key: optional. A string cache key
248 that will serve as the key to the query, bypassing
249 the usual means of forming a key from the Query itself.
213 that will serve as the key to the query, bypassing
214 the usual means of forming a key from the Query itself.
250 215
251 216 """
252 217 self.region = region
253 218 self.cache_key = cache_key
219 self.expiration_time = expiration_time
220 self.ignore_expiration = ignore_expiration
254 221 self._relationship_options = {
255 222 (attribute.property.parent.class_, attribute.property.key): self
256 223 }
257 224
258 def _generate_cache_key(self, path):
259 """Indicate to the lazy-loader strategy that a "baked" query
260 may be used by returning ``None``.
261
262 If this method is omitted, the default implementation of
263 :class:`.MapperOption._generate_cache_key` takes place, which
264 returns ``False`` to disable the "baked" query from being used.
265
266 .. versionadded:: 1.2.7
225 def _process_orm_context(self, orm_context):
226 current_path = orm_context.loader_strategy_path
267 227
268 """
269 return None
270
271 def process_query_conditionally(self, query):
272 """Process a Query that is used within a lazy loader.
273
274 (the process_query_conditionally() method is a SQLAlchemy
275 hook invoked only within lazyload.)
276
277 """
278 if query._current_path:
279 mapper, prop = query._current_path[-2:]
228 if current_path:
229 mapper, prop = current_path[-2:]
280 230 key = prop.key
281 231
282 232 for cls in mapper.class_.__mro__:
283 233 if (cls, key) in self._relationship_options:
284 relationship_option = self._relationship_options[(cls, key)]
285 query._cache_region = relationship_option
286 break
234 relationship_option = self._relationship_options[
235 (cls, key)
236 ]
237 return relationship_option
287 238
288 239 def and_(self, option):
289 240 """Chain another RelationshipCache option to this one.
290 241
291 242 While many RelationshipCache objects can be specified on a single
292 243 Query separately, chaining them together allows for a more efficient
293 244 lookup during load.
294 245
295 246 """
296 247 self._relationship_options.update(option._relationship_options)
297 248 return self
298 249
@@ -1,88 +1,89 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import logging
22 22 from dogpile.cache import register_backend
23 23 module_name = 'rhodecode'
24 24
25 25 register_backend(
26 26 "dogpile.cache.rc.memory_lru", f"{module_name}.lib.rc_cache.backends",
27 27 "LRUMemoryBackend")
28 28
29 29 register_backend(
30 30 "dogpile.cache.rc.file_namespace", f"{module_name}.lib.rc_cache.backends",
31 31 "FileNamespaceBackend")
32 32
33 33 register_backend(
34 34 "dogpile.cache.rc.redis", f"{module_name}.lib.rc_cache.backends",
35 35 "RedisPickleBackend")
36 36
37 37 register_backend(
38 38 "dogpile.cache.rc.redis_msgpack", f"{module_name}.lib.rc_cache.backends",
39 39 "RedisMsgPackBackend")
40 40
41 41
42 42 log = logging.getLogger(__name__)
43 43
44 44 from . import region_meta
45 45 from .utils import (
46 46 get_default_cache_settings, backend_key_generator, get_or_create_region,
47 47 clear_cache_namespace, make_region, InvalidationContext,
48 FreshRegionCache, ActiveRegionCache)
48 FreshRegionCache, ActiveRegionCache
49 )
49 50
50 51
51 52 FILE_TREE_CACHE_VER = 'v4'
52 53 LICENSE_CACHE_VER = 'v2'
53 54
54 55
55 56 def configure_dogpile_cache(settings):
56 57 cache_dir = settings.get('cache_dir')
57 58 if cache_dir:
58 59 region_meta.dogpile_config_defaults['cache_dir'] = cache_dir
59 60
60 61 rc_cache_data = get_default_cache_settings(settings, prefixes=['rc_cache.'])
61 62
62 63 # inspect available namespaces
63 64 avail_regions = set()
64 65 for key in rc_cache_data.keys():
65 66 namespace_name = key.split('.', 1)[0]
66 67 if namespace_name in avail_regions:
67 68 continue
68 69
69 70 avail_regions.add(namespace_name)
70 71 log.debug('dogpile: found following cache regions: %s', namespace_name)
71 72
72 73 new_region = make_region(
73 74 name=namespace_name,
74 75 function_key_generator=None
75 76 )
76 77
77 78 new_region.configure_from_config(settings, 'rc_cache.{}.'.format(namespace_name))
78 79 new_region.function_key_generator = backend_key_generator(new_region.actual_backend)
79 80 if log.isEnabledFor(logging.DEBUG):
80 81 region_args = dict(backend=new_region.actual_backend.__class__,
81 82 region_invalidator=new_region.region_invalidator.__class__)
82 83 log.debug('dogpile: registering a new region `%s` %s', namespace_name, region_args)
83 84
84 85 region_meta.dogpile_cache_regions[namespace_name] = new_region
85 86
86 87
87 88 def includeme(config):
88 89 configure_dogpile_cache(config.registry.settings)
@@ -1,369 +1,372 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 import os
21 21 import time
22 22 import logging
23 23 import functools
24 24 import decorator
25 25 import threading
26 26
27 27 from dogpile.cache import CacheRegion
28 28
29 29 import rhodecode
30 30 from rhodecode.lib.hash_utils import sha1
31 31 from rhodecode.lib.type_utils import str2bool
32 32 from rhodecode.lib.str_utils import safe_bytes
33 from rhodecode.model.db import Session, CacheKey, IntegrityError
34 33
35 34 from rhodecode.lib.rc_cache import cache_key_meta
36 35 from rhodecode.lib.rc_cache import region_meta
37 36
38 37 log = logging.getLogger(__name__)
39 38
40 39
41 40 def isCython(func):
42 41 """
43 42 Private helper that checks if a function is a cython function.
44 43 """
45 44 return func.__class__.__name__ == 'cython_function_or_method'
46 45
47 46
48 47 class RhodeCodeCacheRegion(CacheRegion):
49 48
50 49 def conditional_cache_on_arguments(
51 50 self, namespace=None,
52 51 expiration_time=None,
53 52 should_cache_fn=None,
54 53 to_str=str,
55 54 function_key_generator=None,
56 55 condition=True):
57 56 """
58 57 Custom conditional decorator, that will not touch any dogpile internals if
59 58 condition isn't meet. This works a bit different than should_cache_fn
60 59 And it's faster in cases we don't ever want to compute cached values
61 60 """
62 61 expiration_time_is_callable = callable(expiration_time)
63 62
64 63 if function_key_generator is None:
65 64 function_key_generator = self.function_key_generator
66 65
67 66 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
68 67
69 68 if not condition:
70 69 log.debug('Calling un-cached method:%s', user_func.__name__)
71 70 start = time.time()
72 71 result = user_func(*arg, **kw)
73 72 total = time.time() - start
74 73 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
75 74 return result
76 75
77 76 key = key_generator(*arg, **kw)
78 77
79 78 timeout = expiration_time() if expiration_time_is_callable \
80 79 else expiration_time
81 80
82 81 log.debug('Calling cached method:`%s`', user_func.__name__)
83 82 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
84 83
85 84 def cache_decorator(user_func):
86 85 if to_str is str:
87 86 # backwards compatible
88 87 key_generator = function_key_generator(namespace, user_func)
89 88 else:
90 89 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
91 90
92 91 def refresh(*arg, **kw):
93 92 """
94 93 Like invalidate, but regenerates the value instead
95 94 """
96 95 key = key_generator(*arg, **kw)
97 96 value = user_func(*arg, **kw)
98 97 self.set(key, value)
99 98 return value
100 99
101 100 def invalidate(*arg, **kw):
102 101 key = key_generator(*arg, **kw)
103 102 self.delete(key)
104 103
105 104 def set_(value, *arg, **kw):
106 105 key = key_generator(*arg, **kw)
107 106 self.set(key, value)
108 107
109 108 def get(*arg, **kw):
110 109 key = key_generator(*arg, **kw)
111 110 return self.get(key)
112 111
113 112 user_func.set = set_
114 113 user_func.invalidate = invalidate
115 114 user_func.get = get
116 115 user_func.refresh = refresh
117 116 user_func.key_generator = key_generator
118 117 user_func.original = user_func
119 118
120 119 # Use `decorate` to preserve the signature of :param:`user_func`.
121 120 return decorator.decorate(user_func, functools.partial(
122 121 get_or_create_for_user_func, key_generator))
123 122
124 123 return cache_decorator
125 124
126 125
127 126 def make_region(*arg, **kw):
128 127 return RhodeCodeCacheRegion(*arg, **kw)
129 128
130 129
131 130 def get_default_cache_settings(settings, prefixes=None):
132 131 prefixes = prefixes or []
133 132 cache_settings = {}
134 133 for key in settings.keys():
135 134 for prefix in prefixes:
136 135 if key.startswith(prefix):
137 136 name = key.split(prefix)[1].strip()
138 137 val = settings[key]
139 138 if isinstance(val, str):
140 139 val = val.strip()
141 140 cache_settings[name] = val
142 141 return cache_settings
143 142
144 143
145 144 def compute_key_from_params(*args):
146 145 """
147 146 Helper to compute key from given params to be used in cache manager
148 147 """
149 148 return sha1(safe_bytes("_".join(map(str, args))))
150 149
151 150
152 151 def backend_key_generator(backend):
153 152 """
154 153 Special wrapper that also sends over the backend to the key generator
155 154 """
156 155 def wrapper(namespace, fn):
157 156 return key_generator(backend, namespace, fn)
158 157 return wrapper
159 158
160 159
161 160 def key_generator(backend, namespace, fn):
162 161 fname = fn.__name__
163 162
164 163 def generate_key(*args):
165 164 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
166 165 namespace_pref = namespace or 'default_namespace'
167 166 arg_key = compute_key_from_params(*args)
168 167 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
169 168
170 169 return final_key
171 170
172 171 return generate_key
173 172
174 173
175 174 def get_or_create_region(region_name, region_namespace=None):
176 175 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
177 176 region_obj = region_meta.dogpile_cache_regions.get(region_name)
178 177 if not region_obj:
179 178 raise EnvironmentError(
180 179 'Region `{}` not in configured: {}.'.format(
181 180 region_name, list(region_meta.dogpile_cache_regions.keys())))
182 181
183 182 region_uid_name = '{}:{}'.format(region_name, region_namespace)
184 183 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
185 184 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
186 185 if region_exist:
187 186 log.debug('Using already configured region: %s', region_namespace)
188 187 return region_exist
189 188 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
190 189 expiration_time = region_obj.expiration_time
191 190
192 191 if not os.path.isdir(cache_dir):
193 192 os.makedirs(cache_dir)
194 193 new_region = make_region(
195 194 name=region_uid_name,
196 195 function_key_generator=backend_key_generator(region_obj.actual_backend)
197 196 )
198 197 namespace_filename = os.path.join(
199 198 cache_dir, "{}.cache.dbm".format(region_namespace))
200 199 # special type that allows 1db per namespace
201 200 new_region.configure(
202 201 backend='dogpile.cache.rc.file_namespace',
203 202 expiration_time=expiration_time,
204 203 arguments={"filename": namespace_filename}
205 204 )
206 205
207 206 # create and save in region caches
208 207 log.debug('configuring new region: %s', region_uid_name)
209 208 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
210 209
211 210 return region_obj
212 211
213 212
214 213 def clear_cache_namespace(cache_region, cache_namespace_uid, invalidate=False):
215 214 region = get_or_create_region(cache_region, cache_namespace_uid)
216 215 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
217 216 num_delete_keys = len(cache_keys)
218 217 if invalidate:
219 218 region.invalidate(hard=False)
220 219 else:
221 220 if num_delete_keys:
222 221 region.delete_multi(cache_keys)
223 222 return num_delete_keys
224 223
225 224
226 225 class ActiveRegionCache(object):
227 226 def __init__(self, context, cache_data):
228 227 self.context = context
229 228 self.cache_data = cache_data
230 229
231 230 def should_invalidate(self):
232 231 return False
233 232
234 233
235 234 class FreshRegionCache(object):
236 235 def __init__(self, context, cache_data):
237 236 self.context = context
238 237 self.cache_data = cache_data
239 238
240 239 def should_invalidate(self):
241 240 return True
242 241
243 242
244 243 class InvalidationContext(object):
245 244 """
246 245 usage::
247 246
248 247 from rhodecode.lib import rc_cache
249 248
250 249 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
251 250 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
252 251
253 252 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
254 253 def heavy_compute(cache_name, param1, param2):
255 254 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
256 255
257 256 # invalidation namespace is shared namespace key for all process caches
258 257 # we use it to send a global signal
259 258 invalidation_namespace = 'repo_cache:1'
260 259
261 260 inv_context_manager = rc_cache.InvalidationContext(
262 261 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
263 262 with inv_context_manager as invalidation_context:
264 263 args = ('one', 'two')
265 264 # re-compute and store cache if we get invalidate signal
266 265 if invalidation_context.should_invalidate():
267 266 result = heavy_compute.refresh(*args)
268 267 else:
269 268 result = heavy_compute(*args)
270 269
271 270 compute_time = inv_context_manager.compute_time
272 271 log.debug('result computed in %.4fs', compute_time)
273 272
274 273 # To send global invalidation signal, simply run
275 274 CacheKey.set_invalidate(invalidation_namespace)
276 275
277 276 """
278 277
279 278 def __repr__(self):
280 279 return f'<InvalidationContext:{self.cache_key}[{self.uid}]>'
281 280
282 281 def __init__(self, uid, invalidation_namespace='',
283 282 raise_exception=False, thread_scoped=None):
284 283 self.uid = uid
285 284 self.invalidation_namespace = invalidation_namespace
286 285 self.raise_exception = raise_exception
287 286 self.proc_id = rhodecode.CONFIG.get('instance_id') or 'DEFAULT'
288 287 self.thread_id = 'global'
289 288
290 289 if thread_scoped is None:
291 290 # if we set "default" we can override this via .ini settings
292 291 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
293 292
294 293 # Append the thread id to the cache key if this invalidation context
295 294 # should be scoped to the current thread.
296 295 if thread_scoped is True:
297 296 self.thread_id = threading.current_thread().ident
298 297
299 298 self.cache_key = compute_key_from_params(uid)
300 299 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
301 300 self.proc_id, self.thread_id, self.cache_key)
302 301 self.proc_key = 'proc:{}'.format(self.proc_id)
303 302 self.compute_time = 0
304 303
305 304 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
305 from rhodecode.model.db import CacheKey
306
306 307 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
307 308 # fetch all cache keys for this namespace and convert them to a map to find if we
308 309 # have specific cache_key object registered. We do this because we want to have
309 310 # all consistent cache_state_uid for newly registered objects
310 311 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
311 312 cache_obj = cache_obj_map.get(self.cache_key)
312 313 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
313 314
314 315 if not cache_obj:
315 316 new_cache_args = invalidation_namespace
316 317 first_cache_obj = next(iter(cache_obj_map.values())) if cache_obj_map else None
317 318 cache_state_uid = None
318 319 if first_cache_obj:
319 320 cache_state_uid = first_cache_obj.cache_state_uid
320 321 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
321 322 cache_state_uid=cache_state_uid)
322 323 cache_key_meta.cache_keys_by_pid.add(self.proc_key)
323 324
324 325 return cache_obj
325 326
326 327 def __enter__(self):
327 328 """
328 329 Test if current object is valid, and return CacheRegion function
329 330 that does invalidation and calculation
330 331 """
331 332 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
332 333 # register or get a new key based on uid
333 334 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
334 335 cache_data = self.cache_obj.get_dict()
335 336 self._start_time = time.time()
336 337 if self.cache_obj.cache_active:
337 338 # means our cache obj is existing and marked as it's
338 339 # cache is not outdated, we return ActiveRegionCache
339 340 self.skip_cache_active_change = True
340 341
341 342 return ActiveRegionCache(context=self, cache_data=cache_data)
342 343
343 344 # the key is either not existing or set to False, we return
344 345 # the real invalidator which re-computes value. We additionally set
345 346 # the flag to actually update the Database objects
346 347 self.skip_cache_active_change = False
347 348 return FreshRegionCache(context=self, cache_data=cache_data)
348 349
349 350 def __exit__(self, exc_type, exc_val, exc_tb):
351 from rhodecode.model.db import Session, IntegrityError
352
350 353 # save compute time
351 354 self.compute_time = time.time() - self._start_time
352 355
353 356 if self.skip_cache_active_change:
354 357 return
355 358
356 359 try:
357 360 self.cache_obj.cache_active = True
358 361 Session().add(self.cache_obj)
359 362 Session().commit()
360 363 except IntegrityError:
361 364 # if we catch integrity error, it means we inserted this object
362 365 # assumption is that's really an edge race-condition case and
363 366 # it's safe is to skip it
364 367 Session().rollback()
365 368 except Exception:
366 369 log.exception('Failed to commit on cache key update')
367 370 Session().rollback()
368 371 if self.raise_exception:
369 372 raise
@@ -1,51 +1,56 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 SQLAlchemy Metadata and Session object
23 23 """
24 24
25 25 from sqlalchemy.orm import declarative_base
26 26 from sqlalchemy.orm import scoped_session, sessionmaker
27 27 from sqlalchemy.orm import Session as SASession
28 from rhodecode.lib import caching_query
28 from rhodecode.lib.caching_query import ORMCache
29
29 30
30 31 __all__ = ['Base', 'Session', 'raw_query_executor']
31 32
32 33 # scoped_session. Apply our custom CachingQuery class to it,
33 34 # using a callable that will associate the dictionary
34 35 # of regions with the Query.
35 36 # to use cache use this in query
36 37 # .options(FromCache("sqlalchemy_cache_type", "cachekey"))
37 38 Session = scoped_session(
38 39 sessionmaker(
39 query_cls=caching_query.query_callable(),
40 40 expire_on_commit=True,
41 41 )
42 42 )
43 43
44 # pass empty regions so we can fetch it on-demand inside ORMCache
45 cache = ORMCache(regions={})
46 cache.listen_on_session(Session)
47
48
44 49 # The declarative Base
45 50 Base = declarative_base()
46 51
47 52
48 53 def raw_query_executor():
49 54 engine = Base.metadata.bind
50 55 session = SASession(engine)
51 56 return session
General Comments 0
You need to be logged in to leave comments. Login now