##// END OF EJS Templates
libs: synced with ce codebase
super-admin -
r1018:68a8ca72 default
parent child Browse files
Show More
@@ -1,390 +1,390 b''
1 import sys
1 import sys
2 import threading
2 import threading
3 import weakref
3 import weakref
4 from base64 import b64encode
4 from base64 import b64encode
5 from logging import getLogger
5 from logging import getLogger
6 from os import urandom
6 from os import urandom
7
7
8 from redis import StrictRedis
8 from redis import StrictRedis
9
9
10 __version__ = '3.7.0'
10 __version__ = '3.7.0'
11
11
12 loggers = {
12 loggers = {
13 k: getLogger("vcsserver." + ".".join((__name__, k)))
13 k: getLogger("vcsserver." + ".".join((__name__, k)))
14 for k in [
14 for k in [
15 "acquire",
15 "acquire",
16 "refresh.thread.start",
16 "refresh.thread.start",
17 "refresh.thread.stop",
17 "refresh.thread.stop",
18 "refresh.thread.exit",
18 "refresh.thread.exit",
19 "refresh.start",
19 "refresh.start",
20 "refresh.shutdown",
20 "refresh.shutdown",
21 "refresh.exit",
21 "refresh.exit",
22 "release",
22 "release",
23 ]
23 ]
24 }
24 }
25
25
26 PY3 = sys.version_info[0] == 3
26 PY3 = sys.version_info[0] == 3
27
27
28 if PY3:
28 if PY3:
29 text_type = str
29 text_type = str
30 binary_type = bytes
30 binary_type = bytes
31 else:
31 else:
32 text_type = unicode # noqa
32 text_type = unicode # noqa
33 binary_type = str
33 binary_type = str
34
34
35
35
36 # Check if the id match. If not, return an error code.
36 # Check if the id match. If not, return an error code.
37 UNLOCK_SCRIPT = b"""
37 UNLOCK_SCRIPT = b"""
38 if redis.call("get", KEYS[1]) ~= ARGV[1] then
38 if redis.call("get", KEYS[1]) ~= ARGV[1] then
39 return 1
39 return 1
40 else
40 else
41 redis.call("del", KEYS[2])
41 redis.call("del", KEYS[2])
42 redis.call("lpush", KEYS[2], 1)
42 redis.call("lpush", KEYS[2], 1)
43 redis.call("pexpire", KEYS[2], ARGV[2])
43 redis.call("pexpire", KEYS[2], ARGV[2])
44 redis.call("del", KEYS[1])
44 redis.call("del", KEYS[1])
45 return 0
45 return 0
46 end
46 end
47 """
47 """
48
48
49 # Covers both cases when key doesn't exist and doesn't equal to lock's id
49 # Covers both cases when key doesn't exist and doesn't equal to lock's id
50 EXTEND_SCRIPT = b"""
50 EXTEND_SCRIPT = b"""
51 if redis.call("get", KEYS[1]) ~= ARGV[1] then
51 if redis.call("get", KEYS[1]) ~= ARGV[1] then
52 return 1
52 return 1
53 elseif redis.call("ttl", KEYS[1]) < 0 then
53 elseif redis.call("ttl", KEYS[1]) < 0 then
54 return 2
54 return 2
55 else
55 else
56 redis.call("expire", KEYS[1], ARGV[2])
56 redis.call("expire", KEYS[1], ARGV[2])
57 return 0
57 return 0
58 end
58 end
59 """
59 """
60
60
61 RESET_SCRIPT = b"""
61 RESET_SCRIPT = b"""
62 redis.call('del', KEYS[2])
62 redis.call('del', KEYS[2])
63 redis.call('lpush', KEYS[2], 1)
63 redis.call('lpush', KEYS[2], 1)
64 redis.call('pexpire', KEYS[2], ARGV[2])
64 redis.call('pexpire', KEYS[2], ARGV[2])
65 return redis.call('del', KEYS[1])
65 return redis.call('del', KEYS[1])
66 """
66 """
67
67
68 RESET_ALL_SCRIPT = b"""
68 RESET_ALL_SCRIPT = b"""
69 local locks = redis.call('keys', 'lock:*')
69 local locks = redis.call('keys', 'lock:*')
70 local signal
70 local signal
71 for _, lock in pairs(locks) do
71 for _, lock in pairs(locks) do
72 signal = 'lock-signal:' .. string.sub(lock, 6)
72 signal = 'lock-signal:' .. string.sub(lock, 6)
73 redis.call('del', signal)
73 redis.call('del', signal)
74 redis.call('lpush', signal, 1)
74 redis.call('lpush', signal, 1)
75 redis.call('expire', signal, 1)
75 redis.call('expire', signal, 1)
76 redis.call('del', lock)
76 redis.call('del', lock)
77 end
77 end
78 return #locks
78 return #locks
79 """
79 """
80
80
81
81
82 class AlreadyAcquired(RuntimeError):
82 class AlreadyAcquired(RuntimeError):
83 pass
83 pass
84
84
85
85
86 class NotAcquired(RuntimeError):
86 class NotAcquired(RuntimeError):
87 pass
87 pass
88
88
89
89
90 class AlreadyStarted(RuntimeError):
90 class AlreadyStarted(RuntimeError):
91 pass
91 pass
92
92
93
93
94 class TimeoutNotUsable(RuntimeError):
94 class TimeoutNotUsable(RuntimeError):
95 pass
95 pass
96
96
97
97
98 class InvalidTimeout(RuntimeError):
98 class InvalidTimeout(RuntimeError):
99 pass
99 pass
100
100
101
101
102 class TimeoutTooLarge(RuntimeError):
102 class TimeoutTooLarge(RuntimeError):
103 pass
103 pass
104
104
105
105
106 class NotExpirable(RuntimeError):
106 class NotExpirable(RuntimeError):
107 pass
107 pass
108
108
109
109
110 class Lock(object):
110 class Lock(object):
111 """
111 """
112 A Lock context manager implemented via redis SETNX/BLPOP.
112 A Lock context manager implemented via redis SETNX/BLPOP.
113 """
113 """
114 unlock_script = None
114 unlock_script = None
115 extend_script = None
115 extend_script = None
116 reset_script = None
116 reset_script = None
117 reset_all_script = None
117 reset_all_script = None
118
118
119 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
119 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
120 """
120 """
121 :param redis_client:
121 :param redis_client:
122 An instance of :class:`~StrictRedis`.
122 An instance of :class:`~StrictRedis`.
123 :param name:
123 :param name:
124 The name (redis key) the lock should have.
124 The name (redis key) the lock should have.
125 :param expire:
125 :param expire:
126 The lock expiry time in seconds. If left at the default (None)
126 The lock expiry time in seconds. If left at the default (None)
127 the lock will not expire.
127 the lock will not expire.
128 :param id:
128 :param id:
129 The ID (redis value) the lock should have. A random value is
129 The ID (redis value) the lock should have. A random value is
130 generated when left at the default.
130 generated when left at the default.
131
131
132 Note that if you specify this then the lock is marked as "held". Acquires
132 Note that if you specify this then the lock is marked as "held". Acquires
133 won't be possible.
133 won't be possible.
134 :param auto_renewal:
134 :param auto_renewal:
135 If set to ``True``, Lock will automatically renew the lock so that it
135 If set to ``True``, Lock will automatically renew the lock so that it
136 doesn't expire for as long as the lock is held (acquire() called
136 doesn't expire for as long as the lock is held (acquire() called
137 or running in a context manager).
137 or running in a context manager).
138
138
139 Implementation note: Renewal will happen using a daemon thread with
139 Implementation note: Renewal will happen using a daemon thread with
140 an interval of ``expire*2/3``. If wishing to use a different renewal
140 an interval of ``expire*2/3``. If wishing to use a different renewal
141 time, subclass Lock, call ``super().__init__()`` then set
141 time, subclass Lock, call ``super().__init__()`` then set
142 ``self._lock_renewal_interval`` to your desired interval.
142 ``self._lock_renewal_interval`` to your desired interval.
143 :param strict:
143 :param strict:
144 If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
144 If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
145 :param signal_expire:
145 :param signal_expire:
146 Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
146 Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
147 """
147 """
148 if strict and not isinstance(redis_client, StrictRedis):
148 if strict and not isinstance(redis_client, StrictRedis):
149 raise ValueError("redis_client must be instance of StrictRedis. "
149 raise ValueError("redis_client must be instance of StrictRedis. "
150 "Use strict=False if you know what you're doing.")
150 "Use strict=False if you know what you're doing.")
151 if auto_renewal and expire is None:
151 if auto_renewal and expire is None:
152 raise ValueError("Expire may not be None when auto_renewal is set")
152 raise ValueError("Expire may not be None when auto_renewal is set")
153
153
154 self._client = redis_client
154 self._client = redis_client
155
155
156 if expire:
156 if expire:
157 expire = int(expire)
157 expire = int(expire)
158 if expire < 0:
158 if expire < 0:
159 raise ValueError("A negative expire is not acceptable.")
159 raise ValueError("A negative expire is not acceptable.")
160 else:
160 else:
161 expire = None
161 expire = None
162 self._expire = expire
162 self._expire = expire
163
163
164 self._signal_expire = signal_expire
164 self._signal_expire = signal_expire
165 if id is None:
165 if id is None:
166 self._id = b64encode(urandom(18)).decode('ascii')
166 self._id = b64encode(urandom(18)).decode('ascii')
167 elif isinstance(id, binary_type):
167 elif isinstance(id, binary_type):
168 try:
168 try:
169 self._id = id.decode('ascii')
169 self._id = id.decode('ascii')
170 except UnicodeDecodeError:
170 except UnicodeDecodeError:
171 self._id = b64encode(id).decode('ascii')
171 self._id = b64encode(id).decode('ascii')
172 elif isinstance(id, text_type):
172 elif isinstance(id, text_type):
173 self._id = id
173 self._id = id
174 else:
174 else:
175 raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id))
175 raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id))
176 self._name = 'lock:' + name
176 self._name = 'lock:' + name
177 self._signal = 'lock-signal:' + name
177 self._signal = 'lock-signal:' + name
178 self._lock_renewal_interval = (float(expire) * 2 / 3
178 self._lock_renewal_interval = (float(expire) * 2 / 3
179 if auto_renewal
179 if auto_renewal
180 else None)
180 else None)
181 self._lock_renewal_thread = None
181 self._lock_renewal_thread = None
182
182
183 self.register_scripts(redis_client)
183 self.register_scripts(redis_client)
184
184
185 @classmethod
185 @classmethod
186 def register_scripts(cls, redis_client):
186 def register_scripts(cls, redis_client):
187 global reset_all_script
187 global reset_all_script
188 if reset_all_script is None:
188 if reset_all_script is None:
189 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
189 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
190 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
190 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
191 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
191 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
192 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
192 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
193 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
193 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
194
194
195 @property
195 @property
196 def _held(self):
196 def _held(self):
197 return self.id == self.get_owner_id()
197 return self.id == self.get_owner_id()
198
198
199 def reset(self):
199 def reset(self):
200 """
200 """
201 Forcibly deletes the lock. Use this with care.
201 Forcibly deletes the lock. Use this with care.
202 """
202 """
203 self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
203 self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
204
204
205 @property
205 @property
206 def id(self):
206 def id(self):
207 return self._id
207 return self._id
208
208
209 def get_owner_id(self):
209 def get_owner_id(self):
210 owner_id = self._client.get(self._name)
210 owner_id = self._client.get(self._name)
211 if isinstance(owner_id, binary_type):
211 if isinstance(owner_id, binary_type):
212 owner_id = owner_id.decode('ascii', 'replace')
212 owner_id = owner_id.decode('ascii', 'replace')
213 return owner_id
213 return owner_id
214
214
215 def acquire(self, blocking=True, timeout=None):
215 def acquire(self, blocking=True, timeout=None):
216 """
216 """
217 :param blocking:
217 :param blocking:
218 Boolean value specifying whether lock should be blocking or not.
218 Boolean value specifying whether lock should be blocking or not.
219 :param timeout:
219 :param timeout:
220 An integer value specifying the maximum number of seconds to block.
220 An integer value specifying the maximum number of seconds to block.
221 """
221 """
222 logger = loggers["acquire"]
222 logger = loggers["acquire"]
223
223
224 logger.debug("Getting acquire on %r ...", self._name)
224 logger.debug("Getting blocking: %s acquire on %r ...", blocking, self._name)
225
225
226 if self._held:
226 if self._held:
227 owner_id = self.get_owner_id()
227 owner_id = self.get_owner_id()
228 raise AlreadyAcquired("Already acquired from this Lock instance. Lock id: {}".format(owner_id))
228 raise AlreadyAcquired("Already acquired from this Lock instance. Lock id: {}".format(owner_id))
229
229
230 if not blocking and timeout is not None:
230 if not blocking and timeout is not None:
231 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
231 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
232
232
233 if timeout:
233 if timeout:
234 timeout = int(timeout)
234 timeout = int(timeout)
235 if timeout < 0:
235 if timeout < 0:
236 raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout)
236 raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout)
237
237
238 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
238 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
239 raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire))
239 raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire))
240
240
241 busy = True
241 busy = True
242 blpop_timeout = timeout or self._expire or 0
242 blpop_timeout = timeout or self._expire or 0
243 timed_out = False
243 timed_out = False
244 while busy:
244 while busy:
245 busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
245 busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
246 if busy:
246 if busy:
247 if timed_out:
247 if timed_out:
248 return False
248 return False
249 elif blocking:
249 elif blocking:
250 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
250 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
251 else:
251 else:
252 logger.warning("Failed to get %r.", self._name)
252 logger.warning("Failed to get %r.", self._name)
253 return False
253 return False
254
254
255 logger.info("Got lock for %r.", self._name)
255 logger.debug("Got lock for %r.", self._name)
256 if self._lock_renewal_interval is not None:
256 if self._lock_renewal_interval is not None:
257 self._start_lock_renewer()
257 self._start_lock_renewer()
258 return True
258 return True
259
259
260 def extend(self, expire=None):
260 def extend(self, expire=None):
261 """Extends expiration time of the lock.
261 """Extends expiration time of the lock.
262
262
263 :param expire:
263 :param expire:
264 New expiration time. If ``None`` - `expire` provided during
264 New expiration time. If ``None`` - `expire` provided during
265 lock initialization will be taken.
265 lock initialization will be taken.
266 """
266 """
267 if expire:
267 if expire:
268 expire = int(expire)
268 expire = int(expire)
269 if expire < 0:
269 if expire < 0:
270 raise ValueError("A negative expire is not acceptable.")
270 raise ValueError("A negative expire is not acceptable.")
271 elif self._expire is not None:
271 elif self._expire is not None:
272 expire = self._expire
272 expire = self._expire
273 else:
273 else:
274 raise TypeError(
274 raise TypeError(
275 "To extend a lock 'expire' must be provided as an "
275 "To extend a lock 'expire' must be provided as an "
276 "argument to extend() method or at initialization time."
276 "argument to extend() method or at initialization time."
277 )
277 )
278
278
279 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
279 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
280 if error == 1:
280 if error == 1:
281 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
281 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
282 elif error == 2:
282 elif error == 2:
283 raise NotExpirable("Lock %s has no assigned expiration time" % self._name)
283 raise NotExpirable("Lock %s has no assigned expiration time" % self._name)
284 elif error:
284 elif error:
285 raise RuntimeError("Unsupported error code %s from EXTEND script" % error)
285 raise RuntimeError("Unsupported error code %s from EXTEND script" % error)
286
286
287 @staticmethod
287 @staticmethod
288 def _lock_renewer(lockref, interval, stop):
288 def _lock_renewer(lockref, interval, stop):
289 """
289 """
290 Renew the lock key in redis every `interval` seconds for as long
290 Renew the lock key in redis every `interval` seconds for as long
291 as `self._lock_renewal_thread.should_exit` is False.
291 as `self._lock_renewal_thread.should_exit` is False.
292 """
292 """
293 while not stop.wait(timeout=interval):
293 while not stop.wait(timeout=interval):
294 loggers["refresh.thread.start"].debug("Refreshing lock")
294 loggers["refresh.thread.start"].debug("Refreshing lock")
295 lock = lockref()
295 lock = lockref()
296 if lock is None:
296 if lock is None:
297 loggers["refresh.thread.stop"].debug(
297 loggers["refresh.thread.stop"].debug(
298 "The lock no longer exists, stopping lock refreshing"
298 "The lock no longer exists, stopping lock refreshing"
299 )
299 )
300 break
300 break
301 lock.extend(expire=lock._expire)
301 lock.extend(expire=lock._expire)
302 del lock
302 del lock
303 loggers["refresh.thread.exit"].debug("Exit requested, stopping lock refreshing")
303 loggers["refresh.thread.exit"].debug("Exit requested, stopping lock refreshing")
304
304
305 def _start_lock_renewer(self):
305 def _start_lock_renewer(self):
306 """
306 """
307 Starts the lock refresher thread.
307 Starts the lock refresher thread.
308 """
308 """
309 if self._lock_renewal_thread is not None:
309 if self._lock_renewal_thread is not None:
310 raise AlreadyStarted("Lock refresh thread already started")
310 raise AlreadyStarted("Lock refresh thread already started")
311
311
312 loggers["refresh.start"].debug(
312 loggers["refresh.start"].debug(
313 "Starting thread to refresh lock every %s seconds",
313 "Starting thread to refresh lock every %s seconds",
314 self._lock_renewal_interval
314 self._lock_renewal_interval
315 )
315 )
316 self._lock_renewal_stop = threading.Event()
316 self._lock_renewal_stop = threading.Event()
317 self._lock_renewal_thread = threading.Thread(
317 self._lock_renewal_thread = threading.Thread(
318 group=None,
318 group=None,
319 target=self._lock_renewer,
319 target=self._lock_renewer,
320 kwargs={'lockref': weakref.ref(self),
320 kwargs={'lockref': weakref.ref(self),
321 'interval': self._lock_renewal_interval,
321 'interval': self._lock_renewal_interval,
322 'stop': self._lock_renewal_stop}
322 'stop': self._lock_renewal_stop}
323 )
323 )
324 self._lock_renewal_thread.setDaemon(True)
324 self._lock_renewal_thread.setDaemon(True)
325 self._lock_renewal_thread.start()
325 self._lock_renewal_thread.start()
326
326
327 def _stop_lock_renewer(self):
327 def _stop_lock_renewer(self):
328 """
328 """
329 Stop the lock renewer.
329 Stop the lock renewer.
330
330
331 This signals the renewal thread and waits for its exit.
331 This signals the renewal thread and waits for its exit.
332 """
332 """
333 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
333 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
334 return
334 return
335 loggers["refresh.shutdown"].debug("Signalling the lock refresher to stop")
335 loggers["refresh.shutdown"].debug("Signalling the lock refresher to stop")
336 self._lock_renewal_stop.set()
336 self._lock_renewal_stop.set()
337 self._lock_renewal_thread.join()
337 self._lock_renewal_thread.join()
338 self._lock_renewal_thread = None
338 self._lock_renewal_thread = None
339 loggers["refresh.exit"].debug("Lock refresher has stopped")
339 loggers["refresh.exit"].debug("Lock refresher has stopped")
340
340
341 def __enter__(self):
341 def __enter__(self):
342 acquired = self.acquire(blocking=True)
342 acquired = self.acquire(blocking=True)
343 assert acquired, "Lock wasn't acquired, but blocking=True"
343 assert acquired, "Lock wasn't acquired, but blocking=True"
344 return self
344 return self
345
345
346 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
346 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
347 self.release()
347 self.release()
348
348
349 def release(self):
349 def release(self):
350 """Releases the lock, that was acquired with the same object.
350 """Releases the lock, that was acquired with the same object.
351
351
352 .. note::
352 .. note::
353
353
354 If you want to release a lock that you acquired in a different place you have two choices:
354 If you want to release a lock that you acquired in a different place you have two choices:
355
355
356 * Use ``Lock("name", id=id_from_other_place).release()``
356 * Use ``Lock("name", id=id_from_other_place).release()``
357 * Use ``Lock("name").reset()``
357 * Use ``Lock("name").reset()``
358 """
358 """
359 if self._lock_renewal_thread is not None:
359 if self._lock_renewal_thread is not None:
360 self._stop_lock_renewer()
360 self._stop_lock_renewer()
361 loggers["release"].debug("Releasing %r.", self._name)
361 loggers["release"].debug("Releasing %r.", self._name)
362 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
362 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
363 if error == 1:
363 if error == 1:
364 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
364 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
365 elif error:
365 elif error:
366 raise RuntimeError("Unsupported error code %s from EXTEND script." % error)
366 raise RuntimeError("Unsupported error code %s from EXTEND script." % error)
367
367
368 def locked(self):
368 def locked(self):
369 """
369 """
370 Return true if the lock is acquired.
370 Return true if the lock is acquired.
371
371
372 Checks that lock with same name already exists. This method returns true, even if
372 Checks that lock with same name already exists. This method returns true, even if
373 lock have another id.
373 lock have another id.
374 """
374 """
375 return self._client.exists(self._name) == 1
375 return self._client.exists(self._name) == 1
376
376
377
377
378 reset_all_script = None
378 reset_all_script = None
379
379
380
380
381 def reset_all(redis_client):
381 def reset_all(redis_client):
382 """
382 """
383 Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
383 Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
384
384
385 :param redis_client:
385 :param redis_client:
386 An instance of :class:`~StrictRedis`.
386 An instance of :class:`~StrictRedis`.
387 """
387 """
388 Lock.register_scripts(redis_client)
388 Lock.register_scripts(redis_client)
389
389
390 reset_all_script(client=redis_client) # noqa
390 reset_all_script(client=redis_client) # noqa
@@ -1,147 +1,156 b''
1 from __future__ import absolute_import, division, unicode_literals
1 from __future__ import absolute_import, division, unicode_literals
2
2
3 import re
3 import re
4 import random
4 import random
5 from collections import deque
5 from collections import deque
6 from datetime import timedelta
6 from datetime import timedelta
7 from repoze.lru import lru_cache
7 from repoze.lru import lru_cache
8
8
9 from .timer import Timer
9 from .timer import Timer
10
10
11 TAG_INVALID_CHARS_RE = re.compile(
11 TAG_INVALID_CHARS_RE = re.compile(
12 r"[^\w\d_\-:/\.]",
12 r"[^\w\d_\-:/\.]",
13 #re.UNICODE
13 #re.UNICODE
14 )
14 )
15 TAG_INVALID_CHARS_SUBS = "_"
15 TAG_INVALID_CHARS_SUBS = "_"
16
16
17 # we save and expose methods called by statsd for discovery
17 # we save and expose methods called by statsd for discovery
18 buckets_dict = {
18 buckets_dict = {
19
19
20 }
20 }
21
21
22
22
23 @lru_cache(maxsize=500)
23 @lru_cache(maxsize=500)
24 def _normalize_tags_with_cache(tag_list):
24 def _normalize_tags_with_cache(tag_list):
25 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
25 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
26
26
27
27
28 def normalize_tags(tag_list):
28 def normalize_tags(tag_list):
29 # We have to turn our input tag list into a non-mutable tuple for it to
29 # We have to turn our input tag list into a non-mutable tuple for it to
30 # be hashable (and thus usable) by the @lru_cache decorator.
30 # be hashable (and thus usable) by the @lru_cache decorator.
31 return _normalize_tags_with_cache(tuple(tag_list))
31 return _normalize_tags_with_cache(tuple(tag_list))
32
32
33
33
34 class StatsClientBase(object):
34 class StatsClientBase(object):
35 """A Base class for various statsd clients."""
35 """A Base class for various statsd clients."""
36
36
37 def close(self):
37 def close(self):
38 """Used to close and clean up any underlying resources."""
38 """Used to close and clean up any underlying resources."""
39 raise NotImplementedError()
39 raise NotImplementedError()
40
40
41 def _send(self):
41 def _send(self):
42 raise NotImplementedError()
42 raise NotImplementedError()
43
43
44 def pipeline(self):
44 def pipeline(self):
45 raise NotImplementedError()
45 raise NotImplementedError()
46
46
47 def timer(self, stat, rate=1, tags=None):
47 def timer(self, stat, rate=1, tags=None):
48 """
49 statsd = StatsdClient()
50 with statsd.timer('bucket_name', auto_send=True) as tmr:
51 # This block will be timed.
52 for i in xrange(0, 100000):
53 i ** 2
54 # you can access time here...
55 elapsed_ms = tmr.ms
56 """
48 return Timer(self, stat, rate, tags)
57 return Timer(self, stat, rate, tags)
49
58
50 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
59 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
51 """
60 """
52 Send new timing information.
61 Send new timing information.
53
62
54 `delta` can be either a number of milliseconds or a timedelta.
63 `delta` can be either a number of milliseconds or a timedelta.
55 """
64 """
56 if isinstance(delta, timedelta):
65 if isinstance(delta, timedelta):
57 # Convert timedelta to number of milliseconds.
66 # Convert timedelta to number of milliseconds.
58 delta = delta.total_seconds() * 1000.
67 delta = delta.total_seconds() * 1000.
59 if use_decimals:
68 if use_decimals:
60 fmt = '%0.6f|ms'
69 fmt = '%0.6f|ms'
61 else:
70 else:
62 fmt = '%s|ms'
71 fmt = '%s|ms'
63 self._send_stat(stat, fmt % delta, rate, tags)
72 self._send_stat(stat, fmt % delta, rate, tags)
64
73
65 def incr(self, stat, count=1, rate=1, tags=None):
74 def incr(self, stat, count=1, rate=1, tags=None):
66 """Increment a stat by `count`."""
75 """Increment a stat by `count`."""
67 self._send_stat(stat, '%s|c' % count, rate, tags)
76 self._send_stat(stat, '%s|c' % count, rate, tags)
68
77
69 def decr(self, stat, count=1, rate=1, tags=None):
78 def decr(self, stat, count=1, rate=1, tags=None):
70 """Decrement a stat by `count`."""
79 """Decrement a stat by `count`."""
71 self.incr(stat, -count, rate, tags)
80 self.incr(stat, -count, rate, tags)
72
81
73 def gauge(self, stat, value, rate=1, delta=False, tags=None):
82 def gauge(self, stat, value, rate=1, delta=False, tags=None):
74 """Set a gauge value."""
83 """Set a gauge value."""
75 if value < 0 and not delta:
84 if value < 0 and not delta:
76 if rate < 1:
85 if rate < 1:
77 if random.random() > rate:
86 if random.random() > rate:
78 return
87 return
79 with self.pipeline() as pipe:
88 with self.pipeline() as pipe:
80 pipe._send_stat(stat, '0|g', 1)
89 pipe._send_stat(stat, '0|g', 1)
81 pipe._send_stat(stat, '%s|g' % value, 1)
90 pipe._send_stat(stat, '%s|g' % value, 1)
82 else:
91 else:
83 prefix = '+' if delta and value >= 0 else ''
92 prefix = '+' if delta and value >= 0 else ''
84 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
93 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
85
94
86 def set(self, stat, value, rate=1):
95 def set(self, stat, value, rate=1):
87 """Set a set value."""
96 """Set a set value."""
88 self._send_stat(stat, '%s|s' % value, rate)
97 self._send_stat(stat, '%s|s' % value, rate)
89
98
90 def histogram(self, stat, value, rate=1, tags=None):
99 def histogram(self, stat, value, rate=1, tags=None):
91 """Set a histogram"""
100 """Set a histogram"""
92 self._send_stat(stat, '%s|h' % value, rate, tags)
101 self._send_stat(stat, '%s|h' % value, rate, tags)
93
102
94 def _send_stat(self, stat, value, rate, tags=None):
103 def _send_stat(self, stat, value, rate, tags=None):
95 self._after(self._prepare(stat, value, rate, tags))
104 self._after(self._prepare(stat, value, rate, tags))
96
105
97 def _prepare(self, stat, value, rate, tags=None):
106 def _prepare(self, stat, value, rate, tags=None):
98 global buckets_dict
107 global buckets_dict
99 buckets_dict[stat] = 1
108 buckets_dict[stat] = 1
100
109
101 if rate < 1:
110 if rate < 1:
102 if random.random() > rate:
111 if random.random() > rate:
103 return
112 return
104 value = '%s|@%s' % (value, rate)
113 value = '%s|@%s' % (value, rate)
105
114
106 if self._prefix:
115 if self._prefix:
107 stat = '%s.%s' % (self._prefix, stat)
116 stat = '%s.%s' % (self._prefix, stat)
108
117
109 res = '%s:%s%s' % (
118 res = '%s:%s%s' % (
110 stat,
119 stat,
111 value,
120 value,
112 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
121 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
113 )
122 )
114 return res
123 return res
115
124
116 def _after(self, data):
125 def _after(self, data):
117 if data:
126 if data:
118 self._send(data)
127 self._send(data)
119
128
120
129
121 class PipelineBase(StatsClientBase):
130 class PipelineBase(StatsClientBase):
122
131
123 def __init__(self, client):
132 def __init__(self, client):
124 self._client = client
133 self._client = client
125 self._prefix = client._prefix
134 self._prefix = client._prefix
126 self._stats = deque()
135 self._stats = deque()
127
136
128 def _send(self):
137 def _send(self):
129 raise NotImplementedError()
138 raise NotImplementedError()
130
139
131 def _after(self, data):
140 def _after(self, data):
132 if data is not None:
141 if data is not None:
133 self._stats.append(data)
142 self._stats.append(data)
134
143
135 def __enter__(self):
144 def __enter__(self):
136 return self
145 return self
137
146
138 def __exit__(self, typ, value, tb):
147 def __exit__(self, typ, value, tb):
139 self.send()
148 self.send()
140
149
141 def send(self):
150 def send(self):
142 if not self._stats:
151 if not self._stats:
143 return
152 return
144 self._send()
153 self._send()
145
154
146 def pipeline(self):
155 def pipeline(self):
147 return self.__class__(self)
156 return self.__class__(self)
General Comments 0
You need to be logged in to leave comments. Login now