##// END OF EJS Templates
caches: added reprs
marcink -
r3933:37529f8b default
parent child Browse files
Show More
@@ -1,285 +1,288 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2019 RhodeCode GmbH
3 # Copyright (C) 2015-2019 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import time
21 import time
22 import errno
22 import errno
23 import logging
23 import logging
24
24
25 import msgpack
25 import msgpack
26 import gevent
26 import gevent
27 import redis
27 import redis
28
28
29 from dogpile.cache.api import CachedValue
29 from dogpile.cache.api import CachedValue
30 from dogpile.cache.backends import memory as memory_backend
30 from dogpile.cache.backends import memory as memory_backend
31 from dogpile.cache.backends import file as file_backend
31 from dogpile.cache.backends import file as file_backend
32 from dogpile.cache.backends import redis as redis_backend
32 from dogpile.cache.backends import redis as redis_backend
33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
34 from dogpile.cache.util import memoized_property
34 from dogpile.cache.util import memoized_property
35
35
36 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
36 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
37
37
38
38
39 _default_max_size = 1024
39 _default_max_size = 1024
40
40
41 log = logging.getLogger(__name__)
41 log = logging.getLogger(__name__)
42
42
43
43
44 class LRUMemoryBackend(memory_backend.MemoryBackend):
44 class LRUMemoryBackend(memory_backend.MemoryBackend):
45 key_prefix = 'lru_mem_backend'
45 key_prefix = 'lru_mem_backend'
46 pickle_values = False
46 pickle_values = False
47
47
48 def __init__(self, arguments):
48 def __init__(self, arguments):
49 max_size = arguments.pop('max_size', _default_max_size)
49 max_size = arguments.pop('max_size', _default_max_size)
50
50
51 LRUDictClass = LRUDict
51 LRUDictClass = LRUDict
52 if arguments.pop('log_key_count', None):
52 if arguments.pop('log_key_count', None):
53 LRUDictClass = LRUDictDebug
53 LRUDictClass = LRUDictDebug
54
54
55 arguments['cache_dict'] = LRUDictClass(max_size)
55 arguments['cache_dict'] = LRUDictClass(max_size)
56 super(LRUMemoryBackend, self).__init__(arguments)
56 super(LRUMemoryBackend, self).__init__(arguments)
57
57
58 def delete(self, key):
58 def delete(self, key):
59 try:
59 try:
60 del self._cache[key]
60 del self._cache[key]
61 except KeyError:
61 except KeyError:
62 # we don't care if key isn't there at deletion
62 # we don't care if key isn't there at deletion
63 pass
63 pass
64
64
65 def delete_multi(self, keys):
65 def delete_multi(self, keys):
66 for key in keys:
66 for key in keys:
67 self.delete(key)
67 self.delete(key)
68
68
69
69
70 class PickleSerializer(object):
70 class PickleSerializer(object):
71
71
72 def _dumps(self, value, safe=False):
72 def _dumps(self, value, safe=False):
73 try:
73 try:
74 return compat.pickle.dumps(value)
74 return compat.pickle.dumps(value)
75 except Exception:
75 except Exception:
76 if safe:
76 if safe:
77 return NO_VALUE
77 return NO_VALUE
78 else:
78 else:
79 raise
79 raise
80
80
81 def _loads(self, value, safe=True):
81 def _loads(self, value, safe=True):
82 try:
82 try:
83 return compat.pickle.loads(value)
83 return compat.pickle.loads(value)
84 except Exception:
84 except Exception:
85 if safe:
85 if safe:
86 return NO_VALUE
86 return NO_VALUE
87 else:
87 else:
88 raise
88 raise
89
89
90
90
91 class MsgPackSerializer(object):
91 class MsgPackSerializer(object):
92
92
93 def _dumps(self, value, safe=False):
93 def _dumps(self, value, safe=False):
94 try:
94 try:
95 return msgpack.packb(value)
95 return msgpack.packb(value)
96 except Exception:
96 except Exception:
97 if safe:
97 if safe:
98 return NO_VALUE
98 return NO_VALUE
99 else:
99 else:
100 raise
100 raise
101
101
102 def _loads(self, value, safe=True):
102 def _loads(self, value, safe=True):
103 """
103 """
104 pickle maintained the `CachedValue` wrapper of the tuple
104 pickle maintained the `CachedValue` wrapper of the tuple
105 msgpack does not, so it must be added back in.
105 msgpack does not, so it must be added back in.
106 """
106 """
107 try:
107 try:
108 value = msgpack.unpackb(value, use_list=False)
108 value = msgpack.unpackb(value, use_list=False)
109 return CachedValue(*value)
109 return CachedValue(*value)
110 except Exception:
110 except Exception:
111 if safe:
111 if safe:
112 return NO_VALUE
112 return NO_VALUE
113 else:
113 else:
114 raise
114 raise
115
115
116
116
117 import fcntl
117 import fcntl
118 flock_org = fcntl.flock
118 flock_org = fcntl.flock
119
119
120
120
121 class CustomLockFactory(FileLock):
121 class CustomLockFactory(FileLock):
122
122
123 @memoized_property
123 @memoized_property
124 def _module(self):
124 def _module(self):
125
125
126 def gevent_flock(fd, operation):
126 def gevent_flock(fd, operation):
127 """
127 """
128 Gevent compatible flock
128 Gevent compatible flock
129 """
129 """
130 # set non-blocking, this will cause an exception if we cannot acquire a lock
130 # set non-blocking, this will cause an exception if we cannot acquire a lock
131 operation |= fcntl.LOCK_NB
131 operation |= fcntl.LOCK_NB
132 start_lock_time = time.time()
132 start_lock_time = time.time()
133 timeout = 60 * 15 # 15min
133 timeout = 60 * 15 # 15min
134 while True:
134 while True:
135 try:
135 try:
136 flock_org(fd, operation)
136 flock_org(fd, operation)
137 # lock has been acquired
137 # lock has been acquired
138 break
138 break
139 except (OSError, IOError) as e:
139 except (OSError, IOError) as e:
140 # raise on other errors than Resource temporarily unavailable
140 # raise on other errors than Resource temporarily unavailable
141 if e.errno != errno.EAGAIN:
141 if e.errno != errno.EAGAIN:
142 raise
142 raise
143 elif (time.time() - start_lock_time) > timeout:
143 elif (time.time() - start_lock_time) > timeout:
144 # waited to much time on a lock, better fail than loop for ever
144 # waited to much time on a lock, better fail than loop for ever
145 log.error('Failed to acquire lock on `%s` after waiting %ss',
145 log.error('Failed to acquire lock on `%s` after waiting %ss',
146 self.filename, timeout)
146 self.filename, timeout)
147 raise
147 raise
148 wait_timeout = 0.03
148 wait_timeout = 0.03
149 log.debug('Failed to acquire lock on `%s`, retry in %ss',
149 log.debug('Failed to acquire lock on `%s`, retry in %ss',
150 self.filename, wait_timeout)
150 self.filename, wait_timeout)
151 gevent.sleep(wait_timeout)
151 gevent.sleep(wait_timeout)
152
152
153 fcntl.flock = gevent_flock
153 fcntl.flock = gevent_flock
154 return fcntl
154 return fcntl
155
155
156
156
157 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
157 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
158 key_prefix = 'file_backend'
158 key_prefix = 'file_backend'
159
159
160 def __init__(self, arguments):
160 def __init__(self, arguments):
161 arguments['lock_factory'] = CustomLockFactory
161 arguments['lock_factory'] = CustomLockFactory
162 super(FileNamespaceBackend, self).__init__(arguments)
162 super(FileNamespaceBackend, self).__init__(arguments)
163
163
164 def __repr__(self):
165 return '{} `{}`'.format(self.__class__, self.filename)
166
164 def list_keys(self, prefix=''):
167 def list_keys(self, prefix=''):
165 prefix = '{}:{}'.format(self.key_prefix, prefix)
168 prefix = '{}:{}'.format(self.key_prefix, prefix)
166
169
167 def cond(v):
170 def cond(v):
168 if not prefix:
171 if not prefix:
169 return True
172 return True
170
173
171 if v.startswith(prefix):
174 if v.startswith(prefix):
172 return True
175 return True
173 return False
176 return False
174
177
175 with self._dbm_file(True) as dbm:
178 with self._dbm_file(True) as dbm:
176
179
177 return filter(cond, dbm.keys())
180 return filter(cond, dbm.keys())
178
181
179 def get_store(self):
182 def get_store(self):
180 return self.filename
183 return self.filename
181
184
182 def get(self, key):
185 def get(self, key):
183 with self._dbm_file(False) as dbm:
186 with self._dbm_file(False) as dbm:
184 if hasattr(dbm, 'get'):
187 if hasattr(dbm, 'get'):
185 value = dbm.get(key, NO_VALUE)
188 value = dbm.get(key, NO_VALUE)
186 else:
189 else:
187 # gdbm objects lack a .get method
190 # gdbm objects lack a .get method
188 try:
191 try:
189 value = dbm[key]
192 value = dbm[key]
190 except KeyError:
193 except KeyError:
191 value = NO_VALUE
194 value = NO_VALUE
192 if value is not NO_VALUE:
195 if value is not NO_VALUE:
193 value = self._loads(value)
196 value = self._loads(value)
194 return value
197 return value
195
198
196 def set(self, key, value):
199 def set(self, key, value):
197 with self._dbm_file(True) as dbm:
200 with self._dbm_file(True) as dbm:
198 dbm[key] = self._dumps(value)
201 dbm[key] = self._dumps(value)
199
202
200 def set_multi(self, mapping):
203 def set_multi(self, mapping):
201 with self._dbm_file(True) as dbm:
204 with self._dbm_file(True) as dbm:
202 for key, value in mapping.items():
205 for key, value in mapping.items():
203 dbm[key] = self._dumps(value)
206 dbm[key] = self._dumps(value)
204
207
205
208
206 class BaseRedisBackend(redis_backend.RedisBackend):
209 class BaseRedisBackend(redis_backend.RedisBackend):
207
210
208 def _create_client(self):
211 def _create_client(self):
209 args = {}
212 args = {}
210
213
211 if self.url is not None:
214 if self.url is not None:
212 args.update(url=self.url)
215 args.update(url=self.url)
213
216
214 else:
217 else:
215 args.update(
218 args.update(
216 host=self.host, password=self.password,
219 host=self.host, password=self.password,
217 port=self.port, db=self.db
220 port=self.port, db=self.db
218 )
221 )
219
222
220 connection_pool = redis.ConnectionPool(**args)
223 connection_pool = redis.ConnectionPool(**args)
221
224
222 return redis.StrictRedis(connection_pool=connection_pool)
225 return redis.StrictRedis(connection_pool=connection_pool)
223
226
224 def list_keys(self, prefix=''):
227 def list_keys(self, prefix=''):
225 prefix = '{}:{}*'.format(self.key_prefix, prefix)
228 prefix = '{}:{}*'.format(self.key_prefix, prefix)
226 return self.client.keys(prefix)
229 return self.client.keys(prefix)
227
230
228 def get_store(self):
231 def get_store(self):
229 return self.client.connection_pool
232 return self.client.connection_pool
230
233
231 def get(self, key):
234 def get(self, key):
232 value = self.client.get(key)
235 value = self.client.get(key)
233 if value is None:
236 if value is None:
234 return NO_VALUE
237 return NO_VALUE
235 return self._loads(value)
238 return self._loads(value)
236
239
237 def get_multi(self, keys):
240 def get_multi(self, keys):
238 if not keys:
241 if not keys:
239 return []
242 return []
240 values = self.client.mget(keys)
243 values = self.client.mget(keys)
241 loads = self._loads
244 loads = self._loads
242 return [
245 return [
243 loads(v) if v is not None else NO_VALUE
246 loads(v) if v is not None else NO_VALUE
244 for v in values]
247 for v in values]
245
248
246 def set(self, key, value):
249 def set(self, key, value):
247 if self.redis_expiration_time:
250 if self.redis_expiration_time:
248 self.client.setex(key, self.redis_expiration_time,
251 self.client.setex(key, self.redis_expiration_time,
249 self._dumps(value))
252 self._dumps(value))
250 else:
253 else:
251 self.client.set(key, self._dumps(value))
254 self.client.set(key, self._dumps(value))
252
255
253 def set_multi(self, mapping):
256 def set_multi(self, mapping):
254 dumps = self._dumps
257 dumps = self._dumps
255 mapping = dict(
258 mapping = dict(
256 (k, dumps(v))
259 (k, dumps(v))
257 for k, v in mapping.items()
260 for k, v in mapping.items()
258 )
261 )
259
262
260 if not self.redis_expiration_time:
263 if not self.redis_expiration_time:
261 self.client.mset(mapping)
264 self.client.mset(mapping)
262 else:
265 else:
263 pipe = self.client.pipeline()
266 pipe = self.client.pipeline()
264 for key, value in mapping.items():
267 for key, value in mapping.items():
265 pipe.setex(key, self.redis_expiration_time, value)
268 pipe.setex(key, self.redis_expiration_time, value)
266 pipe.execute()
269 pipe.execute()
267
270
268 def get_mutex(self, key):
271 def get_mutex(self, key):
269 u = redis_backend.u
272 u = redis_backend.u
270 if self.distributed_lock:
273 if self.distributed_lock:
271 lock_key = u('_lock_{0}').format(key)
274 lock_key = u('_lock_{0}').format(key)
272 log.debug('Trying to acquire Redis lock for key %s', lock_key)
275 log.debug('Trying to acquire Redis lock for key %s', lock_key)
273 return self.client.lock(lock_key, self.lock_timeout, self.lock_sleep)
276 return self.client.lock(lock_key, self.lock_timeout, self.lock_sleep)
274 else:
277 else:
275 return None
278 return None
276
279
277
280
278 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
281 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
279 key_prefix = 'redis_pickle_backend'
282 key_prefix = 'redis_pickle_backend'
280 pass
283 pass
281
284
282
285
283 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
286 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
284 key_prefix = 'redis_msgpack_backend'
287 key_prefix = 'redis_msgpack_backend'
285 pass
288 pass
@@ -1,355 +1,355 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2019 RhodeCode GmbH
3 # Copyright (C) 2015-2019 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 import os
20 import os
21 import time
21 import time
22 import logging
22 import logging
23 import functools
23 import functools
24 from decorator import decorate
24 from decorator import decorate
25 import threading
25 import threading
26
26
27 from dogpile.cache import CacheRegion
27 from dogpile.cache import CacheRegion
28 from dogpile.cache.util import compat
28 from dogpile.cache.util import compat
29
29
30 import rhodecode
30 import rhodecode
31 from rhodecode.lib.utils import safe_str, sha1
31 from rhodecode.lib.utils import safe_str, sha1
32 from rhodecode.lib.utils2 import safe_unicode, str2bool
32 from rhodecode.lib.utils2 import safe_unicode, str2bool
33 from rhodecode.model.db import Session, CacheKey, IntegrityError
33 from rhodecode.model.db import Session, CacheKey, IntegrityError
34
34
35 from rhodecode.lib.rc_cache import cache_key_meta
35 from rhodecode.lib.rc_cache import cache_key_meta
36 from rhodecode.lib.rc_cache import region_meta
36 from rhodecode.lib.rc_cache import region_meta
37
37
38 log = logging.getLogger(__name__)
38 log = logging.getLogger(__name__)
39
39
40
40
41 class RhodeCodeCacheRegion(CacheRegion):
41 class RhodeCodeCacheRegion(CacheRegion):
42
42
43 def conditional_cache_on_arguments(
43 def conditional_cache_on_arguments(
44 self, namespace=None,
44 self, namespace=None,
45 expiration_time=None,
45 expiration_time=None,
46 should_cache_fn=None,
46 should_cache_fn=None,
47 to_str=compat.string_type,
47 to_str=compat.string_type,
48 function_key_generator=None,
48 function_key_generator=None,
49 condition=True):
49 condition=True):
50 """
50 """
51 Custom conditional decorator, that will not touch any dogpile internals if
51 Custom conditional decorator, that will not touch any dogpile internals if
52 condition isn't meet. This works a bit different than should_cache_fn
52 condition isn't meet. This works a bit different than should_cache_fn
53 And it's faster in cases we don't ever want to compute cached values
53 And it's faster in cases we don't ever want to compute cached values
54 """
54 """
55 expiration_time_is_callable = compat.callable(expiration_time)
55 expiration_time_is_callable = compat.callable(expiration_time)
56
56
57 if function_key_generator is None:
57 if function_key_generator is None:
58 function_key_generator = self.function_key_generator
58 function_key_generator = self.function_key_generator
59
59
60 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
60 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
61
61
62 if not condition:
62 if not condition:
63 log.debug('Calling un-cached func:%s', user_func)
63 log.debug('Calling un-cached func:%s', user_func.func_name)
64 return user_func(*arg, **kw)
64 return user_func(*arg, **kw)
65
65
66 key = key_generator(*arg, **kw)
66 key = key_generator(*arg, **kw)
67
67
68 timeout = expiration_time() if expiration_time_is_callable \
68 timeout = expiration_time() if expiration_time_is_callable \
69 else expiration_time
69 else expiration_time
70
70
71 log.debug('Calling cached fn:%s', user_func)
71 log.debug('Calling cached fn:%s', user_func.func_name)
72 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
72 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
73
73
74 def cache_decorator(user_func):
74 def cache_decorator(user_func):
75 if to_str is compat.string_type:
75 if to_str is compat.string_type:
76 # backwards compatible
76 # backwards compatible
77 key_generator = function_key_generator(namespace, user_func)
77 key_generator = function_key_generator(namespace, user_func)
78 else:
78 else:
79 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
79 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
80
80
81 def refresh(*arg, **kw):
81 def refresh(*arg, **kw):
82 """
82 """
83 Like invalidate, but regenerates the value instead
83 Like invalidate, but regenerates the value instead
84 """
84 """
85 key = key_generator(*arg, **kw)
85 key = key_generator(*arg, **kw)
86 value = user_func(*arg, **kw)
86 value = user_func(*arg, **kw)
87 self.set(key, value)
87 self.set(key, value)
88 return value
88 return value
89
89
90 def invalidate(*arg, **kw):
90 def invalidate(*arg, **kw):
91 key = key_generator(*arg, **kw)
91 key = key_generator(*arg, **kw)
92 self.delete(key)
92 self.delete(key)
93
93
94 def set_(value, *arg, **kw):
94 def set_(value, *arg, **kw):
95 key = key_generator(*arg, **kw)
95 key = key_generator(*arg, **kw)
96 self.set(key, value)
96 self.set(key, value)
97
97
98 def get(*arg, **kw):
98 def get(*arg, **kw):
99 key = key_generator(*arg, **kw)
99 key = key_generator(*arg, **kw)
100 return self.get(key)
100 return self.get(key)
101
101
102 user_func.set = set_
102 user_func.set = set_
103 user_func.invalidate = invalidate
103 user_func.invalidate = invalidate
104 user_func.get = get
104 user_func.get = get
105 user_func.refresh = refresh
105 user_func.refresh = refresh
106 user_func.key_generator = key_generator
106 user_func.key_generator = key_generator
107 user_func.original = user_func
107 user_func.original = user_func
108
108
109 # Use `decorate` to preserve the signature of :param:`user_func`.
109 # Use `decorate` to preserve the signature of :param:`user_func`.
110
110
111 return decorate(user_func, functools.partial(
111 return decorate(user_func, functools.partial(
112 get_or_create_for_user_func, key_generator))
112 get_or_create_for_user_func, key_generator))
113
113
114 return cache_decorator
114 return cache_decorator
115
115
116
116
117 def make_region(*arg, **kw):
117 def make_region(*arg, **kw):
118 return RhodeCodeCacheRegion(*arg, **kw)
118 return RhodeCodeCacheRegion(*arg, **kw)
119
119
120
120
121 def get_default_cache_settings(settings, prefixes=None):
121 def get_default_cache_settings(settings, prefixes=None):
122 prefixes = prefixes or []
122 prefixes = prefixes or []
123 cache_settings = {}
123 cache_settings = {}
124 for key in settings.keys():
124 for key in settings.keys():
125 for prefix in prefixes:
125 for prefix in prefixes:
126 if key.startswith(prefix):
126 if key.startswith(prefix):
127 name = key.split(prefix)[1].strip()
127 name = key.split(prefix)[1].strip()
128 val = settings[key]
128 val = settings[key]
129 if isinstance(val, compat.string_types):
129 if isinstance(val, compat.string_types):
130 val = val.strip()
130 val = val.strip()
131 cache_settings[name] = val
131 cache_settings[name] = val
132 return cache_settings
132 return cache_settings
133
133
134
134
135 def compute_key_from_params(*args):
135 def compute_key_from_params(*args):
136 """
136 """
137 Helper to compute key from given params to be used in cache manager
137 Helper to compute key from given params to be used in cache manager
138 """
138 """
139 return sha1("_".join(map(safe_str, args)))
139 return sha1("_".join(map(safe_str, args)))
140
140
141
141
142 def backend_key_generator(backend):
142 def backend_key_generator(backend):
143 """
143 """
144 Special wrapper that also sends over the backend to the key generator
144 Special wrapper that also sends over the backend to the key generator
145 """
145 """
146 def wrapper(namespace, fn):
146 def wrapper(namespace, fn):
147 return key_generator(backend, namespace, fn)
147 return key_generator(backend, namespace, fn)
148 return wrapper
148 return wrapper
149
149
150
150
151 def key_generator(backend, namespace, fn):
151 def key_generator(backend, namespace, fn):
152 fname = fn.__name__
152 fname = fn.__name__
153
153
154 def generate_key(*args):
154 def generate_key(*args):
155 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
155 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
156 namespace_pref = namespace or 'default_namespace'
156 namespace_pref = namespace or 'default_namespace'
157 arg_key = compute_key_from_params(*args)
157 arg_key = compute_key_from_params(*args)
158 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
158 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
159
159
160 return final_key
160 return final_key
161
161
162 return generate_key
162 return generate_key
163
163
164
164
165 def get_or_create_region(region_name, region_namespace=None):
165 def get_or_create_region(region_name, region_namespace=None):
166 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
166 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
167 region_obj = region_meta.dogpile_cache_regions.get(region_name)
167 region_obj = region_meta.dogpile_cache_regions.get(region_name)
168 if not region_obj:
168 if not region_obj:
169 raise EnvironmentError(
169 raise EnvironmentError(
170 'Region `{}` not in configured: {}.'.format(
170 'Region `{}` not in configured: {}.'.format(
171 region_name, region_meta.dogpile_cache_regions.keys()))
171 region_name, region_meta.dogpile_cache_regions.keys()))
172
172
173 region_uid_name = '{}:{}'.format(region_name, region_namespace)
173 region_uid_name = '{}:{}'.format(region_name, region_namespace)
174 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
174 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
175 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
175 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
176 if region_exist:
176 if region_exist:
177 log.debug('Using already configured region: %s', region_namespace)
177 log.debug('Using already configured region: %s', region_namespace)
178 return region_exist
178 return region_exist
179 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
179 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
180 expiration_time = region_obj.expiration_time
180 expiration_time = region_obj.expiration_time
181
181
182 if not os.path.isdir(cache_dir):
182 if not os.path.isdir(cache_dir):
183 os.makedirs(cache_dir)
183 os.makedirs(cache_dir)
184 new_region = make_region(
184 new_region = make_region(
185 name=region_uid_name,
185 name=region_uid_name,
186 function_key_generator=backend_key_generator(region_obj.actual_backend)
186 function_key_generator=backend_key_generator(region_obj.actual_backend)
187 )
187 )
188 namespace_filename = os.path.join(
188 namespace_filename = os.path.join(
189 cache_dir, "{}.cache.dbm".format(region_namespace))
189 cache_dir, "{}.cache.dbm".format(region_namespace))
190 # special type that allows 1db per namespace
190 # special type that allows 1db per namespace
191 new_region.configure(
191 new_region.configure(
192 backend='dogpile.cache.rc.file_namespace',
192 backend='dogpile.cache.rc.file_namespace',
193 expiration_time=expiration_time,
193 expiration_time=expiration_time,
194 arguments={"filename": namespace_filename}
194 arguments={"filename": namespace_filename}
195 )
195 )
196
196
197 # create and save in region caches
197 # create and save in region caches
198 log.debug('configuring new region: %s', region_uid_name)
198 log.debug('configuring new region: %s', region_uid_name)
199 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
199 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
200
200
201 return region_obj
201 return region_obj
202
202
203
203
204 def clear_cache_namespace(cache_region, cache_namespace_uid):
204 def clear_cache_namespace(cache_region, cache_namespace_uid):
205 region = get_or_create_region(cache_region, cache_namespace_uid)
205 region = get_or_create_region(cache_region, cache_namespace_uid)
206 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
206 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
207 num_delete_keys = len(cache_keys)
207 num_delete_keys = len(cache_keys)
208 if num_delete_keys:
208 if num_delete_keys:
209 region.delete_multi(cache_keys)
209 region.delete_multi(cache_keys)
210 return num_delete_keys
210 return num_delete_keys
211
211
212
212
213 class ActiveRegionCache(object):
213 class ActiveRegionCache(object):
214 def __init__(self, context, cache_data):
214 def __init__(self, context, cache_data):
215 self.context = context
215 self.context = context
216 self.cache_data = cache_data
216 self.cache_data = cache_data
217
217
218 def should_invalidate(self):
218 def should_invalidate(self):
219 return False
219 return False
220
220
221
221
222 class FreshRegionCache(object):
222 class FreshRegionCache(object):
223 def __init__(self, context, cache_data):
223 def __init__(self, context, cache_data):
224 self.context = context
224 self.context = context
225 self.cache_data = cache_data
225 self.cache_data = cache_data
226
226
227 def should_invalidate(self):
227 def should_invalidate(self):
228 return True
228 return True
229
229
230
230
231 class InvalidationContext(object):
231 class InvalidationContext(object):
232 """
232 """
233 usage::
233 usage::
234
234
235 from rhodecode.lib import rc_cache
235 from rhodecode.lib import rc_cache
236
236
237 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
237 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
238 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
238 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
239
239
240 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
240 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
241 def heavy_compute(cache_name, param1, param2):
241 def heavy_compute(cache_name, param1, param2):
242 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
242 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
243
243
244 # invalidation namespace is shared namespace key for all process caches
244 # invalidation namespace is shared namespace key for all process caches
245 # we use it to send a global signal
245 # we use it to send a global signal
246 invalidation_namespace = 'repo_cache:1'
246 invalidation_namespace = 'repo_cache:1'
247
247
248 inv_context_manager = rc_cache.InvalidationContext(
248 inv_context_manager = rc_cache.InvalidationContext(
249 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
249 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
250 with inv_context_manager as invalidation_context:
250 with inv_context_manager as invalidation_context:
251 args = ('one', 'two')
251 args = ('one', 'two')
252 # re-compute and store cache if we get invalidate signal
252 # re-compute and store cache if we get invalidate signal
253 if invalidation_context.should_invalidate():
253 if invalidation_context.should_invalidate():
254 result = heavy_compute.refresh(*args)
254 result = heavy_compute.refresh(*args)
255 else:
255 else:
256 result = heavy_compute(*args)
256 result = heavy_compute(*args)
257
257
258 compute_time = inv_context_manager.compute_time
258 compute_time = inv_context_manager.compute_time
259 log.debug('result computed in %.4fs', compute_time)
259 log.debug('result computed in %.4fs', compute_time)
260
260
261 # To send global invalidation signal, simply run
261 # To send global invalidation signal, simply run
262 CacheKey.set_invalidate(invalidation_namespace)
262 CacheKey.set_invalidate(invalidation_namespace)
263
263
264 """
264 """
265
265
266 def __repr__(self):
266 def __repr__(self):
267 return '<InvalidationContext:{}[{}]>'.format(
267 return '<InvalidationContext:{}[{}]>'.format(
268 safe_str(self.cache_key), safe_str(self.uid))
268 safe_str(self.cache_key), safe_str(self.uid))
269
269
270 def __init__(self, uid, invalidation_namespace='',
270 def __init__(self, uid, invalidation_namespace='',
271 raise_exception=False, thread_scoped=None):
271 raise_exception=False, thread_scoped=None):
272 self.uid = uid
272 self.uid = uid
273 self.invalidation_namespace = invalidation_namespace
273 self.invalidation_namespace = invalidation_namespace
274 self.raise_exception = raise_exception
274 self.raise_exception = raise_exception
275 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
275 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
276 self.thread_id = 'global'
276 self.thread_id = 'global'
277
277
278 if thread_scoped is None:
278 if thread_scoped is None:
279 # if we set "default" we can override this via .ini settings
279 # if we set "default" we can override this via .ini settings
280 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
280 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
281
281
282 # Append the thread id to the cache key if this invalidation context
282 # Append the thread id to the cache key if this invalidation context
283 # should be scoped to the current thread.
283 # should be scoped to the current thread.
284 if thread_scoped is True:
284 if thread_scoped is True:
285 self.thread_id = threading.current_thread().ident
285 self.thread_id = threading.current_thread().ident
286
286
287 self.cache_key = compute_key_from_params(uid)
287 self.cache_key = compute_key_from_params(uid)
288 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
288 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
289 self.proc_id, self.thread_id, self.cache_key)
289 self.proc_id, self.thread_id, self.cache_key)
290 self.compute_time = 0
290 self.compute_time = 0
291
291
292 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
292 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
293 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
293 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
294 # fetch all cache keys for this namespace and convert them to a map to find if we
294 # fetch all cache keys for this namespace and convert them to a map to find if we
295 # have specific cache_key object registered. We do this because we want to have
295 # have specific cache_key object registered. We do this because we want to have
296 # all consistent cache_state_uid for newly registered objects
296 # all consistent cache_state_uid for newly registered objects
297 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
297 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
298 cache_obj = cache_obj_map.get(self.cache_key)
298 cache_obj = cache_obj_map.get(self.cache_key)
299 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
299 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
300 if not cache_obj:
300 if not cache_obj:
301 new_cache_args = invalidation_namespace
301 new_cache_args = invalidation_namespace
302 first_cache_obj = next(cache_obj_map.itervalues()) if cache_obj_map else None
302 first_cache_obj = next(cache_obj_map.itervalues()) if cache_obj_map else None
303 cache_state_uid = None
303 cache_state_uid = None
304 if first_cache_obj:
304 if first_cache_obj:
305 cache_state_uid = first_cache_obj.cache_state_uid
305 cache_state_uid = first_cache_obj.cache_state_uid
306 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
306 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
307 cache_state_uid=cache_state_uid)
307 cache_state_uid=cache_state_uid)
308 cache_key_meta.cache_keys_by_pid.append(self.cache_key)
308 cache_key_meta.cache_keys_by_pid.append(self.cache_key)
309
309
310 return cache_obj
310 return cache_obj
311
311
312 def __enter__(self):
312 def __enter__(self):
313 """
313 """
314 Test if current object is valid, and return CacheRegion function
314 Test if current object is valid, and return CacheRegion function
315 that does invalidation and calculation
315 that does invalidation and calculation
316 """
316 """
317 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
317 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
318 # register or get a new key based on uid
318 # register or get a new key based on uid
319 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
319 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
320 cache_data = self.cache_obj.get_dict()
320 cache_data = self.cache_obj.get_dict()
321 self._start_time = time.time()
321 self._start_time = time.time()
322 if self.cache_obj.cache_active:
322 if self.cache_obj.cache_active:
323 # means our cache obj is existing and marked as it's
323 # means our cache obj is existing and marked as it's
324 # cache is not outdated, we return ActiveRegionCache
324 # cache is not outdated, we return ActiveRegionCache
325 self.skip_cache_active_change = True
325 self.skip_cache_active_change = True
326
326
327 return ActiveRegionCache(context=self, cache_data=cache_data)
327 return ActiveRegionCache(context=self, cache_data=cache_data)
328
328
329 # the key is either not existing or set to False, we return
329 # the key is either not existing or set to False, we return
330 # the real invalidator which re-computes value. We additionally set
330 # the real invalidator which re-computes value. We additionally set
331 # the flag to actually update the Database objects
331 # the flag to actually update the Database objects
332 self.skip_cache_active_change = False
332 self.skip_cache_active_change = False
333 return FreshRegionCache(context=self, cache_data=cache_data)
333 return FreshRegionCache(context=self, cache_data=cache_data)
334
334
335 def __exit__(self, exc_type, exc_val, exc_tb):
335 def __exit__(self, exc_type, exc_val, exc_tb):
336 # save compute time
336 # save compute time
337 self.compute_time = time.time() - self._start_time
337 self.compute_time = time.time() - self._start_time
338
338
339 if self.skip_cache_active_change:
339 if self.skip_cache_active_change:
340 return
340 return
341
341
342 try:
342 try:
343 self.cache_obj.cache_active = True
343 self.cache_obj.cache_active = True
344 Session().add(self.cache_obj)
344 Session().add(self.cache_obj)
345 Session().commit()
345 Session().commit()
346 except IntegrityError:
346 except IntegrityError:
347 # if we catch integrity error, it means we inserted this object
347 # if we catch integrity error, it means we inserted this object
348 # assumption is that's really an edge race-condition case and
348 # assumption is that's really an edge race-condition case and
349 # it's safe is to skip it
349 # it's safe is to skip it
350 Session().rollback()
350 Session().rollback()
351 except Exception:
351 except Exception:
352 log.exception('Failed to commit on cache key update')
352 log.exception('Failed to commit on cache key update')
353 Session().rollback()
353 Session().rollback()
354 if self.raise_exception:
354 if self.raise_exception:
355 raise
355 raise
General Comments 0
You need to be logged in to leave comments. Login now