diff --git a/rhodecode/lib/_vendor/redis_lock/__init__.py b/rhodecode/lib/_vendor/redis_lock/__init__.py --- a/rhodecode/lib/_vendor/redis_lock/__init__.py +++ b/rhodecode/lib/_vendor/redis_lock/__init__.py @@ -1,13 +1,14 @@ -import sys + import threading import weakref from base64 import b64encode from logging import getLogger from os import urandom +from typing import Union from redis import StrictRedis -__version__ = '3.7.0' +__version__ = '4.0.0' loggers = { k: getLogger("rhodecode." + ".".join((__name__, k))) @@ -105,11 +106,15 @@ class Lock(object): """ A Lock context manager implemented via redis SETNX/BLPOP. """ + unlock_script = None extend_script = None reset_script = None reset_all_script = None + _lock_renewal_interval: float + _lock_renewal_thread: Union[threading.Thread, None] + def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000): """ :param redis_client: @@ -166,7 +171,7 @@ class Lock(object): elif isinstance(id, text_type): self._id = id else: - raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id)) + raise TypeError(f"Incorrect type for `id`. Must be bytes/str not {type(id)}.") self._name = 'lock:' + name self._signal = 'lock-signal:' + name self._lock_renewal_interval = (float(expire) * 2 / 3 @@ -180,11 +185,11 @@ class Lock(object): def register_scripts(cls, redis_client): global reset_all_script if reset_all_script is None: - reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT) cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT) cls.extend_script = redis_client.register_script(EXTEND_SCRIPT) cls.reset_script = redis_client.register_script(RESET_SCRIPT) cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT) + reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT) @property def _held(self): @@ -227,10 +232,10 @@ class Lock(object): if timeout: timeout = int(timeout) if timeout < 0: - raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout) + raise InvalidTimeout(f"Timeout ({timeout}) cannot be less than or equal to 0") if self._expire and not self._lock_renewal_interval and timeout > self._expire: - raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire)) + raise TimeoutTooLarge(f"Timeout ({timeout}) cannot be greater than expire ({self._expire})") busy = True blpop_timeout = timeout or self._expire or 0 @@ -243,16 +248,17 @@ class Lock(object): elif blocking: timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout else: - logger.warning("Failed to get %r.", self._name) + logger.warning("Failed to acquire Lock(%r).", self._name) return False - logger.debug("Got lock for %r.", self._name) + logger.debug("Acquired Lock(%r).", self._name) if self._lock_renewal_interval is not None: self._start_lock_renewer() return True def extend(self, expire=None): - """Extends expiration time of the lock. + """ + Extends expiration time of the lock. :param expire: New expiration time. If ``None`` - `expire` provided during @@ -272,29 +278,29 @@ class Lock(object): error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire)) if error == 1: - raise NotAcquired("Lock %s is not acquired or it already expired." % self._name) + raise NotAcquired(f"Lock {self._name} is not acquired or it already expired.") elif error == 2: - raise NotExpirable("Lock %s has no assigned expiration time" % self._name) + raise NotExpirable(f"Lock {self._name} has no assigned expiration time") elif error: - raise RuntimeError("Unsupported error code %s from EXTEND script" % error) + raise RuntimeError(f"Unsupported error code {error} from EXTEND script") @staticmethod - def _lock_renewer(lockref, interval, stop): + def _lock_renewer(name, lockref, interval, stop): """ Renew the lock key in redis every `interval` seconds for as long as `self._lock_renewal_thread.should_exit` is False. """ while not stop.wait(timeout=interval): - loggers["refresh.thread.start"].debug("Refreshing lock") - lock = lockref() + loggers["refresh.thread.start"].debug("Refreshing Lock(%r).", name) + lock: "Lock" = lockref() if lock is None: loggers["refresh.thread.stop"].debug( - "The lock no longer exists, stopping lock refreshing" + "Stopping loop because Lock(%r) was garbage collected.", name ) break lock.extend(expire=lock._expire) del lock - loggers["refresh.thread.exit"].debug("Exit requested, stopping lock refreshing") + loggers["refresh.thread.exit"].debug("Exiting renewal thread for Lock(%r).", name) def _start_lock_renewer(self): """ @@ -304,18 +310,21 @@ class Lock(object): raise AlreadyStarted("Lock refresh thread already started") loggers["refresh.start"].debug( - "Starting thread to refresh lock every %s seconds", - self._lock_renewal_interval + "Starting renewal thread for Lock(%r). Refresh interval: %s seconds.", + self._name, self._lock_renewal_interval ) self._lock_renewal_stop = threading.Event() self._lock_renewal_thread = threading.Thread( group=None, target=self._lock_renewer, - kwargs={'lockref': weakref.ref(self), - 'interval': self._lock_renewal_interval, - 'stop': self._lock_renewal_stop} + kwargs={ + 'name': self._name, + 'lockref': weakref.ref(self), + 'interval': self._lock_renewal_interval, + 'stop': self._lock_renewal_stop, + }, ) - self._lock_renewal_thread.setDaemon(True) + self._lock_renewal_thread.daemon = True self._lock_renewal_thread.start() def _stop_lock_renewer(self): @@ -326,15 +335,16 @@ class Lock(object): """ if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive(): return - loggers["refresh.shutdown"].debug("Signalling the lock refresher to stop") + loggers["refresh.shutdown"].debug("Signaling renewal thread for Lock(%r) to exit.", self._name) self._lock_renewal_stop.set() self._lock_renewal_thread.join() self._lock_renewal_thread = None - loggers["refresh.exit"].debug("Lock refresher has stopped") + loggers["refresh.exit"].debug("Renewal thread for Lock(%r) exited.", self._name) def __enter__(self): acquired = self.acquire(blocking=True) - assert acquired, "Lock wasn't acquired, but blocking=True" + if not acquired: + raise AssertionError(f"Lock({self._name}) wasn't acquired, but blocking=True was used!") return self def __exit__(self, exc_type=None, exc_value=None, traceback=None): @@ -352,12 +362,12 @@ class Lock(object): """ if self._lock_renewal_thread is not None: self._stop_lock_renewer() - loggers["release"].debug("Releasing %r.", self._name) + loggers["release"].debug("Releasing Lock(%r).", self._name) error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire)) if error == 1: - raise NotAcquired("Lock %s is not acquired or it already expired." % self._name) + raise NotAcquired(f"Lock({self._name}) is not acquired or it already expired.") elif error: - raise RuntimeError("Unsupported error code %s from EXTEND script." % error) + raise RuntimeError(f"Unsupported error code {error} from EXTEND script.") def locked(self): """