##// END OF EJS Templates
redis: added better distributed lock implementation.
super-admin -
r4705:dce8d447 stable
parent child Browse files
Show More
@@ -0,0 +1,389 b''
1 import sys
2 import threading
3 import weakref
4 from base64 import b64encode
5 from logging import getLogger
6 from os import urandom
7
8 from redis import StrictRedis
9
10 __version__ = '3.7.0'
11
12 loggers = {
13 k: getLogger(".".join((__name__, k)))
14 for k in [
15 "acquire",
16 "refresh.thread.start",
17 "refresh.thread.stop",
18 "refresh.thread.exit",
19 "refresh.start",
20 "refresh.shutdown",
21 "refresh.exit",
22 "release",
23 ]
24 }
25
26 PY3 = sys.version_info[0] == 3
27
28 if PY3:
29 text_type = str
30 binary_type = bytes
31 else:
32 text_type = unicode # noqa
33 binary_type = str
34
35
36 # Check if the id match. If not, return an error code.
37 UNLOCK_SCRIPT = b"""
38 if redis.call("get", KEYS[1]) ~= ARGV[1] then
39 return 1
40 else
41 redis.call("del", KEYS[2])
42 redis.call("lpush", KEYS[2], 1)
43 redis.call("pexpire", KEYS[2], ARGV[2])
44 redis.call("del", KEYS[1])
45 return 0
46 end
47 """
48
49 # Covers both cases when key doesn't exist and doesn't equal to lock's id
50 EXTEND_SCRIPT = b"""
51 if redis.call("get", KEYS[1]) ~= ARGV[1] then
52 return 1
53 elseif redis.call("ttl", KEYS[1]) < 0 then
54 return 2
55 else
56 redis.call("expire", KEYS[1], ARGV[2])
57 return 0
58 end
59 """
60
61 RESET_SCRIPT = b"""
62 redis.call('del', KEYS[2])
63 redis.call('lpush', KEYS[2], 1)
64 redis.call('pexpire', KEYS[2], ARGV[2])
65 return redis.call('del', KEYS[1])
66 """
67
68 RESET_ALL_SCRIPT = b"""
69 local locks = redis.call('keys', 'lock:*')
70 local signal
71 for _, lock in pairs(locks) do
72 signal = 'lock-signal:' .. string.sub(lock, 6)
73 redis.call('del', signal)
74 redis.call('lpush', signal, 1)
75 redis.call('expire', signal, 1)
76 redis.call('del', lock)
77 end
78 return #locks
79 """
80
81
82 class AlreadyAcquired(RuntimeError):
83 pass
84
85
86 class NotAcquired(RuntimeError):
87 pass
88
89
90 class AlreadyStarted(RuntimeError):
91 pass
92
93
94 class TimeoutNotUsable(RuntimeError):
95 pass
96
97
98 class InvalidTimeout(RuntimeError):
99 pass
100
101
102 class TimeoutTooLarge(RuntimeError):
103 pass
104
105
106 class NotExpirable(RuntimeError):
107 pass
108
109
110 class Lock(object):
111 """
112 A Lock context manager implemented via redis SETNX/BLPOP.
113 """
114 unlock_script = None
115 extend_script = None
116 reset_script = None
117 reset_all_script = None
118
119 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
120 """
121 :param redis_client:
122 An instance of :class:`~StrictRedis`.
123 :param name:
124 The name (redis key) the lock should have.
125 :param expire:
126 The lock expiry time in seconds. If left at the default (None)
127 the lock will not expire.
128 :param id:
129 The ID (redis value) the lock should have. A random value is
130 generated when left at the default.
131
132 Note that if you specify this then the lock is marked as "held". Acquires
133 won't be possible.
134 :param auto_renewal:
135 If set to ``True``, Lock will automatically renew the lock so that it
136 doesn't expire for as long as the lock is held (acquire() called
137 or running in a context manager).
138
139 Implementation note: Renewal will happen using a daemon thread with
140 an interval of ``expire*2/3``. If wishing to use a different renewal
141 time, subclass Lock, call ``super().__init__()`` then set
142 ``self._lock_renewal_interval`` to your desired interval.
143 :param strict:
144 If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
145 :param signal_expire:
146 Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
147 """
148 if strict and not isinstance(redis_client, StrictRedis):
149 raise ValueError("redis_client must be instance of StrictRedis. "
150 "Use strict=False if you know what you're doing.")
151 if auto_renewal and expire is None:
152 raise ValueError("Expire may not be None when auto_renewal is set")
153
154 self._client = redis_client
155
156 if expire:
157 expire = int(expire)
158 if expire < 0:
159 raise ValueError("A negative expire is not acceptable.")
160 else:
161 expire = None
162 self._expire = expire
163
164 self._signal_expire = signal_expire
165 if id is None:
166 self._id = b64encode(urandom(18)).decode('ascii')
167 elif isinstance(id, binary_type):
168 try:
169 self._id = id.decode('ascii')
170 except UnicodeDecodeError:
171 self._id = b64encode(id).decode('ascii')
172 elif isinstance(id, text_type):
173 self._id = id
174 else:
175 raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id))
176 self._name = 'lock:' + name
177 self._signal = 'lock-signal:' + name
178 self._lock_renewal_interval = (float(expire) * 2 / 3
179 if auto_renewal
180 else None)
181 self._lock_renewal_thread = None
182
183 self.register_scripts(redis_client)
184
185 @classmethod
186 def register_scripts(cls, redis_client):
187 global reset_all_script
188 if reset_all_script is None:
189 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
190 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
191 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
192 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
193 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
194
195 @property
196 def _held(self):
197 return self.id == self.get_owner_id()
198
199 def reset(self):
200 """
201 Forcibly deletes the lock. Use this with care.
202 """
203 self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
204
205 @property
206 def id(self):
207 return self._id
208
209 def get_owner_id(self):
210 owner_id = self._client.get(self._name)
211 if isinstance(owner_id, binary_type):
212 owner_id = owner_id.decode('ascii', 'replace')
213 return owner_id
214
215 def acquire(self, blocking=True, timeout=None):
216 """
217 :param blocking:
218 Boolean value specifying whether lock should be blocking or not.
219 :param timeout:
220 An integer value specifying the maximum number of seconds to block.
221 """
222 logger = loggers["acquire"]
223
224 logger.debug("Getting %r ...", self._name)
225
226 if self._held:
227 raise AlreadyAcquired("Already acquired from this Lock instance.")
228
229 if not blocking and timeout is not None:
230 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
231
232 if timeout:
233 timeout = int(timeout)
234 if timeout < 0:
235 raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout)
236
237 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
238 raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire))
239
240 busy = True
241 blpop_timeout = timeout or self._expire or 0
242 timed_out = False
243 while busy:
244 busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
245 if busy:
246 if timed_out:
247 return False
248 elif blocking:
249 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
250 else:
251 logger.warning("Failed to get %r.", self._name)
252 return False
253
254 logger.info("Got lock for %r.", self._name)
255 if self._lock_renewal_interval is not None:
256 self._start_lock_renewer()
257 return True
258
259 def extend(self, expire=None):
260 """Extends expiration time of the lock.
261
262 :param expire:
263 New expiration time. If ``None`` - `expire` provided during
264 lock initialization will be taken.
265 """
266 if expire:
267 expire = int(expire)
268 if expire < 0:
269 raise ValueError("A negative expire is not acceptable.")
270 elif self._expire is not None:
271 expire = self._expire
272 else:
273 raise TypeError(
274 "To extend a lock 'expire' must be provided as an "
275 "argument to extend() method or at initialization time."
276 )
277
278 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
279 if error == 1:
280 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
281 elif error == 2:
282 raise NotExpirable("Lock %s has no assigned expiration time" % self._name)
283 elif error:
284 raise RuntimeError("Unsupported error code %s from EXTEND script" % error)
285
286 @staticmethod
287 def _lock_renewer(lockref, interval, stop):
288 """
289 Renew the lock key in redis every `interval` seconds for as long
290 as `self._lock_renewal_thread.should_exit` is False.
291 """
292 while not stop.wait(timeout=interval):
293 loggers["refresh.thread.start"].debug("Refreshing lock")
294 lock = lockref()
295 if lock is None:
296 loggers["refresh.thread.stop"].debug(
297 "The lock no longer exists, stopping lock refreshing"
298 )
299 break
300 lock.extend(expire=lock._expire)
301 del lock
302 loggers["refresh.thread.exit"].debug("Exit requested, stopping lock refreshing")
303
304 def _start_lock_renewer(self):
305 """
306 Starts the lock refresher thread.
307 """
308 if self._lock_renewal_thread is not None:
309 raise AlreadyStarted("Lock refresh thread already started")
310
311 loggers["refresh.start"].debug(
312 "Starting thread to refresh lock every %s seconds",
313 self._lock_renewal_interval
314 )
315 self._lock_renewal_stop = threading.Event()
316 self._lock_renewal_thread = threading.Thread(
317 group=None,
318 target=self._lock_renewer,
319 kwargs={'lockref': weakref.ref(self),
320 'interval': self._lock_renewal_interval,
321 'stop': self._lock_renewal_stop}
322 )
323 self._lock_renewal_thread.setDaemon(True)
324 self._lock_renewal_thread.start()
325
326 def _stop_lock_renewer(self):
327 """
328 Stop the lock renewer.
329
330 This signals the renewal thread and waits for its exit.
331 """
332 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
333 return
334 loggers["refresh.shutdown"].debug("Signalling the lock refresher to stop")
335 self._lock_renewal_stop.set()
336 self._lock_renewal_thread.join()
337 self._lock_renewal_thread = None
338 loggers["refresh.exit"].debug("Lock refresher has stopped")
339
340 def __enter__(self):
341 acquired = self.acquire(blocking=True)
342 assert acquired, "Lock wasn't acquired, but blocking=True"
343 return self
344
345 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
346 self.release()
347
348 def release(self):
349 """Releases the lock, that was acquired with the same object.
350
351 .. note::
352
353 If you want to release a lock that you acquired in a different place you have two choices:
354
355 * Use ``Lock("name", id=id_from_other_place).release()``
356 * Use ``Lock("name").reset()``
357 """
358 if self._lock_renewal_thread is not None:
359 self._stop_lock_renewer()
360 loggers["release"].debug("Releasing %r.", self._name)
361 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
362 if error == 1:
363 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
364 elif error:
365 raise RuntimeError("Unsupported error code %s from EXTEND script." % error)
366
367 def locked(self):
368 """
369 Return true if the lock is acquired.
370
371 Checks that lock with same name already exists. This method returns true, even if
372 lock have another id.
373 """
374 return self._client.exists(self._name) == 1
375
376
377 reset_all_script = None
378
379
380 def reset_all(redis_client):
381 """
382 Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
383
384 :param redis_client:
385 An instance of :class:`~StrictRedis`.
386 """
387 Lock.register_scripts(redis_client)
388
389 reset_all_script(client=redis_client) # noqa
@@ -1,305 +1,311 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import time
22 22 import errno
23 23 import logging
24 24
25 25 import msgpack
26 26 import gevent
27 27 import redis
28 28
29 29 from dogpile.cache.api import CachedValue
30 30 from dogpile.cache.backends import memory as memory_backend
31 31 from dogpile.cache.backends import file as file_backend
32 32 from dogpile.cache.backends import redis as redis_backend
33 33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
34 34 from dogpile.cache.util import memoized_property
35 35
36 36 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
37 37
38 38
39 39 _default_max_size = 1024
40 40
41 41 log = logging.getLogger(__name__)
42 42
43 43
44 44 class LRUMemoryBackend(memory_backend.MemoryBackend):
45 45 key_prefix = 'lru_mem_backend'
46 46 pickle_values = False
47 47
48 48 def __init__(self, arguments):
49 49 max_size = arguments.pop('max_size', _default_max_size)
50 50
51 51 LRUDictClass = LRUDict
52 52 if arguments.pop('log_key_count', None):
53 53 LRUDictClass = LRUDictDebug
54 54
55 55 arguments['cache_dict'] = LRUDictClass(max_size)
56 56 super(LRUMemoryBackend, self).__init__(arguments)
57 57
58 58 def delete(self, key):
59 59 try:
60 60 del self._cache[key]
61 61 except KeyError:
62 62 # we don't care if key isn't there at deletion
63 63 pass
64 64
65 65 def delete_multi(self, keys):
66 66 for key in keys:
67 67 self.delete(key)
68 68
69 69
70 70 class PickleSerializer(object):
71 71
72 72 def _dumps(self, value, safe=False):
73 73 try:
74 74 return compat.pickle.dumps(value)
75 75 except Exception:
76 76 if safe:
77 77 return NO_VALUE
78 78 else:
79 79 raise
80 80
81 81 def _loads(self, value, safe=True):
82 82 try:
83 83 return compat.pickle.loads(value)
84 84 except Exception:
85 85 if safe:
86 86 return NO_VALUE
87 87 else:
88 88 raise
89 89
90 90
91 91 class MsgPackSerializer(object):
92 92
93 93 def _dumps(self, value, safe=False):
94 94 try:
95 95 return msgpack.packb(value)
96 96 except Exception:
97 97 if safe:
98 98 return NO_VALUE
99 99 else:
100 100 raise
101 101
102 102 def _loads(self, value, safe=True):
103 103 """
104 104 pickle maintained the `CachedValue` wrapper of the tuple
105 105 msgpack does not, so it must be added back in.
106 106 """
107 107 try:
108 108 value = msgpack.unpackb(value, use_list=False)
109 109 return CachedValue(*value)
110 110 except Exception:
111 111 if safe:
112 112 return NO_VALUE
113 113 else:
114 114 raise
115 115
116 116
117 117 import fcntl
118 118 flock_org = fcntl.flock
119 119
120 120
121 121 class CustomLockFactory(FileLock):
122 122
123 123 @memoized_property
124 124 def _module(self):
125 125
126 126 def gevent_flock(fd, operation):
127 127 """
128 128 Gevent compatible flock
129 129 """
130 130 # set non-blocking, this will cause an exception if we cannot acquire a lock
131 131 operation |= fcntl.LOCK_NB
132 132 start_lock_time = time.time()
133 133 timeout = 60 * 15 # 15min
134 134 while True:
135 135 try:
136 136 flock_org(fd, operation)
137 137 # lock has been acquired
138 138 break
139 139 except (OSError, IOError) as e:
140 140 # raise on other errors than Resource temporarily unavailable
141 141 if e.errno != errno.EAGAIN:
142 142 raise
143 143 elif (time.time() - start_lock_time) > timeout:
144 144 # waited to much time on a lock, better fail than loop for ever
145 145 log.error('Failed to acquire lock on `%s` after waiting %ss',
146 146 self.filename, timeout)
147 147 raise
148 148 wait_timeout = 0.03
149 149 log.debug('Failed to acquire lock on `%s`, retry in %ss',
150 150 self.filename, wait_timeout)
151 151 gevent.sleep(wait_timeout)
152 152
153 153 fcntl.flock = gevent_flock
154 154 return fcntl
155 155
156 156
157 157 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
158 158 key_prefix = 'file_backend'
159 159
160 160 def __init__(self, arguments):
161 161 arguments['lock_factory'] = CustomLockFactory
162 162 db_file = arguments.get('filename')
163 163
164 164 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
165 165 try:
166 166 super(FileNamespaceBackend, self).__init__(arguments)
167 167 except Exception:
168 168 log.error('Failed to initialize db at: %s', db_file)
169 169 raise
170 170
171 171 def __repr__(self):
172 172 return '{} `{}`'.format(self.__class__, self.filename)
173 173
174 174 def list_keys(self, prefix=''):
175 175 prefix = '{}:{}'.format(self.key_prefix, prefix)
176 176
177 177 def cond(v):
178 178 if not prefix:
179 179 return True
180 180
181 181 if v.startswith(prefix):
182 182 return True
183 183 return False
184 184
185 185 with self._dbm_file(True) as dbm:
186 186 try:
187 187 return filter(cond, dbm.keys())
188 188 except Exception:
189 189 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
190 190 raise
191 191
192 192 def get_store(self):
193 193 return self.filename
194 194
195 195 def _dbm_get(self, key):
196 196 with self._dbm_file(False) as dbm:
197 197 if hasattr(dbm, 'get'):
198 198 value = dbm.get(key, NO_VALUE)
199 199 else:
200 200 # gdbm objects lack a .get method
201 201 try:
202 202 value = dbm[key]
203 203 except KeyError:
204 204 value = NO_VALUE
205 205 if value is not NO_VALUE:
206 206 value = self._loads(value)
207 207 return value
208 208
209 209 def get(self, key):
210 210 try:
211 211 return self._dbm_get(key)
212 212 except Exception:
213 213 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
214 214 raise
215 215
216 216 def set(self, key, value):
217 217 with self._dbm_file(True) as dbm:
218 218 dbm[key] = self._dumps(value)
219 219
220 220 def set_multi(self, mapping):
221 221 with self._dbm_file(True) as dbm:
222 222 for key, value in mapping.items():
223 223 dbm[key] = self._dumps(value)
224 224
225 225
226 226 class BaseRedisBackend(redis_backend.RedisBackend):
227 227
228 228 def _create_client(self):
229 229 args = {}
230 230
231 231 if self.url is not None:
232 232 args.update(url=self.url)
233 233
234 234 else:
235 235 args.update(
236 236 host=self.host, password=self.password,
237 237 port=self.port, db=self.db
238 238 )
239 239
240 240 connection_pool = redis.ConnectionPool(**args)
241 241
242 242 return redis.StrictRedis(connection_pool=connection_pool)
243 243
244 244 def list_keys(self, prefix=''):
245 245 prefix = '{}:{}*'.format(self.key_prefix, prefix)
246 246 return self.client.keys(prefix)
247 247
248 248 def get_store(self):
249 249 return self.client.connection_pool
250 250
251 251 def get(self, key):
252 252 value = self.client.get(key)
253 253 if value is None:
254 254 return NO_VALUE
255 255 return self._loads(value)
256 256
257 257 def get_multi(self, keys):
258 258 if not keys:
259 259 return []
260 260 values = self.client.mget(keys)
261 261 loads = self._loads
262 262 return [
263 263 loads(v) if v is not None else NO_VALUE
264 264 for v in values]
265 265
266 266 def set(self, key, value):
267 267 if self.redis_expiration_time:
268 268 self.client.setex(key, self.redis_expiration_time,
269 269 self._dumps(value))
270 270 else:
271 271 self.client.set(key, self._dumps(value))
272 272
273 273 def set_multi(self, mapping):
274 274 dumps = self._dumps
275 275 mapping = dict(
276 276 (k, dumps(v))
277 277 for k, v in mapping.items()
278 278 )
279 279
280 280 if not self.redis_expiration_time:
281 281 self.client.mset(mapping)
282 282 else:
283 283 pipe = self.client.pipeline()
284 284 for key, value in mapping.items():
285 285 pipe.setex(key, self.redis_expiration_time, value)
286 286 pipe.execute()
287 287
288 288 def get_mutex(self, key):
289 u = redis_backend.u
290 289 if self.distributed_lock:
291 lock_key = u('_lock_{0}').format(key)
290 import redis_lock
291 lock_key = redis_backend.u('_lock_{0}').format(key)
292 292 log.debug('Trying to acquire Redis lock for key %s', lock_key)
293 return self.client.lock(lock_key, self.lock_timeout, self.lock_sleep)
293 lock = redis_lock.Lock(
294 redis_client=self.client,
295 name=lock_key,
296 expire=self.lock_timeout,
297 auto_renewal=False,
298 )
299 return lock
294 300 else:
295 301 return None
296 302
297 303
298 304 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
299 305 key_prefix = 'redis_pickle_backend'
300 306 pass
301 307
302 308
303 309 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
304 310 key_prefix = 'redis_msgpack_backend'
305 311 pass
General Comments 0
You need to be logged in to leave comments. Login now