##// END OF EJS Templates
caches: improved locking problems with distributed lock new cache backend
super-admin -
r4714:97b167f6 default
parent child Browse files
Show More
@@ -1,389 +1,390 b''
1 import sys
1 import sys
2 import threading
2 import threading
3 import weakref
3 import weakref
4 from base64 import b64encode
4 from base64 import b64encode
5 from logging import getLogger
5 from logging import getLogger
6 from os import urandom
6 from os import urandom
7
7
8 from redis import StrictRedis
8 from redis import StrictRedis
9
9
10 __version__ = '3.7.0'
10 __version__ = '3.7.0'
11
11
12 loggers = {
12 loggers = {
13 k: getLogger("rhodecode" + ".".join((__name__, k)))
13 k: getLogger("rhodecode." + ".".join((__name__, k)))
14 for k in [
14 for k in [
15 "acquire",
15 "acquire",
16 "refresh.thread.start",
16 "refresh.thread.start",
17 "refresh.thread.stop",
17 "refresh.thread.stop",
18 "refresh.thread.exit",
18 "refresh.thread.exit",
19 "refresh.start",
19 "refresh.start",
20 "refresh.shutdown",
20 "refresh.shutdown",
21 "refresh.exit",
21 "refresh.exit",
22 "release",
22 "release",
23 ]
23 ]
24 }
24 }
25
25
26 PY3 = sys.version_info[0] == 3
26 PY3 = sys.version_info[0] == 3
27
27
28 if PY3:
28 if PY3:
29 text_type = str
29 text_type = str
30 binary_type = bytes
30 binary_type = bytes
31 else:
31 else:
32 text_type = unicode # noqa
32 text_type = unicode # noqa
33 binary_type = str
33 binary_type = str
34
34
35
35
36 # Check if the id match. If not, return an error code.
36 # Check if the id match. If not, return an error code.
37 UNLOCK_SCRIPT = b"""
37 UNLOCK_SCRIPT = b"""
38 if redis.call("get", KEYS[1]) ~= ARGV[1] then
38 if redis.call("get", KEYS[1]) ~= ARGV[1] then
39 return 1
39 return 1
40 else
40 else
41 redis.call("del", KEYS[2])
41 redis.call("del", KEYS[2])
42 redis.call("lpush", KEYS[2], 1)
42 redis.call("lpush", KEYS[2], 1)
43 redis.call("pexpire", KEYS[2], ARGV[2])
43 redis.call("pexpire", KEYS[2], ARGV[2])
44 redis.call("del", KEYS[1])
44 redis.call("del", KEYS[1])
45 return 0
45 return 0
46 end
46 end
47 """
47 """
48
48
49 # Covers both cases when key doesn't exist and doesn't equal to lock's id
49 # Covers both cases when key doesn't exist and doesn't equal to lock's id
50 EXTEND_SCRIPT = b"""
50 EXTEND_SCRIPT = b"""
51 if redis.call("get", KEYS[1]) ~= ARGV[1] then
51 if redis.call("get", KEYS[1]) ~= ARGV[1] then
52 return 1
52 return 1
53 elseif redis.call("ttl", KEYS[1]) < 0 then
53 elseif redis.call("ttl", KEYS[1]) < 0 then
54 return 2
54 return 2
55 else
55 else
56 redis.call("expire", KEYS[1], ARGV[2])
56 redis.call("expire", KEYS[1], ARGV[2])
57 return 0
57 return 0
58 end
58 end
59 """
59 """
60
60
61 RESET_SCRIPT = b"""
61 RESET_SCRIPT = b"""
62 redis.call('del', KEYS[2])
62 redis.call('del', KEYS[2])
63 redis.call('lpush', KEYS[2], 1)
63 redis.call('lpush', KEYS[2], 1)
64 redis.call('pexpire', KEYS[2], ARGV[2])
64 redis.call('pexpire', KEYS[2], ARGV[2])
65 return redis.call('del', KEYS[1])
65 return redis.call('del', KEYS[1])
66 """
66 """
67
67
68 RESET_ALL_SCRIPT = b"""
68 RESET_ALL_SCRIPT = b"""
69 local locks = redis.call('keys', 'lock:*')
69 local locks = redis.call('keys', 'lock:*')
70 local signal
70 local signal
71 for _, lock in pairs(locks) do
71 for _, lock in pairs(locks) do
72 signal = 'lock-signal:' .. string.sub(lock, 6)
72 signal = 'lock-signal:' .. string.sub(lock, 6)
73 redis.call('del', signal)
73 redis.call('del', signal)
74 redis.call('lpush', signal, 1)
74 redis.call('lpush', signal, 1)
75 redis.call('expire', signal, 1)
75 redis.call('expire', signal, 1)
76 redis.call('del', lock)
76 redis.call('del', lock)
77 end
77 end
78 return #locks
78 return #locks
79 """
79 """
80
80
81
81
82 class AlreadyAcquired(RuntimeError):
82 class AlreadyAcquired(RuntimeError):
83 pass
83 pass
84
84
85
85
86 class NotAcquired(RuntimeError):
86 class NotAcquired(RuntimeError):
87 pass
87 pass
88
88
89
89
90 class AlreadyStarted(RuntimeError):
90 class AlreadyStarted(RuntimeError):
91 pass
91 pass
92
92
93
93
94 class TimeoutNotUsable(RuntimeError):
94 class TimeoutNotUsable(RuntimeError):
95 pass
95 pass
96
96
97
97
98 class InvalidTimeout(RuntimeError):
98 class InvalidTimeout(RuntimeError):
99 pass
99 pass
100
100
101
101
102 class TimeoutTooLarge(RuntimeError):
102 class TimeoutTooLarge(RuntimeError):
103 pass
103 pass
104
104
105
105
106 class NotExpirable(RuntimeError):
106 class NotExpirable(RuntimeError):
107 pass
107 pass
108
108
109
109
110 class Lock(object):
110 class Lock(object):
111 """
111 """
112 A Lock context manager implemented via redis SETNX/BLPOP.
112 A Lock context manager implemented via redis SETNX/BLPOP.
113 """
113 """
114 unlock_script = None
114 unlock_script = None
115 extend_script = None
115 extend_script = None
116 reset_script = None
116 reset_script = None
117 reset_all_script = None
117 reset_all_script = None
118
118
119 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
119 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
120 """
120 """
121 :param redis_client:
121 :param redis_client:
122 An instance of :class:`~StrictRedis`.
122 An instance of :class:`~StrictRedis`.
123 :param name:
123 :param name:
124 The name (redis key) the lock should have.
124 The name (redis key) the lock should have.
125 :param expire:
125 :param expire:
126 The lock expiry time in seconds. If left at the default (None)
126 The lock expiry time in seconds. If left at the default (None)
127 the lock will not expire.
127 the lock will not expire.
128 :param id:
128 :param id:
129 The ID (redis value) the lock should have. A random value is
129 The ID (redis value) the lock should have. A random value is
130 generated when left at the default.
130 generated when left at the default.
131
131
132 Note that if you specify this then the lock is marked as "held". Acquires
132 Note that if you specify this then the lock is marked as "held". Acquires
133 won't be possible.
133 won't be possible.
134 :param auto_renewal:
134 :param auto_renewal:
135 If set to ``True``, Lock will automatically renew the lock so that it
135 If set to ``True``, Lock will automatically renew the lock so that it
136 doesn't expire for as long as the lock is held (acquire() called
136 doesn't expire for as long as the lock is held (acquire() called
137 or running in a context manager).
137 or running in a context manager).
138
138
139 Implementation note: Renewal will happen using a daemon thread with
139 Implementation note: Renewal will happen using a daemon thread with
140 an interval of ``expire*2/3``. If wishing to use a different renewal
140 an interval of ``expire*2/3``. If wishing to use a different renewal
141 time, subclass Lock, call ``super().__init__()`` then set
141 time, subclass Lock, call ``super().__init__()`` then set
142 ``self._lock_renewal_interval`` to your desired interval.
142 ``self._lock_renewal_interval`` to your desired interval.
143 :param strict:
143 :param strict:
144 If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
144 If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
145 :param signal_expire:
145 :param signal_expire:
146 Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
146 Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
147 """
147 """
148 if strict and not isinstance(redis_client, StrictRedis):
148 if strict and not isinstance(redis_client, StrictRedis):
149 raise ValueError("redis_client must be instance of StrictRedis. "
149 raise ValueError("redis_client must be instance of StrictRedis. "
150 "Use strict=False if you know what you're doing.")
150 "Use strict=False if you know what you're doing.")
151 if auto_renewal and expire is None:
151 if auto_renewal and expire is None:
152 raise ValueError("Expire may not be None when auto_renewal is set")
152 raise ValueError("Expire may not be None when auto_renewal is set")
153
153
154 self._client = redis_client
154 self._client = redis_client
155
155
156 if expire:
156 if expire:
157 expire = int(expire)
157 expire = int(expire)
158 if expire < 0:
158 if expire < 0:
159 raise ValueError("A negative expire is not acceptable.")
159 raise ValueError("A negative expire is not acceptable.")
160 else:
160 else:
161 expire = None
161 expire = None
162 self._expire = expire
162 self._expire = expire
163
163
164 self._signal_expire = signal_expire
164 self._signal_expire = signal_expire
165 if id is None:
165 if id is None:
166 self._id = b64encode(urandom(18)).decode('ascii')
166 self._id = b64encode(urandom(18)).decode('ascii')
167 elif isinstance(id, binary_type):
167 elif isinstance(id, binary_type):
168 try:
168 try:
169 self._id = id.decode('ascii')
169 self._id = id.decode('ascii')
170 except UnicodeDecodeError:
170 except UnicodeDecodeError:
171 self._id = b64encode(id).decode('ascii')
171 self._id = b64encode(id).decode('ascii')
172 elif isinstance(id, text_type):
172 elif isinstance(id, text_type):
173 self._id = id
173 self._id = id
174 else:
174 else:
175 raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id))
175 raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id))
176 self._name = 'lock:' + name
176 self._name = 'lock:' + name
177 self._signal = 'lock-signal:' + name
177 self._signal = 'lock-signal:' + name
178 self._lock_renewal_interval = (float(expire) * 2 / 3
178 self._lock_renewal_interval = (float(expire) * 2 / 3
179 if auto_renewal
179 if auto_renewal
180 else None)
180 else None)
181 self._lock_renewal_thread = None
181 self._lock_renewal_thread = None
182
182
183 self.register_scripts(redis_client)
183 self.register_scripts(redis_client)
184
184
185 @classmethod
185 @classmethod
186 def register_scripts(cls, redis_client):
186 def register_scripts(cls, redis_client):
187 global reset_all_script
187 global reset_all_script
188 if reset_all_script is None:
188 if reset_all_script is None:
189 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
189 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
190 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
190 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
191 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
191 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
192 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
192 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
193 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
193 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
194
194
195 @property
195 @property
196 def _held(self):
196 def _held(self):
197 return self.id == self.get_owner_id()
197 return self.id == self.get_owner_id()
198
198
199 def reset(self):
199 def reset(self):
200 """
200 """
201 Forcibly deletes the lock. Use this with care.
201 Forcibly deletes the lock. Use this with care.
202 """
202 """
203 self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
203 self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
204
204
205 @property
205 @property
206 def id(self):
206 def id(self):
207 return self._id
207 return self._id
208
208
209 def get_owner_id(self):
209 def get_owner_id(self):
210 owner_id = self._client.get(self._name)
210 owner_id = self._client.get(self._name)
211 if isinstance(owner_id, binary_type):
211 if isinstance(owner_id, binary_type):
212 owner_id = owner_id.decode('ascii', 'replace')
212 owner_id = owner_id.decode('ascii', 'replace')
213 return owner_id
213 return owner_id
214
214
215 def acquire(self, blocking=True, timeout=None):
215 def acquire(self, blocking=True, timeout=None):
216 """
216 """
217 :param blocking:
217 :param blocking:
218 Boolean value specifying whether lock should be blocking or not.
218 Boolean value specifying whether lock should be blocking or not.
219 :param timeout:
219 :param timeout:
220 An integer value specifying the maximum number of seconds to block.
220 An integer value specifying the maximum number of seconds to block.
221 """
221 """
222 logger = loggers["acquire"]
222 logger = loggers["acquire"]
223
223
224 logger.debug("Getting %r ...", self._name)
224 logger.debug("Getting acquire on %r ...", self._name)
225
225
226 if self._held:
226 if self._held:
227 raise AlreadyAcquired("Already acquired from this Lock instance.")
227 owner_id = self.get_owner_id()
228 raise AlreadyAcquired("Already acquired from this Lock instance. Lock id: {}".format(owner_id))
228
229
229 if not blocking and timeout is not None:
230 if not blocking and timeout is not None:
230 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
231 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
231
232
232 if timeout:
233 if timeout:
233 timeout = int(timeout)
234 timeout = int(timeout)
234 if timeout < 0:
235 if timeout < 0:
235 raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout)
236 raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout)
236
237
237 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
238 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
238 raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire))
239 raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire))
239
240
240 busy = True
241 busy = True
241 blpop_timeout = timeout or self._expire or 0
242 blpop_timeout = timeout or self._expire or 0
242 timed_out = False
243 timed_out = False
243 while busy:
244 while busy:
244 busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
245 busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
245 if busy:
246 if busy:
246 if timed_out:
247 if timed_out:
247 return False
248 return False
248 elif blocking:
249 elif blocking:
249 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
250 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
250 else:
251 else:
251 logger.warning("Failed to get %r.", self._name)
252 logger.warning("Failed to get %r.", self._name)
252 return False
253 return False
253
254
254 logger.info("Got lock for %r.", self._name)
255 logger.info("Got lock for %r.", self._name)
255 if self._lock_renewal_interval is not None:
256 if self._lock_renewal_interval is not None:
256 self._start_lock_renewer()
257 self._start_lock_renewer()
257 return True
258 return True
258
259
259 def extend(self, expire=None):
260 def extend(self, expire=None):
260 """Extends expiration time of the lock.
261 """Extends expiration time of the lock.
261
262
262 :param expire:
263 :param expire:
263 New expiration time. If ``None`` - `expire` provided during
264 New expiration time. If ``None`` - `expire` provided during
264 lock initialization will be taken.
265 lock initialization will be taken.
265 """
266 """
266 if expire:
267 if expire:
267 expire = int(expire)
268 expire = int(expire)
268 if expire < 0:
269 if expire < 0:
269 raise ValueError("A negative expire is not acceptable.")
270 raise ValueError("A negative expire is not acceptable.")
270 elif self._expire is not None:
271 elif self._expire is not None:
271 expire = self._expire
272 expire = self._expire
272 else:
273 else:
273 raise TypeError(
274 raise TypeError(
274 "To extend a lock 'expire' must be provided as an "
275 "To extend a lock 'expire' must be provided as an "
275 "argument to extend() method or at initialization time."
276 "argument to extend() method or at initialization time."
276 )
277 )
277
278
278 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
279 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
279 if error == 1:
280 if error == 1:
280 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
281 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
281 elif error == 2:
282 elif error == 2:
282 raise NotExpirable("Lock %s has no assigned expiration time" % self._name)
283 raise NotExpirable("Lock %s has no assigned expiration time" % self._name)
283 elif error:
284 elif error:
284 raise RuntimeError("Unsupported error code %s from EXTEND script" % error)
285 raise RuntimeError("Unsupported error code %s from EXTEND script" % error)
285
286
286 @staticmethod
287 @staticmethod
287 def _lock_renewer(lockref, interval, stop):
288 def _lock_renewer(lockref, interval, stop):
288 """
289 """
289 Renew the lock key in redis every `interval` seconds for as long
290 Renew the lock key in redis every `interval` seconds for as long
290 as `self._lock_renewal_thread.should_exit` is False.
291 as `self._lock_renewal_thread.should_exit` is False.
291 """
292 """
292 while not stop.wait(timeout=interval):
293 while not stop.wait(timeout=interval):
293 loggers["refresh.thread.start"].debug("Refreshing lock")
294 loggers["refresh.thread.start"].debug("Refreshing lock")
294 lock = lockref()
295 lock = lockref()
295 if lock is None:
296 if lock is None:
296 loggers["refresh.thread.stop"].debug(
297 loggers["refresh.thread.stop"].debug(
297 "The lock no longer exists, stopping lock refreshing"
298 "The lock no longer exists, stopping lock refreshing"
298 )
299 )
299 break
300 break
300 lock.extend(expire=lock._expire)
301 lock.extend(expire=lock._expire)
301 del lock
302 del lock
302 loggers["refresh.thread.exit"].debug("Exit requested, stopping lock refreshing")
303 loggers["refresh.thread.exit"].debug("Exit requested, stopping lock refreshing")
303
304
304 def _start_lock_renewer(self):
305 def _start_lock_renewer(self):
305 """
306 """
306 Starts the lock refresher thread.
307 Starts the lock refresher thread.
307 """
308 """
308 if self._lock_renewal_thread is not None:
309 if self._lock_renewal_thread is not None:
309 raise AlreadyStarted("Lock refresh thread already started")
310 raise AlreadyStarted("Lock refresh thread already started")
310
311
311 loggers["refresh.start"].debug(
312 loggers["refresh.start"].debug(
312 "Starting thread to refresh lock every %s seconds",
313 "Starting thread to refresh lock every %s seconds",
313 self._lock_renewal_interval
314 self._lock_renewal_interval
314 )
315 )
315 self._lock_renewal_stop = threading.Event()
316 self._lock_renewal_stop = threading.Event()
316 self._lock_renewal_thread = threading.Thread(
317 self._lock_renewal_thread = threading.Thread(
317 group=None,
318 group=None,
318 target=self._lock_renewer,
319 target=self._lock_renewer,
319 kwargs={'lockref': weakref.ref(self),
320 kwargs={'lockref': weakref.ref(self),
320 'interval': self._lock_renewal_interval,
321 'interval': self._lock_renewal_interval,
321 'stop': self._lock_renewal_stop}
322 'stop': self._lock_renewal_stop}
322 )
323 )
323 self._lock_renewal_thread.setDaemon(True)
324 self._lock_renewal_thread.setDaemon(True)
324 self._lock_renewal_thread.start()
325 self._lock_renewal_thread.start()
325
326
326 def _stop_lock_renewer(self):
327 def _stop_lock_renewer(self):
327 """
328 """
328 Stop the lock renewer.
329 Stop the lock renewer.
329
330
330 This signals the renewal thread and waits for its exit.
331 This signals the renewal thread and waits for its exit.
331 """
332 """
332 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
333 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
333 return
334 return
334 loggers["refresh.shutdown"].debug("Signalling the lock refresher to stop")
335 loggers["refresh.shutdown"].debug("Signalling the lock refresher to stop")
335 self._lock_renewal_stop.set()
336 self._lock_renewal_stop.set()
336 self._lock_renewal_thread.join()
337 self._lock_renewal_thread.join()
337 self._lock_renewal_thread = None
338 self._lock_renewal_thread = None
338 loggers["refresh.exit"].debug("Lock refresher has stopped")
339 loggers["refresh.exit"].debug("Lock refresher has stopped")
339
340
340 def __enter__(self):
341 def __enter__(self):
341 acquired = self.acquire(blocking=True)
342 acquired = self.acquire(blocking=True)
342 assert acquired, "Lock wasn't acquired, but blocking=True"
343 assert acquired, "Lock wasn't acquired, but blocking=True"
343 return self
344 return self
344
345
345 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
346 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
346 self.release()
347 self.release()
347
348
348 def release(self):
349 def release(self):
349 """Releases the lock, that was acquired with the same object.
350 """Releases the lock, that was acquired with the same object.
350
351
351 .. note::
352 .. note::
352
353
353 If you want to release a lock that you acquired in a different place you have two choices:
354 If you want to release a lock that you acquired in a different place you have two choices:
354
355
355 * Use ``Lock("name", id=id_from_other_place).release()``
356 * Use ``Lock("name", id=id_from_other_place).release()``
356 * Use ``Lock("name").reset()``
357 * Use ``Lock("name").reset()``
357 """
358 """
358 if self._lock_renewal_thread is not None:
359 if self._lock_renewal_thread is not None:
359 self._stop_lock_renewer()
360 self._stop_lock_renewer()
360 loggers["release"].debug("Releasing %r.", self._name)
361 loggers["release"].debug("Releasing %r.", self._name)
361 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
362 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
362 if error == 1:
363 if error == 1:
363 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
364 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
364 elif error:
365 elif error:
365 raise RuntimeError("Unsupported error code %s from EXTEND script." % error)
366 raise RuntimeError("Unsupported error code %s from EXTEND script." % error)
366
367
367 def locked(self):
368 def locked(self):
368 """
369 """
369 Return true if the lock is acquired.
370 Return true if the lock is acquired.
370
371
371 Checks that lock with same name already exists. This method returns true, even if
372 Checks that lock with same name already exists. This method returns true, even if
372 lock have another id.
373 lock have another id.
373 """
374 """
374 return self._client.exists(self._name) == 1
375 return self._client.exists(self._name) == 1
375
376
376
377
377 reset_all_script = None
378 reset_all_script = None
378
379
379
380
380 def reset_all(redis_client):
381 def reset_all(redis_client):
381 """
382 """
382 Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
383 Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
383
384
384 :param redis_client:
385 :param redis_client:
385 An instance of :class:`~StrictRedis`.
386 An instance of :class:`~StrictRedis`.
386 """
387 """
387 Lock.register_scripts(redis_client)
388 Lock.register_scripts(redis_client)
388
389
389 reset_all_script(client=redis_client) # noqa
390 reset_all_script(client=redis_client) # noqa
@@ -1,312 +1,342 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2020 RhodeCode GmbH
3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import time
21 import time
22 import errno
22 import errno
23 import logging
23 import logging
24
24
25 import msgpack
25 import msgpack
26 import gevent
26 import gevent
27 import redis
27 import redis
28
28
29 from dogpile.cache.api import CachedValue
29 from dogpile.cache.api import CachedValue
30 from dogpile.cache.backends import memory as memory_backend
30 from dogpile.cache.backends import memory as memory_backend
31 from dogpile.cache.backends import file as file_backend
31 from dogpile.cache.backends import file as file_backend
32 from dogpile.cache.backends import redis as redis_backend
32 from dogpile.cache.backends import redis as redis_backend
33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
34 from dogpile.cache.util import memoized_property
34 from dogpile.cache.util import memoized_property
35
35
36 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
36 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
37
37
38
38
39 _default_max_size = 1024
39 _default_max_size = 1024
40
40
41 log = logging.getLogger(__name__)
41 log = logging.getLogger(__name__)
42
42
43
43
44 class LRUMemoryBackend(memory_backend.MemoryBackend):
44 class LRUMemoryBackend(memory_backend.MemoryBackend):
45 key_prefix = 'lru_mem_backend'
45 key_prefix = 'lru_mem_backend'
46 pickle_values = False
46 pickle_values = False
47
47
48 def __init__(self, arguments):
48 def __init__(self, arguments):
49 max_size = arguments.pop('max_size', _default_max_size)
49 max_size = arguments.pop('max_size', _default_max_size)
50
50
51 LRUDictClass = LRUDict
51 LRUDictClass = LRUDict
52 if arguments.pop('log_key_count', None):
52 if arguments.pop('log_key_count', None):
53 LRUDictClass = LRUDictDebug
53 LRUDictClass = LRUDictDebug
54
54
55 arguments['cache_dict'] = LRUDictClass(max_size)
55 arguments['cache_dict'] = LRUDictClass(max_size)
56 super(LRUMemoryBackend, self).__init__(arguments)
56 super(LRUMemoryBackend, self).__init__(arguments)
57
57
58 def delete(self, key):
58 def delete(self, key):
59 try:
59 try:
60 del self._cache[key]
60 del self._cache[key]
61 except KeyError:
61 except KeyError:
62 # we don't care if key isn't there at deletion
62 # we don't care if key isn't there at deletion
63 pass
63 pass
64
64
65 def delete_multi(self, keys):
65 def delete_multi(self, keys):
66 for key in keys:
66 for key in keys:
67 self.delete(key)
67 self.delete(key)
68
68
69
69
70 class PickleSerializer(object):
70 class PickleSerializer(object):
71
71
72 def _dumps(self, value, safe=False):
72 def _dumps(self, value, safe=False):
73 try:
73 try:
74 return compat.pickle.dumps(value)
74 return compat.pickle.dumps(value)
75 except Exception:
75 except Exception:
76 if safe:
76 if safe:
77 return NO_VALUE
77 return NO_VALUE
78 else:
78 else:
79 raise
79 raise
80
80
81 def _loads(self, value, safe=True):
81 def _loads(self, value, safe=True):
82 try:
82 try:
83 return compat.pickle.loads(value)
83 return compat.pickle.loads(value)
84 except Exception:
84 except Exception:
85 if safe:
85 if safe:
86 return NO_VALUE
86 return NO_VALUE
87 else:
87 else:
88 raise
88 raise
89
89
90
90
91 class MsgPackSerializer(object):
91 class MsgPackSerializer(object):
92
92
93 def _dumps(self, value, safe=False):
93 def _dumps(self, value, safe=False):
94 try:
94 try:
95 return msgpack.packb(value)
95 return msgpack.packb(value)
96 except Exception:
96 except Exception:
97 if safe:
97 if safe:
98 return NO_VALUE
98 return NO_VALUE
99 else:
99 else:
100 raise
100 raise
101
101
102 def _loads(self, value, safe=True):
102 def _loads(self, value, safe=True):
103 """
103 """
104 pickle maintained the `CachedValue` wrapper of the tuple
104 pickle maintained the `CachedValue` wrapper of the tuple
105 msgpack does not, so it must be added back in.
105 msgpack does not, so it must be added back in.
106 """
106 """
107 try:
107 try:
108 value = msgpack.unpackb(value, use_list=False)
108 value = msgpack.unpackb(value, use_list=False)
109 return CachedValue(*value)
109 return CachedValue(*value)
110 except Exception:
110 except Exception:
111 if safe:
111 if safe:
112 return NO_VALUE
112 return NO_VALUE
113 else:
113 else:
114 raise
114 raise
115
115
116
116
117 import fcntl
117 import fcntl
118 flock_org = fcntl.flock
118 flock_org = fcntl.flock
119
119
120
120
121 class CustomLockFactory(FileLock):
121 class CustomLockFactory(FileLock):
122
122
123 @memoized_property
123 @memoized_property
124 def _module(self):
124 def _module(self):
125
125
126 def gevent_flock(fd, operation):
126 def gevent_flock(fd, operation):
127 """
127 """
128 Gevent compatible flock
128 Gevent compatible flock
129 """
129 """
130 # set non-blocking, this will cause an exception if we cannot acquire a lock
130 # set non-blocking, this will cause an exception if we cannot acquire a lock
131 operation |= fcntl.LOCK_NB
131 operation |= fcntl.LOCK_NB
132 start_lock_time = time.time()
132 start_lock_time = time.time()
133 timeout = 60 * 15 # 15min
133 timeout = 60 * 15 # 15min
134 while True:
134 while True:
135 try:
135 try:
136 flock_org(fd, operation)
136 flock_org(fd, operation)
137 # lock has been acquired
137 # lock has been acquired
138 break
138 break
139 except (OSError, IOError) as e:
139 except (OSError, IOError) as e:
140 # raise on other errors than Resource temporarily unavailable
140 # raise on other errors than Resource temporarily unavailable
141 if e.errno != errno.EAGAIN:
141 if e.errno != errno.EAGAIN:
142 raise
142 raise
143 elif (time.time() - start_lock_time) > timeout:
143 elif (time.time() - start_lock_time) > timeout:
144 # waited to much time on a lock, better fail than loop for ever
144 # waited to much time on a lock, better fail than loop for ever
145 log.error('Failed to acquire lock on `%s` after waiting %ss',
145 log.error('Failed to acquire lock on `%s` after waiting %ss',
146 self.filename, timeout)
146 self.filename, timeout)
147 raise
147 raise
148 wait_timeout = 0.03
148 wait_timeout = 0.03
149 log.debug('Failed to acquire lock on `%s`, retry in %ss',
149 log.debug('Failed to acquire lock on `%s`, retry in %ss',
150 self.filename, wait_timeout)
150 self.filename, wait_timeout)
151 gevent.sleep(wait_timeout)
151 gevent.sleep(wait_timeout)
152
152
153 fcntl.flock = gevent_flock
153 fcntl.flock = gevent_flock
154 return fcntl
154 return fcntl
155
155
156
156
157 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
157 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
158 key_prefix = 'file_backend'
158 key_prefix = 'file_backend'
159
159
160 def __init__(self, arguments):
160 def __init__(self, arguments):
161 arguments['lock_factory'] = CustomLockFactory
161 arguments['lock_factory'] = CustomLockFactory
162 db_file = arguments.get('filename')
162 db_file = arguments.get('filename')
163
163
164 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
164 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
165 try:
165 try:
166 super(FileNamespaceBackend, self).__init__(arguments)
166 super(FileNamespaceBackend, self).__init__(arguments)
167 except Exception:
167 except Exception:
168 log.error('Failed to initialize db at: %s', db_file)
168 log.error('Failed to initialize db at: %s', db_file)
169 raise
169 raise
170
170
171 def __repr__(self):
171 def __repr__(self):
172 return '{} `{}`'.format(self.__class__, self.filename)
172 return '{} `{}`'.format(self.__class__, self.filename)
173
173
174 def list_keys(self, prefix=''):
174 def list_keys(self, prefix=''):
175 prefix = '{}:{}'.format(self.key_prefix, prefix)
175 prefix = '{}:{}'.format(self.key_prefix, prefix)
176
176
177 def cond(v):
177 def cond(v):
178 if not prefix:
178 if not prefix:
179 return True
179 return True
180
180
181 if v.startswith(prefix):
181 if v.startswith(prefix):
182 return True
182 return True
183 return False
183 return False
184
184
185 with self._dbm_file(True) as dbm:
185 with self._dbm_file(True) as dbm:
186 try:
186 try:
187 return filter(cond, dbm.keys())
187 return filter(cond, dbm.keys())
188 except Exception:
188 except Exception:
189 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
189 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
190 raise
190 raise
191
191
192 def get_store(self):
192 def get_store(self):
193 return self.filename
193 return self.filename
194
194
195 def _dbm_get(self, key):
195 def _dbm_get(self, key):
196 with self._dbm_file(False) as dbm:
196 with self._dbm_file(False) as dbm:
197 if hasattr(dbm, 'get'):
197 if hasattr(dbm, 'get'):
198 value = dbm.get(key, NO_VALUE)
198 value = dbm.get(key, NO_VALUE)
199 else:
199 else:
200 # gdbm objects lack a .get method
200 # gdbm objects lack a .get method
201 try:
201 try:
202 value = dbm[key]
202 value = dbm[key]
203 except KeyError:
203 except KeyError:
204 value = NO_VALUE
204 value = NO_VALUE
205 if value is not NO_VALUE:
205 if value is not NO_VALUE:
206 value = self._loads(value)
206 value = self._loads(value)
207 return value
207 return value
208
208
209 def get(self, key):
209 def get(self, key):
210 try:
210 try:
211 return self._dbm_get(key)
211 return self._dbm_get(key)
212 except Exception:
212 except Exception:
213 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
213 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
214 raise
214 raise
215
215
216 def set(self, key, value):
216 def set(self, key, value):
217 with self._dbm_file(True) as dbm:
217 with self._dbm_file(True) as dbm:
218 dbm[key] = self._dumps(value)
218 dbm[key] = self._dumps(value)
219
219
220 def set_multi(self, mapping):
220 def set_multi(self, mapping):
221 with self._dbm_file(True) as dbm:
221 with self._dbm_file(True) as dbm:
222 for key, value in mapping.items():
222 for key, value in mapping.items():
223 dbm[key] = self._dumps(value)
223 dbm[key] = self._dumps(value)
224
224
225
225
226 class BaseRedisBackend(redis_backend.RedisBackend):
226 class BaseRedisBackend(redis_backend.RedisBackend):
227
227
228 def _create_client(self):
228 def _create_client(self):
229 args = {}
229 args = {}
230
230
231 if self.url is not None:
231 if self.url is not None:
232 args.update(url=self.url)
232 args.update(url=self.url)
233
233
234 else:
234 else:
235 args.update(
235 args.update(
236 host=self.host, password=self.password,
236 host=self.host, password=self.password,
237 port=self.port, db=self.db
237 port=self.port, db=self.db
238 )
238 )
239
239
240 connection_pool = redis.ConnectionPool(**args)
240 connection_pool = redis.ConnectionPool(**args)
241
241
242 return redis.StrictRedis(connection_pool=connection_pool)
242 return redis.StrictRedis(connection_pool=connection_pool)
243
243
244 def list_keys(self, prefix=''):
244 def list_keys(self, prefix=''):
245 prefix = '{}:{}*'.format(self.key_prefix, prefix)
245 prefix = '{}:{}*'.format(self.key_prefix, prefix)
246 return self.client.keys(prefix)
246 return self.client.keys(prefix)
247
247
248 def get_store(self):
248 def get_store(self):
249 return self.client.connection_pool
249 return self.client.connection_pool
250
250
251 def get(self, key):
251 def get(self, key):
252 value = self.client.get(key)
252 value = self.client.get(key)
253 if value is None:
253 if value is None:
254 return NO_VALUE
254 return NO_VALUE
255 return self._loads(value)
255 return self._loads(value)
256
256
257 def get_multi(self, keys):
257 def get_multi(self, keys):
258 if not keys:
258 if not keys:
259 return []
259 return []
260 values = self.client.mget(keys)
260 values = self.client.mget(keys)
261 loads = self._loads
261 loads = self._loads
262 return [
262 return [
263 loads(v) if v is not None else NO_VALUE
263 loads(v) if v is not None else NO_VALUE
264 for v in values]
264 for v in values]
265
265
266 def set(self, key, value):
266 def set(self, key, value):
267 if self.redis_expiration_time:
267 if self.redis_expiration_time:
268 self.client.setex(key, self.redis_expiration_time,
268 self.client.setex(key, self.redis_expiration_time,
269 self._dumps(value))
269 self._dumps(value))
270 else:
270 else:
271 self.client.set(key, self._dumps(value))
271 self.client.set(key, self._dumps(value))
272
272
273 def set_multi(self, mapping):
273 def set_multi(self, mapping):
274 dumps = self._dumps
274 dumps = self._dumps
275 mapping = dict(
275 mapping = dict(
276 (k, dumps(v))
276 (k, dumps(v))
277 for k, v in mapping.items()
277 for k, v in mapping.items()
278 )
278 )
279
279
280 if not self.redis_expiration_time:
280 if not self.redis_expiration_time:
281 self.client.mset(mapping)
281 self.client.mset(mapping)
282 else:
282 else:
283 pipe = self.client.pipeline()
283 pipe = self.client.pipeline()
284 for key, value in mapping.items():
284 for key, value in mapping.items():
285 pipe.setex(key, self.redis_expiration_time, value)
285 pipe.setex(key, self.redis_expiration_time, value)
286 pipe.execute()
286 pipe.execute()
287
287
288 def get_mutex(self, key):
288 def get_mutex(self, key):
289 if self.distributed_lock:
289 if self.distributed_lock:
290 import redis_lock
291 lock_key = redis_backend.u('_lock_{0}').format(key)
290 lock_key = redis_backend.u('_lock_{0}').format(key)
292 log.debug('Trying to acquire Redis lock for key %s', lock_key)
291 log.debug('Trying to acquire Redis lock for key %s', lock_key)
293 lock = redis_lock.Lock(
292
294 redis_client=self.client,
293 auto_renewal = True
295 name=lock_key,
294 lock_timeout = self.lock_timeout
296 expire=self.lock_timeout,
295 if auto_renewal and not self.lock_timeout:
297 auto_renewal=False,
296 # set default timeout for auto_renewal
298 strict=True,
297 lock_timeout = 10
299 )
298 return get_mutex_lock(self.client, lock_key, lock_timeout,
300 return lock
299 auto_renewal=auto_renewal)
301 else:
300 else:
302 return None
301 return None
303
302
304
303
305 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
304 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
306 key_prefix = 'redis_pickle_backend'
305 key_prefix = 'redis_pickle_backend'
307 pass
306 pass
308
307
309
308
310 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
309 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
311 key_prefix = 'redis_msgpack_backend'
310 key_prefix = 'redis_msgpack_backend'
312 pass
311 pass
312
313
314 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
315 import redis_lock
316
317 class _RedisLockWrapper(object):
318 """LockWrapper for redis_lock"""
319
320 def __init__(self):
321 pass
322
323 @property
324 def lock(self):
325 return redis_lock.Lock(
326 redis_client=client,
327 name=lock_key,
328 expire=lock_timeout,
329 auto_renewal=auto_renewal,
330 strict=True,
331 )
332
333 def acquire(self, wait=True):
334 return self.lock.acquire(wait)
335
336 def release(self):
337 try:
338 self.lock.release()
339 except redis_lock.NotAcquired:
340 pass
341
342 return _RedisLockWrapper()
General Comments 0
You need to be logged in to leave comments. Login now