diff --git a/rhodecode/lib/_vendor/redis_lock/__init__.py b/rhodecode/lib/_vendor/redis_lock/__init__.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/_vendor/redis_lock/__init__.py @@ -0,0 +1,389 @@ +import sys +import threading +import weakref +from base64 import b64encode +from logging import getLogger +from os import urandom + +from redis import StrictRedis + +__version__ = '3.7.0' + +loggers = { + k: getLogger(".".join((__name__, k))) + for k in [ + "acquire", + "refresh.thread.start", + "refresh.thread.stop", + "refresh.thread.exit", + "refresh.start", + "refresh.shutdown", + "refresh.exit", + "release", + ] +} + +PY3 = sys.version_info[0] == 3 + +if PY3: + text_type = str + binary_type = bytes +else: + text_type = unicode # noqa + binary_type = str + + +# Check if the id match. If not, return an error code. +UNLOCK_SCRIPT = b""" + if redis.call("get", KEYS[1]) ~= ARGV[1] then + return 1 + else + redis.call("del", KEYS[2]) + redis.call("lpush", KEYS[2], 1) + redis.call("pexpire", KEYS[2], ARGV[2]) + redis.call("del", KEYS[1]) + return 0 + end +""" + +# Covers both cases when key doesn't exist and doesn't equal to lock's id +EXTEND_SCRIPT = b""" + if redis.call("get", KEYS[1]) ~= ARGV[1] then + return 1 + elseif redis.call("ttl", KEYS[1]) < 0 then + return 2 + else + redis.call("expire", KEYS[1], ARGV[2]) + return 0 + end +""" + +RESET_SCRIPT = b""" + redis.call('del', KEYS[2]) + redis.call('lpush', KEYS[2], 1) + redis.call('pexpire', KEYS[2], ARGV[2]) + return redis.call('del', KEYS[1]) +""" + +RESET_ALL_SCRIPT = b""" + local locks = redis.call('keys', 'lock:*') + local signal + for _, lock in pairs(locks) do + signal = 'lock-signal:' .. string.sub(lock, 6) + redis.call('del', signal) + redis.call('lpush', signal, 1) + redis.call('expire', signal, 1) + redis.call('del', lock) + end + return #locks +""" + + +class AlreadyAcquired(RuntimeError): + pass + + +class NotAcquired(RuntimeError): + pass + + +class AlreadyStarted(RuntimeError): + pass + + +class TimeoutNotUsable(RuntimeError): + pass + + +class InvalidTimeout(RuntimeError): + pass + + +class TimeoutTooLarge(RuntimeError): + pass + + +class NotExpirable(RuntimeError): + pass + + +class Lock(object): + """ + A Lock context manager implemented via redis SETNX/BLPOP. + """ + unlock_script = None + extend_script = None + reset_script = None + reset_all_script = None + + def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000): + """ + :param redis_client: + An instance of :class:`~StrictRedis`. + :param name: + The name (redis key) the lock should have. + :param expire: + The lock expiry time in seconds. If left at the default (None) + the lock will not expire. + :param id: + The ID (redis value) the lock should have. A random value is + generated when left at the default. + + Note that if you specify this then the lock is marked as "held". Acquires + won't be possible. + :param auto_renewal: + If set to ``True``, Lock will automatically renew the lock so that it + doesn't expire for as long as the lock is held (acquire() called + or running in a context manager). + + Implementation note: Renewal will happen using a daemon thread with + an interval of ``expire*2/3``. If wishing to use a different renewal + time, subclass Lock, call ``super().__init__()`` then set + ``self._lock_renewal_interval`` to your desired interval. + :param strict: + If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``. + :param signal_expire: + Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``. + """ + if strict and not isinstance(redis_client, StrictRedis): + raise ValueError("redis_client must be instance of StrictRedis. " + "Use strict=False if you know what you're doing.") + if auto_renewal and expire is None: + raise ValueError("Expire may not be None when auto_renewal is set") + + self._client = redis_client + + if expire: + expire = int(expire) + if expire < 0: + raise ValueError("A negative expire is not acceptable.") + else: + expire = None + self._expire = expire + + self._signal_expire = signal_expire + if id is None: + self._id = b64encode(urandom(18)).decode('ascii') + elif isinstance(id, binary_type): + try: + self._id = id.decode('ascii') + except UnicodeDecodeError: + self._id = b64encode(id).decode('ascii') + elif isinstance(id, text_type): + self._id = id + else: + raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id)) + self._name = 'lock:' + name + self._signal = 'lock-signal:' + name + self._lock_renewal_interval = (float(expire) * 2 / 3 + if auto_renewal + else None) + self._lock_renewal_thread = None + + self.register_scripts(redis_client) + + @classmethod + def register_scripts(cls, redis_client): + global reset_all_script + if reset_all_script is None: + reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT) + cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT) + cls.extend_script = redis_client.register_script(EXTEND_SCRIPT) + cls.reset_script = redis_client.register_script(RESET_SCRIPT) + cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT) + + @property + def _held(self): + return self.id == self.get_owner_id() + + def reset(self): + """ + Forcibly deletes the lock. Use this with care. + """ + self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire)) + + @property + def id(self): + return self._id + + def get_owner_id(self): + owner_id = self._client.get(self._name) + if isinstance(owner_id, binary_type): + owner_id = owner_id.decode('ascii', 'replace') + return owner_id + + def acquire(self, blocking=True, timeout=None): + """ + :param blocking: + Boolean value specifying whether lock should be blocking or not. + :param timeout: + An integer value specifying the maximum number of seconds to block. + """ + logger = loggers["acquire"] + + logger.debug("Getting %r ...", self._name) + + if self._held: + raise AlreadyAcquired("Already acquired from this Lock instance.") + + if not blocking and timeout is not None: + raise TimeoutNotUsable("Timeout cannot be used if blocking=False") + + if timeout: + timeout = int(timeout) + if timeout < 0: + raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout) + + if self._expire and not self._lock_renewal_interval and timeout > self._expire: + raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire)) + + busy = True + blpop_timeout = timeout or self._expire or 0 + timed_out = False + while busy: + busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire) + if busy: + if timed_out: + return False + elif blocking: + timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout + else: + logger.warning("Failed to get %r.", self._name) + return False + + logger.info("Got lock for %r.", self._name) + if self._lock_renewal_interval is not None: + self._start_lock_renewer() + return True + + def extend(self, expire=None): + """Extends expiration time of the lock. + + :param expire: + New expiration time. If ``None`` - `expire` provided during + lock initialization will be taken. + """ + if expire: + expire = int(expire) + if expire < 0: + raise ValueError("A negative expire is not acceptable.") + elif self._expire is not None: + expire = self._expire + else: + raise TypeError( + "To extend a lock 'expire' must be provided as an " + "argument to extend() method or at initialization time." + ) + + error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire)) + if error == 1: + raise NotAcquired("Lock %s is not acquired or it already expired." % self._name) + elif error == 2: + raise NotExpirable("Lock %s has no assigned expiration time" % self._name) + elif error: + raise RuntimeError("Unsupported error code %s from EXTEND script" % error) + + @staticmethod + def _lock_renewer(lockref, interval, stop): + """ + Renew the lock key in redis every `interval` seconds for as long + as `self._lock_renewal_thread.should_exit` is False. + """ + while not stop.wait(timeout=interval): + loggers["refresh.thread.start"].debug("Refreshing lock") + lock = lockref() + if lock is None: + loggers["refresh.thread.stop"].debug( + "The lock no longer exists, stopping lock refreshing" + ) + break + lock.extend(expire=lock._expire) + del lock + loggers["refresh.thread.exit"].debug("Exit requested, stopping lock refreshing") + + def _start_lock_renewer(self): + """ + Starts the lock refresher thread. + """ + if self._lock_renewal_thread is not None: + raise AlreadyStarted("Lock refresh thread already started") + + loggers["refresh.start"].debug( + "Starting thread to refresh lock every %s seconds", + self._lock_renewal_interval + ) + self._lock_renewal_stop = threading.Event() + self._lock_renewal_thread = threading.Thread( + group=None, + target=self._lock_renewer, + kwargs={'lockref': weakref.ref(self), + 'interval': self._lock_renewal_interval, + 'stop': self._lock_renewal_stop} + ) + self._lock_renewal_thread.setDaemon(True) + self._lock_renewal_thread.start() + + def _stop_lock_renewer(self): + """ + Stop the lock renewer. + + This signals the renewal thread and waits for its exit. + """ + if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive(): + return + loggers["refresh.shutdown"].debug("Signalling the lock refresher to stop") + self._lock_renewal_stop.set() + self._lock_renewal_thread.join() + self._lock_renewal_thread = None + loggers["refresh.exit"].debug("Lock refresher has stopped") + + def __enter__(self): + acquired = self.acquire(blocking=True) + assert acquired, "Lock wasn't acquired, but blocking=True" + return self + + def __exit__(self, exc_type=None, exc_value=None, traceback=None): + self.release() + + def release(self): + """Releases the lock, that was acquired with the same object. + + .. note:: + + If you want to release a lock that you acquired in a different place you have two choices: + + * Use ``Lock("name", id=id_from_other_place).release()`` + * Use ``Lock("name").reset()`` + """ + if self._lock_renewal_thread is not None: + self._stop_lock_renewer() + loggers["release"].debug("Releasing %r.", self._name) + error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire)) + if error == 1: + raise NotAcquired("Lock %s is not acquired or it already expired." % self._name) + elif error: + raise RuntimeError("Unsupported error code %s from EXTEND script." % error) + + def locked(self): + """ + Return true if the lock is acquired. + + Checks that lock with same name already exists. This method returns true, even if + lock have another id. + """ + return self._client.exists(self._name) == 1 + + +reset_all_script = None + + +def reset_all(redis_client): + """ + Forcibly deletes all locks if its remains (like a crash reason). Use this with care. + + :param redis_client: + An instance of :class:`~StrictRedis`. + """ + Lock.register_scripts(redis_client) + + reset_all_script(client=redis_client) # noqa diff --git a/rhodecode/lib/rc_cache/backends.py b/rhodecode/lib/rc_cache/backends.py --- a/rhodecode/lib/rc_cache/backends.py +++ b/rhodecode/lib/rc_cache/backends.py @@ -286,11 +286,17 @@ class BaseRedisBackend(redis_backend.Red pipe.execute() def get_mutex(self, key): - u = redis_backend.u if self.distributed_lock: - lock_key = u('_lock_{0}').format(key) + import redis_lock + lock_key = redis_backend.u('_lock_{0}').format(key) log.debug('Trying to acquire Redis lock for key %s', lock_key) - return self.client.lock(lock_key, self.lock_timeout, self.lock_sleep) + lock = redis_lock.Lock( + redis_client=self.client, + name=lock_key, + expire=self.lock_timeout, + auto_renewal=False, + ) + return lock else: return None