##// END OF EJS Templates
caches: fixed unicode errors on non-ascii cache keys
super-admin -
r4809:3a959508 default
parent child Browse files
Show More
@@ -1,364 +1,364 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2020 RhodeCode GmbH
3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import time
21 import time
22 import errno
22 import errno
23 import logging
23 import logging
24
24
25 import msgpack
25 import msgpack
26 import gevent
26 import gevent
27 import redis
27 import redis
28
28
29 from dogpile.cache.api import CachedValue
29 from dogpile.cache.api import CachedValue
30 from dogpile.cache.backends import memory as memory_backend
30 from dogpile.cache.backends import memory as memory_backend
31 from dogpile.cache.backends import file as file_backend
31 from dogpile.cache.backends import file as file_backend
32 from dogpile.cache.backends import redis as redis_backend
32 from dogpile.cache.backends import redis as redis_backend
33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
34 from dogpile.cache.util import memoized_property
34 from dogpile.cache.util import memoized_property
35
35
36 from pyramid.settings import asbool
36 from pyramid.settings import asbool
37
37
38 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
38 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
39 from rhodecode.lib.utils import safe_str
39 from rhodecode.lib.utils import safe_str, safe_unicode
40
40
41
41
42 _default_max_size = 1024
42 _default_max_size = 1024
43
43
44 log = logging.getLogger(__name__)
44 log = logging.getLogger(__name__)
45
45
46
46
47 class LRUMemoryBackend(memory_backend.MemoryBackend):
47 class LRUMemoryBackend(memory_backend.MemoryBackend):
48 key_prefix = 'lru_mem_backend'
48 key_prefix = 'lru_mem_backend'
49 pickle_values = False
49 pickle_values = False
50
50
51 def __init__(self, arguments):
51 def __init__(self, arguments):
52 max_size = arguments.pop('max_size', _default_max_size)
52 max_size = arguments.pop('max_size', _default_max_size)
53
53
54 LRUDictClass = LRUDict
54 LRUDictClass = LRUDict
55 if arguments.pop('log_key_count', None):
55 if arguments.pop('log_key_count', None):
56 LRUDictClass = LRUDictDebug
56 LRUDictClass = LRUDictDebug
57
57
58 arguments['cache_dict'] = LRUDictClass(max_size)
58 arguments['cache_dict'] = LRUDictClass(max_size)
59 super(LRUMemoryBackend, self).__init__(arguments)
59 super(LRUMemoryBackend, self).__init__(arguments)
60
60
61 def delete(self, key):
61 def delete(self, key):
62 try:
62 try:
63 del self._cache[key]
63 del self._cache[key]
64 except KeyError:
64 except KeyError:
65 # we don't care if key isn't there at deletion
65 # we don't care if key isn't there at deletion
66 pass
66 pass
67
67
68 def delete_multi(self, keys):
68 def delete_multi(self, keys):
69 for key in keys:
69 for key in keys:
70 self.delete(key)
70 self.delete(key)
71
71
72
72
73 class PickleSerializer(object):
73 class PickleSerializer(object):
74
74
75 def _dumps(self, value, safe=False):
75 def _dumps(self, value, safe=False):
76 try:
76 try:
77 return compat.pickle.dumps(value)
77 return compat.pickle.dumps(value)
78 except Exception:
78 except Exception:
79 if safe:
79 if safe:
80 return NO_VALUE
80 return NO_VALUE
81 else:
81 else:
82 raise
82 raise
83
83
84 def _loads(self, value, safe=True):
84 def _loads(self, value, safe=True):
85 try:
85 try:
86 return compat.pickle.loads(value)
86 return compat.pickle.loads(value)
87 except Exception:
87 except Exception:
88 if safe:
88 if safe:
89 return NO_VALUE
89 return NO_VALUE
90 else:
90 else:
91 raise
91 raise
92
92
93
93
94 class MsgPackSerializer(object):
94 class MsgPackSerializer(object):
95
95
96 def _dumps(self, value, safe=False):
96 def _dumps(self, value, safe=False):
97 try:
97 try:
98 return msgpack.packb(value)
98 return msgpack.packb(value)
99 except Exception:
99 except Exception:
100 if safe:
100 if safe:
101 return NO_VALUE
101 return NO_VALUE
102 else:
102 else:
103 raise
103 raise
104
104
105 def _loads(self, value, safe=True):
105 def _loads(self, value, safe=True):
106 """
106 """
107 pickle maintained the `CachedValue` wrapper of the tuple
107 pickle maintained the `CachedValue` wrapper of the tuple
108 msgpack does not, so it must be added back in.
108 msgpack does not, so it must be added back in.
109 """
109 """
110 try:
110 try:
111 value = msgpack.unpackb(value, use_list=False)
111 value = msgpack.unpackb(value, use_list=False)
112 return CachedValue(*value)
112 return CachedValue(*value)
113 except Exception:
113 except Exception:
114 if safe:
114 if safe:
115 return NO_VALUE
115 return NO_VALUE
116 else:
116 else:
117 raise
117 raise
118
118
119
119
120 import fcntl
120 import fcntl
121 flock_org = fcntl.flock
121 flock_org = fcntl.flock
122
122
123
123
124 class CustomLockFactory(FileLock):
124 class CustomLockFactory(FileLock):
125
125
126 @memoized_property
126 @memoized_property
127 def _module(self):
127 def _module(self):
128
128
129 def gevent_flock(fd, operation):
129 def gevent_flock(fd, operation):
130 """
130 """
131 Gevent compatible flock
131 Gevent compatible flock
132 """
132 """
133 # set non-blocking, this will cause an exception if we cannot acquire a lock
133 # set non-blocking, this will cause an exception if we cannot acquire a lock
134 operation |= fcntl.LOCK_NB
134 operation |= fcntl.LOCK_NB
135 start_lock_time = time.time()
135 start_lock_time = time.time()
136 timeout = 60 * 15 # 15min
136 timeout = 60 * 15 # 15min
137 while True:
137 while True:
138 try:
138 try:
139 flock_org(fd, operation)
139 flock_org(fd, operation)
140 # lock has been acquired
140 # lock has been acquired
141 break
141 break
142 except (OSError, IOError) as e:
142 except (OSError, IOError) as e:
143 # raise on other errors than Resource temporarily unavailable
143 # raise on other errors than Resource temporarily unavailable
144 if e.errno != errno.EAGAIN:
144 if e.errno != errno.EAGAIN:
145 raise
145 raise
146 elif (time.time() - start_lock_time) > timeout:
146 elif (time.time() - start_lock_time) > timeout:
147 # waited to much time on a lock, better fail than loop for ever
147 # waited to much time on a lock, better fail than loop for ever
148 log.error('Failed to acquire lock on `%s` after waiting %ss',
148 log.error('Failed to acquire lock on `%s` after waiting %ss',
149 self.filename, timeout)
149 self.filename, timeout)
150 raise
150 raise
151 wait_timeout = 0.03
151 wait_timeout = 0.03
152 log.debug('Failed to acquire lock on `%s`, retry in %ss',
152 log.debug('Failed to acquire lock on `%s`, retry in %ss',
153 self.filename, wait_timeout)
153 self.filename, wait_timeout)
154 gevent.sleep(wait_timeout)
154 gevent.sleep(wait_timeout)
155
155
156 fcntl.flock = gevent_flock
156 fcntl.flock = gevent_flock
157 return fcntl
157 return fcntl
158
158
159
159
160 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
160 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
161 key_prefix = 'file_backend'
161 key_prefix = 'file_backend'
162
162
163 def __init__(self, arguments):
163 def __init__(self, arguments):
164 arguments['lock_factory'] = CustomLockFactory
164 arguments['lock_factory'] = CustomLockFactory
165 db_file = arguments.get('filename')
165 db_file = arguments.get('filename')
166
166
167 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
167 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
168 try:
168 try:
169 super(FileNamespaceBackend, self).__init__(arguments)
169 super(FileNamespaceBackend, self).__init__(arguments)
170 except Exception:
170 except Exception:
171 log.exception('Failed to initialize db at: %s', db_file)
171 log.exception('Failed to initialize db at: %s', db_file)
172 raise
172 raise
173
173
174 def __repr__(self):
174 def __repr__(self):
175 return '{} `{}`'.format(self.__class__, self.filename)
175 return '{} `{}`'.format(self.__class__, self.filename)
176
176
177 def list_keys(self, prefix=''):
177 def list_keys(self, prefix=''):
178 prefix = '{}:{}'.format(self.key_prefix, prefix)
178 prefix = '{}:{}'.format(self.key_prefix, prefix)
179
179
180 def cond(v):
180 def cond(v):
181 if not prefix:
181 if not prefix:
182 return True
182 return True
183
183
184 if v.startswith(prefix):
184 if v.startswith(prefix):
185 return True
185 return True
186 return False
186 return False
187
187
188 with self._dbm_file(True) as dbm:
188 with self._dbm_file(True) as dbm:
189 try:
189 try:
190 return filter(cond, dbm.keys())
190 return filter(cond, dbm.keys())
191 except Exception:
191 except Exception:
192 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
192 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
193 raise
193 raise
194
194
195 def get_store(self):
195 def get_store(self):
196 return self.filename
196 return self.filename
197
197
198 def _dbm_get(self, key):
198 def _dbm_get(self, key):
199 with self._dbm_file(False) as dbm:
199 with self._dbm_file(False) as dbm:
200 if hasattr(dbm, 'get'):
200 if hasattr(dbm, 'get'):
201 value = dbm.get(key, NO_VALUE)
201 value = dbm.get(key, NO_VALUE)
202 else:
202 else:
203 # gdbm objects lack a .get method
203 # gdbm objects lack a .get method
204 try:
204 try:
205 value = dbm[key]
205 value = dbm[key]
206 except KeyError:
206 except KeyError:
207 value = NO_VALUE
207 value = NO_VALUE
208 if value is not NO_VALUE:
208 if value is not NO_VALUE:
209 value = self._loads(value)
209 value = self._loads(value)
210 return value
210 return value
211
211
212 def get(self, key):
212 def get(self, key):
213 try:
213 try:
214 return self._dbm_get(key)
214 return self._dbm_get(key)
215 except Exception:
215 except Exception:
216 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
216 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
217 raise
217 raise
218
218
219 def set(self, key, value):
219 def set(self, key, value):
220 with self._dbm_file(True) as dbm:
220 with self._dbm_file(True) as dbm:
221 dbm[key] = self._dumps(value)
221 dbm[key] = self._dumps(value)
222
222
223 def set_multi(self, mapping):
223 def set_multi(self, mapping):
224 with self._dbm_file(True) as dbm:
224 with self._dbm_file(True) as dbm:
225 for key, value in mapping.items():
225 for key, value in mapping.items():
226 dbm[key] = self._dumps(value)
226 dbm[key] = self._dumps(value)
227
227
228
228
229 class BaseRedisBackend(redis_backend.RedisBackend):
229 class BaseRedisBackend(redis_backend.RedisBackend):
230 key_prefix = ''
230 key_prefix = ''
231
231
232 def __init__(self, arguments):
232 def __init__(self, arguments):
233 super(BaseRedisBackend, self).__init__(arguments)
233 super(BaseRedisBackend, self).__init__(arguments)
234 self._lock_timeout = self.lock_timeout
234 self._lock_timeout = self.lock_timeout
235 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
235 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
236
236
237 if self._lock_auto_renewal and not self._lock_timeout:
237 if self._lock_auto_renewal and not self._lock_timeout:
238 # set default timeout for auto_renewal
238 # set default timeout for auto_renewal
239 self._lock_timeout = 30
239 self._lock_timeout = 30
240
240
241 def _create_client(self):
241 def _create_client(self):
242 args = {}
242 args = {}
243
243
244 if self.url is not None:
244 if self.url is not None:
245 args.update(url=self.url)
245 args.update(url=self.url)
246
246
247 else:
247 else:
248 args.update(
248 args.update(
249 host=self.host, password=self.password,
249 host=self.host, password=self.password,
250 port=self.port, db=self.db
250 port=self.port, db=self.db
251 )
251 )
252
252
253 connection_pool = redis.ConnectionPool(**args)
253 connection_pool = redis.ConnectionPool(**args)
254
254
255 return redis.StrictRedis(connection_pool=connection_pool)
255 return redis.StrictRedis(connection_pool=connection_pool)
256
256
257 def list_keys(self, prefix=''):
257 def list_keys(self, prefix=''):
258 prefix = '{}:{}*'.format(self.key_prefix, prefix)
258 prefix = '{}:{}*'.format(self.key_prefix, prefix)
259 return self.client.keys(prefix)
259 return self.client.keys(prefix)
260
260
261 def get_store(self):
261 def get_store(self):
262 return self.client.connection_pool
262 return self.client.connection_pool
263
263
264 def get(self, key):
264 def get(self, key):
265 value = self.client.get(key)
265 value = self.client.get(key)
266 if value is None:
266 if value is None:
267 return NO_VALUE
267 return NO_VALUE
268 return self._loads(value)
268 return self._loads(value)
269
269
270 def get_multi(self, keys):
270 def get_multi(self, keys):
271 if not keys:
271 if not keys:
272 return []
272 return []
273 values = self.client.mget(keys)
273 values = self.client.mget(keys)
274 loads = self._loads
274 loads = self._loads
275 return [
275 return [
276 loads(v) if v is not None else NO_VALUE
276 loads(v) if v is not None else NO_VALUE
277 for v in values]
277 for v in values]
278
278
279 def set(self, key, value):
279 def set(self, key, value):
280 if self.redis_expiration_time:
280 if self.redis_expiration_time:
281 self.client.setex(key, self.redis_expiration_time,
281 self.client.setex(key, self.redis_expiration_time,
282 self._dumps(value))
282 self._dumps(value))
283 else:
283 else:
284 self.client.set(key, self._dumps(value))
284 self.client.set(key, self._dumps(value))
285
285
286 def set_multi(self, mapping):
286 def set_multi(self, mapping):
287 dumps = self._dumps
287 dumps = self._dumps
288 mapping = dict(
288 mapping = dict(
289 (k, dumps(v))
289 (k, dumps(v))
290 for k, v in mapping.items()
290 for k, v in mapping.items()
291 )
291 )
292
292
293 if not self.redis_expiration_time:
293 if not self.redis_expiration_time:
294 self.client.mset(mapping)
294 self.client.mset(mapping)
295 else:
295 else:
296 pipe = self.client.pipeline()
296 pipe = self.client.pipeline()
297 for key, value in mapping.items():
297 for key, value in mapping.items():
298 pipe.setex(key, self.redis_expiration_time, value)
298 pipe.setex(key, self.redis_expiration_time, value)
299 pipe.execute()
299 pipe.execute()
300
300
301 def get_mutex(self, key):
301 def get_mutex(self, key):
302 if self.distributed_lock:
302 if self.distributed_lock:
303 lock_key = redis_backend.u('_lock_{0}').format(safe_str(key))
303 lock_key = redis_backend.u(u'_lock_{0}'.format(safe_unicode(key)))
304 return get_mutex_lock(self.client, lock_key, self._lock_timeout,
304 return get_mutex_lock(self.client, lock_key, self._lock_timeout,
305 auto_renewal=self._lock_auto_renewal)
305 auto_renewal=self._lock_auto_renewal)
306 else:
306 else:
307 return None
307 return None
308
308
309
309
310 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
310 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
311 key_prefix = 'redis_pickle_backend'
311 key_prefix = 'redis_pickle_backend'
312 pass
312 pass
313
313
314
314
315 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
315 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
316 key_prefix = 'redis_msgpack_backend'
316 key_prefix = 'redis_msgpack_backend'
317 pass
317 pass
318
318
319
319
320 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
320 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
321 import redis_lock
321 import redis_lock
322
322
323 class _RedisLockWrapper(object):
323 class _RedisLockWrapper(object):
324 """LockWrapper for redis_lock"""
324 """LockWrapper for redis_lock"""
325
325
326 @classmethod
326 @classmethod
327 def get_lock(cls):
327 def get_lock(cls):
328 return redis_lock.Lock(
328 return redis_lock.Lock(
329 redis_client=client,
329 redis_client=client,
330 name=lock_key,
330 name=lock_key,
331 expire=lock_timeout,
331 expire=lock_timeout,
332 auto_renewal=auto_renewal,
332 auto_renewal=auto_renewal,
333 strict=True,
333 strict=True,
334 )
334 )
335
335
336 def __repr__(self):
336 def __repr__(self):
337 return "{}:{}".format(self.__class__.__name__, lock_key)
337 return "{}:{}".format(self.__class__.__name__, lock_key)
338
338
339 def __str__(self):
339 def __str__(self):
340 return "{}:{}".format(self.__class__.__name__, lock_key)
340 return "{}:{}".format(self.__class__.__name__, lock_key)
341
341
342 def __init__(self):
342 def __init__(self):
343 self.lock = self.get_lock()
343 self.lock = self.get_lock()
344 self.lock_key = lock_key
344 self.lock_key = lock_key
345
345
346 def acquire(self, wait=True):
346 def acquire(self, wait=True):
347 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
347 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
348 try:
348 try:
349 acquired = self.lock.acquire(wait)
349 acquired = self.lock.acquire(wait)
350 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
350 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
351 return acquired
351 return acquired
352 except redis_lock.AlreadyAcquired:
352 except redis_lock.AlreadyAcquired:
353 return False
353 return False
354 except redis_lock.AlreadyStarted:
354 except redis_lock.AlreadyStarted:
355 # refresh thread exists, but it also means we acquired the lock
355 # refresh thread exists, but it also means we acquired the lock
356 return True
356 return True
357
357
358 def release(self):
358 def release(self):
359 try:
359 try:
360 self.lock.release()
360 self.lock.release()
361 except redis_lock.NotAcquired:
361 except redis_lock.NotAcquired:
362 pass
362 pass
363
363
364 return _RedisLockWrapper()
364 return _RedisLockWrapper()
General Comments 0
You need to be logged in to leave comments. Login now