##// END OF EJS Templates
caches: improved locking problems with distributed lock new cache backend
super-admin -
r4714:97b167f6 default
parent child Browse files
Show More
@@ -1,389 +1,390 b''
1 1 import sys
2 2 import threading
3 3 import weakref
4 4 from base64 import b64encode
5 5 from logging import getLogger
6 6 from os import urandom
7 7
8 8 from redis import StrictRedis
9 9
10 10 __version__ = '3.7.0'
11 11
12 12 loggers = {
13 k: getLogger("rhodecode" + ".".join((__name__, k)))
13 k: getLogger("rhodecode." + ".".join((__name__, k)))
14 14 for k in [
15 15 "acquire",
16 16 "refresh.thread.start",
17 17 "refresh.thread.stop",
18 18 "refresh.thread.exit",
19 19 "refresh.start",
20 20 "refresh.shutdown",
21 21 "refresh.exit",
22 22 "release",
23 23 ]
24 24 }
25 25
26 26 PY3 = sys.version_info[0] == 3
27 27
28 28 if PY3:
29 29 text_type = str
30 30 binary_type = bytes
31 31 else:
32 32 text_type = unicode # noqa
33 33 binary_type = str
34 34
35 35
36 36 # Check if the id match. If not, return an error code.
37 37 UNLOCK_SCRIPT = b"""
38 38 if redis.call("get", KEYS[1]) ~= ARGV[1] then
39 39 return 1
40 40 else
41 41 redis.call("del", KEYS[2])
42 42 redis.call("lpush", KEYS[2], 1)
43 43 redis.call("pexpire", KEYS[2], ARGV[2])
44 44 redis.call("del", KEYS[1])
45 45 return 0
46 46 end
47 47 """
48 48
49 49 # Covers both cases when key doesn't exist and doesn't equal to lock's id
50 50 EXTEND_SCRIPT = b"""
51 51 if redis.call("get", KEYS[1]) ~= ARGV[1] then
52 52 return 1
53 53 elseif redis.call("ttl", KEYS[1]) < 0 then
54 54 return 2
55 55 else
56 56 redis.call("expire", KEYS[1], ARGV[2])
57 57 return 0
58 58 end
59 59 """
60 60
61 61 RESET_SCRIPT = b"""
62 62 redis.call('del', KEYS[2])
63 63 redis.call('lpush', KEYS[2], 1)
64 64 redis.call('pexpire', KEYS[2], ARGV[2])
65 65 return redis.call('del', KEYS[1])
66 66 """
67 67
68 68 RESET_ALL_SCRIPT = b"""
69 69 local locks = redis.call('keys', 'lock:*')
70 70 local signal
71 71 for _, lock in pairs(locks) do
72 72 signal = 'lock-signal:' .. string.sub(lock, 6)
73 73 redis.call('del', signal)
74 74 redis.call('lpush', signal, 1)
75 75 redis.call('expire', signal, 1)
76 76 redis.call('del', lock)
77 77 end
78 78 return #locks
79 79 """
80 80
81 81
82 82 class AlreadyAcquired(RuntimeError):
83 83 pass
84 84
85 85
86 86 class NotAcquired(RuntimeError):
87 87 pass
88 88
89 89
90 90 class AlreadyStarted(RuntimeError):
91 91 pass
92 92
93 93
94 94 class TimeoutNotUsable(RuntimeError):
95 95 pass
96 96
97 97
98 98 class InvalidTimeout(RuntimeError):
99 99 pass
100 100
101 101
102 102 class TimeoutTooLarge(RuntimeError):
103 103 pass
104 104
105 105
106 106 class NotExpirable(RuntimeError):
107 107 pass
108 108
109 109
110 110 class Lock(object):
111 111 """
112 112 A Lock context manager implemented via redis SETNX/BLPOP.
113 113 """
114 114 unlock_script = None
115 115 extend_script = None
116 116 reset_script = None
117 117 reset_all_script = None
118 118
119 119 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
120 120 """
121 121 :param redis_client:
122 122 An instance of :class:`~StrictRedis`.
123 123 :param name:
124 124 The name (redis key) the lock should have.
125 125 :param expire:
126 126 The lock expiry time in seconds. If left at the default (None)
127 127 the lock will not expire.
128 128 :param id:
129 129 The ID (redis value) the lock should have. A random value is
130 130 generated when left at the default.
131 131
132 132 Note that if you specify this then the lock is marked as "held". Acquires
133 133 won't be possible.
134 134 :param auto_renewal:
135 135 If set to ``True``, Lock will automatically renew the lock so that it
136 136 doesn't expire for as long as the lock is held (acquire() called
137 137 or running in a context manager).
138 138
139 139 Implementation note: Renewal will happen using a daemon thread with
140 140 an interval of ``expire*2/3``. If wishing to use a different renewal
141 141 time, subclass Lock, call ``super().__init__()`` then set
142 142 ``self._lock_renewal_interval`` to your desired interval.
143 143 :param strict:
144 144 If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
145 145 :param signal_expire:
146 146 Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
147 147 """
148 148 if strict and not isinstance(redis_client, StrictRedis):
149 149 raise ValueError("redis_client must be instance of StrictRedis. "
150 150 "Use strict=False if you know what you're doing.")
151 151 if auto_renewal and expire is None:
152 152 raise ValueError("Expire may not be None when auto_renewal is set")
153 153
154 154 self._client = redis_client
155 155
156 156 if expire:
157 157 expire = int(expire)
158 158 if expire < 0:
159 159 raise ValueError("A negative expire is not acceptable.")
160 160 else:
161 161 expire = None
162 162 self._expire = expire
163 163
164 164 self._signal_expire = signal_expire
165 165 if id is None:
166 166 self._id = b64encode(urandom(18)).decode('ascii')
167 167 elif isinstance(id, binary_type):
168 168 try:
169 169 self._id = id.decode('ascii')
170 170 except UnicodeDecodeError:
171 171 self._id = b64encode(id).decode('ascii')
172 172 elif isinstance(id, text_type):
173 173 self._id = id
174 174 else:
175 175 raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id))
176 176 self._name = 'lock:' + name
177 177 self._signal = 'lock-signal:' + name
178 178 self._lock_renewal_interval = (float(expire) * 2 / 3
179 179 if auto_renewal
180 180 else None)
181 181 self._lock_renewal_thread = None
182 182
183 183 self.register_scripts(redis_client)
184 184
185 185 @classmethod
186 186 def register_scripts(cls, redis_client):
187 187 global reset_all_script
188 188 if reset_all_script is None:
189 189 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
190 190 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
191 191 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
192 192 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
193 193 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
194 194
195 195 @property
196 196 def _held(self):
197 197 return self.id == self.get_owner_id()
198 198
199 199 def reset(self):
200 200 """
201 201 Forcibly deletes the lock. Use this with care.
202 202 """
203 203 self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
204 204
205 205 @property
206 206 def id(self):
207 207 return self._id
208 208
209 209 def get_owner_id(self):
210 210 owner_id = self._client.get(self._name)
211 211 if isinstance(owner_id, binary_type):
212 212 owner_id = owner_id.decode('ascii', 'replace')
213 213 return owner_id
214 214
215 215 def acquire(self, blocking=True, timeout=None):
216 216 """
217 217 :param blocking:
218 218 Boolean value specifying whether lock should be blocking or not.
219 219 :param timeout:
220 220 An integer value specifying the maximum number of seconds to block.
221 221 """
222 222 logger = loggers["acquire"]
223 223
224 logger.debug("Getting %r ...", self._name)
224 logger.debug("Getting acquire on %r ...", self._name)
225 225
226 226 if self._held:
227 raise AlreadyAcquired("Already acquired from this Lock instance.")
227 owner_id = self.get_owner_id()
228 raise AlreadyAcquired("Already acquired from this Lock instance. Lock id: {}".format(owner_id))
228 229
229 230 if not blocking and timeout is not None:
230 231 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
231 232
232 233 if timeout:
233 234 timeout = int(timeout)
234 235 if timeout < 0:
235 236 raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout)
236 237
237 238 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
238 239 raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire))
239 240
240 241 busy = True
241 242 blpop_timeout = timeout or self._expire or 0
242 243 timed_out = False
243 244 while busy:
244 245 busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
245 246 if busy:
246 247 if timed_out:
247 248 return False
248 249 elif blocking:
249 250 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
250 251 else:
251 252 logger.warning("Failed to get %r.", self._name)
252 253 return False
253 254
254 255 logger.info("Got lock for %r.", self._name)
255 256 if self._lock_renewal_interval is not None:
256 257 self._start_lock_renewer()
257 258 return True
258 259
259 260 def extend(self, expire=None):
260 261 """Extends expiration time of the lock.
261 262
262 263 :param expire:
263 264 New expiration time. If ``None`` - `expire` provided during
264 265 lock initialization will be taken.
265 266 """
266 267 if expire:
267 268 expire = int(expire)
268 269 if expire < 0:
269 270 raise ValueError("A negative expire is not acceptable.")
270 271 elif self._expire is not None:
271 272 expire = self._expire
272 273 else:
273 274 raise TypeError(
274 275 "To extend a lock 'expire' must be provided as an "
275 276 "argument to extend() method or at initialization time."
276 277 )
277 278
278 279 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
279 280 if error == 1:
280 281 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
281 282 elif error == 2:
282 283 raise NotExpirable("Lock %s has no assigned expiration time" % self._name)
283 284 elif error:
284 285 raise RuntimeError("Unsupported error code %s from EXTEND script" % error)
285 286
286 287 @staticmethod
287 288 def _lock_renewer(lockref, interval, stop):
288 289 """
289 290 Renew the lock key in redis every `interval` seconds for as long
290 291 as `self._lock_renewal_thread.should_exit` is False.
291 292 """
292 293 while not stop.wait(timeout=interval):
293 294 loggers["refresh.thread.start"].debug("Refreshing lock")
294 295 lock = lockref()
295 296 if lock is None:
296 297 loggers["refresh.thread.stop"].debug(
297 298 "The lock no longer exists, stopping lock refreshing"
298 299 )
299 300 break
300 301 lock.extend(expire=lock._expire)
301 302 del lock
302 303 loggers["refresh.thread.exit"].debug("Exit requested, stopping lock refreshing")
303 304
304 305 def _start_lock_renewer(self):
305 306 """
306 307 Starts the lock refresher thread.
307 308 """
308 309 if self._lock_renewal_thread is not None:
309 310 raise AlreadyStarted("Lock refresh thread already started")
310 311
311 312 loggers["refresh.start"].debug(
312 313 "Starting thread to refresh lock every %s seconds",
313 314 self._lock_renewal_interval
314 315 )
315 316 self._lock_renewal_stop = threading.Event()
316 317 self._lock_renewal_thread = threading.Thread(
317 318 group=None,
318 319 target=self._lock_renewer,
319 320 kwargs={'lockref': weakref.ref(self),
320 321 'interval': self._lock_renewal_interval,
321 322 'stop': self._lock_renewal_stop}
322 323 )
323 324 self._lock_renewal_thread.setDaemon(True)
324 325 self._lock_renewal_thread.start()
325 326
326 327 def _stop_lock_renewer(self):
327 328 """
328 329 Stop the lock renewer.
329 330
330 331 This signals the renewal thread and waits for its exit.
331 332 """
332 333 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
333 334 return
334 335 loggers["refresh.shutdown"].debug("Signalling the lock refresher to stop")
335 336 self._lock_renewal_stop.set()
336 337 self._lock_renewal_thread.join()
337 338 self._lock_renewal_thread = None
338 339 loggers["refresh.exit"].debug("Lock refresher has stopped")
339 340
340 341 def __enter__(self):
341 342 acquired = self.acquire(blocking=True)
342 343 assert acquired, "Lock wasn't acquired, but blocking=True"
343 344 return self
344 345
345 346 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
346 347 self.release()
347 348
348 349 def release(self):
349 350 """Releases the lock, that was acquired with the same object.
350 351
351 352 .. note::
352 353
353 354 If you want to release a lock that you acquired in a different place you have two choices:
354 355
355 356 * Use ``Lock("name", id=id_from_other_place).release()``
356 357 * Use ``Lock("name").reset()``
357 358 """
358 359 if self._lock_renewal_thread is not None:
359 360 self._stop_lock_renewer()
360 361 loggers["release"].debug("Releasing %r.", self._name)
361 362 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
362 363 if error == 1:
363 364 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
364 365 elif error:
365 366 raise RuntimeError("Unsupported error code %s from EXTEND script." % error)
366 367
367 368 def locked(self):
368 369 """
369 370 Return true if the lock is acquired.
370 371
371 372 Checks that lock with same name already exists. This method returns true, even if
372 373 lock have another id.
373 374 """
374 375 return self._client.exists(self._name) == 1
375 376
376 377
377 378 reset_all_script = None
378 379
379 380
380 381 def reset_all(redis_client):
381 382 """
382 383 Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
383 384
384 385 :param redis_client:
385 386 An instance of :class:`~StrictRedis`.
386 387 """
387 388 Lock.register_scripts(redis_client)
388 389
389 390 reset_all_script(client=redis_client) # noqa
@@ -1,312 +1,342 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import time
22 22 import errno
23 23 import logging
24 24
25 25 import msgpack
26 26 import gevent
27 27 import redis
28 28
29 29 from dogpile.cache.api import CachedValue
30 30 from dogpile.cache.backends import memory as memory_backend
31 31 from dogpile.cache.backends import file as file_backend
32 32 from dogpile.cache.backends import redis as redis_backend
33 33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
34 34 from dogpile.cache.util import memoized_property
35 35
36 36 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
37 37
38 38
39 39 _default_max_size = 1024
40 40
41 41 log = logging.getLogger(__name__)
42 42
43 43
44 44 class LRUMemoryBackend(memory_backend.MemoryBackend):
45 45 key_prefix = 'lru_mem_backend'
46 46 pickle_values = False
47 47
48 48 def __init__(self, arguments):
49 49 max_size = arguments.pop('max_size', _default_max_size)
50 50
51 51 LRUDictClass = LRUDict
52 52 if arguments.pop('log_key_count', None):
53 53 LRUDictClass = LRUDictDebug
54 54
55 55 arguments['cache_dict'] = LRUDictClass(max_size)
56 56 super(LRUMemoryBackend, self).__init__(arguments)
57 57
58 58 def delete(self, key):
59 59 try:
60 60 del self._cache[key]
61 61 except KeyError:
62 62 # we don't care if key isn't there at deletion
63 63 pass
64 64
65 65 def delete_multi(self, keys):
66 66 for key in keys:
67 67 self.delete(key)
68 68
69 69
70 70 class PickleSerializer(object):
71 71
72 72 def _dumps(self, value, safe=False):
73 73 try:
74 74 return compat.pickle.dumps(value)
75 75 except Exception:
76 76 if safe:
77 77 return NO_VALUE
78 78 else:
79 79 raise
80 80
81 81 def _loads(self, value, safe=True):
82 82 try:
83 83 return compat.pickle.loads(value)
84 84 except Exception:
85 85 if safe:
86 86 return NO_VALUE
87 87 else:
88 88 raise
89 89
90 90
91 91 class MsgPackSerializer(object):
92 92
93 93 def _dumps(self, value, safe=False):
94 94 try:
95 95 return msgpack.packb(value)
96 96 except Exception:
97 97 if safe:
98 98 return NO_VALUE
99 99 else:
100 100 raise
101 101
102 102 def _loads(self, value, safe=True):
103 103 """
104 104 pickle maintained the `CachedValue` wrapper of the tuple
105 105 msgpack does not, so it must be added back in.
106 106 """
107 107 try:
108 108 value = msgpack.unpackb(value, use_list=False)
109 109 return CachedValue(*value)
110 110 except Exception:
111 111 if safe:
112 112 return NO_VALUE
113 113 else:
114 114 raise
115 115
116 116
117 117 import fcntl
118 118 flock_org = fcntl.flock
119 119
120 120
121 121 class CustomLockFactory(FileLock):
122 122
123 123 @memoized_property
124 124 def _module(self):
125 125
126 126 def gevent_flock(fd, operation):
127 127 """
128 128 Gevent compatible flock
129 129 """
130 130 # set non-blocking, this will cause an exception if we cannot acquire a lock
131 131 operation |= fcntl.LOCK_NB
132 132 start_lock_time = time.time()
133 133 timeout = 60 * 15 # 15min
134 134 while True:
135 135 try:
136 136 flock_org(fd, operation)
137 137 # lock has been acquired
138 138 break
139 139 except (OSError, IOError) as e:
140 140 # raise on other errors than Resource temporarily unavailable
141 141 if e.errno != errno.EAGAIN:
142 142 raise
143 143 elif (time.time() - start_lock_time) > timeout:
144 144 # waited to much time on a lock, better fail than loop for ever
145 145 log.error('Failed to acquire lock on `%s` after waiting %ss',
146 146 self.filename, timeout)
147 147 raise
148 148 wait_timeout = 0.03
149 149 log.debug('Failed to acquire lock on `%s`, retry in %ss',
150 150 self.filename, wait_timeout)
151 151 gevent.sleep(wait_timeout)
152 152
153 153 fcntl.flock = gevent_flock
154 154 return fcntl
155 155
156 156
157 157 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
158 158 key_prefix = 'file_backend'
159 159
160 160 def __init__(self, arguments):
161 161 arguments['lock_factory'] = CustomLockFactory
162 162 db_file = arguments.get('filename')
163 163
164 164 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
165 165 try:
166 166 super(FileNamespaceBackend, self).__init__(arguments)
167 167 except Exception:
168 168 log.error('Failed to initialize db at: %s', db_file)
169 169 raise
170 170
171 171 def __repr__(self):
172 172 return '{} `{}`'.format(self.__class__, self.filename)
173 173
174 174 def list_keys(self, prefix=''):
175 175 prefix = '{}:{}'.format(self.key_prefix, prefix)
176 176
177 177 def cond(v):
178 178 if not prefix:
179 179 return True
180 180
181 181 if v.startswith(prefix):
182 182 return True
183 183 return False
184 184
185 185 with self._dbm_file(True) as dbm:
186 186 try:
187 187 return filter(cond, dbm.keys())
188 188 except Exception:
189 189 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
190 190 raise
191 191
192 192 def get_store(self):
193 193 return self.filename
194 194
195 195 def _dbm_get(self, key):
196 196 with self._dbm_file(False) as dbm:
197 197 if hasattr(dbm, 'get'):
198 198 value = dbm.get(key, NO_VALUE)
199 199 else:
200 200 # gdbm objects lack a .get method
201 201 try:
202 202 value = dbm[key]
203 203 except KeyError:
204 204 value = NO_VALUE
205 205 if value is not NO_VALUE:
206 206 value = self._loads(value)
207 207 return value
208 208
209 209 def get(self, key):
210 210 try:
211 211 return self._dbm_get(key)
212 212 except Exception:
213 213 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
214 214 raise
215 215
216 216 def set(self, key, value):
217 217 with self._dbm_file(True) as dbm:
218 218 dbm[key] = self._dumps(value)
219 219
220 220 def set_multi(self, mapping):
221 221 with self._dbm_file(True) as dbm:
222 222 for key, value in mapping.items():
223 223 dbm[key] = self._dumps(value)
224 224
225 225
226 226 class BaseRedisBackend(redis_backend.RedisBackend):
227 227
228 228 def _create_client(self):
229 229 args = {}
230 230
231 231 if self.url is not None:
232 232 args.update(url=self.url)
233 233
234 234 else:
235 235 args.update(
236 236 host=self.host, password=self.password,
237 237 port=self.port, db=self.db
238 238 )
239 239
240 240 connection_pool = redis.ConnectionPool(**args)
241 241
242 242 return redis.StrictRedis(connection_pool=connection_pool)
243 243
244 244 def list_keys(self, prefix=''):
245 245 prefix = '{}:{}*'.format(self.key_prefix, prefix)
246 246 return self.client.keys(prefix)
247 247
248 248 def get_store(self):
249 249 return self.client.connection_pool
250 250
251 251 def get(self, key):
252 252 value = self.client.get(key)
253 253 if value is None:
254 254 return NO_VALUE
255 255 return self._loads(value)
256 256
257 257 def get_multi(self, keys):
258 258 if not keys:
259 259 return []
260 260 values = self.client.mget(keys)
261 261 loads = self._loads
262 262 return [
263 263 loads(v) if v is not None else NO_VALUE
264 264 for v in values]
265 265
266 266 def set(self, key, value):
267 267 if self.redis_expiration_time:
268 268 self.client.setex(key, self.redis_expiration_time,
269 269 self._dumps(value))
270 270 else:
271 271 self.client.set(key, self._dumps(value))
272 272
273 273 def set_multi(self, mapping):
274 274 dumps = self._dumps
275 275 mapping = dict(
276 276 (k, dumps(v))
277 277 for k, v in mapping.items()
278 278 )
279 279
280 280 if not self.redis_expiration_time:
281 281 self.client.mset(mapping)
282 282 else:
283 283 pipe = self.client.pipeline()
284 284 for key, value in mapping.items():
285 285 pipe.setex(key, self.redis_expiration_time, value)
286 286 pipe.execute()
287 287
288 288 def get_mutex(self, key):
289 289 if self.distributed_lock:
290 import redis_lock
291 290 lock_key = redis_backend.u('_lock_{0}').format(key)
292 291 log.debug('Trying to acquire Redis lock for key %s', lock_key)
293 lock = redis_lock.Lock(
294 redis_client=self.client,
295 name=lock_key,
296 expire=self.lock_timeout,
297 auto_renewal=False,
298 strict=True,
299 )
300 return lock
292
293 auto_renewal = True
294 lock_timeout = self.lock_timeout
295 if auto_renewal and not self.lock_timeout:
296 # set default timeout for auto_renewal
297 lock_timeout = 10
298 return get_mutex_lock(self.client, lock_key, lock_timeout,
299 auto_renewal=auto_renewal)
301 300 else:
302 301 return None
303 302
304 303
305 304 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
306 305 key_prefix = 'redis_pickle_backend'
307 306 pass
308 307
309 308
310 309 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
311 310 key_prefix = 'redis_msgpack_backend'
312 311 pass
312
313
314 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
315 import redis_lock
316
317 class _RedisLockWrapper(object):
318 """LockWrapper for redis_lock"""
319
320 def __init__(self):
321 pass
322
323 @property
324 def lock(self):
325 return redis_lock.Lock(
326 redis_client=client,
327 name=lock_key,
328 expire=lock_timeout,
329 auto_renewal=auto_renewal,
330 strict=True,
331 )
332
333 def acquire(self, wait=True):
334 return self.lock.acquire(wait)
335
336 def release(self):
337 try:
338 self.lock.release()
339 except redis_lock.NotAcquired:
340 pass
341
342 return _RedisLockWrapper()
General Comments 0
You need to be logged in to leave comments. Login now