##// END OF EJS Templates
redis-lock: bumped to latest version
super-admin -
r1140:0696ceaf default
parent child Browse files
Show More
@@ -1,13 +1,14 b''
1 import sys
1
2 2 import threading
3 3 import weakref
4 4 from base64 import b64encode
5 5 from logging import getLogger
6 6 from os import urandom
7 from typing import Union
7 8
8 9 from redis import StrictRedis
9 10
10 __version__ = '3.7.0'
11 __version__ = '4.0.0'
11 12
12 13 loggers = {
13 14 k: getLogger("vcsserver." + ".".join((__name__, k)))
@@ -105,11 +106,15 b' class Lock(object):'
105 106 """
106 107 A Lock context manager implemented via redis SETNX/BLPOP.
107 108 """
109
108 110 unlock_script = None
109 111 extend_script = None
110 112 reset_script = None
111 113 reset_all_script = None
112 114
115 _lock_renewal_interval: float
116 _lock_renewal_thread: Union[threading.Thread, None]
117
113 118 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
114 119 """
115 120 :param redis_client:
@@ -166,7 +171,7 b' class Lock(object):'
166 171 elif isinstance(id, text_type):
167 172 self._id = id
168 173 else:
169 raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id))
174 raise TypeError(f"Incorrect type for `id`. Must be bytes/str not {type(id)}.")
170 175 self._name = 'lock:' + name
171 176 self._signal = 'lock-signal:' + name
172 177 self._lock_renewal_interval = (float(expire) * 2 / 3
@@ -180,11 +185,11 b' class Lock(object):'
180 185 def register_scripts(cls, redis_client):
181 186 global reset_all_script
182 187 if reset_all_script is None:
183 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
184 188 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
185 189 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
186 190 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
187 191 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
192 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
188 193
189 194 @property
190 195 def _held(self):
@@ -219,7 +224,7 b' class Lock(object):'
219 224
220 225 if self._held:
221 226 owner_id = self.get_owner_id()
222 raise AlreadyAcquired(f"Already acquired from this Lock instance. Lock id: {owner_id}")
227 raise AlreadyAcquired("Already acquired from this Lock instance. Lock id: {}".format(owner_id))
223 228
224 229 if not blocking and timeout is not None:
225 230 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
@@ -227,10 +232,10 b' class Lock(object):'
227 232 if timeout:
228 233 timeout = int(timeout)
229 234 if timeout < 0:
230 raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout)
235 raise InvalidTimeout(f"Timeout ({timeout}) cannot be less than or equal to 0")
231 236
232 237 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
233 raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire))
238 raise TimeoutTooLarge(f"Timeout ({timeout}) cannot be greater than expire ({self._expire})")
234 239
235 240 busy = True
236 241 blpop_timeout = timeout or self._expire or 0
@@ -243,16 +248,17 b' class Lock(object):'
243 248 elif blocking:
244 249 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
245 250 else:
246 logger.warning("Failed to get %r.", self._name)
251 logger.warning("Failed to acquire Lock(%r).", self._name)
247 252 return False
248 253
249 logger.debug("Got lock for %r.", self._name)
254 logger.debug("Acquired Lock(%r).", self._name)
250 255 if self._lock_renewal_interval is not None:
251 256 self._start_lock_renewer()
252 257 return True
253 258
254 259 def extend(self, expire=None):
255 """Extends expiration time of the lock.
260 """
261 Extends expiration time of the lock.
256 262
257 263 :param expire:
258 264 New expiration time. If ``None`` - `expire` provided during
@@ -272,29 +278,29 b' class Lock(object):'
272 278
273 279 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
274 280 if error == 1:
275 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
281 raise NotAcquired(f"Lock {self._name} is not acquired or it already expired.")
276 282 elif error == 2:
277 raise NotExpirable("Lock %s has no assigned expiration time" % self._name)
283 raise NotExpirable(f"Lock {self._name} has no assigned expiration time")
278 284 elif error:
279 raise RuntimeError("Unsupported error code %s from EXTEND script" % error)
285 raise RuntimeError(f"Unsupported error code {error} from EXTEND script")
280 286
281 287 @staticmethod
282 def _lock_renewer(lockref, interval, stop):
288 def _lock_renewer(name, lockref, interval, stop):
283 289 """
284 290 Renew the lock key in redis every `interval` seconds for as long
285 291 as `self._lock_renewal_thread.should_exit` is False.
286 292 """
287 293 while not stop.wait(timeout=interval):
288 loggers["refresh.thread.start"].debug("Refreshing lock")
289 lock = lockref()
294 loggers["refresh.thread.start"].debug("Refreshing Lock(%r).", name)
295 lock: "Lock" = lockref()
290 296 if lock is None:
291 297 loggers["refresh.thread.stop"].debug(
292 "The lock no longer exists, stopping lock refreshing"
298 "Stopping loop because Lock(%r) was garbage collected.", name
293 299 )
294 300 break
295 301 lock.extend(expire=lock._expire)
296 302 del lock
297 loggers["refresh.thread.exit"].debug("Exit requested, stopping lock refreshing")
303 loggers["refresh.thread.exit"].debug("Exiting renewal thread for Lock(%r).", name)
298 304
299 305 def _start_lock_renewer(self):
300 306 """
@@ -304,18 +310,21 b' class Lock(object):'
304 310 raise AlreadyStarted("Lock refresh thread already started")
305 311
306 312 loggers["refresh.start"].debug(
307 "Starting thread to refresh lock every %s seconds",
308 self._lock_renewal_interval
313 "Starting renewal thread for Lock(%r). Refresh interval: %s seconds.",
314 self._name, self._lock_renewal_interval
309 315 )
310 316 self._lock_renewal_stop = threading.Event()
311 317 self._lock_renewal_thread = threading.Thread(
312 318 group=None,
313 319 target=self._lock_renewer,
314 kwargs={'lockref': weakref.ref(self),
315 'interval': self._lock_renewal_interval,
316 'stop': self._lock_renewal_stop}
320 kwargs={
321 'name': self._name,
322 'lockref': weakref.ref(self),
323 'interval': self._lock_renewal_interval,
324 'stop': self._lock_renewal_stop,
325 },
317 326 )
318 self._lock_renewal_thread.setDaemon(True)
327 self._lock_renewal_thread.daemon = True
319 328 self._lock_renewal_thread.start()
320 329
321 330 def _stop_lock_renewer(self):
@@ -326,15 +335,16 b' class Lock(object):'
326 335 """
327 336 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
328 337 return
329 loggers["refresh.shutdown"].debug("Signalling the lock refresher to stop")
338 loggers["refresh.shutdown"].debug("Signaling renewal thread for Lock(%r) to exit.", self._name)
330 339 self._lock_renewal_stop.set()
331 340 self._lock_renewal_thread.join()
332 341 self._lock_renewal_thread = None
333 loggers["refresh.exit"].debug("Lock refresher has stopped")
342 loggers["refresh.exit"].debug("Renewal thread for Lock(%r) exited.", self._name)
334 343
335 344 def __enter__(self):
336 345 acquired = self.acquire(blocking=True)
337 assert acquired, "Lock wasn't acquired, but blocking=True"
346 if not acquired:
347 raise AssertionError(f"Lock({self._name}) wasn't acquired, but blocking=True was used!")
338 348 return self
339 349
340 350 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
@@ -352,12 +362,12 b' class Lock(object):'
352 362 """
353 363 if self._lock_renewal_thread is not None:
354 364 self._stop_lock_renewer()
355 loggers["release"].debug("Releasing %r.", self._name)
365 loggers["release"].debug("Releasing Lock(%r).", self._name)
356 366 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
357 367 if error == 1:
358 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
368 raise NotAcquired(f"Lock({self._name}) is not acquired or it already expired.")
359 369 elif error:
360 raise RuntimeError("Unsupported error code %s from EXTEND script." % error)
370 raise RuntimeError(f"Unsupported error code {error} from EXTEND script.")
361 371
362 372 def locked(self):
363 373 """
General Comments 0
You need to be logged in to leave comments. Login now