##// END OF EJS Templates
py3: fixes for python3
super-admin -
r1045:7571f5a6 python3
parent child Browse files
Show More
@@ -1,390 +1,384 b''
1 import sys
1 import sys
2 import threading
2 import threading
3 import weakref
3 import weakref
4 from base64 import b64encode
4 from base64 import b64encode
5 from logging import getLogger
5 from logging import getLogger
6 from os import urandom
6 from os import urandom
7
7
8 from redis import StrictRedis
8 from redis import StrictRedis
9
9
10 __version__ = '3.7.0'
10 __version__ = '3.7.0'
11
11
12 loggers = {
12 loggers = {
13 k: getLogger("vcsserver." + ".".join((__name__, k)))
13 k: getLogger("vcsserver." + ".".join((__name__, k)))
14 for k in [
14 for k in [
15 "acquire",
15 "acquire",
16 "refresh.thread.start",
16 "refresh.thread.start",
17 "refresh.thread.stop",
17 "refresh.thread.stop",
18 "refresh.thread.exit",
18 "refresh.thread.exit",
19 "refresh.start",
19 "refresh.start",
20 "refresh.shutdown",
20 "refresh.shutdown",
21 "refresh.exit",
21 "refresh.exit",
22 "release",
22 "release",
23 ]
23 ]
24 }
24 }
25
25
26 PY3 = sys.version_info[0] == 3
27
28 if PY3:
29 text_type = str
26 text_type = str
30 binary_type = bytes
27 binary_type = bytes
31 else:
32 text_type = unicode # noqa
33 binary_type = str
34
28
35
29
36 # Check if the id match. If not, return an error code.
30 # Check if the id match. If not, return an error code.
37 UNLOCK_SCRIPT = b"""
31 UNLOCK_SCRIPT = b"""
38 if redis.call("get", KEYS[1]) ~= ARGV[1] then
32 if redis.call("get", KEYS[1]) ~= ARGV[1] then
39 return 1
33 return 1
40 else
34 else
41 redis.call("del", KEYS[2])
35 redis.call("del", KEYS[2])
42 redis.call("lpush", KEYS[2], 1)
36 redis.call("lpush", KEYS[2], 1)
43 redis.call("pexpire", KEYS[2], ARGV[2])
37 redis.call("pexpire", KEYS[2], ARGV[2])
44 redis.call("del", KEYS[1])
38 redis.call("del", KEYS[1])
45 return 0
39 return 0
46 end
40 end
47 """
41 """
48
42
49 # Covers both cases when key doesn't exist and doesn't equal to lock's id
43 # Covers both cases when key doesn't exist and doesn't equal to lock's id
50 EXTEND_SCRIPT = b"""
44 EXTEND_SCRIPT = b"""
51 if redis.call("get", KEYS[1]) ~= ARGV[1] then
45 if redis.call("get", KEYS[1]) ~= ARGV[1] then
52 return 1
46 return 1
53 elseif redis.call("ttl", KEYS[1]) < 0 then
47 elseif redis.call("ttl", KEYS[1]) < 0 then
54 return 2
48 return 2
55 else
49 else
56 redis.call("expire", KEYS[1], ARGV[2])
50 redis.call("expire", KEYS[1], ARGV[2])
57 return 0
51 return 0
58 end
52 end
59 """
53 """
60
54
61 RESET_SCRIPT = b"""
55 RESET_SCRIPT = b"""
62 redis.call('del', KEYS[2])
56 redis.call('del', KEYS[2])
63 redis.call('lpush', KEYS[2], 1)
57 redis.call('lpush', KEYS[2], 1)
64 redis.call('pexpire', KEYS[2], ARGV[2])
58 redis.call('pexpire', KEYS[2], ARGV[2])
65 return redis.call('del', KEYS[1])
59 return redis.call('del', KEYS[1])
66 """
60 """
67
61
68 RESET_ALL_SCRIPT = b"""
62 RESET_ALL_SCRIPT = b"""
69 local locks = redis.call('keys', 'lock:*')
63 local locks = redis.call('keys', 'lock:*')
70 local signal
64 local signal
71 for _, lock in pairs(locks) do
65 for _, lock in pairs(locks) do
72 signal = 'lock-signal:' .. string.sub(lock, 6)
66 signal = 'lock-signal:' .. string.sub(lock, 6)
73 redis.call('del', signal)
67 redis.call('del', signal)
74 redis.call('lpush', signal, 1)
68 redis.call('lpush', signal, 1)
75 redis.call('expire', signal, 1)
69 redis.call('expire', signal, 1)
76 redis.call('del', lock)
70 redis.call('del', lock)
77 end
71 end
78 return #locks
72 return #locks
79 """
73 """
80
74
81
75
82 class AlreadyAcquired(RuntimeError):
76 class AlreadyAcquired(RuntimeError):
83 pass
77 pass
84
78
85
79
86 class NotAcquired(RuntimeError):
80 class NotAcquired(RuntimeError):
87 pass
81 pass
88
82
89
83
90 class AlreadyStarted(RuntimeError):
84 class AlreadyStarted(RuntimeError):
91 pass
85 pass
92
86
93
87
94 class TimeoutNotUsable(RuntimeError):
88 class TimeoutNotUsable(RuntimeError):
95 pass
89 pass
96
90
97
91
98 class InvalidTimeout(RuntimeError):
92 class InvalidTimeout(RuntimeError):
99 pass
93 pass
100
94
101
95
102 class TimeoutTooLarge(RuntimeError):
96 class TimeoutTooLarge(RuntimeError):
103 pass
97 pass
104
98
105
99
106 class NotExpirable(RuntimeError):
100 class NotExpirable(RuntimeError):
107 pass
101 pass
108
102
109
103
110 class Lock(object):
104 class Lock(object):
111 """
105 """
112 A Lock context manager implemented via redis SETNX/BLPOP.
106 A Lock context manager implemented via redis SETNX/BLPOP.
113 """
107 """
114 unlock_script = None
108 unlock_script = None
115 extend_script = None
109 extend_script = None
116 reset_script = None
110 reset_script = None
117 reset_all_script = None
111 reset_all_script = None
118
112
119 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
113 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
120 """
114 """
121 :param redis_client:
115 :param redis_client:
122 An instance of :class:`~StrictRedis`.
116 An instance of :class:`~StrictRedis`.
123 :param name:
117 :param name:
124 The name (redis key) the lock should have.
118 The name (redis key) the lock should have.
125 :param expire:
119 :param expire:
126 The lock expiry time in seconds. If left at the default (None)
120 The lock expiry time in seconds. If left at the default (None)
127 the lock will not expire.
121 the lock will not expire.
128 :param id:
122 :param id:
129 The ID (redis value) the lock should have. A random value is
123 The ID (redis value) the lock should have. A random value is
130 generated when left at the default.
124 generated when left at the default.
131
125
132 Note that if you specify this then the lock is marked as "held". Acquires
126 Note that if you specify this then the lock is marked as "held". Acquires
133 won't be possible.
127 won't be possible.
134 :param auto_renewal:
128 :param auto_renewal:
135 If set to ``True``, Lock will automatically renew the lock so that it
129 If set to ``True``, Lock will automatically renew the lock so that it
136 doesn't expire for as long as the lock is held (acquire() called
130 doesn't expire for as long as the lock is held (acquire() called
137 or running in a context manager).
131 or running in a context manager).
138
132
139 Implementation note: Renewal will happen using a daemon thread with
133 Implementation note: Renewal will happen using a daemon thread with
140 an interval of ``expire*2/3``. If wishing to use a different renewal
134 an interval of ``expire*2/3``. If wishing to use a different renewal
141 time, subclass Lock, call ``super().__init__()`` then set
135 time, subclass Lock, call ``super().__init__()`` then set
142 ``self._lock_renewal_interval`` to your desired interval.
136 ``self._lock_renewal_interval`` to your desired interval.
143 :param strict:
137 :param strict:
144 If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
138 If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
145 :param signal_expire:
139 :param signal_expire:
146 Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
140 Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
147 """
141 """
148 if strict and not isinstance(redis_client, StrictRedis):
142 if strict and not isinstance(redis_client, StrictRedis):
149 raise ValueError("redis_client must be instance of StrictRedis. "
143 raise ValueError("redis_client must be instance of StrictRedis. "
150 "Use strict=False if you know what you're doing.")
144 "Use strict=False if you know what you're doing.")
151 if auto_renewal and expire is None:
145 if auto_renewal and expire is None:
152 raise ValueError("Expire may not be None when auto_renewal is set")
146 raise ValueError("Expire may not be None when auto_renewal is set")
153
147
154 self._client = redis_client
148 self._client = redis_client
155
149
156 if expire:
150 if expire:
157 expire = int(expire)
151 expire = int(expire)
158 if expire < 0:
152 if expire < 0:
159 raise ValueError("A negative expire is not acceptable.")
153 raise ValueError("A negative expire is not acceptable.")
160 else:
154 else:
161 expire = None
155 expire = None
162 self._expire = expire
156 self._expire = expire
163
157
164 self._signal_expire = signal_expire
158 self._signal_expire = signal_expire
165 if id is None:
159 if id is None:
166 self._id = b64encode(urandom(18)).decode('ascii')
160 self._id = b64encode(urandom(18)).decode('ascii')
167 elif isinstance(id, binary_type):
161 elif isinstance(id, binary_type):
168 try:
162 try:
169 self._id = id.decode('ascii')
163 self._id = id.decode('ascii')
170 except UnicodeDecodeError:
164 except UnicodeDecodeError:
171 self._id = b64encode(id).decode('ascii')
165 self._id = b64encode(id).decode('ascii')
172 elif isinstance(id, text_type):
166 elif isinstance(id, text_type):
173 self._id = id
167 self._id = id
174 else:
168 else:
175 raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id))
169 raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id))
176 self._name = 'lock:' + name
170 self._name = 'lock:' + name
177 self._signal = 'lock-signal:' + name
171 self._signal = 'lock-signal:' + name
178 self._lock_renewal_interval = (float(expire) * 2 / 3
172 self._lock_renewal_interval = (float(expire) * 2 / 3
179 if auto_renewal
173 if auto_renewal
180 else None)
174 else None)
181 self._lock_renewal_thread = None
175 self._lock_renewal_thread = None
182
176
183 self.register_scripts(redis_client)
177 self.register_scripts(redis_client)
184
178
185 @classmethod
179 @classmethod
186 def register_scripts(cls, redis_client):
180 def register_scripts(cls, redis_client):
187 global reset_all_script
181 global reset_all_script
188 if reset_all_script is None:
182 if reset_all_script is None:
189 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
183 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
190 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
184 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
191 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
185 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
192 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
186 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
193 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
187 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
194
188
195 @property
189 @property
196 def _held(self):
190 def _held(self):
197 return self.id == self.get_owner_id()
191 return self.id == self.get_owner_id()
198
192
199 def reset(self):
193 def reset(self):
200 """
194 """
201 Forcibly deletes the lock. Use this with care.
195 Forcibly deletes the lock. Use this with care.
202 """
196 """
203 self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
197 self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
204
198
205 @property
199 @property
206 def id(self):
200 def id(self):
207 return self._id
201 return self._id
208
202
209 def get_owner_id(self):
203 def get_owner_id(self):
210 owner_id = self._client.get(self._name)
204 owner_id = self._client.get(self._name)
211 if isinstance(owner_id, binary_type):
205 if isinstance(owner_id, binary_type):
212 owner_id = owner_id.decode('ascii', 'replace')
206 owner_id = owner_id.decode('ascii', 'replace')
213 return owner_id
207 return owner_id
214
208
215 def acquire(self, blocking=True, timeout=None):
209 def acquire(self, blocking=True, timeout=None):
216 """
210 """
217 :param blocking:
211 :param blocking:
218 Boolean value specifying whether lock should be blocking or not.
212 Boolean value specifying whether lock should be blocking or not.
219 :param timeout:
213 :param timeout:
220 An integer value specifying the maximum number of seconds to block.
214 An integer value specifying the maximum number of seconds to block.
221 """
215 """
222 logger = loggers["acquire"]
216 logger = loggers["acquire"]
223
217
224 logger.debug("Getting blocking: %s acquire on %r ...", blocking, self._name)
218 logger.debug("Getting blocking: %s acquire on %r ...", blocking, self._name)
225
219
226 if self._held:
220 if self._held:
227 owner_id = self.get_owner_id()
221 owner_id = self.get_owner_id()
228 raise AlreadyAcquired("Already acquired from this Lock instance. Lock id: {}".format(owner_id))
222 raise AlreadyAcquired("Already acquired from this Lock instance. Lock id: {}".format(owner_id))
229
223
230 if not blocking and timeout is not None:
224 if not blocking and timeout is not None:
231 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
225 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
232
226
233 if timeout:
227 if timeout:
234 timeout = int(timeout)
228 timeout = int(timeout)
235 if timeout < 0:
229 if timeout < 0:
236 raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout)
230 raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout)
237
231
238 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
232 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
239 raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire))
233 raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire))
240
234
241 busy = True
235 busy = True
242 blpop_timeout = timeout or self._expire or 0
236 blpop_timeout = timeout or self._expire or 0
243 timed_out = False
237 timed_out = False
244 while busy:
238 while busy:
245 busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
239 busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
246 if busy:
240 if busy:
247 if timed_out:
241 if timed_out:
248 return False
242 return False
249 elif blocking:
243 elif blocking:
250 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
244 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
251 else:
245 else:
252 logger.warning("Failed to get %r.", self._name)
246 logger.warning("Failed to get %r.", self._name)
253 return False
247 return False
254
248
255 logger.debug("Got lock for %r.", self._name)
249 logger.debug("Got lock for %r.", self._name)
256 if self._lock_renewal_interval is not None:
250 if self._lock_renewal_interval is not None:
257 self._start_lock_renewer()
251 self._start_lock_renewer()
258 return True
252 return True
259
253
260 def extend(self, expire=None):
254 def extend(self, expire=None):
261 """Extends expiration time of the lock.
255 """Extends expiration time of the lock.
262
256
263 :param expire:
257 :param expire:
264 New expiration time. If ``None`` - `expire` provided during
258 New expiration time. If ``None`` - `expire` provided during
265 lock initialization will be taken.
259 lock initialization will be taken.
266 """
260 """
267 if expire:
261 if expire:
268 expire = int(expire)
262 expire = int(expire)
269 if expire < 0:
263 if expire < 0:
270 raise ValueError("A negative expire is not acceptable.")
264 raise ValueError("A negative expire is not acceptable.")
271 elif self._expire is not None:
265 elif self._expire is not None:
272 expire = self._expire
266 expire = self._expire
273 else:
267 else:
274 raise TypeError(
268 raise TypeError(
275 "To extend a lock 'expire' must be provided as an "
269 "To extend a lock 'expire' must be provided as an "
276 "argument to extend() method or at initialization time."
270 "argument to extend() method or at initialization time."
277 )
271 )
278
272
279 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
273 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
280 if error == 1:
274 if error == 1:
281 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
275 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
282 elif error == 2:
276 elif error == 2:
283 raise NotExpirable("Lock %s has no assigned expiration time" % self._name)
277 raise NotExpirable("Lock %s has no assigned expiration time" % self._name)
284 elif error:
278 elif error:
285 raise RuntimeError("Unsupported error code %s from EXTEND script" % error)
279 raise RuntimeError("Unsupported error code %s from EXTEND script" % error)
286
280
287 @staticmethod
281 @staticmethod
288 def _lock_renewer(lockref, interval, stop):
282 def _lock_renewer(lockref, interval, stop):
289 """
283 """
290 Renew the lock key in redis every `interval` seconds for as long
284 Renew the lock key in redis every `interval` seconds for as long
291 as `self._lock_renewal_thread.should_exit` is False.
285 as `self._lock_renewal_thread.should_exit` is False.
292 """
286 """
293 while not stop.wait(timeout=interval):
287 while not stop.wait(timeout=interval):
294 loggers["refresh.thread.start"].debug("Refreshing lock")
288 loggers["refresh.thread.start"].debug("Refreshing lock")
295 lock = lockref()
289 lock = lockref()
296 if lock is None:
290 if lock is None:
297 loggers["refresh.thread.stop"].debug(
291 loggers["refresh.thread.stop"].debug(
298 "The lock no longer exists, stopping lock refreshing"
292 "The lock no longer exists, stopping lock refreshing"
299 )
293 )
300 break
294 break
301 lock.extend(expire=lock._expire)
295 lock.extend(expire=lock._expire)
302 del lock
296 del lock
303 loggers["refresh.thread.exit"].debug("Exit requested, stopping lock refreshing")
297 loggers["refresh.thread.exit"].debug("Exit requested, stopping lock refreshing")
304
298
305 def _start_lock_renewer(self):
299 def _start_lock_renewer(self):
306 """
300 """
307 Starts the lock refresher thread.
301 Starts the lock refresher thread.
308 """
302 """
309 if self._lock_renewal_thread is not None:
303 if self._lock_renewal_thread is not None:
310 raise AlreadyStarted("Lock refresh thread already started")
304 raise AlreadyStarted("Lock refresh thread already started")
311
305
312 loggers["refresh.start"].debug(
306 loggers["refresh.start"].debug(
313 "Starting thread to refresh lock every %s seconds",
307 "Starting thread to refresh lock every %s seconds",
314 self._lock_renewal_interval
308 self._lock_renewal_interval
315 )
309 )
316 self._lock_renewal_stop = threading.Event()
310 self._lock_renewal_stop = threading.Event()
317 self._lock_renewal_thread = threading.Thread(
311 self._lock_renewal_thread = threading.Thread(
318 group=None,
312 group=None,
319 target=self._lock_renewer,
313 target=self._lock_renewer,
320 kwargs={'lockref': weakref.ref(self),
314 kwargs={'lockref': weakref.ref(self),
321 'interval': self._lock_renewal_interval,
315 'interval': self._lock_renewal_interval,
322 'stop': self._lock_renewal_stop}
316 'stop': self._lock_renewal_stop}
323 )
317 )
324 self._lock_renewal_thread.setDaemon(True)
318 self._lock_renewal_thread.setDaemon(True)
325 self._lock_renewal_thread.start()
319 self._lock_renewal_thread.start()
326
320
327 def _stop_lock_renewer(self):
321 def _stop_lock_renewer(self):
328 """
322 """
329 Stop the lock renewer.
323 Stop the lock renewer.
330
324
331 This signals the renewal thread and waits for its exit.
325 This signals the renewal thread and waits for its exit.
332 """
326 """
333 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
327 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
334 return
328 return
335 loggers["refresh.shutdown"].debug("Signalling the lock refresher to stop")
329 loggers["refresh.shutdown"].debug("Signalling the lock refresher to stop")
336 self._lock_renewal_stop.set()
330 self._lock_renewal_stop.set()
337 self._lock_renewal_thread.join()
331 self._lock_renewal_thread.join()
338 self._lock_renewal_thread = None
332 self._lock_renewal_thread = None
339 loggers["refresh.exit"].debug("Lock refresher has stopped")
333 loggers["refresh.exit"].debug("Lock refresher has stopped")
340
334
341 def __enter__(self):
335 def __enter__(self):
342 acquired = self.acquire(blocking=True)
336 acquired = self.acquire(blocking=True)
343 assert acquired, "Lock wasn't acquired, but blocking=True"
337 assert acquired, "Lock wasn't acquired, but blocking=True"
344 return self
338 return self
345
339
346 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
340 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
347 self.release()
341 self.release()
348
342
349 def release(self):
343 def release(self):
350 """Releases the lock, that was acquired with the same object.
344 """Releases the lock, that was acquired with the same object.
351
345
352 .. note::
346 .. note::
353
347
354 If you want to release a lock that you acquired in a different place you have two choices:
348 If you want to release a lock that you acquired in a different place you have two choices:
355
349
356 * Use ``Lock("name", id=id_from_other_place).release()``
350 * Use ``Lock("name", id=id_from_other_place).release()``
357 * Use ``Lock("name").reset()``
351 * Use ``Lock("name").reset()``
358 """
352 """
359 if self._lock_renewal_thread is not None:
353 if self._lock_renewal_thread is not None:
360 self._stop_lock_renewer()
354 self._stop_lock_renewer()
361 loggers["release"].debug("Releasing %r.", self._name)
355 loggers["release"].debug("Releasing %r.", self._name)
362 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
356 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
363 if error == 1:
357 if error == 1:
364 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
358 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
365 elif error:
359 elif error:
366 raise RuntimeError("Unsupported error code %s from EXTEND script." % error)
360 raise RuntimeError("Unsupported error code %s from EXTEND script." % error)
367
361
368 def locked(self):
362 def locked(self):
369 """
363 """
370 Return true if the lock is acquired.
364 Return true if the lock is acquired.
371
365
372 Checks that lock with same name already exists. This method returns true, even if
366 Checks that lock with same name already exists. This method returns true, even if
373 lock have another id.
367 lock have another id.
374 """
368 """
375 return self._client.exists(self._name) == 1
369 return self._client.exists(self._name) == 1
376
370
377
371
378 reset_all_script = None
372 reset_all_script = None
379
373
380
374
381 def reset_all(redis_client):
375 def reset_all(redis_client):
382 """
376 """
383 Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
377 Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
384
378
385 :param redis_client:
379 :param redis_client:
386 An instance of :class:`~StrictRedis`.
380 An instance of :class:`~StrictRedis`.
387 """
381 """
388 Lock.register_scripts(redis_client)
382 Lock.register_scripts(redis_client)
389
383
390 reset_all_script(client=redis_client) # noqa
384 reset_all_script(client=redis_client) # noqa
@@ -1,75 +1,68 b''
1
1
2
2
3 import functools
3 import functools
4
5 # Use timer that's not susceptible to time of day adjustments.
6 try:
7 # perf_counter is only present on Py3.3+
8 from time import perf_counter as time_now
4 from time import perf_counter as time_now
9 except ImportError:
10 # fall back to using time
11 from time import time as time_now
12
5
13
6
14 def safe_wraps(wrapper, *args, **kwargs):
7 def safe_wraps(wrapper, *args, **kwargs):
15 """Safely wraps partial functions."""
8 """Safely wraps partial functions."""
16 while isinstance(wrapper, functools.partial):
9 while isinstance(wrapper, functools.partial):
17 wrapper = wrapper.func
10 wrapper = wrapper.func
18 return functools.wraps(wrapper, *args, **kwargs)
11 return functools.wraps(wrapper, *args, **kwargs)
19
12
20
13
21 class Timer(object):
14 class Timer(object):
22 """A context manager/decorator for statsd.timing()."""
15 """A context manager/decorator for statsd.timing()."""
23
16
24 def __init__(self, client, stat, rate=1, tags=None, use_decimals=True, auto_send=True):
17 def __init__(self, client, stat, rate=1, tags=None, use_decimals=True, auto_send=True):
25 self.client = client
18 self.client = client
26 self.stat = stat
19 self.stat = stat
27 self.rate = rate
20 self.rate = rate
28 self.tags = tags
21 self.tags = tags
29 self.ms = None
22 self.ms = None
30 self._sent = False
23 self._sent = False
31 self._start_time = None
24 self._start_time = None
32 self.use_decimals = use_decimals
25 self.use_decimals = use_decimals
33 self.auto_send = auto_send
26 self.auto_send = auto_send
34
27
35 def __call__(self, f):
28 def __call__(self, f):
36 """Thread-safe timing function decorator."""
29 """Thread-safe timing function decorator."""
37 @safe_wraps(f)
30 @safe_wraps(f)
38 def _wrapped(*args, **kwargs):
31 def _wrapped(*args, **kwargs):
39 start_time = time_now()
32 start_time = time_now()
40 try:
33 try:
41 return f(*args, **kwargs)
34 return f(*args, **kwargs)
42 finally:
35 finally:
43 elapsed_time_ms = 1000.0 * (time_now() - start_time)
36 elapsed_time_ms = 1000.0 * (time_now() - start_time)
44 self.client.timing(self.stat, elapsed_time_ms, self.rate, self.tags, self.use_decimals)
37 self.client.timing(self.stat, elapsed_time_ms, self.rate, self.tags, self.use_decimals)
45 self._sent = True
38 self._sent = True
46 return _wrapped
39 return _wrapped
47
40
48 def __enter__(self):
41 def __enter__(self):
49 return self.start()
42 return self.start()
50
43
51 def __exit__(self, typ, value, tb):
44 def __exit__(self, typ, value, tb):
52 self.stop(send=self.auto_send)
45 self.stop(send=self.auto_send)
53
46
54 def start(self):
47 def start(self):
55 self.ms = None
48 self.ms = None
56 self._sent = False
49 self._sent = False
57 self._start_time = time_now()
50 self._start_time = time_now()
58 return self
51 return self
59
52
60 def stop(self, send=True):
53 def stop(self, send=True):
61 if self._start_time is None:
54 if self._start_time is None:
62 raise RuntimeError('Timer has not started.')
55 raise RuntimeError('Timer has not started.')
63 dt = time_now() - self._start_time
56 dt = time_now() - self._start_time
64 self.ms = 1000.0 * dt # Convert to milliseconds.
57 self.ms = 1000.0 * dt # Convert to milliseconds.
65 if send:
58 if send:
66 self.send()
59 self.send()
67 return self
60 return self
68
61
69 def send(self):
62 def send(self):
70 if self.ms is None:
63 if self.ms is None:
71 raise RuntimeError('No data recorded.')
64 raise RuntimeError('No data recorded.')
72 if self._sent:
65 if self._sent:
73 raise RuntimeError('Already sent data.')
66 raise RuntimeError('Already sent data.')
74 self._sent = True
67 self._sent = True
75 self.client.timing(self.stat, self.ms, self.rate, self.tags, self.use_decimals)
68 self.client.timing(self.stat, self.ms, self.rate, self.tags, self.use_decimals)
@@ -1,261 +1,207 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2020 RhodeCode GmbH
2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import os
18 import os
19 import time
19 import time
20 import logging
20 import logging
21 import functools
21 import functools
22 import decorator
22
23
23 from dogpile.cache import CacheRegion
24 from dogpile.cache import CacheRegion
24
25
25 from vcsserver.utils import safe_str, sha1
26 from vcsserver.utils import safe_str, sha1
26 from vcsserver.lib.rc_cache import region_meta
27 from vcsserver.lib.rc_cache import region_meta
27
28
28 log = logging.getLogger(__name__)
29 log = logging.getLogger(__name__)
29
30
30
31
31 class RhodeCodeCacheRegion(CacheRegion):
32 class RhodeCodeCacheRegion(CacheRegion):
32
33
33 def conditional_cache_on_arguments(
34 def conditional_cache_on_arguments(
34 self, namespace=None,
35 self, namespace=None,
35 expiration_time=None,
36 expiration_time=None,
36 should_cache_fn=None,
37 should_cache_fn=None,
37 to_str=str,
38 to_str=str,
38 function_key_generator=None,
39 function_key_generator=None,
39 condition=True):
40 condition=True):
40 """
41 """
41 Custom conditional decorator, that will not touch any dogpile internals if
42 Custom conditional decorator, that will not touch any dogpile internals if
42 condition isn't meet. This works a bit different than should_cache_fn
43 condition isn't meet. This works a bit different than should_cache_fn
43 And it's faster in cases we don't ever want to compute cached values
44 And it's faster in cases we don't ever want to compute cached values
44 """
45 """
45 expiration_time_is_callable = callable(expiration_time)
46 expiration_time_is_callable = callable(expiration_time)
46
47
47 if function_key_generator is None:
48 if function_key_generator is None:
48 function_key_generator = self.function_key_generator
49 function_key_generator = self.function_key_generator
49
50
50 # workaround for py2 and cython problems, this block should be removed
51 # once we've migrated to py3
52 if 'cython' == 'cython':
53 def decorator(fn):
54 if to_str is str:
55 # backwards compatible
56 key_generator = function_key_generator(namespace, fn)
57 else:
58 key_generator = function_key_generator(namespace, fn, to_str=to_str)
59
60 @functools.wraps(fn)
61 def decorate(*arg, **kw):
62 key = key_generator(*arg, **kw)
63
64 @functools.wraps(fn)
65 def creator():
66 return fn(*arg, **kw)
67
68 if not condition:
69 return creator()
70
71 timeout = expiration_time() if expiration_time_is_callable \
72 else expiration_time
73
74 return self.get_or_create(key, creator, timeout, should_cache_fn)
75
76 def invalidate(*arg, **kw):
77 key = key_generator(*arg, **kw)
78 self.delete(key)
79
80 def set_(value, *arg, **kw):
81 key = key_generator(*arg, **kw)
82 self.set(key, value)
83
84 def get(*arg, **kw):
85 key = key_generator(*arg, **kw)
86 return self.get(key)
87
88 def refresh(*arg, **kw):
89 key = key_generator(*arg, **kw)
90 value = fn(*arg, **kw)
91 self.set(key, value)
92 return value
93
94 decorate.set = set_
95 decorate.invalidate = invalidate
96 decorate.refresh = refresh
97 decorate.get = get
98 decorate.original = fn
99 decorate.key_generator = key_generator
100 decorate.__wrapped__ = fn
101
102 return decorate
103 return decorator
104
105 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
51 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
106
52
107 if not condition:
53 if not condition:
108 log.debug('Calling un-cached method:%s', user_func.__name__)
54 log.debug('Calling un-cached method:%s', user_func.__name__)
109 start = time.time()
55 start = time.time()
110 result = user_func(*arg, **kw)
56 result = user_func(*arg, **kw)
111 total = time.time() - start
57 total = time.time() - start
112 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
58 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
113 return result
59 return result
114
60
115 key = key_generator(*arg, **kw)
61 key = key_generator(*arg, **kw)
116
62
117 timeout = expiration_time() if expiration_time_is_callable \
63 timeout = expiration_time() if expiration_time_is_callable \
118 else expiration_time
64 else expiration_time
119
65
120 log.debug('Calling cached method:`%s`', user_func.__name__)
66 log.debug('Calling cached method:`%s`', user_func.__name__)
121 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
67 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
122
68
123 def cache_decorator(user_func):
69 def cache_decorator(user_func):
124 if to_str is str:
70 if to_str is str:
125 # backwards compatible
71 # backwards compatible
126 key_generator = function_key_generator(namespace, user_func)
72 key_generator = function_key_generator(namespace, user_func)
127 else:
73 else:
128 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
74 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
129
75
130 def refresh(*arg, **kw):
76 def refresh(*arg, **kw):
131 """
77 """
132 Like invalidate, but regenerates the value instead
78 Like invalidate, but regenerates the value instead
133 """
79 """
134 key = key_generator(*arg, **kw)
80 key = key_generator(*arg, **kw)
135 value = user_func(*arg, **kw)
81 value = user_func(*arg, **kw)
136 self.set(key, value)
82 self.set(key, value)
137 return value
83 return value
138
84
139 def invalidate(*arg, **kw):
85 def invalidate(*arg, **kw):
140 key = key_generator(*arg, **kw)
86 key = key_generator(*arg, **kw)
141 self.delete(key)
87 self.delete(key)
142
88
143 def set_(value, *arg, **kw):
89 def set_(value, *arg, **kw):
144 key = key_generator(*arg, **kw)
90 key = key_generator(*arg, **kw)
145 self.set(key, value)
91 self.set(key, value)
146
92
147 def get(*arg, **kw):
93 def get(*arg, **kw):
148 key = key_generator(*arg, **kw)
94 key = key_generator(*arg, **kw)
149 return self.get(key)
95 return self.get(key)
150
96
151 user_func.set = set_
97 user_func.set = set_
152 user_func.invalidate = invalidate
98 user_func.invalidate = invalidate
153 user_func.get = get
99 user_func.get = get
154 user_func.refresh = refresh
100 user_func.refresh = refresh
155 user_func.key_generator = key_generator
101 user_func.key_generator = key_generator
156 user_func.original = user_func
102 user_func.original = user_func
157
103
158 # Use `decorate` to preserve the signature of :param:`user_func`.
104 # Use `decorate` to preserve the signature of :param:`user_func`.
159 return decorator.decorate(user_func, functools.partial(
105 return decorator.decorate(user_func, functools.partial(
160 get_or_create_for_user_func, key_generator))
106 get_or_create_for_user_func, key_generator))
161
107
162 return cache_decorator
108 return cache_decorator
163
109
164
110
165 def make_region(*arg, **kw):
111 def make_region(*arg, **kw):
166 return RhodeCodeCacheRegion(*arg, **kw)
112 return RhodeCodeCacheRegion(*arg, **kw)
167
113
168
114
169 def get_default_cache_settings(settings, prefixes=None):
115 def get_default_cache_settings(settings, prefixes=None):
170 prefixes = prefixes or []
116 prefixes = prefixes or []
171 cache_settings = {}
117 cache_settings = {}
172 for key in settings.keys():
118 for key in settings.keys():
173 for prefix in prefixes:
119 for prefix in prefixes:
174 if key.startswith(prefix):
120 if key.startswith(prefix):
175 name = key.split(prefix)[1].strip()
121 name = key.split(prefix)[1].strip()
176 val = settings[key]
122 val = settings[key]
177 if isinstance(val, str):
123 if isinstance(val, str):
178 val = val.strip()
124 val = val.strip()
179 cache_settings[name] = val
125 cache_settings[name] = val
180 return cache_settings
126 return cache_settings
181
127
182
128
183 def compute_key_from_params(*args):
129 def compute_key_from_params(*args):
184 """
130 """
185 Helper to compute key from given params to be used in cache manager
131 Helper to compute key from given params to be used in cache manager
186 """
132 """
187 return sha1("_".join(map(safe_str, args)))
133 return sha1("_".join(map(safe_str, args)))
188
134
189
135
190 def backend_key_generator(backend):
136 def backend_key_generator(backend):
191 """
137 """
192 Special wrapper that also sends over the backend to the key generator
138 Special wrapper that also sends over the backend to the key generator
193 """
139 """
194 def wrapper(namespace, fn):
140 def wrapper(namespace, fn):
195 return key_generator(backend, namespace, fn)
141 return key_generator(backend, namespace, fn)
196 return wrapper
142 return wrapper
197
143
198
144
199 def key_generator(backend, namespace, fn):
145 def key_generator(backend, namespace, fn):
200 fname = fn.__name__
146 fname = fn.__name__
201
147
202 def generate_key(*args):
148 def generate_key(*args):
203 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
149 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
204 namespace_pref = namespace or 'default_namespace'
150 namespace_pref = namespace or 'default_namespace'
205 arg_key = compute_key_from_params(*args)
151 arg_key = compute_key_from_params(*args)
206 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
152 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
207
153
208 return final_key
154 return final_key
209
155
210 return generate_key
156 return generate_key
211
157
212
158
213 def get_or_create_region(region_name, region_namespace=None):
159 def get_or_create_region(region_name, region_namespace=None):
214 from vcsserver.lib.rc_cache.backends import FileNamespaceBackend
160 from vcsserver.lib.rc_cache.backends import FileNamespaceBackend
215 region_obj = region_meta.dogpile_cache_regions.get(region_name)
161 region_obj = region_meta.dogpile_cache_regions.get(region_name)
216 if not region_obj:
162 if not region_obj:
217 raise EnvironmentError(
163 raise EnvironmentError(
218 'Region `{}` not in configured: {}.'.format(
164 'Region `{}` not in configured: {}.'.format(
219 region_name, region_meta.dogpile_cache_regions.keys()))
165 region_name, region_meta.dogpile_cache_regions.keys()))
220
166
221 region_uid_name = '{}:{}'.format(region_name, region_namespace)
167 region_uid_name = '{}:{}'.format(region_name, region_namespace)
222 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
168 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
223 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
169 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
224 if region_exist:
170 if region_exist:
225 log.debug('Using already configured region: %s', region_namespace)
171 log.debug('Using already configured region: %s', region_namespace)
226 return region_exist
172 return region_exist
227 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
173 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
228 expiration_time = region_obj.expiration_time
174 expiration_time = region_obj.expiration_time
229
175
230 if not os.path.isdir(cache_dir):
176 if not os.path.isdir(cache_dir):
231 os.makedirs(cache_dir)
177 os.makedirs(cache_dir)
232 new_region = make_region(
178 new_region = make_region(
233 name=region_uid_name,
179 name=region_uid_name,
234 function_key_generator=backend_key_generator(region_obj.actual_backend)
180 function_key_generator=backend_key_generator(region_obj.actual_backend)
235 )
181 )
236 namespace_filename = os.path.join(
182 namespace_filename = os.path.join(
237 cache_dir, "{}.cache.dbm".format(region_namespace))
183 cache_dir, "{}.cache.dbm".format(region_namespace))
238 # special type that allows 1db per namespace
184 # special type that allows 1db per namespace
239 new_region.configure(
185 new_region.configure(
240 backend='dogpile.cache.rc.file_namespace',
186 backend='dogpile.cache.rc.file_namespace',
241 expiration_time=expiration_time,
187 expiration_time=expiration_time,
242 arguments={"filename": namespace_filename}
188 arguments={"filename": namespace_filename}
243 )
189 )
244
190
245 # create and save in region caches
191 # create and save in region caches
246 log.debug('configuring new region: %s', region_uid_name)
192 log.debug('configuring new region: %s', region_uid_name)
247 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
193 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
248
194
249 return region_obj
195 return region_obj
250
196
251
197
252 def clear_cache_namespace(cache_region, cache_namespace_uid, invalidate=False):
198 def clear_cache_namespace(cache_region, cache_namespace_uid, invalidate=False):
253 region = get_or_create_region(cache_region, cache_namespace_uid)
199 region = get_or_create_region(cache_region, cache_namespace_uid)
254 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
200 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
255 num_delete_keys = len(cache_keys)
201 num_delete_keys = len(cache_keys)
256 if invalidate:
202 if invalidate:
257 region.invalidate(hard=False)
203 region.invalidate(hard=False)
258 else:
204 else:
259 if num_delete_keys:
205 if num_delete_keys:
260 region.delete_multi(cache_keys)
206 region.delete_multi(cache_keys)
261 return num_delete_keys
207 return num_delete_keys
General Comments 0
You need to be logged in to leave comments. Login now