##// END OF EJS Templates
caches: fixed unicode errors on non-ascii cache keys
super-admin -
r4809:3a959508 default
parent child Browse files
Show More
@@ -1,364 +1,364 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import time
22 22 import errno
23 23 import logging
24 24
25 25 import msgpack
26 26 import gevent
27 27 import redis
28 28
29 29 from dogpile.cache.api import CachedValue
30 30 from dogpile.cache.backends import memory as memory_backend
31 31 from dogpile.cache.backends import file as file_backend
32 32 from dogpile.cache.backends import redis as redis_backend
33 33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
34 34 from dogpile.cache.util import memoized_property
35 35
36 36 from pyramid.settings import asbool
37 37
38 38 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
39 from rhodecode.lib.utils import safe_str
39 from rhodecode.lib.utils import safe_str, safe_unicode
40 40
41 41
42 42 _default_max_size = 1024
43 43
44 44 log = logging.getLogger(__name__)
45 45
46 46
47 47 class LRUMemoryBackend(memory_backend.MemoryBackend):
48 48 key_prefix = 'lru_mem_backend'
49 49 pickle_values = False
50 50
51 51 def __init__(self, arguments):
52 52 max_size = arguments.pop('max_size', _default_max_size)
53 53
54 54 LRUDictClass = LRUDict
55 55 if arguments.pop('log_key_count', None):
56 56 LRUDictClass = LRUDictDebug
57 57
58 58 arguments['cache_dict'] = LRUDictClass(max_size)
59 59 super(LRUMemoryBackend, self).__init__(arguments)
60 60
61 61 def delete(self, key):
62 62 try:
63 63 del self._cache[key]
64 64 except KeyError:
65 65 # we don't care if key isn't there at deletion
66 66 pass
67 67
68 68 def delete_multi(self, keys):
69 69 for key in keys:
70 70 self.delete(key)
71 71
72 72
73 73 class PickleSerializer(object):
74 74
75 75 def _dumps(self, value, safe=False):
76 76 try:
77 77 return compat.pickle.dumps(value)
78 78 except Exception:
79 79 if safe:
80 80 return NO_VALUE
81 81 else:
82 82 raise
83 83
84 84 def _loads(self, value, safe=True):
85 85 try:
86 86 return compat.pickle.loads(value)
87 87 except Exception:
88 88 if safe:
89 89 return NO_VALUE
90 90 else:
91 91 raise
92 92
93 93
94 94 class MsgPackSerializer(object):
95 95
96 96 def _dumps(self, value, safe=False):
97 97 try:
98 98 return msgpack.packb(value)
99 99 except Exception:
100 100 if safe:
101 101 return NO_VALUE
102 102 else:
103 103 raise
104 104
105 105 def _loads(self, value, safe=True):
106 106 """
107 107 pickle maintained the `CachedValue` wrapper of the tuple
108 108 msgpack does not, so it must be added back in.
109 109 """
110 110 try:
111 111 value = msgpack.unpackb(value, use_list=False)
112 112 return CachedValue(*value)
113 113 except Exception:
114 114 if safe:
115 115 return NO_VALUE
116 116 else:
117 117 raise
118 118
119 119
120 120 import fcntl
121 121 flock_org = fcntl.flock
122 122
123 123
124 124 class CustomLockFactory(FileLock):
125 125
126 126 @memoized_property
127 127 def _module(self):
128 128
129 129 def gevent_flock(fd, operation):
130 130 """
131 131 Gevent compatible flock
132 132 """
133 133 # set non-blocking, this will cause an exception if we cannot acquire a lock
134 134 operation |= fcntl.LOCK_NB
135 135 start_lock_time = time.time()
136 136 timeout = 60 * 15 # 15min
137 137 while True:
138 138 try:
139 139 flock_org(fd, operation)
140 140 # lock has been acquired
141 141 break
142 142 except (OSError, IOError) as e:
143 143 # raise on other errors than Resource temporarily unavailable
144 144 if e.errno != errno.EAGAIN:
145 145 raise
146 146 elif (time.time() - start_lock_time) > timeout:
147 147 # waited to much time on a lock, better fail than loop for ever
148 148 log.error('Failed to acquire lock on `%s` after waiting %ss',
149 149 self.filename, timeout)
150 150 raise
151 151 wait_timeout = 0.03
152 152 log.debug('Failed to acquire lock on `%s`, retry in %ss',
153 153 self.filename, wait_timeout)
154 154 gevent.sleep(wait_timeout)
155 155
156 156 fcntl.flock = gevent_flock
157 157 return fcntl
158 158
159 159
160 160 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
161 161 key_prefix = 'file_backend'
162 162
163 163 def __init__(self, arguments):
164 164 arguments['lock_factory'] = CustomLockFactory
165 165 db_file = arguments.get('filename')
166 166
167 167 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
168 168 try:
169 169 super(FileNamespaceBackend, self).__init__(arguments)
170 170 except Exception:
171 171 log.exception('Failed to initialize db at: %s', db_file)
172 172 raise
173 173
174 174 def __repr__(self):
175 175 return '{} `{}`'.format(self.__class__, self.filename)
176 176
177 177 def list_keys(self, prefix=''):
178 178 prefix = '{}:{}'.format(self.key_prefix, prefix)
179 179
180 180 def cond(v):
181 181 if not prefix:
182 182 return True
183 183
184 184 if v.startswith(prefix):
185 185 return True
186 186 return False
187 187
188 188 with self._dbm_file(True) as dbm:
189 189 try:
190 190 return filter(cond, dbm.keys())
191 191 except Exception:
192 192 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
193 193 raise
194 194
195 195 def get_store(self):
196 196 return self.filename
197 197
198 198 def _dbm_get(self, key):
199 199 with self._dbm_file(False) as dbm:
200 200 if hasattr(dbm, 'get'):
201 201 value = dbm.get(key, NO_VALUE)
202 202 else:
203 203 # gdbm objects lack a .get method
204 204 try:
205 205 value = dbm[key]
206 206 except KeyError:
207 207 value = NO_VALUE
208 208 if value is not NO_VALUE:
209 209 value = self._loads(value)
210 210 return value
211 211
212 212 def get(self, key):
213 213 try:
214 214 return self._dbm_get(key)
215 215 except Exception:
216 216 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
217 217 raise
218 218
219 219 def set(self, key, value):
220 220 with self._dbm_file(True) as dbm:
221 221 dbm[key] = self._dumps(value)
222 222
223 223 def set_multi(self, mapping):
224 224 with self._dbm_file(True) as dbm:
225 225 for key, value in mapping.items():
226 226 dbm[key] = self._dumps(value)
227 227
228 228
229 229 class BaseRedisBackend(redis_backend.RedisBackend):
230 230 key_prefix = ''
231 231
232 232 def __init__(self, arguments):
233 233 super(BaseRedisBackend, self).__init__(arguments)
234 234 self._lock_timeout = self.lock_timeout
235 235 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
236 236
237 237 if self._lock_auto_renewal and not self._lock_timeout:
238 238 # set default timeout for auto_renewal
239 239 self._lock_timeout = 30
240 240
241 241 def _create_client(self):
242 242 args = {}
243 243
244 244 if self.url is not None:
245 245 args.update(url=self.url)
246 246
247 247 else:
248 248 args.update(
249 249 host=self.host, password=self.password,
250 250 port=self.port, db=self.db
251 251 )
252 252
253 253 connection_pool = redis.ConnectionPool(**args)
254 254
255 255 return redis.StrictRedis(connection_pool=connection_pool)
256 256
257 257 def list_keys(self, prefix=''):
258 258 prefix = '{}:{}*'.format(self.key_prefix, prefix)
259 259 return self.client.keys(prefix)
260 260
261 261 def get_store(self):
262 262 return self.client.connection_pool
263 263
264 264 def get(self, key):
265 265 value = self.client.get(key)
266 266 if value is None:
267 267 return NO_VALUE
268 268 return self._loads(value)
269 269
270 270 def get_multi(self, keys):
271 271 if not keys:
272 272 return []
273 273 values = self.client.mget(keys)
274 274 loads = self._loads
275 275 return [
276 276 loads(v) if v is not None else NO_VALUE
277 277 for v in values]
278 278
279 279 def set(self, key, value):
280 280 if self.redis_expiration_time:
281 281 self.client.setex(key, self.redis_expiration_time,
282 282 self._dumps(value))
283 283 else:
284 284 self.client.set(key, self._dumps(value))
285 285
286 286 def set_multi(self, mapping):
287 287 dumps = self._dumps
288 288 mapping = dict(
289 289 (k, dumps(v))
290 290 for k, v in mapping.items()
291 291 )
292 292
293 293 if not self.redis_expiration_time:
294 294 self.client.mset(mapping)
295 295 else:
296 296 pipe = self.client.pipeline()
297 297 for key, value in mapping.items():
298 298 pipe.setex(key, self.redis_expiration_time, value)
299 299 pipe.execute()
300 300
301 301 def get_mutex(self, key):
302 302 if self.distributed_lock:
303 lock_key = redis_backend.u('_lock_{0}').format(safe_str(key))
303 lock_key = redis_backend.u(u'_lock_{0}'.format(safe_unicode(key)))
304 304 return get_mutex_lock(self.client, lock_key, self._lock_timeout,
305 305 auto_renewal=self._lock_auto_renewal)
306 306 else:
307 307 return None
308 308
309 309
310 310 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
311 311 key_prefix = 'redis_pickle_backend'
312 312 pass
313 313
314 314
315 315 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
316 316 key_prefix = 'redis_msgpack_backend'
317 317 pass
318 318
319 319
320 320 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
321 321 import redis_lock
322 322
323 323 class _RedisLockWrapper(object):
324 324 """LockWrapper for redis_lock"""
325 325
326 326 @classmethod
327 327 def get_lock(cls):
328 328 return redis_lock.Lock(
329 329 redis_client=client,
330 330 name=lock_key,
331 331 expire=lock_timeout,
332 332 auto_renewal=auto_renewal,
333 333 strict=True,
334 334 )
335 335
336 336 def __repr__(self):
337 337 return "{}:{}".format(self.__class__.__name__, lock_key)
338 338
339 339 def __str__(self):
340 340 return "{}:{}".format(self.__class__.__name__, lock_key)
341 341
342 342 def __init__(self):
343 343 self.lock = self.get_lock()
344 344 self.lock_key = lock_key
345 345
346 346 def acquire(self, wait=True):
347 347 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
348 348 try:
349 349 acquired = self.lock.acquire(wait)
350 350 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
351 351 return acquired
352 352 except redis_lock.AlreadyAcquired:
353 353 return False
354 354 except redis_lock.AlreadyStarted:
355 355 # refresh thread exists, but it also means we acquired the lock
356 356 return True
357 357
358 358 def release(self):
359 359 try:
360 360 self.lock.release()
361 361 except redis_lock.NotAcquired:
362 362 pass
363 363
364 364 return _RedisLockWrapper()
General Comments 0
You need to be logged in to leave comments. Login now