##// END OF EJS Templates
caches: fixed error for unique non-ascii cache keys
super-admin -
r4752:d818a8fe default
parent child Browse files
Show More
@@ -1,363 +1,364 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import time
22 22 import errno
23 23 import logging
24 24
25 25 import msgpack
26 26 import gevent
27 27 import redis
28 28
29 29 from dogpile.cache.api import CachedValue
30 30 from dogpile.cache.backends import memory as memory_backend
31 31 from dogpile.cache.backends import file as file_backend
32 32 from dogpile.cache.backends import redis as redis_backend
33 33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
34 34 from dogpile.cache.util import memoized_property
35 35
36 36 from pyramid.settings import asbool
37 37
38 38 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
39 from rhodecode.lib.utils import safe_str
39 40
40 41
41 42 _default_max_size = 1024
42 43
43 44 log = logging.getLogger(__name__)
44 45
45 46
46 47 class LRUMemoryBackend(memory_backend.MemoryBackend):
47 48 key_prefix = 'lru_mem_backend'
48 49 pickle_values = False
49 50
50 51 def __init__(self, arguments):
51 52 max_size = arguments.pop('max_size', _default_max_size)
52 53
53 54 LRUDictClass = LRUDict
54 55 if arguments.pop('log_key_count', None):
55 56 LRUDictClass = LRUDictDebug
56 57
57 58 arguments['cache_dict'] = LRUDictClass(max_size)
58 59 super(LRUMemoryBackend, self).__init__(arguments)
59 60
60 61 def delete(self, key):
61 62 try:
62 63 del self._cache[key]
63 64 except KeyError:
64 65 # we don't care if key isn't there at deletion
65 66 pass
66 67
67 68 def delete_multi(self, keys):
68 69 for key in keys:
69 70 self.delete(key)
70 71
71 72
72 73 class PickleSerializer(object):
73 74
74 75 def _dumps(self, value, safe=False):
75 76 try:
76 77 return compat.pickle.dumps(value)
77 78 except Exception:
78 79 if safe:
79 80 return NO_VALUE
80 81 else:
81 82 raise
82 83
83 84 def _loads(self, value, safe=True):
84 85 try:
85 86 return compat.pickle.loads(value)
86 87 except Exception:
87 88 if safe:
88 89 return NO_VALUE
89 90 else:
90 91 raise
91 92
92 93
93 94 class MsgPackSerializer(object):
94 95
95 96 def _dumps(self, value, safe=False):
96 97 try:
97 98 return msgpack.packb(value)
98 99 except Exception:
99 100 if safe:
100 101 return NO_VALUE
101 102 else:
102 103 raise
103 104
104 105 def _loads(self, value, safe=True):
105 106 """
106 107 pickle maintained the `CachedValue` wrapper of the tuple
107 108 msgpack does not, so it must be added back in.
108 109 """
109 110 try:
110 111 value = msgpack.unpackb(value, use_list=False)
111 112 return CachedValue(*value)
112 113 except Exception:
113 114 if safe:
114 115 return NO_VALUE
115 116 else:
116 117 raise
117 118
118 119
119 120 import fcntl
120 121 flock_org = fcntl.flock
121 122
122 123
123 124 class CustomLockFactory(FileLock):
124 125
125 126 @memoized_property
126 127 def _module(self):
127 128
128 129 def gevent_flock(fd, operation):
129 130 """
130 131 Gevent compatible flock
131 132 """
132 133 # set non-blocking, this will cause an exception if we cannot acquire a lock
133 134 operation |= fcntl.LOCK_NB
134 135 start_lock_time = time.time()
135 136 timeout = 60 * 15 # 15min
136 137 while True:
137 138 try:
138 139 flock_org(fd, operation)
139 140 # lock has been acquired
140 141 break
141 142 except (OSError, IOError) as e:
142 143 # raise on other errors than Resource temporarily unavailable
143 144 if e.errno != errno.EAGAIN:
144 145 raise
145 146 elif (time.time() - start_lock_time) > timeout:
146 147 # waited to much time on a lock, better fail than loop for ever
147 148 log.error('Failed to acquire lock on `%s` after waiting %ss',
148 149 self.filename, timeout)
149 150 raise
150 151 wait_timeout = 0.03
151 152 log.debug('Failed to acquire lock on `%s`, retry in %ss',
152 153 self.filename, wait_timeout)
153 154 gevent.sleep(wait_timeout)
154 155
155 156 fcntl.flock = gevent_flock
156 157 return fcntl
157 158
158 159
159 160 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
160 161 key_prefix = 'file_backend'
161 162
162 163 def __init__(self, arguments):
163 164 arguments['lock_factory'] = CustomLockFactory
164 165 db_file = arguments.get('filename')
165 166
166 167 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
167 168 try:
168 169 super(FileNamespaceBackend, self).__init__(arguments)
169 170 except Exception:
170 171 log.error('Failed to initialize db at: %s', db_file)
171 172 raise
172 173
173 174 def __repr__(self):
174 175 return '{} `{}`'.format(self.__class__, self.filename)
175 176
176 177 def list_keys(self, prefix=''):
177 178 prefix = '{}:{}'.format(self.key_prefix, prefix)
178 179
179 180 def cond(v):
180 181 if not prefix:
181 182 return True
182 183
183 184 if v.startswith(prefix):
184 185 return True
185 186 return False
186 187
187 188 with self._dbm_file(True) as dbm:
188 189 try:
189 190 return filter(cond, dbm.keys())
190 191 except Exception:
191 192 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
192 193 raise
193 194
194 195 def get_store(self):
195 196 return self.filename
196 197
197 198 def _dbm_get(self, key):
198 199 with self._dbm_file(False) as dbm:
199 200 if hasattr(dbm, 'get'):
200 201 value = dbm.get(key, NO_VALUE)
201 202 else:
202 203 # gdbm objects lack a .get method
203 204 try:
204 205 value = dbm[key]
205 206 except KeyError:
206 207 value = NO_VALUE
207 208 if value is not NO_VALUE:
208 209 value = self._loads(value)
209 210 return value
210 211
211 212 def get(self, key):
212 213 try:
213 214 return self._dbm_get(key)
214 215 except Exception:
215 216 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
216 217 raise
217 218
218 219 def set(self, key, value):
219 220 with self._dbm_file(True) as dbm:
220 221 dbm[key] = self._dumps(value)
221 222
222 223 def set_multi(self, mapping):
223 224 with self._dbm_file(True) as dbm:
224 225 for key, value in mapping.items():
225 226 dbm[key] = self._dumps(value)
226 227
227 228
228 229 class BaseRedisBackend(redis_backend.RedisBackend):
229 230 key_prefix = ''
230 231
231 232 def __init__(self, arguments):
232 233 super(BaseRedisBackend, self).__init__(arguments)
233 234 self._lock_timeout = self.lock_timeout
234 235 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
235 236
236 237 if self._lock_auto_renewal and not self._lock_timeout:
237 238 # set default timeout for auto_renewal
238 239 self._lock_timeout = 30
239 240
240 241 def _create_client(self):
241 242 args = {}
242 243
243 244 if self.url is not None:
244 245 args.update(url=self.url)
245 246
246 247 else:
247 248 args.update(
248 249 host=self.host, password=self.password,
249 250 port=self.port, db=self.db
250 251 )
251 252
252 253 connection_pool = redis.ConnectionPool(**args)
253 254
254 255 return redis.StrictRedis(connection_pool=connection_pool)
255 256
256 257 def list_keys(self, prefix=''):
257 258 prefix = '{}:{}*'.format(self.key_prefix, prefix)
258 259 return self.client.keys(prefix)
259 260
260 261 def get_store(self):
261 262 return self.client.connection_pool
262 263
263 264 def get(self, key):
264 265 value = self.client.get(key)
265 266 if value is None:
266 267 return NO_VALUE
267 268 return self._loads(value)
268 269
269 270 def get_multi(self, keys):
270 271 if not keys:
271 272 return []
272 273 values = self.client.mget(keys)
273 274 loads = self._loads
274 275 return [
275 276 loads(v) if v is not None else NO_VALUE
276 277 for v in values]
277 278
278 279 def set(self, key, value):
279 280 if self.redis_expiration_time:
280 281 self.client.setex(key, self.redis_expiration_time,
281 282 self._dumps(value))
282 283 else:
283 284 self.client.set(key, self._dumps(value))
284 285
285 286 def set_multi(self, mapping):
286 287 dumps = self._dumps
287 288 mapping = dict(
288 289 (k, dumps(v))
289 290 for k, v in mapping.items()
290 291 )
291 292
292 293 if not self.redis_expiration_time:
293 294 self.client.mset(mapping)
294 295 else:
295 296 pipe = self.client.pipeline()
296 297 for key, value in mapping.items():
297 298 pipe.setex(key, self.redis_expiration_time, value)
298 299 pipe.execute()
299 300
300 301 def get_mutex(self, key):
301 302 if self.distributed_lock:
302 lock_key = redis_backend.u('_lock_{0}').format(key)
303 lock_key = redis_backend.u('_lock_{0}').format(safe_str(key))
303 304 return get_mutex_lock(self.client, lock_key, self._lock_timeout,
304 305 auto_renewal=self._lock_auto_renewal)
305 306 else:
306 307 return None
307 308
308 309
309 310 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
310 311 key_prefix = 'redis_pickle_backend'
311 312 pass
312 313
313 314
314 315 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
315 316 key_prefix = 'redis_msgpack_backend'
316 317 pass
317 318
318 319
319 320 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
320 321 import redis_lock
321 322
322 323 class _RedisLockWrapper(object):
323 324 """LockWrapper for redis_lock"""
324 325
325 326 @classmethod
326 327 def get_lock(cls):
327 328 return redis_lock.Lock(
328 329 redis_client=client,
329 330 name=lock_key,
330 331 expire=lock_timeout,
331 332 auto_renewal=auto_renewal,
332 333 strict=True,
333 334 )
334 335
335 336 def __repr__(self):
336 337 return "{}:{}".format(self.__class__.__name__, lock_key)
337 338
338 339 def __str__(self):
339 340 return "{}:{}".format(self.__class__.__name__, lock_key)
340 341
341 342 def __init__(self):
342 343 self.lock = self.get_lock()
343 344 self.lock_key = lock_key
344 345
345 346 def acquire(self, wait=True):
346 347 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
347 348 try:
348 349 acquired = self.lock.acquire(wait)
349 350 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
350 351 return acquired
351 352 except redis_lock.AlreadyAcquired:
352 353 return False
353 354 except redis_lock.AlreadyStarted:
354 355 # refresh thread exists, but it also means we acquired the lock
355 356 return True
356 357
357 358 def release(self):
358 359 try:
359 360 self.lock.release()
360 361 except redis_lock.NotAcquired:
361 362 pass
362 363
363 364 return _RedisLockWrapper()
General Comments 0
You need to be logged in to leave comments. Login now