##// END OF EJS Templates
libs: synced with ce codebase
super-admin -
r1018:68a8ca72 default
parent child Browse files
Show More
@@ -1,390 +1,390 b''
1 1 import sys
2 2 import threading
3 3 import weakref
4 4 from base64 import b64encode
5 5 from logging import getLogger
6 6 from os import urandom
7 7
8 8 from redis import StrictRedis
9 9
10 10 __version__ = '3.7.0'
11 11
12 12 loggers = {
13 13 k: getLogger("vcsserver." + ".".join((__name__, k)))
14 14 for k in [
15 15 "acquire",
16 16 "refresh.thread.start",
17 17 "refresh.thread.stop",
18 18 "refresh.thread.exit",
19 19 "refresh.start",
20 20 "refresh.shutdown",
21 21 "refresh.exit",
22 22 "release",
23 23 ]
24 24 }
25 25
26 26 PY3 = sys.version_info[0] == 3
27 27
28 28 if PY3:
29 29 text_type = str
30 30 binary_type = bytes
31 31 else:
32 32 text_type = unicode # noqa
33 33 binary_type = str
34 34
35 35
36 36 # Check if the id match. If not, return an error code.
37 37 UNLOCK_SCRIPT = b"""
38 38 if redis.call("get", KEYS[1]) ~= ARGV[1] then
39 39 return 1
40 40 else
41 41 redis.call("del", KEYS[2])
42 42 redis.call("lpush", KEYS[2], 1)
43 43 redis.call("pexpire", KEYS[2], ARGV[2])
44 44 redis.call("del", KEYS[1])
45 45 return 0
46 46 end
47 47 """
48 48
49 49 # Covers both cases when key doesn't exist and doesn't equal to lock's id
50 50 EXTEND_SCRIPT = b"""
51 51 if redis.call("get", KEYS[1]) ~= ARGV[1] then
52 52 return 1
53 53 elseif redis.call("ttl", KEYS[1]) < 0 then
54 54 return 2
55 55 else
56 56 redis.call("expire", KEYS[1], ARGV[2])
57 57 return 0
58 58 end
59 59 """
60 60
61 61 RESET_SCRIPT = b"""
62 62 redis.call('del', KEYS[2])
63 63 redis.call('lpush', KEYS[2], 1)
64 64 redis.call('pexpire', KEYS[2], ARGV[2])
65 65 return redis.call('del', KEYS[1])
66 66 """
67 67
68 68 RESET_ALL_SCRIPT = b"""
69 69 local locks = redis.call('keys', 'lock:*')
70 70 local signal
71 71 for _, lock in pairs(locks) do
72 72 signal = 'lock-signal:' .. string.sub(lock, 6)
73 73 redis.call('del', signal)
74 74 redis.call('lpush', signal, 1)
75 75 redis.call('expire', signal, 1)
76 76 redis.call('del', lock)
77 77 end
78 78 return #locks
79 79 """
80 80
81 81
82 82 class AlreadyAcquired(RuntimeError):
83 83 pass
84 84
85 85
86 86 class NotAcquired(RuntimeError):
87 87 pass
88 88
89 89
90 90 class AlreadyStarted(RuntimeError):
91 91 pass
92 92
93 93
94 94 class TimeoutNotUsable(RuntimeError):
95 95 pass
96 96
97 97
98 98 class InvalidTimeout(RuntimeError):
99 99 pass
100 100
101 101
102 102 class TimeoutTooLarge(RuntimeError):
103 103 pass
104 104
105 105
106 106 class NotExpirable(RuntimeError):
107 107 pass
108 108
109 109
110 110 class Lock(object):
111 111 """
112 112 A Lock context manager implemented via redis SETNX/BLPOP.
113 113 """
114 114 unlock_script = None
115 115 extend_script = None
116 116 reset_script = None
117 117 reset_all_script = None
118 118
119 119 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
120 120 """
121 121 :param redis_client:
122 122 An instance of :class:`~StrictRedis`.
123 123 :param name:
124 124 The name (redis key) the lock should have.
125 125 :param expire:
126 126 The lock expiry time in seconds. If left at the default (None)
127 127 the lock will not expire.
128 128 :param id:
129 129 The ID (redis value) the lock should have. A random value is
130 130 generated when left at the default.
131 131
132 132 Note that if you specify this then the lock is marked as "held". Acquires
133 133 won't be possible.
134 134 :param auto_renewal:
135 135 If set to ``True``, Lock will automatically renew the lock so that it
136 136 doesn't expire for as long as the lock is held (acquire() called
137 137 or running in a context manager).
138 138
139 139 Implementation note: Renewal will happen using a daemon thread with
140 140 an interval of ``expire*2/3``. If wishing to use a different renewal
141 141 time, subclass Lock, call ``super().__init__()`` then set
142 142 ``self._lock_renewal_interval`` to your desired interval.
143 143 :param strict:
144 144 If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
145 145 :param signal_expire:
146 146 Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
147 147 """
148 148 if strict and not isinstance(redis_client, StrictRedis):
149 149 raise ValueError("redis_client must be instance of StrictRedis. "
150 150 "Use strict=False if you know what you're doing.")
151 151 if auto_renewal and expire is None:
152 152 raise ValueError("Expire may not be None when auto_renewal is set")
153 153
154 154 self._client = redis_client
155 155
156 156 if expire:
157 157 expire = int(expire)
158 158 if expire < 0:
159 159 raise ValueError("A negative expire is not acceptable.")
160 160 else:
161 161 expire = None
162 162 self._expire = expire
163 163
164 164 self._signal_expire = signal_expire
165 165 if id is None:
166 166 self._id = b64encode(urandom(18)).decode('ascii')
167 167 elif isinstance(id, binary_type):
168 168 try:
169 169 self._id = id.decode('ascii')
170 170 except UnicodeDecodeError:
171 171 self._id = b64encode(id).decode('ascii')
172 172 elif isinstance(id, text_type):
173 173 self._id = id
174 174 else:
175 175 raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id))
176 176 self._name = 'lock:' + name
177 177 self._signal = 'lock-signal:' + name
178 178 self._lock_renewal_interval = (float(expire) * 2 / 3
179 179 if auto_renewal
180 180 else None)
181 181 self._lock_renewal_thread = None
182 182
183 183 self.register_scripts(redis_client)
184 184
185 185 @classmethod
186 186 def register_scripts(cls, redis_client):
187 187 global reset_all_script
188 188 if reset_all_script is None:
189 189 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
190 190 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
191 191 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
192 192 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
193 193 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
194 194
195 195 @property
196 196 def _held(self):
197 197 return self.id == self.get_owner_id()
198 198
199 199 def reset(self):
200 200 """
201 201 Forcibly deletes the lock. Use this with care.
202 202 """
203 203 self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
204 204
205 205 @property
206 206 def id(self):
207 207 return self._id
208 208
209 209 def get_owner_id(self):
210 210 owner_id = self._client.get(self._name)
211 211 if isinstance(owner_id, binary_type):
212 212 owner_id = owner_id.decode('ascii', 'replace')
213 213 return owner_id
214 214
215 215 def acquire(self, blocking=True, timeout=None):
216 216 """
217 217 :param blocking:
218 218 Boolean value specifying whether lock should be blocking or not.
219 219 :param timeout:
220 220 An integer value specifying the maximum number of seconds to block.
221 221 """
222 222 logger = loggers["acquire"]
223 223
224 logger.debug("Getting acquire on %r ...", self._name)
224 logger.debug("Getting blocking: %s acquire on %r ...", blocking, self._name)
225 225
226 226 if self._held:
227 227 owner_id = self.get_owner_id()
228 228 raise AlreadyAcquired("Already acquired from this Lock instance. Lock id: {}".format(owner_id))
229 229
230 230 if not blocking and timeout is not None:
231 231 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
232 232
233 233 if timeout:
234 234 timeout = int(timeout)
235 235 if timeout < 0:
236 236 raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout)
237 237
238 238 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
239 239 raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire))
240 240
241 241 busy = True
242 242 blpop_timeout = timeout or self._expire or 0
243 243 timed_out = False
244 244 while busy:
245 245 busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
246 246 if busy:
247 247 if timed_out:
248 248 return False
249 249 elif blocking:
250 250 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
251 251 else:
252 252 logger.warning("Failed to get %r.", self._name)
253 253 return False
254 254
255 logger.info("Got lock for %r.", self._name)
255 logger.debug("Got lock for %r.", self._name)
256 256 if self._lock_renewal_interval is not None:
257 257 self._start_lock_renewer()
258 258 return True
259 259
260 260 def extend(self, expire=None):
261 261 """Extends expiration time of the lock.
262 262
263 263 :param expire:
264 264 New expiration time. If ``None`` - `expire` provided during
265 265 lock initialization will be taken.
266 266 """
267 267 if expire:
268 268 expire = int(expire)
269 269 if expire < 0:
270 270 raise ValueError("A negative expire is not acceptable.")
271 271 elif self._expire is not None:
272 272 expire = self._expire
273 273 else:
274 274 raise TypeError(
275 275 "To extend a lock 'expire' must be provided as an "
276 276 "argument to extend() method or at initialization time."
277 277 )
278 278
279 279 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
280 280 if error == 1:
281 281 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
282 282 elif error == 2:
283 283 raise NotExpirable("Lock %s has no assigned expiration time" % self._name)
284 284 elif error:
285 285 raise RuntimeError("Unsupported error code %s from EXTEND script" % error)
286 286
287 287 @staticmethod
288 288 def _lock_renewer(lockref, interval, stop):
289 289 """
290 290 Renew the lock key in redis every `interval` seconds for as long
291 291 as `self._lock_renewal_thread.should_exit` is False.
292 292 """
293 293 while not stop.wait(timeout=interval):
294 294 loggers["refresh.thread.start"].debug("Refreshing lock")
295 295 lock = lockref()
296 296 if lock is None:
297 297 loggers["refresh.thread.stop"].debug(
298 298 "The lock no longer exists, stopping lock refreshing"
299 299 )
300 300 break
301 301 lock.extend(expire=lock._expire)
302 302 del lock
303 303 loggers["refresh.thread.exit"].debug("Exit requested, stopping lock refreshing")
304 304
305 305 def _start_lock_renewer(self):
306 306 """
307 307 Starts the lock refresher thread.
308 308 """
309 309 if self._lock_renewal_thread is not None:
310 310 raise AlreadyStarted("Lock refresh thread already started")
311 311
312 312 loggers["refresh.start"].debug(
313 313 "Starting thread to refresh lock every %s seconds",
314 314 self._lock_renewal_interval
315 315 )
316 316 self._lock_renewal_stop = threading.Event()
317 317 self._lock_renewal_thread = threading.Thread(
318 318 group=None,
319 319 target=self._lock_renewer,
320 320 kwargs={'lockref': weakref.ref(self),
321 321 'interval': self._lock_renewal_interval,
322 322 'stop': self._lock_renewal_stop}
323 323 )
324 324 self._lock_renewal_thread.setDaemon(True)
325 325 self._lock_renewal_thread.start()
326 326
327 327 def _stop_lock_renewer(self):
328 328 """
329 329 Stop the lock renewer.
330 330
331 331 This signals the renewal thread and waits for its exit.
332 332 """
333 333 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
334 334 return
335 335 loggers["refresh.shutdown"].debug("Signalling the lock refresher to stop")
336 336 self._lock_renewal_stop.set()
337 337 self._lock_renewal_thread.join()
338 338 self._lock_renewal_thread = None
339 339 loggers["refresh.exit"].debug("Lock refresher has stopped")
340 340
341 341 def __enter__(self):
342 342 acquired = self.acquire(blocking=True)
343 343 assert acquired, "Lock wasn't acquired, but blocking=True"
344 344 return self
345 345
346 346 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
347 347 self.release()
348 348
349 349 def release(self):
350 350 """Releases the lock, that was acquired with the same object.
351 351
352 352 .. note::
353 353
354 354 If you want to release a lock that you acquired in a different place you have two choices:
355 355
356 356 * Use ``Lock("name", id=id_from_other_place).release()``
357 357 * Use ``Lock("name").reset()``
358 358 """
359 359 if self._lock_renewal_thread is not None:
360 360 self._stop_lock_renewer()
361 361 loggers["release"].debug("Releasing %r.", self._name)
362 362 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
363 363 if error == 1:
364 364 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
365 365 elif error:
366 366 raise RuntimeError("Unsupported error code %s from EXTEND script." % error)
367 367
368 368 def locked(self):
369 369 """
370 370 Return true if the lock is acquired.
371 371
372 372 Checks that lock with same name already exists. This method returns true, even if
373 373 lock have another id.
374 374 """
375 375 return self._client.exists(self._name) == 1
376 376
377 377
378 378 reset_all_script = None
379 379
380 380
381 381 def reset_all(redis_client):
382 382 """
383 383 Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
384 384
385 385 :param redis_client:
386 386 An instance of :class:`~StrictRedis`.
387 387 """
388 388 Lock.register_scripts(redis_client)
389 389
390 390 reset_all_script(client=redis_client) # noqa
@@ -1,147 +1,156 b''
1 1 from __future__ import absolute_import, division, unicode_literals
2 2
3 3 import re
4 4 import random
5 5 from collections import deque
6 6 from datetime import timedelta
7 7 from repoze.lru import lru_cache
8 8
9 9 from .timer import Timer
10 10
11 11 TAG_INVALID_CHARS_RE = re.compile(
12 12 r"[^\w\d_\-:/\.]",
13 13 #re.UNICODE
14 14 )
15 15 TAG_INVALID_CHARS_SUBS = "_"
16 16
17 17 # we save and expose methods called by statsd for discovery
18 18 buckets_dict = {
19 19
20 20 }
21 21
22 22
23 23 @lru_cache(maxsize=500)
24 24 def _normalize_tags_with_cache(tag_list):
25 25 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
26 26
27 27
28 28 def normalize_tags(tag_list):
29 29 # We have to turn our input tag list into a non-mutable tuple for it to
30 30 # be hashable (and thus usable) by the @lru_cache decorator.
31 31 return _normalize_tags_with_cache(tuple(tag_list))
32 32
33 33
34 34 class StatsClientBase(object):
35 35 """A Base class for various statsd clients."""
36 36
37 37 def close(self):
38 38 """Used to close and clean up any underlying resources."""
39 39 raise NotImplementedError()
40 40
41 41 def _send(self):
42 42 raise NotImplementedError()
43 43
44 44 def pipeline(self):
45 45 raise NotImplementedError()
46 46
47 47 def timer(self, stat, rate=1, tags=None):
48 """
49 statsd = StatsdClient()
50 with statsd.timer('bucket_name', auto_send=True) as tmr:
51 # This block will be timed.
52 for i in xrange(0, 100000):
53 i ** 2
54 # you can access time here...
55 elapsed_ms = tmr.ms
56 """
48 57 return Timer(self, stat, rate, tags)
49 58
50 59 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
51 60 """
52 61 Send new timing information.
53 62
54 63 `delta` can be either a number of milliseconds or a timedelta.
55 64 """
56 65 if isinstance(delta, timedelta):
57 66 # Convert timedelta to number of milliseconds.
58 67 delta = delta.total_seconds() * 1000.
59 68 if use_decimals:
60 69 fmt = '%0.6f|ms'
61 70 else:
62 71 fmt = '%s|ms'
63 72 self._send_stat(stat, fmt % delta, rate, tags)
64 73
65 74 def incr(self, stat, count=1, rate=1, tags=None):
66 75 """Increment a stat by `count`."""
67 76 self._send_stat(stat, '%s|c' % count, rate, tags)
68 77
69 78 def decr(self, stat, count=1, rate=1, tags=None):
70 79 """Decrement a stat by `count`."""
71 80 self.incr(stat, -count, rate, tags)
72 81
73 82 def gauge(self, stat, value, rate=1, delta=False, tags=None):
74 83 """Set a gauge value."""
75 84 if value < 0 and not delta:
76 85 if rate < 1:
77 86 if random.random() > rate:
78 87 return
79 88 with self.pipeline() as pipe:
80 89 pipe._send_stat(stat, '0|g', 1)
81 90 pipe._send_stat(stat, '%s|g' % value, 1)
82 91 else:
83 92 prefix = '+' if delta and value >= 0 else ''
84 93 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
85 94
86 95 def set(self, stat, value, rate=1):
87 96 """Set a set value."""
88 97 self._send_stat(stat, '%s|s' % value, rate)
89 98
90 99 def histogram(self, stat, value, rate=1, tags=None):
91 100 """Set a histogram"""
92 101 self._send_stat(stat, '%s|h' % value, rate, tags)
93 102
94 103 def _send_stat(self, stat, value, rate, tags=None):
95 104 self._after(self._prepare(stat, value, rate, tags))
96 105
97 106 def _prepare(self, stat, value, rate, tags=None):
98 107 global buckets_dict
99 108 buckets_dict[stat] = 1
100 109
101 110 if rate < 1:
102 111 if random.random() > rate:
103 112 return
104 113 value = '%s|@%s' % (value, rate)
105 114
106 115 if self._prefix:
107 116 stat = '%s.%s' % (self._prefix, stat)
108 117
109 118 res = '%s:%s%s' % (
110 119 stat,
111 120 value,
112 121 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
113 122 )
114 123 return res
115 124
116 125 def _after(self, data):
117 126 if data:
118 127 self._send(data)
119 128
120 129
121 130 class PipelineBase(StatsClientBase):
122 131
123 132 def __init__(self, client):
124 133 self._client = client
125 134 self._prefix = client._prefix
126 135 self._stats = deque()
127 136
128 137 def _send(self):
129 138 raise NotImplementedError()
130 139
131 140 def _after(self, data):
132 141 if data is not None:
133 142 self._stats.append(data)
134 143
135 144 def __enter__(self):
136 145 return self
137 146
138 147 def __exit__(self, typ, value, tb):
139 148 self.send()
140 149
141 150 def send(self):
142 151 if not self._stats:
143 152 return
144 153 self._send()
145 154
146 155 def pipeline(self):
147 156 return self.__class__(self)
General Comments 0
You need to be logged in to leave comments. Login now