##// END OF EJS Templates
redis-lock: bumped to latest version
super-admin -
r1140:0696ceaf default
parent child Browse files
Show More
@@ -1,384 +1,394 b''
1 import sys
1
2 2 import threading
3 3 import weakref
4 4 from base64 import b64encode
5 5 from logging import getLogger
6 6 from os import urandom
7 from typing import Union
7 8
8 9 from redis import StrictRedis
9 10
10 __version__ = '3.7.0'
11 __version__ = '4.0.0'
11 12
12 13 loggers = {
13 14 k: getLogger("vcsserver." + ".".join((__name__, k)))
14 15 for k in [
15 16 "acquire",
16 17 "refresh.thread.start",
17 18 "refresh.thread.stop",
18 19 "refresh.thread.exit",
19 20 "refresh.start",
20 21 "refresh.shutdown",
21 22 "refresh.exit",
22 23 "release",
23 24 ]
24 25 }
25 26
26 27 text_type = str
27 28 binary_type = bytes
28 29
29 30
30 31 # Check if the id match. If not, return an error code.
31 32 UNLOCK_SCRIPT = b"""
32 33 if redis.call("get", KEYS[1]) ~= ARGV[1] then
33 34 return 1
34 35 else
35 36 redis.call("del", KEYS[2])
36 37 redis.call("lpush", KEYS[2], 1)
37 38 redis.call("pexpire", KEYS[2], ARGV[2])
38 39 redis.call("del", KEYS[1])
39 40 return 0
40 41 end
41 42 """
42 43
43 44 # Covers both cases when key doesn't exist and doesn't equal to lock's id
44 45 EXTEND_SCRIPT = b"""
45 46 if redis.call("get", KEYS[1]) ~= ARGV[1] then
46 47 return 1
47 48 elseif redis.call("ttl", KEYS[1]) < 0 then
48 49 return 2
49 50 else
50 51 redis.call("expire", KEYS[1], ARGV[2])
51 52 return 0
52 53 end
53 54 """
54 55
55 56 RESET_SCRIPT = b"""
56 57 redis.call('del', KEYS[2])
57 58 redis.call('lpush', KEYS[2], 1)
58 59 redis.call('pexpire', KEYS[2], ARGV[2])
59 60 return redis.call('del', KEYS[1])
60 61 """
61 62
62 63 RESET_ALL_SCRIPT = b"""
63 64 local locks = redis.call('keys', 'lock:*')
64 65 local signal
65 66 for _, lock in pairs(locks) do
66 67 signal = 'lock-signal:' .. string.sub(lock, 6)
67 68 redis.call('del', signal)
68 69 redis.call('lpush', signal, 1)
69 70 redis.call('expire', signal, 1)
70 71 redis.call('del', lock)
71 72 end
72 73 return #locks
73 74 """
74 75
75 76
76 77 class AlreadyAcquired(RuntimeError):
77 78 pass
78 79
79 80
80 81 class NotAcquired(RuntimeError):
81 82 pass
82 83
83 84
84 85 class AlreadyStarted(RuntimeError):
85 86 pass
86 87
87 88
88 89 class TimeoutNotUsable(RuntimeError):
89 90 pass
90 91
91 92
92 93 class InvalidTimeout(RuntimeError):
93 94 pass
94 95
95 96
96 97 class TimeoutTooLarge(RuntimeError):
97 98 pass
98 99
99 100
100 101 class NotExpirable(RuntimeError):
101 102 pass
102 103
103 104
104 105 class Lock(object):
105 106 """
106 107 A Lock context manager implemented via redis SETNX/BLPOP.
107 108 """
109
108 110 unlock_script = None
109 111 extend_script = None
110 112 reset_script = None
111 113 reset_all_script = None
112 114
115 _lock_renewal_interval: float
116 _lock_renewal_thread: Union[threading.Thread, None]
117
113 118 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
114 119 """
115 120 :param redis_client:
116 121 An instance of :class:`~StrictRedis`.
117 122 :param name:
118 123 The name (redis key) the lock should have.
119 124 :param expire:
120 125 The lock expiry time in seconds. If left at the default (None)
121 126 the lock will not expire.
122 127 :param id:
123 128 The ID (redis value) the lock should have. A random value is
124 129 generated when left at the default.
125 130
126 131 Note that if you specify this then the lock is marked as "held". Acquires
127 132 won't be possible.
128 133 :param auto_renewal:
129 134 If set to ``True``, Lock will automatically renew the lock so that it
130 135 doesn't expire for as long as the lock is held (acquire() called
131 136 or running in a context manager).
132 137
133 138 Implementation note: Renewal will happen using a daemon thread with
134 139 an interval of ``expire*2/3``. If wishing to use a different renewal
135 140 time, subclass Lock, call ``super().__init__()`` then set
136 141 ``self._lock_renewal_interval`` to your desired interval.
137 142 :param strict:
138 143 If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
139 144 :param signal_expire:
140 145 Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
141 146 """
142 147 if strict and not isinstance(redis_client, StrictRedis):
143 148 raise ValueError("redis_client must be instance of StrictRedis. "
144 149 "Use strict=False if you know what you're doing.")
145 150 if auto_renewal and expire is None:
146 151 raise ValueError("Expire may not be None when auto_renewal is set")
147 152
148 153 self._client = redis_client
149 154
150 155 if expire:
151 156 expire = int(expire)
152 157 if expire < 0:
153 158 raise ValueError("A negative expire is not acceptable.")
154 159 else:
155 160 expire = None
156 161 self._expire = expire
157 162
158 163 self._signal_expire = signal_expire
159 164 if id is None:
160 165 self._id = b64encode(urandom(18)).decode('ascii')
161 166 elif isinstance(id, binary_type):
162 167 try:
163 168 self._id = id.decode('ascii')
164 169 except UnicodeDecodeError:
165 170 self._id = b64encode(id).decode('ascii')
166 171 elif isinstance(id, text_type):
167 172 self._id = id
168 173 else:
169 raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id))
174 raise TypeError(f"Incorrect type for `id`. Must be bytes/str not {type(id)}.")
170 175 self._name = 'lock:' + name
171 176 self._signal = 'lock-signal:' + name
172 177 self._lock_renewal_interval = (float(expire) * 2 / 3
173 178 if auto_renewal
174 179 else None)
175 180 self._lock_renewal_thread = None
176 181
177 182 self.register_scripts(redis_client)
178 183
179 184 @classmethod
180 185 def register_scripts(cls, redis_client):
181 186 global reset_all_script
182 187 if reset_all_script is None:
183 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
184 188 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
185 189 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
186 190 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
187 191 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
192 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
188 193
189 194 @property
190 195 def _held(self):
191 196 return self.id == self.get_owner_id()
192 197
193 198 def reset(self):
194 199 """
195 200 Forcibly deletes the lock. Use this with care.
196 201 """
197 202 self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
198 203
199 204 @property
200 205 def id(self):
201 206 return self._id
202 207
203 208 def get_owner_id(self):
204 209 owner_id = self._client.get(self._name)
205 210 if isinstance(owner_id, binary_type):
206 211 owner_id = owner_id.decode('ascii', 'replace')
207 212 return owner_id
208 213
209 214 def acquire(self, blocking=True, timeout=None):
210 215 """
211 216 :param blocking:
212 217 Boolean value specifying whether lock should be blocking or not.
213 218 :param timeout:
214 219 An integer value specifying the maximum number of seconds to block.
215 220 """
216 221 logger = loggers["acquire"]
217 222
218 223 logger.debug("Getting blocking: %s acquire on %r ...", blocking, self._name)
219 224
220 225 if self._held:
221 226 owner_id = self.get_owner_id()
222 raise AlreadyAcquired(f"Already acquired from this Lock instance. Lock id: {owner_id}")
227 raise AlreadyAcquired("Already acquired from this Lock instance. Lock id: {}".format(owner_id))
223 228
224 229 if not blocking and timeout is not None:
225 230 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
226 231
227 232 if timeout:
228 233 timeout = int(timeout)
229 234 if timeout < 0:
230 raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout)
235 raise InvalidTimeout(f"Timeout ({timeout}) cannot be less than or equal to 0")
231 236
232 237 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
233 raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire))
238 raise TimeoutTooLarge(f"Timeout ({timeout}) cannot be greater than expire ({self._expire})")
234 239
235 240 busy = True
236 241 blpop_timeout = timeout or self._expire or 0
237 242 timed_out = False
238 243 while busy:
239 244 busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
240 245 if busy:
241 246 if timed_out:
242 247 return False
243 248 elif blocking:
244 249 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
245 250 else:
246 logger.warning("Failed to get %r.", self._name)
251 logger.warning("Failed to acquire Lock(%r).", self._name)
247 252 return False
248 253
249 logger.debug("Got lock for %r.", self._name)
254 logger.debug("Acquired Lock(%r).", self._name)
250 255 if self._lock_renewal_interval is not None:
251 256 self._start_lock_renewer()
252 257 return True
253 258
254 259 def extend(self, expire=None):
255 """Extends expiration time of the lock.
260 """
261 Extends expiration time of the lock.
256 262
257 263 :param expire:
258 264 New expiration time. If ``None`` - `expire` provided during
259 265 lock initialization will be taken.
260 266 """
261 267 if expire:
262 268 expire = int(expire)
263 269 if expire < 0:
264 270 raise ValueError("A negative expire is not acceptable.")
265 271 elif self._expire is not None:
266 272 expire = self._expire
267 273 else:
268 274 raise TypeError(
269 275 "To extend a lock 'expire' must be provided as an "
270 276 "argument to extend() method or at initialization time."
271 277 )
272 278
273 279 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
274 280 if error == 1:
275 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
281 raise NotAcquired(f"Lock {self._name} is not acquired or it already expired.")
276 282 elif error == 2:
277 raise NotExpirable("Lock %s has no assigned expiration time" % self._name)
283 raise NotExpirable(f"Lock {self._name} has no assigned expiration time")
278 284 elif error:
279 raise RuntimeError("Unsupported error code %s from EXTEND script" % error)
285 raise RuntimeError(f"Unsupported error code {error} from EXTEND script")
280 286
281 287 @staticmethod
282 def _lock_renewer(lockref, interval, stop):
288 def _lock_renewer(name, lockref, interval, stop):
283 289 """
284 290 Renew the lock key in redis every `interval` seconds for as long
285 291 as `self._lock_renewal_thread.should_exit` is False.
286 292 """
287 293 while not stop.wait(timeout=interval):
288 loggers["refresh.thread.start"].debug("Refreshing lock")
289 lock = lockref()
294 loggers["refresh.thread.start"].debug("Refreshing Lock(%r).", name)
295 lock: "Lock" = lockref()
290 296 if lock is None:
291 297 loggers["refresh.thread.stop"].debug(
292 "The lock no longer exists, stopping lock refreshing"
298 "Stopping loop because Lock(%r) was garbage collected.", name
293 299 )
294 300 break
295 301 lock.extend(expire=lock._expire)
296 302 del lock
297 loggers["refresh.thread.exit"].debug("Exit requested, stopping lock refreshing")
303 loggers["refresh.thread.exit"].debug("Exiting renewal thread for Lock(%r).", name)
298 304
299 305 def _start_lock_renewer(self):
300 306 """
301 307 Starts the lock refresher thread.
302 308 """
303 309 if self._lock_renewal_thread is not None:
304 310 raise AlreadyStarted("Lock refresh thread already started")
305 311
306 312 loggers["refresh.start"].debug(
307 "Starting thread to refresh lock every %s seconds",
308 self._lock_renewal_interval
313 "Starting renewal thread for Lock(%r). Refresh interval: %s seconds.",
314 self._name, self._lock_renewal_interval
309 315 )
310 316 self._lock_renewal_stop = threading.Event()
311 317 self._lock_renewal_thread = threading.Thread(
312 318 group=None,
313 319 target=self._lock_renewer,
314 kwargs={'lockref': weakref.ref(self),
315 'interval': self._lock_renewal_interval,
316 'stop': self._lock_renewal_stop}
320 kwargs={
321 'name': self._name,
322 'lockref': weakref.ref(self),
323 'interval': self._lock_renewal_interval,
324 'stop': self._lock_renewal_stop,
325 },
317 326 )
318 self._lock_renewal_thread.setDaemon(True)
327 self._lock_renewal_thread.daemon = True
319 328 self._lock_renewal_thread.start()
320 329
321 330 def _stop_lock_renewer(self):
322 331 """
323 332 Stop the lock renewer.
324 333
325 334 This signals the renewal thread and waits for its exit.
326 335 """
327 336 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
328 337 return
329 loggers["refresh.shutdown"].debug("Signalling the lock refresher to stop")
338 loggers["refresh.shutdown"].debug("Signaling renewal thread for Lock(%r) to exit.", self._name)
330 339 self._lock_renewal_stop.set()
331 340 self._lock_renewal_thread.join()
332 341 self._lock_renewal_thread = None
333 loggers["refresh.exit"].debug("Lock refresher has stopped")
342 loggers["refresh.exit"].debug("Renewal thread for Lock(%r) exited.", self._name)
334 343
335 344 def __enter__(self):
336 345 acquired = self.acquire(blocking=True)
337 assert acquired, "Lock wasn't acquired, but blocking=True"
346 if not acquired:
347 raise AssertionError(f"Lock({self._name}) wasn't acquired, but blocking=True was used!")
338 348 return self
339 349
340 350 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
341 351 self.release()
342 352
343 353 def release(self):
344 354 """Releases the lock, that was acquired with the same object.
345 355
346 356 .. note::
347 357
348 358 If you want to release a lock that you acquired in a different place you have two choices:
349 359
350 360 * Use ``Lock("name", id=id_from_other_place).release()``
351 361 * Use ``Lock("name").reset()``
352 362 """
353 363 if self._lock_renewal_thread is not None:
354 364 self._stop_lock_renewer()
355 loggers["release"].debug("Releasing %r.", self._name)
365 loggers["release"].debug("Releasing Lock(%r).", self._name)
356 366 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
357 367 if error == 1:
358 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
368 raise NotAcquired(f"Lock({self._name}) is not acquired or it already expired.")
359 369 elif error:
360 raise RuntimeError("Unsupported error code %s from EXTEND script." % error)
370 raise RuntimeError(f"Unsupported error code {error} from EXTEND script.")
361 371
362 372 def locked(self):
363 373 """
364 374 Return true if the lock is acquired.
365 375
366 376 Checks that lock with same name already exists. This method returns true, even if
367 377 lock have another id.
368 378 """
369 379 return self._client.exists(self._name) == 1
370 380
371 381
372 382 reset_all_script = None
373 383
374 384
375 385 def reset_all(redis_client):
376 386 """
377 387 Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
378 388
379 389 :param redis_client:
380 390 An instance of :class:`~StrictRedis`.
381 391 """
382 392 Lock.register_scripts(redis_client)
383 393
384 394 reset_all_script(client=redis_client) # noqa
General Comments 0
You need to be logged in to leave comments. Login now