##// END OF EJS Templates
redis-lock: bumped to latest version
super-admin -
r1140:0696ceaf default
parent child Browse files
Show More
@@ -1,13 +1,14 b''
1 import sys
1
2 import threading
2 import threading
3 import weakref
3 import weakref
4 from base64 import b64encode
4 from base64 import b64encode
5 from logging import getLogger
5 from logging import getLogger
6 from os import urandom
6 from os import urandom
7 from typing import Union
7
8
8 from redis import StrictRedis
9 from redis import StrictRedis
9
10
10 __version__ = '3.7.0'
11 __version__ = '4.0.0'
11
12
12 loggers = {
13 loggers = {
13 k: getLogger("vcsserver." + ".".join((__name__, k)))
14 k: getLogger("vcsserver." + ".".join((__name__, k)))
@@ -105,11 +106,15 b' class Lock(object):'
105 """
106 """
106 A Lock context manager implemented via redis SETNX/BLPOP.
107 A Lock context manager implemented via redis SETNX/BLPOP.
107 """
108 """
109
108 unlock_script = None
110 unlock_script = None
109 extend_script = None
111 extend_script = None
110 reset_script = None
112 reset_script = None
111 reset_all_script = None
113 reset_all_script = None
112
114
115 _lock_renewal_interval: float
116 _lock_renewal_thread: Union[threading.Thread, None]
117
113 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
118 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
114 """
119 """
115 :param redis_client:
120 :param redis_client:
@@ -166,7 +171,7 b' class Lock(object):'
166 elif isinstance(id, text_type):
171 elif isinstance(id, text_type):
167 self._id = id
172 self._id = id
168 else:
173 else:
169 raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id))
174 raise TypeError(f"Incorrect type for `id`. Must be bytes/str not {type(id)}.")
170 self._name = 'lock:' + name
175 self._name = 'lock:' + name
171 self._signal = 'lock-signal:' + name
176 self._signal = 'lock-signal:' + name
172 self._lock_renewal_interval = (float(expire) * 2 / 3
177 self._lock_renewal_interval = (float(expire) * 2 / 3
@@ -180,11 +185,11 b' class Lock(object):'
180 def register_scripts(cls, redis_client):
185 def register_scripts(cls, redis_client):
181 global reset_all_script
186 global reset_all_script
182 if reset_all_script is None:
187 if reset_all_script is None:
183 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
184 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
188 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
185 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
189 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
186 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
190 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
187 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
191 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
192 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
188
193
189 @property
194 @property
190 def _held(self):
195 def _held(self):
@@ -219,7 +224,7 b' class Lock(object):'
219
224
220 if self._held:
225 if self._held:
221 owner_id = self.get_owner_id()
226 owner_id = self.get_owner_id()
222 raise AlreadyAcquired(f"Already acquired from this Lock instance. Lock id: {owner_id}")
227 raise AlreadyAcquired("Already acquired from this Lock instance. Lock id: {}".format(owner_id))
223
228
224 if not blocking and timeout is not None:
229 if not blocking and timeout is not None:
225 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
230 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
@@ -227,10 +232,10 b' class Lock(object):'
227 if timeout:
232 if timeout:
228 timeout = int(timeout)
233 timeout = int(timeout)
229 if timeout < 0:
234 if timeout < 0:
230 raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout)
235 raise InvalidTimeout(f"Timeout ({timeout}) cannot be less than or equal to 0")
231
236
232 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
237 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
233 raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire))
238 raise TimeoutTooLarge(f"Timeout ({timeout}) cannot be greater than expire ({self._expire})")
234
239
235 busy = True
240 busy = True
236 blpop_timeout = timeout or self._expire or 0
241 blpop_timeout = timeout or self._expire or 0
@@ -243,16 +248,17 b' class Lock(object):'
243 elif blocking:
248 elif blocking:
244 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
249 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
245 else:
250 else:
246 logger.warning("Failed to get %r.", self._name)
251 logger.warning("Failed to acquire Lock(%r).", self._name)
247 return False
252 return False
248
253
249 logger.debug("Got lock for %r.", self._name)
254 logger.debug("Acquired Lock(%r).", self._name)
250 if self._lock_renewal_interval is not None:
255 if self._lock_renewal_interval is not None:
251 self._start_lock_renewer()
256 self._start_lock_renewer()
252 return True
257 return True
253
258
254 def extend(self, expire=None):
259 def extend(self, expire=None):
255 """Extends expiration time of the lock.
260 """
261 Extends expiration time of the lock.
256
262
257 :param expire:
263 :param expire:
258 New expiration time. If ``None`` - `expire` provided during
264 New expiration time. If ``None`` - `expire` provided during
@@ -272,29 +278,29 b' class Lock(object):'
272
278
273 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
279 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
274 if error == 1:
280 if error == 1:
275 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
281 raise NotAcquired(f"Lock {self._name} is not acquired or it already expired.")
276 elif error == 2:
282 elif error == 2:
277 raise NotExpirable("Lock %s has no assigned expiration time" % self._name)
283 raise NotExpirable(f"Lock {self._name} has no assigned expiration time")
278 elif error:
284 elif error:
279 raise RuntimeError("Unsupported error code %s from EXTEND script" % error)
285 raise RuntimeError(f"Unsupported error code {error} from EXTEND script")
280
286
281 @staticmethod
287 @staticmethod
282 def _lock_renewer(lockref, interval, stop):
288 def _lock_renewer(name, lockref, interval, stop):
283 """
289 """
284 Renew the lock key in redis every `interval` seconds for as long
290 Renew the lock key in redis every `interval` seconds for as long
285 as `self._lock_renewal_thread.should_exit` is False.
291 as `self._lock_renewal_thread.should_exit` is False.
286 """
292 """
287 while not stop.wait(timeout=interval):
293 while not stop.wait(timeout=interval):
288 loggers["refresh.thread.start"].debug("Refreshing lock")
294 loggers["refresh.thread.start"].debug("Refreshing Lock(%r).", name)
289 lock = lockref()
295 lock: "Lock" = lockref()
290 if lock is None:
296 if lock is None:
291 loggers["refresh.thread.stop"].debug(
297 loggers["refresh.thread.stop"].debug(
292 "The lock no longer exists, stopping lock refreshing"
298 "Stopping loop because Lock(%r) was garbage collected.", name
293 )
299 )
294 break
300 break
295 lock.extend(expire=lock._expire)
301 lock.extend(expire=lock._expire)
296 del lock
302 del lock
297 loggers["refresh.thread.exit"].debug("Exit requested, stopping lock refreshing")
303 loggers["refresh.thread.exit"].debug("Exiting renewal thread for Lock(%r).", name)
298
304
299 def _start_lock_renewer(self):
305 def _start_lock_renewer(self):
300 """
306 """
@@ -304,18 +310,21 b' class Lock(object):'
304 raise AlreadyStarted("Lock refresh thread already started")
310 raise AlreadyStarted("Lock refresh thread already started")
305
311
306 loggers["refresh.start"].debug(
312 loggers["refresh.start"].debug(
307 "Starting thread to refresh lock every %s seconds",
313 "Starting renewal thread for Lock(%r). Refresh interval: %s seconds.",
308 self._lock_renewal_interval
314 self._name, self._lock_renewal_interval
309 )
315 )
310 self._lock_renewal_stop = threading.Event()
316 self._lock_renewal_stop = threading.Event()
311 self._lock_renewal_thread = threading.Thread(
317 self._lock_renewal_thread = threading.Thread(
312 group=None,
318 group=None,
313 target=self._lock_renewer,
319 target=self._lock_renewer,
314 kwargs={'lockref': weakref.ref(self),
320 kwargs={
321 'name': self._name,
322 'lockref': weakref.ref(self),
315 'interval': self._lock_renewal_interval,
323 'interval': self._lock_renewal_interval,
316 'stop': self._lock_renewal_stop}
324 'stop': self._lock_renewal_stop,
325 },
317 )
326 )
318 self._lock_renewal_thread.setDaemon(True)
327 self._lock_renewal_thread.daemon = True
319 self._lock_renewal_thread.start()
328 self._lock_renewal_thread.start()
320
329
321 def _stop_lock_renewer(self):
330 def _stop_lock_renewer(self):
@@ -326,15 +335,16 b' class Lock(object):'
326 """
335 """
327 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
336 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
328 return
337 return
329 loggers["refresh.shutdown"].debug("Signalling the lock refresher to stop")
338 loggers["refresh.shutdown"].debug("Signaling renewal thread for Lock(%r) to exit.", self._name)
330 self._lock_renewal_stop.set()
339 self._lock_renewal_stop.set()
331 self._lock_renewal_thread.join()
340 self._lock_renewal_thread.join()
332 self._lock_renewal_thread = None
341 self._lock_renewal_thread = None
333 loggers["refresh.exit"].debug("Lock refresher has stopped")
342 loggers["refresh.exit"].debug("Renewal thread for Lock(%r) exited.", self._name)
334
343
335 def __enter__(self):
344 def __enter__(self):
336 acquired = self.acquire(blocking=True)
345 acquired = self.acquire(blocking=True)
337 assert acquired, "Lock wasn't acquired, but blocking=True"
346 if not acquired:
347 raise AssertionError(f"Lock({self._name}) wasn't acquired, but blocking=True was used!")
338 return self
348 return self
339
349
340 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
350 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
@@ -352,12 +362,12 b' class Lock(object):'
352 """
362 """
353 if self._lock_renewal_thread is not None:
363 if self._lock_renewal_thread is not None:
354 self._stop_lock_renewer()
364 self._stop_lock_renewer()
355 loggers["release"].debug("Releasing %r.", self._name)
365 loggers["release"].debug("Releasing Lock(%r).", self._name)
356 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
366 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
357 if error == 1:
367 if error == 1:
358 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
368 raise NotAcquired(f"Lock({self._name}) is not acquired or it already expired.")
359 elif error:
369 elif error:
360 raise RuntimeError("Unsupported error code %s from EXTEND script." % error)
370 raise RuntimeError(f"Unsupported error code {error} from EXTEND script.")
361
371
362 def locked(self):
372 def locked(self):
363 """
373 """
General Comments 0
You need to be logged in to leave comments. Login now