##// END OF EJS Templates
redis-lock: added strict/logging improvements.
super-admin -
r945:7e130ea6 stable
parent child Browse files
Show More
@@ -1,389 +1,389 b''
1 import sys
1 import sys
2 import threading
2 import threading
3 import weakref
3 import weakref
4 from base64 import b64encode
4 from base64 import b64encode
5 from logging import getLogger
5 from logging import getLogger
6 from os import urandom
6 from os import urandom
7
7
8 from redis import StrictRedis
8 from redis import StrictRedis
9
9
10 __version__ = '3.7.0'
10 __version__ = '3.7.0'
11
11
12 loggers = {
12 loggers = {
13 k: getLogger(".".join((__name__, k)))
13 k: getLogger("vcsserver" + ".".join((__name__, k)))
14 for k in [
14 for k in [
15 "acquire",
15 "acquire",
16 "refresh.thread.start",
16 "refresh.thread.start",
17 "refresh.thread.stop",
17 "refresh.thread.stop",
18 "refresh.thread.exit",
18 "refresh.thread.exit",
19 "refresh.start",
19 "refresh.start",
20 "refresh.shutdown",
20 "refresh.shutdown",
21 "refresh.exit",
21 "refresh.exit",
22 "release",
22 "release",
23 ]
23 ]
24 }
24 }
25
25
26 PY3 = sys.version_info[0] == 3
26 PY3 = sys.version_info[0] == 3
27
27
28 if PY3:
28 if PY3:
29 text_type = str
29 text_type = str
30 binary_type = bytes
30 binary_type = bytes
31 else:
31 else:
32 text_type = unicode # noqa
32 text_type = unicode # noqa
33 binary_type = str
33 binary_type = str
34
34
35
35
36 # Check if the id match. If not, return an error code.
36 # Check if the id match. If not, return an error code.
37 UNLOCK_SCRIPT = b"""
37 UNLOCK_SCRIPT = b"""
38 if redis.call("get", KEYS[1]) ~= ARGV[1] then
38 if redis.call("get", KEYS[1]) ~= ARGV[1] then
39 return 1
39 return 1
40 else
40 else
41 redis.call("del", KEYS[2])
41 redis.call("del", KEYS[2])
42 redis.call("lpush", KEYS[2], 1)
42 redis.call("lpush", KEYS[2], 1)
43 redis.call("pexpire", KEYS[2], ARGV[2])
43 redis.call("pexpire", KEYS[2], ARGV[2])
44 redis.call("del", KEYS[1])
44 redis.call("del", KEYS[1])
45 return 0
45 return 0
46 end
46 end
47 """
47 """
48
48
49 # Covers both cases when key doesn't exist and doesn't equal to lock's id
49 # Covers both cases when key doesn't exist and doesn't equal to lock's id
50 EXTEND_SCRIPT = b"""
50 EXTEND_SCRIPT = b"""
51 if redis.call("get", KEYS[1]) ~= ARGV[1] then
51 if redis.call("get", KEYS[1]) ~= ARGV[1] then
52 return 1
52 return 1
53 elseif redis.call("ttl", KEYS[1]) < 0 then
53 elseif redis.call("ttl", KEYS[1]) < 0 then
54 return 2
54 return 2
55 else
55 else
56 redis.call("expire", KEYS[1], ARGV[2])
56 redis.call("expire", KEYS[1], ARGV[2])
57 return 0
57 return 0
58 end
58 end
59 """
59 """
60
60
61 RESET_SCRIPT = b"""
61 RESET_SCRIPT = b"""
62 redis.call('del', KEYS[2])
62 redis.call('del', KEYS[2])
63 redis.call('lpush', KEYS[2], 1)
63 redis.call('lpush', KEYS[2], 1)
64 redis.call('pexpire', KEYS[2], ARGV[2])
64 redis.call('pexpire', KEYS[2], ARGV[2])
65 return redis.call('del', KEYS[1])
65 return redis.call('del', KEYS[1])
66 """
66 """
67
67
68 RESET_ALL_SCRIPT = b"""
68 RESET_ALL_SCRIPT = b"""
69 local locks = redis.call('keys', 'lock:*')
69 local locks = redis.call('keys', 'lock:*')
70 local signal
70 local signal
71 for _, lock in pairs(locks) do
71 for _, lock in pairs(locks) do
72 signal = 'lock-signal:' .. string.sub(lock, 6)
72 signal = 'lock-signal:' .. string.sub(lock, 6)
73 redis.call('del', signal)
73 redis.call('del', signal)
74 redis.call('lpush', signal, 1)
74 redis.call('lpush', signal, 1)
75 redis.call('expire', signal, 1)
75 redis.call('expire', signal, 1)
76 redis.call('del', lock)
76 redis.call('del', lock)
77 end
77 end
78 return #locks
78 return #locks
79 """
79 """
80
80
81
81
82 class AlreadyAcquired(RuntimeError):
82 class AlreadyAcquired(RuntimeError):
83 pass
83 pass
84
84
85
85
86 class NotAcquired(RuntimeError):
86 class NotAcquired(RuntimeError):
87 pass
87 pass
88
88
89
89
90 class AlreadyStarted(RuntimeError):
90 class AlreadyStarted(RuntimeError):
91 pass
91 pass
92
92
93
93
94 class TimeoutNotUsable(RuntimeError):
94 class TimeoutNotUsable(RuntimeError):
95 pass
95 pass
96
96
97
97
98 class InvalidTimeout(RuntimeError):
98 class InvalidTimeout(RuntimeError):
99 pass
99 pass
100
100
101
101
102 class TimeoutTooLarge(RuntimeError):
102 class TimeoutTooLarge(RuntimeError):
103 pass
103 pass
104
104
105
105
106 class NotExpirable(RuntimeError):
106 class NotExpirable(RuntimeError):
107 pass
107 pass
108
108
109
109
110 class Lock(object):
110 class Lock(object):
111 """
111 """
112 A Lock context manager implemented via redis SETNX/BLPOP.
112 A Lock context manager implemented via redis SETNX/BLPOP.
113 """
113 """
114 unlock_script = None
114 unlock_script = None
115 extend_script = None
115 extend_script = None
116 reset_script = None
116 reset_script = None
117 reset_all_script = None
117 reset_all_script = None
118
118
119 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
119 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
120 """
120 """
121 :param redis_client:
121 :param redis_client:
122 An instance of :class:`~StrictRedis`.
122 An instance of :class:`~StrictRedis`.
123 :param name:
123 :param name:
124 The name (redis key) the lock should have.
124 The name (redis key) the lock should have.
125 :param expire:
125 :param expire:
126 The lock expiry time in seconds. If left at the default (None)
126 The lock expiry time in seconds. If left at the default (None)
127 the lock will not expire.
127 the lock will not expire.
128 :param id:
128 :param id:
129 The ID (redis value) the lock should have. A random value is
129 The ID (redis value) the lock should have. A random value is
130 generated when left at the default.
130 generated when left at the default.
131
131
132 Note that if you specify this then the lock is marked as "held". Acquires
132 Note that if you specify this then the lock is marked as "held". Acquires
133 won't be possible.
133 won't be possible.
134 :param auto_renewal:
134 :param auto_renewal:
135 If set to ``True``, Lock will automatically renew the lock so that it
135 If set to ``True``, Lock will automatically renew the lock so that it
136 doesn't expire for as long as the lock is held (acquire() called
136 doesn't expire for as long as the lock is held (acquire() called
137 or running in a context manager).
137 or running in a context manager).
138
138
139 Implementation note: Renewal will happen using a daemon thread with
139 Implementation note: Renewal will happen using a daemon thread with
140 an interval of ``expire*2/3``. If wishing to use a different renewal
140 an interval of ``expire*2/3``. If wishing to use a different renewal
141 time, subclass Lock, call ``super().__init__()`` then set
141 time, subclass Lock, call ``super().__init__()`` then set
142 ``self._lock_renewal_interval`` to your desired interval.
142 ``self._lock_renewal_interval`` to your desired interval.
143 :param strict:
143 :param strict:
144 If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
144 If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
145 :param signal_expire:
145 :param signal_expire:
146 Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
146 Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
147 """
147 """
148 if strict and not isinstance(redis_client, StrictRedis):
148 if strict and not isinstance(redis_client, StrictRedis):
149 raise ValueError("redis_client must be instance of StrictRedis. "
149 raise ValueError("redis_client must be instance of StrictRedis. "
150 "Use strict=False if you know what you're doing.")
150 "Use strict=False if you know what you're doing.")
151 if auto_renewal and expire is None:
151 if auto_renewal and expire is None:
152 raise ValueError("Expire may not be None when auto_renewal is set")
152 raise ValueError("Expire may not be None when auto_renewal is set")
153
153
154 self._client = redis_client
154 self._client = redis_client
155
155
156 if expire:
156 if expire:
157 expire = int(expire)
157 expire = int(expire)
158 if expire < 0:
158 if expire < 0:
159 raise ValueError("A negative expire is not acceptable.")
159 raise ValueError("A negative expire is not acceptable.")
160 else:
160 else:
161 expire = None
161 expire = None
162 self._expire = expire
162 self._expire = expire
163
163
164 self._signal_expire = signal_expire
164 self._signal_expire = signal_expire
165 if id is None:
165 if id is None:
166 self._id = b64encode(urandom(18)).decode('ascii')
166 self._id = b64encode(urandom(18)).decode('ascii')
167 elif isinstance(id, binary_type):
167 elif isinstance(id, binary_type):
168 try:
168 try:
169 self._id = id.decode('ascii')
169 self._id = id.decode('ascii')
170 except UnicodeDecodeError:
170 except UnicodeDecodeError:
171 self._id = b64encode(id).decode('ascii')
171 self._id = b64encode(id).decode('ascii')
172 elif isinstance(id, text_type):
172 elif isinstance(id, text_type):
173 self._id = id
173 self._id = id
174 else:
174 else:
175 raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id))
175 raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id))
176 self._name = 'lock:' + name
176 self._name = 'lock:' + name
177 self._signal = 'lock-signal:' + name
177 self._signal = 'lock-signal:' + name
178 self._lock_renewal_interval = (float(expire) * 2 / 3
178 self._lock_renewal_interval = (float(expire) * 2 / 3
179 if auto_renewal
179 if auto_renewal
180 else None)
180 else None)
181 self._lock_renewal_thread = None
181 self._lock_renewal_thread = None
182
182
183 self.register_scripts(redis_client)
183 self.register_scripts(redis_client)
184
184
185 @classmethod
185 @classmethod
186 def register_scripts(cls, redis_client):
186 def register_scripts(cls, redis_client):
187 global reset_all_script
187 global reset_all_script
188 if reset_all_script is None:
188 if reset_all_script is None:
189 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
189 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
190 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
190 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
191 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
191 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
192 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
192 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
193 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
193 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
194
194
195 @property
195 @property
196 def _held(self):
196 def _held(self):
197 return self.id == self.get_owner_id()
197 return self.id == self.get_owner_id()
198
198
199 def reset(self):
199 def reset(self):
200 """
200 """
201 Forcibly deletes the lock. Use this with care.
201 Forcibly deletes the lock. Use this with care.
202 """
202 """
203 self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
203 self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
204
204
205 @property
205 @property
206 def id(self):
206 def id(self):
207 return self._id
207 return self._id
208
208
209 def get_owner_id(self):
209 def get_owner_id(self):
210 owner_id = self._client.get(self._name)
210 owner_id = self._client.get(self._name)
211 if isinstance(owner_id, binary_type):
211 if isinstance(owner_id, binary_type):
212 owner_id = owner_id.decode('ascii', 'replace')
212 owner_id = owner_id.decode('ascii', 'replace')
213 return owner_id
213 return owner_id
214
214
215 def acquire(self, blocking=True, timeout=None):
215 def acquire(self, blocking=True, timeout=None):
216 """
216 """
217 :param blocking:
217 :param blocking:
218 Boolean value specifying whether lock should be blocking or not.
218 Boolean value specifying whether lock should be blocking or not.
219 :param timeout:
219 :param timeout:
220 An integer value specifying the maximum number of seconds to block.
220 An integer value specifying the maximum number of seconds to block.
221 """
221 """
222 logger = loggers["acquire"]
222 logger = loggers["acquire"]
223
223
224 logger.debug("Getting %r ...", self._name)
224 logger.debug("Getting %r ...", self._name)
225
225
226 if self._held:
226 if self._held:
227 raise AlreadyAcquired("Already acquired from this Lock instance.")
227 raise AlreadyAcquired("Already acquired from this Lock instance.")
228
228
229 if not blocking and timeout is not None:
229 if not blocking and timeout is not None:
230 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
230 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
231
231
232 if timeout:
232 if timeout:
233 timeout = int(timeout)
233 timeout = int(timeout)
234 if timeout < 0:
234 if timeout < 0:
235 raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout)
235 raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout)
236
236
237 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
237 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
238 raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire))
238 raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire))
239
239
240 busy = True
240 busy = True
241 blpop_timeout = timeout or self._expire or 0
241 blpop_timeout = timeout or self._expire or 0
242 timed_out = False
242 timed_out = False
243 while busy:
243 while busy:
244 busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
244 busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
245 if busy:
245 if busy:
246 if timed_out:
246 if timed_out:
247 return False
247 return False
248 elif blocking:
248 elif blocking:
249 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
249 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
250 else:
250 else:
251 logger.warning("Failed to get %r.", self._name)
251 logger.warning("Failed to get %r.", self._name)
252 return False
252 return False
253
253
254 logger.info("Got lock for %r.", self._name)
254 logger.info("Got lock for %r.", self._name)
255 if self._lock_renewal_interval is not None:
255 if self._lock_renewal_interval is not None:
256 self._start_lock_renewer()
256 self._start_lock_renewer()
257 return True
257 return True
258
258
259 def extend(self, expire=None):
259 def extend(self, expire=None):
260 """Extends expiration time of the lock.
260 """Extends expiration time of the lock.
261
261
262 :param expire:
262 :param expire:
263 New expiration time. If ``None`` - `expire` provided during
263 New expiration time. If ``None`` - `expire` provided during
264 lock initialization will be taken.
264 lock initialization will be taken.
265 """
265 """
266 if expire:
266 if expire:
267 expire = int(expire)
267 expire = int(expire)
268 if expire < 0:
268 if expire < 0:
269 raise ValueError("A negative expire is not acceptable.")
269 raise ValueError("A negative expire is not acceptable.")
270 elif self._expire is not None:
270 elif self._expire is not None:
271 expire = self._expire
271 expire = self._expire
272 else:
272 else:
273 raise TypeError(
273 raise TypeError(
274 "To extend a lock 'expire' must be provided as an "
274 "To extend a lock 'expire' must be provided as an "
275 "argument to extend() method or at initialization time."
275 "argument to extend() method or at initialization time."
276 )
276 )
277
277
278 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
278 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
279 if error == 1:
279 if error == 1:
280 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
280 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
281 elif error == 2:
281 elif error == 2:
282 raise NotExpirable("Lock %s has no assigned expiration time" % self._name)
282 raise NotExpirable("Lock %s has no assigned expiration time" % self._name)
283 elif error:
283 elif error:
284 raise RuntimeError("Unsupported error code %s from EXTEND script" % error)
284 raise RuntimeError("Unsupported error code %s from EXTEND script" % error)
285
285
286 @staticmethod
286 @staticmethod
287 def _lock_renewer(lockref, interval, stop):
287 def _lock_renewer(lockref, interval, stop):
288 """
288 """
289 Renew the lock key in redis every `interval` seconds for as long
289 Renew the lock key in redis every `interval` seconds for as long
290 as `self._lock_renewal_thread.should_exit` is False.
290 as `self._lock_renewal_thread.should_exit` is False.
291 """
291 """
292 while not stop.wait(timeout=interval):
292 while not stop.wait(timeout=interval):
293 loggers["refresh.thread.start"].debug("Refreshing lock")
293 loggers["refresh.thread.start"].debug("Refreshing lock")
294 lock = lockref()
294 lock = lockref()
295 if lock is None:
295 if lock is None:
296 loggers["refresh.thread.stop"].debug(
296 loggers["refresh.thread.stop"].debug(
297 "The lock no longer exists, stopping lock refreshing"
297 "The lock no longer exists, stopping lock refreshing"
298 )
298 )
299 break
299 break
300 lock.extend(expire=lock._expire)
300 lock.extend(expire=lock._expire)
301 del lock
301 del lock
302 loggers["refresh.thread.exit"].debug("Exit requested, stopping lock refreshing")
302 loggers["refresh.thread.exit"].debug("Exit requested, stopping lock refreshing")
303
303
304 def _start_lock_renewer(self):
304 def _start_lock_renewer(self):
305 """
305 """
306 Starts the lock refresher thread.
306 Starts the lock refresher thread.
307 """
307 """
308 if self._lock_renewal_thread is not None:
308 if self._lock_renewal_thread is not None:
309 raise AlreadyStarted("Lock refresh thread already started")
309 raise AlreadyStarted("Lock refresh thread already started")
310
310
311 loggers["refresh.start"].debug(
311 loggers["refresh.start"].debug(
312 "Starting thread to refresh lock every %s seconds",
312 "Starting thread to refresh lock every %s seconds",
313 self._lock_renewal_interval
313 self._lock_renewal_interval
314 )
314 )
315 self._lock_renewal_stop = threading.Event()
315 self._lock_renewal_stop = threading.Event()
316 self._lock_renewal_thread = threading.Thread(
316 self._lock_renewal_thread = threading.Thread(
317 group=None,
317 group=None,
318 target=self._lock_renewer,
318 target=self._lock_renewer,
319 kwargs={'lockref': weakref.ref(self),
319 kwargs={'lockref': weakref.ref(self),
320 'interval': self._lock_renewal_interval,
320 'interval': self._lock_renewal_interval,
321 'stop': self._lock_renewal_stop}
321 'stop': self._lock_renewal_stop}
322 )
322 )
323 self._lock_renewal_thread.setDaemon(True)
323 self._lock_renewal_thread.setDaemon(True)
324 self._lock_renewal_thread.start()
324 self._lock_renewal_thread.start()
325
325
326 def _stop_lock_renewer(self):
326 def _stop_lock_renewer(self):
327 """
327 """
328 Stop the lock renewer.
328 Stop the lock renewer.
329
329
330 This signals the renewal thread and waits for its exit.
330 This signals the renewal thread and waits for its exit.
331 """
331 """
332 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
332 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
333 return
333 return
334 loggers["refresh.shutdown"].debug("Signalling the lock refresher to stop")
334 loggers["refresh.shutdown"].debug("Signalling the lock refresher to stop")
335 self._lock_renewal_stop.set()
335 self._lock_renewal_stop.set()
336 self._lock_renewal_thread.join()
336 self._lock_renewal_thread.join()
337 self._lock_renewal_thread = None
337 self._lock_renewal_thread = None
338 loggers["refresh.exit"].debug("Lock refresher has stopped")
338 loggers["refresh.exit"].debug("Lock refresher has stopped")
339
339
340 def __enter__(self):
340 def __enter__(self):
341 acquired = self.acquire(blocking=True)
341 acquired = self.acquire(blocking=True)
342 assert acquired, "Lock wasn't acquired, but blocking=True"
342 assert acquired, "Lock wasn't acquired, but blocking=True"
343 return self
343 return self
344
344
345 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
345 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
346 self.release()
346 self.release()
347
347
348 def release(self):
348 def release(self):
349 """Releases the lock, that was acquired with the same object.
349 """Releases the lock, that was acquired with the same object.
350
350
351 .. note::
351 .. note::
352
352
353 If you want to release a lock that you acquired in a different place you have two choices:
353 If you want to release a lock that you acquired in a different place you have two choices:
354
354
355 * Use ``Lock("name", id=id_from_other_place).release()``
355 * Use ``Lock("name", id=id_from_other_place).release()``
356 * Use ``Lock("name").reset()``
356 * Use ``Lock("name").reset()``
357 """
357 """
358 if self._lock_renewal_thread is not None:
358 if self._lock_renewal_thread is not None:
359 self._stop_lock_renewer()
359 self._stop_lock_renewer()
360 loggers["release"].debug("Releasing %r.", self._name)
360 loggers["release"].debug("Releasing %r.", self._name)
361 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
361 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
362 if error == 1:
362 if error == 1:
363 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
363 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
364 elif error:
364 elif error:
365 raise RuntimeError("Unsupported error code %s from EXTEND script." % error)
365 raise RuntimeError("Unsupported error code %s from EXTEND script." % error)
366
366
367 def locked(self):
367 def locked(self):
368 """
368 """
369 Return true if the lock is acquired.
369 Return true if the lock is acquired.
370
370
371 Checks that lock with same name already exists. This method returns true, even if
371 Checks that lock with same name already exists. This method returns true, even if
372 lock have another id.
372 lock have another id.
373 """
373 """
374 return self._client.exists(self._name) == 1
374 return self._client.exists(self._name) == 1
375
375
376
376
377 reset_all_script = None
377 reset_all_script = None
378
378
379
379
380 def reset_all(redis_client):
380 def reset_all(redis_client):
381 """
381 """
382 Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
382 Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
383
383
384 :param redis_client:
384 :param redis_client:
385 An instance of :class:`~StrictRedis`.
385 An instance of :class:`~StrictRedis`.
386 """
386 """
387 Lock.register_scripts(redis_client)
387 Lock.register_scripts(redis_client)
388
388
389 reset_all_script(client=redis_client) # noqa
389 reset_all_script(client=redis_client) # noqa
@@ -1,259 +1,260 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2020 RhodeCode GmbH
2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import time
18 import time
19 import errno
19 import errno
20 import logging
20 import logging
21
21
22 import msgpack
22 import msgpack
23 import redis
23 import redis
24
24
25 from dogpile.cache.api import CachedValue
25 from dogpile.cache.api import CachedValue
26 from dogpile.cache.backends import memory as memory_backend
26 from dogpile.cache.backends import memory as memory_backend
27 from dogpile.cache.backends import file as file_backend
27 from dogpile.cache.backends import file as file_backend
28 from dogpile.cache.backends import redis as redis_backend
28 from dogpile.cache.backends import redis as redis_backend
29 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
29 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
30 from dogpile.cache.util import memoized_property
30 from dogpile.cache.util import memoized_property
31
31
32 from vcsserver.lib.memory_lru_dict import LRUDict, LRUDictDebug
32 from vcsserver.lib.memory_lru_dict import LRUDict, LRUDictDebug
33
33
34
34
35 _default_max_size = 1024
35 _default_max_size = 1024
36
36
37 log = logging.getLogger(__name__)
37 log = logging.getLogger(__name__)
38
38
39
39
40 class LRUMemoryBackend(memory_backend.MemoryBackend):
40 class LRUMemoryBackend(memory_backend.MemoryBackend):
41 key_prefix = 'lru_mem_backend'
41 key_prefix = 'lru_mem_backend'
42 pickle_values = False
42 pickle_values = False
43
43
44 def __init__(self, arguments):
44 def __init__(self, arguments):
45 max_size = arguments.pop('max_size', _default_max_size)
45 max_size = arguments.pop('max_size', _default_max_size)
46
46
47 LRUDictClass = LRUDict
47 LRUDictClass = LRUDict
48 if arguments.pop('log_key_count', None):
48 if arguments.pop('log_key_count', None):
49 LRUDictClass = LRUDictDebug
49 LRUDictClass = LRUDictDebug
50
50
51 arguments['cache_dict'] = LRUDictClass(max_size)
51 arguments['cache_dict'] = LRUDictClass(max_size)
52 super(LRUMemoryBackend, self).__init__(arguments)
52 super(LRUMemoryBackend, self).__init__(arguments)
53
53
54 def delete(self, key):
54 def delete(self, key):
55 try:
55 try:
56 del self._cache[key]
56 del self._cache[key]
57 except KeyError:
57 except KeyError:
58 # we don't care if key isn't there at deletion
58 # we don't care if key isn't there at deletion
59 pass
59 pass
60
60
61 def delete_multi(self, keys):
61 def delete_multi(self, keys):
62 for key in keys:
62 for key in keys:
63 self.delete(key)
63 self.delete(key)
64
64
65
65
66 class PickleSerializer(object):
66 class PickleSerializer(object):
67
67
68 def _dumps(self, value, safe=False):
68 def _dumps(self, value, safe=False):
69 try:
69 try:
70 return compat.pickle.dumps(value)
70 return compat.pickle.dumps(value)
71 except Exception:
71 except Exception:
72 if safe:
72 if safe:
73 return NO_VALUE
73 return NO_VALUE
74 else:
74 else:
75 raise
75 raise
76
76
77 def _loads(self, value, safe=True):
77 def _loads(self, value, safe=True):
78 try:
78 try:
79 return compat.pickle.loads(value)
79 return compat.pickle.loads(value)
80 except Exception:
80 except Exception:
81 if safe:
81 if safe:
82 return NO_VALUE
82 return NO_VALUE
83 else:
83 else:
84 raise
84 raise
85
85
86
86
87 class MsgPackSerializer(object):
87 class MsgPackSerializer(object):
88
88
89 def _dumps(self, value, safe=False):
89 def _dumps(self, value, safe=False):
90 try:
90 try:
91 return msgpack.packb(value)
91 return msgpack.packb(value)
92 except Exception:
92 except Exception:
93 if safe:
93 if safe:
94 return NO_VALUE
94 return NO_VALUE
95 else:
95 else:
96 raise
96 raise
97
97
98 def _loads(self, value, safe=True):
98 def _loads(self, value, safe=True):
99 """
99 """
100 pickle maintained the `CachedValue` wrapper of the tuple
100 pickle maintained the `CachedValue` wrapper of the tuple
101 msgpack does not, so it must be added back in.
101 msgpack does not, so it must be added back in.
102 """
102 """
103 try:
103 try:
104 value = msgpack.unpackb(value, use_list=False)
104 value = msgpack.unpackb(value, use_list=False)
105 return CachedValue(*value)
105 return CachedValue(*value)
106 except Exception:
106 except Exception:
107 if safe:
107 if safe:
108 return NO_VALUE
108 return NO_VALUE
109 else:
109 else:
110 raise
110 raise
111
111
112
112
113 import fcntl
113 import fcntl
114 flock_org = fcntl.flock
114 flock_org = fcntl.flock
115
115
116
116
117 class CustomLockFactory(FileLock):
117 class CustomLockFactory(FileLock):
118
118
119 pass
119 pass
120
120
121
121
122 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
122 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
123 key_prefix = 'file_backend'
123 key_prefix = 'file_backend'
124
124
125 def __init__(self, arguments):
125 def __init__(self, arguments):
126 arguments['lock_factory'] = CustomLockFactory
126 arguments['lock_factory'] = CustomLockFactory
127 super(FileNamespaceBackend, self).__init__(arguments)
127 super(FileNamespaceBackend, self).__init__(arguments)
128
128
129 def __repr__(self):
129 def __repr__(self):
130 return '{} `{}`'.format(self.__class__, self.filename)
130 return '{} `{}`'.format(self.__class__, self.filename)
131
131
132 def list_keys(self, prefix=''):
132 def list_keys(self, prefix=''):
133 prefix = '{}:{}'.format(self.key_prefix, prefix)
133 prefix = '{}:{}'.format(self.key_prefix, prefix)
134
134
135 def cond(v):
135 def cond(v):
136 if not prefix:
136 if not prefix:
137 return True
137 return True
138
138
139 if v.startswith(prefix):
139 if v.startswith(prefix):
140 return True
140 return True
141 return False
141 return False
142
142
143 with self._dbm_file(True) as dbm:
143 with self._dbm_file(True) as dbm:
144
144
145 return filter(cond, dbm.keys())
145 return filter(cond, dbm.keys())
146
146
147 def get_store(self):
147 def get_store(self):
148 return self.filename
148 return self.filename
149
149
150 def get(self, key):
150 def get(self, key):
151 with self._dbm_file(False) as dbm:
151 with self._dbm_file(False) as dbm:
152 if hasattr(dbm, 'get'):
152 if hasattr(dbm, 'get'):
153 value = dbm.get(key, NO_VALUE)
153 value = dbm.get(key, NO_VALUE)
154 else:
154 else:
155 # gdbm objects lack a .get method
155 # gdbm objects lack a .get method
156 try:
156 try:
157 value = dbm[key]
157 value = dbm[key]
158 except KeyError:
158 except KeyError:
159 value = NO_VALUE
159 value = NO_VALUE
160 if value is not NO_VALUE:
160 if value is not NO_VALUE:
161 value = self._loads(value)
161 value = self._loads(value)
162 return value
162 return value
163
163
164 def set(self, key, value):
164 def set(self, key, value):
165 with self._dbm_file(True) as dbm:
165 with self._dbm_file(True) as dbm:
166 dbm[key] = self._dumps(value)
166 dbm[key] = self._dumps(value)
167
167
168 def set_multi(self, mapping):
168 def set_multi(self, mapping):
169 with self._dbm_file(True) as dbm:
169 with self._dbm_file(True) as dbm:
170 for key, value in mapping.items():
170 for key, value in mapping.items():
171 dbm[key] = self._dumps(value)
171 dbm[key] = self._dumps(value)
172
172
173
173
174 class BaseRedisBackend(redis_backend.RedisBackend):
174 class BaseRedisBackend(redis_backend.RedisBackend):
175
175
176 def _create_client(self):
176 def _create_client(self):
177 args = {}
177 args = {}
178
178
179 if self.url is not None:
179 if self.url is not None:
180 args.update(url=self.url)
180 args.update(url=self.url)
181
181
182 else:
182 else:
183 args.update(
183 args.update(
184 host=self.host, password=self.password,
184 host=self.host, password=self.password,
185 port=self.port, db=self.db
185 port=self.port, db=self.db
186 )
186 )
187
187
188 connection_pool = redis.ConnectionPool(**args)
188 connection_pool = redis.ConnectionPool(**args)
189
189
190 return redis.StrictRedis(connection_pool=connection_pool)
190 return redis.StrictRedis(connection_pool=connection_pool)
191
191
192 def list_keys(self, prefix=''):
192 def list_keys(self, prefix=''):
193 prefix = '{}:{}*'.format(self.key_prefix, prefix)
193 prefix = '{}:{}*'.format(self.key_prefix, prefix)
194 return self.client.keys(prefix)
194 return self.client.keys(prefix)
195
195
196 def get_store(self):
196 def get_store(self):
197 return self.client.connection_pool
197 return self.client.connection_pool
198
198
199 def get(self, key):
199 def get(self, key):
200 value = self.client.get(key)
200 value = self.client.get(key)
201 if value is None:
201 if value is None:
202 return NO_VALUE
202 return NO_VALUE
203 return self._loads(value)
203 return self._loads(value)
204
204
205 def get_multi(self, keys):
205 def get_multi(self, keys):
206 if not keys:
206 if not keys:
207 return []
207 return []
208 values = self.client.mget(keys)
208 values = self.client.mget(keys)
209 loads = self._loads
209 loads = self._loads
210 return [
210 return [
211 loads(v) if v is not None else NO_VALUE
211 loads(v) if v is not None else NO_VALUE
212 for v in values]
212 for v in values]
213
213
214 def set(self, key, value):
214 def set(self, key, value):
215 if self.redis_expiration_time:
215 if self.redis_expiration_time:
216 self.client.setex(key, self.redis_expiration_time,
216 self.client.setex(key, self.redis_expiration_time,
217 self._dumps(value))
217 self._dumps(value))
218 else:
218 else:
219 self.client.set(key, self._dumps(value))
219 self.client.set(key, self._dumps(value))
220
220
221 def set_multi(self, mapping):
221 def set_multi(self, mapping):
222 dumps = self._dumps
222 dumps = self._dumps
223 mapping = dict(
223 mapping = dict(
224 (k, dumps(v))
224 (k, dumps(v))
225 for k, v in mapping.items()
225 for k, v in mapping.items()
226 )
226 )
227
227
228 if not self.redis_expiration_time:
228 if not self.redis_expiration_time:
229 self.client.mset(mapping)
229 self.client.mset(mapping)
230 else:
230 else:
231 pipe = self.client.pipeline()
231 pipe = self.client.pipeline()
232 for key, value in mapping.items():
232 for key, value in mapping.items():
233 pipe.setex(key, self.redis_expiration_time, value)
233 pipe.setex(key, self.redis_expiration_time, value)
234 pipe.execute()
234 pipe.execute()
235
235
236 def get_mutex(self, key):
236 def get_mutex(self, key):
237 if self.distributed_lock:
237 if self.distributed_lock:
238 import redis_lock
238 import redis_lock
239 lock_key = redis_backend.u('_lock_{0}').format(key)
239 lock_key = redis_backend.u('_lock_{0}').format(key)
240 log.debug('Trying to acquire Redis lock for key %s', lock_key)
240 log.debug('Trying to acquire Redis lock for key %s', lock_key)
241 lock = redis_lock.Lock(
241 lock = redis_lock.Lock(
242 redis_client=self.client,
242 redis_client=self.client,
243 name=lock_key,
243 name=lock_key,
244 expire=self.lock_timeout,
244 expire=self.lock_timeout,
245 auto_renewal=False,
245 auto_renewal=False,
246 strict=True,
246 )
247 )
247 return lock
248 return lock
248 else:
249 else:
249 return None
250 return None
250
251
251
252
252 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
253 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
253 key_prefix = 'redis_pickle_backend'
254 key_prefix = 'redis_pickle_backend'
254 pass
255 pass
255
256
256
257
257 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
258 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
258 key_prefix = 'redis_msgpack_backend'
259 key_prefix = 'redis_msgpack_backend'
259 pass
260 pass
General Comments 0
You need to be logged in to leave comments. Login now