##// END OF EJS Templates
feat(locks): added no-blocking option for redis blocks
super-admin -
r5380:6a5d4eae default
parent child Browse files
Show More
@@ -1,394 +1,402 b''
1 1
2 2 import threading
3 3 import weakref
4 4 from base64 import b64encode
5 5 from logging import getLogger
6 6 from os import urandom
7 7 from typing import Union
8 8
9 9 from redis import StrictRedis
10 10
11 11 __version__ = '4.0.0'
12 12
13 13 loggers = {
14 14 k: getLogger("rhodecode." + ".".join((__name__, k)))
15 15 for k in [
16 16 "acquire",
17 17 "refresh.thread.start",
18 18 "refresh.thread.stop",
19 19 "refresh.thread.exit",
20 20 "refresh.start",
21 21 "refresh.shutdown",
22 22 "refresh.exit",
23 23 "release",
24 24 ]
25 25 }
26 26
27 27 text_type = str
28 28 binary_type = bytes
29 29
30 30
31 31 # Check if the id match. If not, return an error code.
32 32 UNLOCK_SCRIPT = b"""
33 33 if redis.call("get", KEYS[1]) ~= ARGV[1] then
34 34 return 1
35 35 else
36 36 redis.call("del", KEYS[2])
37 37 redis.call("lpush", KEYS[2], 1)
38 38 redis.call("pexpire", KEYS[2], ARGV[2])
39 39 redis.call("del", KEYS[1])
40 40 return 0
41 41 end
42 42 """
43 43
44 44 # Covers both cases when key doesn't exist and doesn't equal to lock's id
45 45 EXTEND_SCRIPT = b"""
46 46 if redis.call("get", KEYS[1]) ~= ARGV[1] then
47 47 return 1
48 48 elseif redis.call("ttl", KEYS[1]) < 0 then
49 49 return 2
50 50 else
51 51 redis.call("expire", KEYS[1], ARGV[2])
52 52 return 0
53 53 end
54 54 """
55 55
56 56 RESET_SCRIPT = b"""
57 57 redis.call('del', KEYS[2])
58 58 redis.call('lpush', KEYS[2], 1)
59 59 redis.call('pexpire', KEYS[2], ARGV[2])
60 60 return redis.call('del', KEYS[1])
61 61 """
62 62
63 63 RESET_ALL_SCRIPT = b"""
64 64 local locks = redis.call('keys', 'lock:*')
65 65 local signal
66 66 for _, lock in pairs(locks) do
67 67 signal = 'lock-signal:' .. string.sub(lock, 6)
68 68 redis.call('del', signal)
69 69 redis.call('lpush', signal, 1)
70 70 redis.call('expire', signal, 1)
71 71 redis.call('del', lock)
72 72 end
73 73 return #locks
74 74 """
75 75
76 76
77 77 class AlreadyAcquired(RuntimeError):
78 78 pass
79 79
80 80
81 81 class NotAcquired(RuntimeError):
82 82 pass
83 83
84 84
85 85 class AlreadyStarted(RuntimeError):
86 86 pass
87 87
88 88
89 89 class TimeoutNotUsable(RuntimeError):
90 90 pass
91 91
92 92
93 93 class InvalidTimeout(RuntimeError):
94 94 pass
95 95
96 96
97 97 class TimeoutTooLarge(RuntimeError):
98 98 pass
99 99
100 100
101 101 class NotExpirable(RuntimeError):
102 102 pass
103 103
104 104
105 105 class Lock(object):
106 106 """
107 107 A Lock context manager implemented via redis SETNX/BLPOP.
108 108 """
109 109
110 110 unlock_script = None
111 111 extend_script = None
112 112 reset_script = None
113 113 reset_all_script = None
114 blocking = None
114 115
115 116 _lock_renewal_interval: float
116 117 _lock_renewal_thread: Union[threading.Thread, None]
117 118
118 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
119 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000, blocking=True):
119 120 """
120 121 :param redis_client:
121 122 An instance of :class:`~StrictRedis`.
122 123 :param name:
123 124 The name (redis key) the lock should have.
124 125 :param expire:
125 126 The lock expiry time in seconds. If left at the default (None)
126 127 the lock will not expire.
127 128 :param id:
128 129 The ID (redis value) the lock should have. A random value is
129 130 generated when left at the default.
130 131
131 132 Note that if you specify this then the lock is marked as "held". Acquires
132 133 won't be possible.
133 134 :param auto_renewal:
134 135 If set to ``True``, Lock will automatically renew the lock so that it
135 136 doesn't expire for as long as the lock is held (acquire() called
136 137 or running in a context manager).
137 138
138 139 Implementation note: Renewal will happen using a daemon thread with
139 140 an interval of ``expire*2/3``. If wishing to use a different renewal
140 141 time, subclass Lock, call ``super().__init__()`` then set
141 142 ``self._lock_renewal_interval`` to your desired interval.
142 143 :param strict:
143 144 If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
144 145 :param signal_expire:
145 146 Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
147 :param blocking:
148 Boolean value specifying whether lock should be blocking or not.
149 Used in `__enter__` method.
146 150 """
147 151 if strict and not isinstance(redis_client, StrictRedis):
148 152 raise ValueError("redis_client must be instance of StrictRedis. "
149 153 "Use strict=False if you know what you're doing.")
150 154 if auto_renewal and expire is None:
151 155 raise ValueError("Expire may not be None when auto_renewal is set")
152 156
153 157 self._client = redis_client
154 158
155 159 if expire:
156 160 expire = int(expire)
157 161 if expire < 0:
158 162 raise ValueError("A negative expire is not acceptable.")
159 163 else:
160 164 expire = None
161 165 self._expire = expire
162 166
163 167 self._signal_expire = signal_expire
164 168 if id is None:
165 169 self._id = b64encode(urandom(18)).decode('ascii')
166 170 elif isinstance(id, binary_type):
167 171 try:
168 172 self._id = id.decode('ascii')
169 173 except UnicodeDecodeError:
170 174 self._id = b64encode(id).decode('ascii')
171 175 elif isinstance(id, text_type):
172 176 self._id = id
173 177 else:
174 178 raise TypeError(f"Incorrect type for `id`. Must be bytes/str not {type(id)}.")
175 179 self._name = 'lock:' + name
176 180 self._signal = 'lock-signal:' + name
177 181 self._lock_renewal_interval = (float(expire) * 2 / 3
178 182 if auto_renewal
179 183 else None)
180 184 self._lock_renewal_thread = None
181 185
186 self.blocking = blocking
187
182 188 self.register_scripts(redis_client)
183 189
184 190 @classmethod
185 191 def register_scripts(cls, redis_client):
186 192 global reset_all_script
187 193 if reset_all_script is None:
188 194 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
189 195 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
190 196 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
191 197 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
192 198 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
193 199
194 200 @property
195 201 def _held(self):
196 202 return self.id == self.get_owner_id()
197 203
198 204 def reset(self):
199 205 """
200 206 Forcibly deletes the lock. Use this with care.
201 207 """
202 208 self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
203 209
204 210 @property
205 211 def id(self):
206 212 return self._id
207 213
208 214 def get_owner_id(self):
209 215 owner_id = self._client.get(self._name)
210 216 if isinstance(owner_id, binary_type):
211 217 owner_id = owner_id.decode('ascii', 'replace')
212 218 return owner_id
213 219
214 220 def acquire(self, blocking=True, timeout=None):
215 221 """
216 222 :param blocking:
217 223 Boolean value specifying whether lock should be blocking or not.
218 224 :param timeout:
219 225 An integer value specifying the maximum number of seconds to block.
220 226 """
221 227 logger = loggers["acquire"]
222 228
223 229 logger.debug("Getting blocking: %s acquire on %r ...", blocking, self._name)
224 230
225 231 if self._held:
226 232 owner_id = self.get_owner_id()
227 233 raise AlreadyAcquired("Already acquired from this Lock instance. Lock id: {}".format(owner_id))
228 234
229 235 if not blocking and timeout is not None:
230 236 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
231 237
232 238 if timeout:
233 239 timeout = int(timeout)
234 240 if timeout < 0:
235 241 raise InvalidTimeout(f"Timeout ({timeout}) cannot be less than or equal to 0")
236 242
237 243 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
238 244 raise TimeoutTooLarge(f"Timeout ({timeout}) cannot be greater than expire ({self._expire})")
239 245
240 246 busy = True
241 247 blpop_timeout = timeout or self._expire or 0
242 248 timed_out = False
243 249 while busy:
244 250 busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
245 251 if busy:
246 252 if timed_out:
247 253 return False
248 254 elif blocking:
249 255 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
250 256 else:
251 257 logger.warning("Failed to acquire Lock(%r).", self._name)
252 258 return False
253 259
254 260 logger.debug("Acquired Lock(%r).", self._name)
255 261 if self._lock_renewal_interval is not None:
256 262 self._start_lock_renewer()
257 263 return True
258 264
259 265 def extend(self, expire=None):
260 266 """
261 267 Extends expiration time of the lock.
262 268
263 269 :param expire:
264 270 New expiration time. If ``None`` - `expire` provided during
265 271 lock initialization will be taken.
266 272 """
267 273 if expire:
268 274 expire = int(expire)
269 275 if expire < 0:
270 276 raise ValueError("A negative expire is not acceptable.")
271 277 elif self._expire is not None:
272 278 expire = self._expire
273 279 else:
274 280 raise TypeError(
275 281 "To extend a lock 'expire' must be provided as an "
276 282 "argument to extend() method or at initialization time."
277 283 )
278 284
279 285 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
280 286 if error == 1:
281 287 raise NotAcquired(f"Lock {self._name} is not acquired or it already expired.")
282 288 elif error == 2:
283 289 raise NotExpirable(f"Lock {self._name} has no assigned expiration time")
284 290 elif error:
285 291 raise RuntimeError(f"Unsupported error code {error} from EXTEND script")
286 292
287 293 @staticmethod
288 294 def _lock_renewer(name, lockref, interval, stop):
289 295 """
290 296 Renew the lock key in redis every `interval` seconds for as long
291 297 as `self._lock_renewal_thread.should_exit` is False.
292 298 """
293 299 while not stop.wait(timeout=interval):
294 300 loggers["refresh.thread.start"].debug("Refreshing Lock(%r).", name)
295 301 lock: "Lock" = lockref()
296 302 if lock is None:
297 303 loggers["refresh.thread.stop"].debug(
298 304 "Stopping loop because Lock(%r) was garbage collected.", name
299 305 )
300 306 break
301 307 lock.extend(expire=lock._expire)
302 308 del lock
303 309 loggers["refresh.thread.exit"].debug("Exiting renewal thread for Lock(%r).", name)
304 310
305 311 def _start_lock_renewer(self):
306 312 """
307 313 Starts the lock refresher thread.
308 314 """
309 315 if self._lock_renewal_thread is not None:
310 316 raise AlreadyStarted("Lock refresh thread already started")
311 317
312 318 loggers["refresh.start"].debug(
313 319 "Starting renewal thread for Lock(%r). Refresh interval: %s seconds.",
314 320 self._name, self._lock_renewal_interval
315 321 )
316 322 self._lock_renewal_stop = threading.Event()
317 323 self._lock_renewal_thread = threading.Thread(
318 324 group=None,
319 325 target=self._lock_renewer,
320 326 kwargs={
321 327 'name': self._name,
322 328 'lockref': weakref.ref(self),
323 329 'interval': self._lock_renewal_interval,
324 330 'stop': self._lock_renewal_stop,
325 331 },
326 332 )
327 333 self._lock_renewal_thread.daemon = True
328 334 self._lock_renewal_thread.start()
329 335
330 336 def _stop_lock_renewer(self):
331 337 """
332 338 Stop the lock renewer.
333 339
334 340 This signals the renewal thread and waits for its exit.
335 341 """
336 342 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
337 343 return
338 344 loggers["refresh.shutdown"].debug("Signaling renewal thread for Lock(%r) to exit.", self._name)
339 345 self._lock_renewal_stop.set()
340 346 self._lock_renewal_thread.join()
341 347 self._lock_renewal_thread = None
342 348 loggers["refresh.exit"].debug("Renewal thread for Lock(%r) exited.", self._name)
343 349
344 350 def __enter__(self):
345 acquired = self.acquire(blocking=True)
351 acquired = self.acquire(blocking=self.blocking)
346 352 if not acquired:
347 raise AssertionError(f"Lock({self._name}) wasn't acquired, but blocking=True was used!")
353 if self.blocking:
354 raise AssertionError(f"Lock({self._name}) wasn't acquired, but blocking=True was used!")
355 raise NotAcquired(f"Lock({self._name}) is not acquired or it already expired.")
348 356 return self
349 357
350 358 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
351 359 self.release()
352 360
353 361 def release(self):
354 362 """Releases the lock, that was acquired with the same object.
355 363
356 364 .. note::
357 365
358 366 If you want to release a lock that you acquired in a different place you have two choices:
359 367
360 368 * Use ``Lock("name", id=id_from_other_place).release()``
361 369 * Use ``Lock("name").reset()``
362 370 """
363 371 if self._lock_renewal_thread is not None:
364 372 self._stop_lock_renewer()
365 373 loggers["release"].debug("Releasing Lock(%r).", self._name)
366 374 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
367 375 if error == 1:
368 376 raise NotAcquired(f"Lock({self._name}) is not acquired or it already expired.")
369 377 elif error:
370 378 raise RuntimeError(f"Unsupported error code {error} from EXTEND script.")
371 379
372 380 def locked(self):
373 381 """
374 382 Return true if the lock is acquired.
375 383
376 384 Checks that lock with same name already exists. This method returns true, even if
377 385 lock have another id.
378 386 """
379 387 return self._client.exists(self._name) == 1
380 388
381 389
382 390 reset_all_script = None
383 391
384 392
385 393 def reset_all(redis_client):
386 394 """
387 395 Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
388 396
389 397 :param redis_client:
390 398 An instance of :class:`~StrictRedis`.
391 399 """
392 400 Lock.register_scripts(redis_client)
393 401
394 402 reset_all_script(client=redis_client) # noqa
General Comments 0
You need to be logged in to leave comments. Login now