##// END OF EJS Templates
py3: synced _vendor with vcsserver py3 changes
super-admin -
r4909:b5f74a71 default
parent child Browse files
Show More
@@ -1,236 +1,236 b''
1 1 '''
2 2 This library is provided to allow standard python logging
3 3 to output log data as JSON formatted strings
4 4 '''
5 5 import logging
6 6 import json
7 7 import re
8 8 from datetime import date, datetime, time, tzinfo, timedelta
9 9 import traceback
10 10 import importlib
11 11
12 12 from inspect import istraceback
13 13
14 14 from collections import OrderedDict
15 15 from rhodecode.lib.logging_formatter import _inject_req_id, ExceptionAwareFormatter
16 16
17 17 ZERO = timedelta(0)
18 18 HOUR = timedelta(hours=1)
19 19
20 20
21 21 class UTC(tzinfo):
22 22 """UTC"""
23 23
24 24 def utcoffset(self, dt):
25 25 return ZERO
26 26
27 27 def tzname(self, dt):
28 28 return "UTC"
29 29
30 30 def dst(self, dt):
31 31 return ZERO
32 32
33 33 utc = UTC()
34 34
35 35
36 36 # skip natural LogRecord attributes
37 37 # http://docs.python.org/library/logging.html#logrecord-attributes
38 38 RESERVED_ATTRS = (
39 39 'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename',
40 40 'funcName', 'levelname', 'levelno', 'lineno', 'module',
41 41 'msecs', 'message', 'msg', 'name', 'pathname', 'process',
42 42 'processName', 'relativeCreated', 'stack_info', 'thread', 'threadName')
43 43
44 44
45 45 def merge_record_extra(record, target, reserved):
46 46 """
47 47 Merges extra attributes from LogRecord object into target dictionary
48 48
49 49 :param record: logging.LogRecord
50 50 :param target: dict to update
51 51 :param reserved: dict or list with reserved keys to skip
52 52 """
53 53 for key, value in record.__dict__.items():
54 54 # this allows to have numeric keys
55 55 if (key not in reserved
56 56 and not (hasattr(key, "startswith")
57 57 and key.startswith('_'))):
58 58 target[key] = value
59 59 return target
60 60
61 61
62 62 class JsonEncoder(json.JSONEncoder):
63 63 """
64 64 A custom encoder extending the default JSONEncoder
65 65 """
66 66
67 67 def default(self, obj):
68 68 if isinstance(obj, (date, datetime, time)):
69 69 return self.format_datetime_obj(obj)
70 70
71 71 elif istraceback(obj):
72 72 return ''.join(traceback.format_tb(obj)).strip()
73 73
74 74 elif type(obj) == Exception \
75 75 or isinstance(obj, Exception) \
76 76 or type(obj) == type:
77 77 return str(obj)
78 78
79 79 try:
80 80 return super(JsonEncoder, self).default(obj)
81 81
82 82 except TypeError:
83 83 try:
84 84 return str(obj)
85 85
86 86 except Exception:
87 87 return None
88 88
89 89 def format_datetime_obj(self, obj):
90 90 return obj.isoformat()
91 91
92 92
93 93 class JsonFormatter(ExceptionAwareFormatter):
94 94 """
95 95 A custom formatter to format logging records as json strings.
96 96 Extra values will be formatted as str() if not supported by
97 97 json default encoder
98 98 """
99 99
100 100 def __init__(self, *args, **kwargs):
101 101 """
102 102 :param json_default: a function for encoding non-standard objects
103 103 as outlined in http://docs.python.org/2/library/json.html
104 104 :param json_encoder: optional custom encoder
105 105 :param json_serializer: a :meth:`json.dumps`-compatible callable
106 106 that will be used to serialize the log record.
107 107 :param json_indent: an optional :meth:`json.dumps`-compatible numeric value
108 108 that will be used to customize the indent of the output json.
109 109 :param prefix: an optional string prefix added at the beginning of
110 110 the formatted string
111 111 :param json_indent: indent parameter for json.dumps
112 112 :param json_ensure_ascii: ensure_ascii parameter for json.dumps
113 113 :param reserved_attrs: an optional list of fields that will be skipped when
114 114 outputting json log record. Defaults to all log record attributes:
115 115 http://docs.python.org/library/logging.html#logrecord-attributes
116 116 :param timestamp: an optional string/boolean field to add a timestamp when
117 117 outputting the json log record. If string is passed, timestamp will be added
118 118 to log record using string as key. If True boolean is passed, timestamp key
119 119 will be "timestamp". Defaults to False/off.
120 120 """
121 121 self.json_default = self._str_to_fn(kwargs.pop("json_default", None))
122 122 self.json_encoder = self._str_to_fn(kwargs.pop("json_encoder", None))
123 123 self.json_serializer = self._str_to_fn(kwargs.pop("json_serializer", json.dumps))
124 124 self.json_indent = kwargs.pop("json_indent", None)
125 125 self.json_ensure_ascii = kwargs.pop("json_ensure_ascii", True)
126 126 self.prefix = kwargs.pop("prefix", "")
127 127 reserved_attrs = kwargs.pop("reserved_attrs", RESERVED_ATTRS)
128 self.reserved_attrs = dict(zip(reserved_attrs, reserved_attrs))
128 self.reserved_attrs = dict(list(zip(reserved_attrs, reserved_attrs)))
129 129 self.timestamp = kwargs.pop("timestamp", True)
130 130
131 131 # super(JsonFormatter, self).__init__(*args, **kwargs)
132 132 logging.Formatter.__init__(self, *args, **kwargs)
133 133 if not self.json_encoder and not self.json_default:
134 134 self.json_encoder = JsonEncoder
135 135
136 136 self._required_fields = self.parse()
137 self._skip_fields = dict(zip(self._required_fields,
138 self._required_fields))
137 self._skip_fields = dict(list(zip(self._required_fields,
138 self._required_fields)))
139 139 self._skip_fields.update(self.reserved_attrs)
140 140
141 141 def _str_to_fn(self, fn_as_str):
142 142 """
143 143 If the argument is not a string, return whatever was passed in.
144 144 Parses a string such as package.module.function, imports the module
145 145 and returns the function.
146 146
147 147 :param fn_as_str: The string to parse. If not a string, return it.
148 148 """
149 149 if not isinstance(fn_as_str, str):
150 150 return fn_as_str
151 151
152 152 path, _, function = fn_as_str.rpartition('.')
153 153 module = importlib.import_module(path)
154 154 return getattr(module, function)
155 155
156 156 def parse(self):
157 157 """
158 158 Parses format string looking for substitutions
159 159
160 160 This method is responsible for returning a list of fields (as strings)
161 161 to include in all log messages.
162 162 """
163 163 standard_formatters = re.compile(r'\((.+?)\)', re.IGNORECASE)
164 164 return standard_formatters.findall(self._fmt)
165 165
166 166 def add_fields(self, log_record, record, message_dict):
167 167 """
168 168 Override this method to implement custom logic for adding fields.
169 169 """
170 170 for field in self._required_fields:
171 171 log_record[field] = record.__dict__.get(field)
172 172 log_record.update(message_dict)
173 173 merge_record_extra(record, log_record, reserved=self._skip_fields)
174 174
175 175 if self.timestamp:
176 176 key = self.timestamp if type(self.timestamp) == str else 'timestamp'
177 177 log_record[key] = datetime.fromtimestamp(record.created, tz=utc)
178 178
179 179 def process_log_record(self, log_record):
180 180 """
181 181 Override this method to implement custom logic
182 182 on the possibly ordered dictionary.
183 183 """
184 184 return log_record
185 185
186 186 def jsonify_log_record(self, log_record):
187 187 """Returns a json string of the log record."""
188 188 return self.json_serializer(log_record,
189 189 default=self.json_default,
190 190 cls=self.json_encoder,
191 191 indent=self.json_indent,
192 192 ensure_ascii=self.json_ensure_ascii)
193 193
194 194 def serialize_log_record(self, log_record):
195 195 """Returns the final representation of the log record."""
196 196 return "%s%s" % (self.prefix, self.jsonify_log_record(log_record))
197 197
198 198 def format(self, record):
199 199 """Formats a log record and serializes to json"""
200 200 message_dict = {}
201 201 # FIXME: logging.LogRecord.msg and logging.LogRecord.message in typeshed
202 202 # are always type of str. We shouldn't need to override that.
203 203 if isinstance(record.msg, dict):
204 204 message_dict = record.msg
205 205 record.message = None
206 206 else:
207 207 record.message = record.getMessage()
208 208 # only format time if needed
209 209 if "asctime" in self._required_fields:
210 210 record.asctime = self.formatTime(record, self.datefmt)
211 211
212 212 # Display formatted exception, but allow overriding it in the
213 213 # user-supplied dict.
214 214 if record.exc_info and not message_dict.get('exc_info'):
215 215 message_dict['exc_info'] = self.formatException(record.exc_info)
216 216 if not message_dict.get('exc_info') and record.exc_text:
217 217 message_dict['exc_info'] = record.exc_text
218 218 # Display formatted record of stack frames
219 219 # default format is a string returned from :func:`traceback.print_stack`
220 220 try:
221 221 if record.stack_info and not message_dict.get('stack_info'):
222 222 message_dict['stack_info'] = self.formatStack(record.stack_info)
223 223 except AttributeError:
224 224 # Python2.7 doesn't have stack_info.
225 225 pass
226 226
227 227 try:
228 228 log_record = OrderedDict()
229 229 except NameError:
230 230 log_record = {}
231 231
232 232 _inject_req_id(record, with_prefix=False)
233 233 self.add_fields(log_record, record, message_dict)
234 234 log_record = self.process_log_record(log_record)
235 235
236 236 return self.serialize_log_record(log_record)
@@ -1,390 +1,384 b''
1 1 import sys
2 2 import threading
3 3 import weakref
4 4 from base64 import b64encode
5 5 from logging import getLogger
6 6 from os import urandom
7 7
8 8 from redis import StrictRedis
9 9
10 10 __version__ = '3.7.0'
11 11
12 12 loggers = {
13 13 k: getLogger("rhodecode." + ".".join((__name__, k)))
14 14 for k in [
15 15 "acquire",
16 16 "refresh.thread.start",
17 17 "refresh.thread.stop",
18 18 "refresh.thread.exit",
19 19 "refresh.start",
20 20 "refresh.shutdown",
21 21 "refresh.exit",
22 22 "release",
23 23 ]
24 24 }
25 25
26 PY3 = sys.version_info[0] == 3
27
28 if PY3:
29 text_type = str
30 binary_type = bytes
31 else:
32 text_type = unicode # noqa
33 binary_type = str
26 text_type = str
27 binary_type = bytes
34 28
35 29
36 30 # Check if the id match. If not, return an error code.
37 31 UNLOCK_SCRIPT = b"""
38 32 if redis.call("get", KEYS[1]) ~= ARGV[1] then
39 33 return 1
40 34 else
41 35 redis.call("del", KEYS[2])
42 36 redis.call("lpush", KEYS[2], 1)
43 37 redis.call("pexpire", KEYS[2], ARGV[2])
44 38 redis.call("del", KEYS[1])
45 39 return 0
46 40 end
47 41 """
48 42
49 43 # Covers both cases when key doesn't exist and doesn't equal to lock's id
50 44 EXTEND_SCRIPT = b"""
51 45 if redis.call("get", KEYS[1]) ~= ARGV[1] then
52 46 return 1
53 47 elseif redis.call("ttl", KEYS[1]) < 0 then
54 48 return 2
55 49 else
56 50 redis.call("expire", KEYS[1], ARGV[2])
57 51 return 0
58 52 end
59 53 """
60 54
61 55 RESET_SCRIPT = b"""
62 56 redis.call('del', KEYS[2])
63 57 redis.call('lpush', KEYS[2], 1)
64 58 redis.call('pexpire', KEYS[2], ARGV[2])
65 59 return redis.call('del', KEYS[1])
66 60 """
67 61
68 62 RESET_ALL_SCRIPT = b"""
69 63 local locks = redis.call('keys', 'lock:*')
70 64 local signal
71 65 for _, lock in pairs(locks) do
72 66 signal = 'lock-signal:' .. string.sub(lock, 6)
73 67 redis.call('del', signal)
74 68 redis.call('lpush', signal, 1)
75 69 redis.call('expire', signal, 1)
76 70 redis.call('del', lock)
77 71 end
78 72 return #locks
79 73 """
80 74
81 75
82 76 class AlreadyAcquired(RuntimeError):
83 77 pass
84 78
85 79
86 80 class NotAcquired(RuntimeError):
87 81 pass
88 82
89 83
90 84 class AlreadyStarted(RuntimeError):
91 85 pass
92 86
93 87
94 88 class TimeoutNotUsable(RuntimeError):
95 89 pass
96 90
97 91
98 92 class InvalidTimeout(RuntimeError):
99 93 pass
100 94
101 95
102 96 class TimeoutTooLarge(RuntimeError):
103 97 pass
104 98
105 99
106 100 class NotExpirable(RuntimeError):
107 101 pass
108 102
109 103
110 104 class Lock(object):
111 105 """
112 106 A Lock context manager implemented via redis SETNX/BLPOP.
113 107 """
114 108 unlock_script = None
115 109 extend_script = None
116 110 reset_script = None
117 111 reset_all_script = None
118 112
119 113 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
120 114 """
121 115 :param redis_client:
122 116 An instance of :class:`~StrictRedis`.
123 117 :param name:
124 118 The name (redis key) the lock should have.
125 119 :param expire:
126 120 The lock expiry time in seconds. If left at the default (None)
127 121 the lock will not expire.
128 122 :param id:
129 123 The ID (redis value) the lock should have. A random value is
130 124 generated when left at the default.
131 125
132 126 Note that if you specify this then the lock is marked as "held". Acquires
133 127 won't be possible.
134 128 :param auto_renewal:
135 129 If set to ``True``, Lock will automatically renew the lock so that it
136 130 doesn't expire for as long as the lock is held (acquire() called
137 131 or running in a context manager).
138 132
139 133 Implementation note: Renewal will happen using a daemon thread with
140 134 an interval of ``expire*2/3``. If wishing to use a different renewal
141 135 time, subclass Lock, call ``super().__init__()`` then set
142 136 ``self._lock_renewal_interval`` to your desired interval.
143 137 :param strict:
144 138 If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
145 139 :param signal_expire:
146 140 Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
147 141 """
148 142 if strict and not isinstance(redis_client, StrictRedis):
149 143 raise ValueError("redis_client must be instance of StrictRedis. "
150 144 "Use strict=False if you know what you're doing.")
151 145 if auto_renewal and expire is None:
152 146 raise ValueError("Expire may not be None when auto_renewal is set")
153 147
154 148 self._client = redis_client
155 149
156 150 if expire:
157 151 expire = int(expire)
158 152 if expire < 0:
159 153 raise ValueError("A negative expire is not acceptable.")
160 154 else:
161 155 expire = None
162 156 self._expire = expire
163 157
164 158 self._signal_expire = signal_expire
165 159 if id is None:
166 160 self._id = b64encode(urandom(18)).decode('ascii')
167 161 elif isinstance(id, binary_type):
168 162 try:
169 163 self._id = id.decode('ascii')
170 164 except UnicodeDecodeError:
171 165 self._id = b64encode(id).decode('ascii')
172 166 elif isinstance(id, text_type):
173 167 self._id = id
174 168 else:
175 169 raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id))
176 170 self._name = 'lock:' + name
177 171 self._signal = 'lock-signal:' + name
178 172 self._lock_renewal_interval = (float(expire) * 2 / 3
179 173 if auto_renewal
180 174 else None)
181 175 self._lock_renewal_thread = None
182 176
183 177 self.register_scripts(redis_client)
184 178
185 179 @classmethod
186 180 def register_scripts(cls, redis_client):
187 181 global reset_all_script
188 182 if reset_all_script is None:
189 183 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
190 184 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
191 185 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
192 186 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
193 187 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
194 188
195 189 @property
196 190 def _held(self):
197 191 return self.id == self.get_owner_id()
198 192
199 193 def reset(self):
200 194 """
201 195 Forcibly deletes the lock. Use this with care.
202 196 """
203 197 self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
204 198
205 199 @property
206 200 def id(self):
207 201 return self._id
208 202
209 203 def get_owner_id(self):
210 204 owner_id = self._client.get(self._name)
211 205 if isinstance(owner_id, binary_type):
212 206 owner_id = owner_id.decode('ascii', 'replace')
213 207 return owner_id
214 208
215 209 def acquire(self, blocking=True, timeout=None):
216 210 """
217 211 :param blocking:
218 212 Boolean value specifying whether lock should be blocking or not.
219 213 :param timeout:
220 214 An integer value specifying the maximum number of seconds to block.
221 215 """
222 216 logger = loggers["acquire"]
223 217
224 218 logger.debug("Getting blocking: %s acquire on %r ...", blocking, self._name)
225 219
226 220 if self._held:
227 221 owner_id = self.get_owner_id()
228 222 raise AlreadyAcquired("Already acquired from this Lock instance. Lock id: {}".format(owner_id))
229 223
230 224 if not blocking and timeout is not None:
231 225 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
232 226
233 227 if timeout:
234 228 timeout = int(timeout)
235 229 if timeout < 0:
236 230 raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout)
237 231
238 232 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
239 233 raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire))
240 234
241 235 busy = True
242 236 blpop_timeout = timeout or self._expire or 0
243 237 timed_out = False
244 238 while busy:
245 239 busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
246 240 if busy:
247 241 if timed_out:
248 242 return False
249 243 elif blocking:
250 244 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
251 245 else:
252 246 logger.warning("Failed to get %r.", self._name)
253 247 return False
254 248
255 249 logger.debug("Got lock for %r.", self._name)
256 250 if self._lock_renewal_interval is not None:
257 251 self._start_lock_renewer()
258 252 return True
259 253
260 254 def extend(self, expire=None):
261 255 """Extends expiration time of the lock.
262 256
263 257 :param expire:
264 258 New expiration time. If ``None`` - `expire` provided during
265 259 lock initialization will be taken.
266 260 """
267 261 if expire:
268 262 expire = int(expire)
269 263 if expire < 0:
270 264 raise ValueError("A negative expire is not acceptable.")
271 265 elif self._expire is not None:
272 266 expire = self._expire
273 267 else:
274 268 raise TypeError(
275 269 "To extend a lock 'expire' must be provided as an "
276 270 "argument to extend() method or at initialization time."
277 271 )
278 272
279 273 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
280 274 if error == 1:
281 275 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
282 276 elif error == 2:
283 277 raise NotExpirable("Lock %s has no assigned expiration time" % self._name)
284 278 elif error:
285 279 raise RuntimeError("Unsupported error code %s from EXTEND script" % error)
286 280
287 281 @staticmethod
288 282 def _lock_renewer(lockref, interval, stop):
289 283 """
290 284 Renew the lock key in redis every `interval` seconds for as long
291 285 as `self._lock_renewal_thread.should_exit` is False.
292 286 """
293 287 while not stop.wait(timeout=interval):
294 288 loggers["refresh.thread.start"].debug("Refreshing lock")
295 289 lock = lockref()
296 290 if lock is None:
297 291 loggers["refresh.thread.stop"].debug(
298 292 "The lock no longer exists, stopping lock refreshing"
299 293 )
300 294 break
301 295 lock.extend(expire=lock._expire)
302 296 del lock
303 297 loggers["refresh.thread.exit"].debug("Exit requested, stopping lock refreshing")
304 298
305 299 def _start_lock_renewer(self):
306 300 """
307 301 Starts the lock refresher thread.
308 302 """
309 303 if self._lock_renewal_thread is not None:
310 304 raise AlreadyStarted("Lock refresh thread already started")
311 305
312 306 loggers["refresh.start"].debug(
313 307 "Starting thread to refresh lock every %s seconds",
314 308 self._lock_renewal_interval
315 309 )
316 310 self._lock_renewal_stop = threading.Event()
317 311 self._lock_renewal_thread = threading.Thread(
318 312 group=None,
319 313 target=self._lock_renewer,
320 314 kwargs={'lockref': weakref.ref(self),
321 315 'interval': self._lock_renewal_interval,
322 316 'stop': self._lock_renewal_stop}
323 317 )
324 318 self._lock_renewal_thread.setDaemon(True)
325 319 self._lock_renewal_thread.start()
326 320
327 321 def _stop_lock_renewer(self):
328 322 """
329 323 Stop the lock renewer.
330 324
331 325 This signals the renewal thread and waits for its exit.
332 326 """
333 327 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
334 328 return
335 329 loggers["refresh.shutdown"].debug("Signalling the lock refresher to stop")
336 330 self._lock_renewal_stop.set()
337 331 self._lock_renewal_thread.join()
338 332 self._lock_renewal_thread = None
339 333 loggers["refresh.exit"].debug("Lock refresher has stopped")
340 334
341 335 def __enter__(self):
342 336 acquired = self.acquire(blocking=True)
343 337 assert acquired, "Lock wasn't acquired, but blocking=True"
344 338 return self
345 339
346 340 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
347 341 self.release()
348 342
349 343 def release(self):
350 344 """Releases the lock, that was acquired with the same object.
351 345
352 346 .. note::
353 347
354 348 If you want to release a lock that you acquired in a different place you have two choices:
355 349
356 350 * Use ``Lock("name", id=id_from_other_place).release()``
357 351 * Use ``Lock("name").reset()``
358 352 """
359 353 if self._lock_renewal_thread is not None:
360 354 self._stop_lock_renewer()
361 355 loggers["release"].debug("Releasing %r.", self._name)
362 356 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
363 357 if error == 1:
364 358 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
365 359 elif error:
366 360 raise RuntimeError("Unsupported error code %s from EXTEND script." % error)
367 361
368 362 def locked(self):
369 363 """
370 364 Return true if the lock is acquired.
371 365
372 366 Checks that lock with same name already exists. This method returns true, even if
373 367 lock have another id.
374 368 """
375 369 return self._client.exists(self._name) == 1
376 370
377 371
378 372 reset_all_script = None
379 373
380 374
381 375 def reset_all(redis_client):
382 376 """
383 377 Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
384 378
385 379 :param redis_client:
386 380 An instance of :class:`~StrictRedis`.
387 381 """
388 382 Lock.register_scripts(redis_client)
389 383
390 384 reset_all_script(client=redis_client) # noqa
@@ -1,52 +1,52 b''
1 from __future__ import absolute_import, division, unicode_literals
1
2 2
3 3 import logging
4 4
5 5 from .stream import TCPStatsClient, UnixSocketStatsClient # noqa
6 6 from .udp import StatsClient # noqa
7 7
8 8 HOST = 'localhost'
9 9 PORT = 8125
10 10 IPV6 = False
11 11 PREFIX = None
12 12 MAXUDPSIZE = 512
13 13
14 14 log = logging.getLogger('rhodecode.statsd')
15 15
16 16
17 17 def statsd_config(config, prefix='statsd.'):
18 18 _config = {}
19 19 for key in config.keys():
20 20 if key.startswith(prefix):
21 21 _config[key[len(prefix):]] = config[key]
22 22 return _config
23 23
24 24
25 25 def client_from_config(configuration, prefix='statsd.', **kwargs):
26 26 from pyramid.settings import asbool
27 27
28 28 _config = statsd_config(configuration, prefix)
29 29 statsd_enabled = asbool(_config.pop('enabled', False))
30 30 if not statsd_enabled:
31 31 log.debug('statsd client not enabled by statsd.enabled = flag, skipping...')
32 32 return
33 33
34 34 host = _config.pop('statsd_host', HOST)
35 35 port = _config.pop('statsd_port', PORT)
36 36 prefix = _config.pop('statsd_prefix', PREFIX)
37 37 maxudpsize = _config.pop('statsd_maxudpsize', MAXUDPSIZE)
38 38 ipv6 = asbool(_config.pop('statsd_ipv6', IPV6))
39 39 log.debug('configured statsd client %s:%s', host, port)
40 40
41 41 try:
42 42 client = StatsClient(
43 43 host=host, port=port, prefix=prefix, maxudpsize=maxudpsize, ipv6=ipv6)
44 44 except Exception:
45 45 log.exception('StatsD is enabled, but failed to connect to statsd server, fallback: disable statsd')
46 46 client = None
47 47
48 48 return client
49 49
50 50
51 51 def get_statsd_client(request):
52 52 return client_from_config(request.registry.settings)
@@ -1,156 +1,156 b''
1 from __future__ import absolute_import, division, unicode_literals
1
2 2
3 3 import re
4 4 import random
5 5 from collections import deque
6 6 from datetime import timedelta
7 7 from repoze.lru import lru_cache
8 8
9 9 from .timer import Timer
10 10
11 11 TAG_INVALID_CHARS_RE = re.compile(
12 12 r"[^\w\d_\-:/\.]",
13 13 #re.UNICODE
14 14 )
15 15 TAG_INVALID_CHARS_SUBS = "_"
16 16
17 17 # we save and expose methods called by statsd for discovery
18 18 buckets_dict = {
19 19
20 20 }
21 21
22 22
23 23 @lru_cache(maxsize=500)
24 24 def _normalize_tags_with_cache(tag_list):
25 25 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
26 26
27 27
28 28 def normalize_tags(tag_list):
29 29 # We have to turn our input tag list into a non-mutable tuple for it to
30 30 # be hashable (and thus usable) by the @lru_cache decorator.
31 31 return _normalize_tags_with_cache(tuple(tag_list))
32 32
33 33
34 34 class StatsClientBase(object):
35 35 """A Base class for various statsd clients."""
36 36
37 37 def close(self):
38 38 """Used to close and clean up any underlying resources."""
39 39 raise NotImplementedError()
40 40
41 41 def _send(self):
42 42 raise NotImplementedError()
43 43
44 44 def pipeline(self):
45 45 raise NotImplementedError()
46 46
47 47 def timer(self, stat, rate=1, tags=None, auto_send=True):
48 48 """
49 49 statsd = StatsdClient.statsd
50 50 with statsd.timer('bucket_name', auto_send=True) as tmr:
51 51 # This block will be timed.
52 52 for i in range(0, 100000):
53 53 i ** 2
54 54 # you can access time here...
55 55 elapsed_ms = tmr.ms
56 56 """
57 57 return Timer(self, stat, rate, tags, auto_send=auto_send)
58 58
59 59 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
60 60 """
61 61 Send new timing information.
62 62
63 63 `delta` can be either a number of milliseconds or a timedelta.
64 64 """
65 65 if isinstance(delta, timedelta):
66 66 # Convert timedelta to number of milliseconds.
67 67 delta = delta.total_seconds() * 1000.
68 68 if use_decimals:
69 69 fmt = '%0.6f|ms'
70 70 else:
71 71 fmt = '%s|ms'
72 72 self._send_stat(stat, fmt % delta, rate, tags)
73 73
74 74 def incr(self, stat, count=1, rate=1, tags=None):
75 75 """Increment a stat by `count`."""
76 76 self._send_stat(stat, '%s|c' % count, rate, tags)
77 77
78 78 def decr(self, stat, count=1, rate=1, tags=None):
79 79 """Decrement a stat by `count`."""
80 80 self.incr(stat, -count, rate, tags)
81 81
82 82 def gauge(self, stat, value, rate=1, delta=False, tags=None):
83 83 """Set a gauge value."""
84 84 if value < 0 and not delta:
85 85 if rate < 1:
86 86 if random.random() > rate:
87 87 return
88 88 with self.pipeline() as pipe:
89 89 pipe._send_stat(stat, '0|g', 1)
90 90 pipe._send_stat(stat, '%s|g' % value, 1)
91 91 else:
92 92 prefix = '+' if delta and value >= 0 else ''
93 93 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
94 94
95 95 def set(self, stat, value, rate=1):
96 96 """Set a set value."""
97 97 self._send_stat(stat, '%s|s' % value, rate)
98 98
99 99 def histogram(self, stat, value, rate=1, tags=None):
100 100 """Set a histogram"""
101 101 self._send_stat(stat, '%s|h' % value, rate, tags)
102 102
103 103 def _send_stat(self, stat, value, rate, tags=None):
104 104 self._after(self._prepare(stat, value, rate, tags))
105 105
106 106 def _prepare(self, stat, value, rate, tags=None):
107 107 global buckets_dict
108 108 buckets_dict[stat] = 1
109 109
110 110 if rate < 1:
111 111 if random.random() > rate:
112 112 return
113 113 value = '%s|@%s' % (value, rate)
114 114
115 115 if self._prefix:
116 116 stat = '%s.%s' % (self._prefix, stat)
117 117
118 118 res = '%s:%s%s' % (
119 119 stat,
120 120 value,
121 121 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
122 122 )
123 123 return res
124 124
125 125 def _after(self, data):
126 126 if data:
127 127 self._send(data)
128 128
129 129
130 130 class PipelineBase(StatsClientBase):
131 131
132 132 def __init__(self, client):
133 133 self._client = client
134 134 self._prefix = client._prefix
135 135 self._stats = deque()
136 136
137 137 def _send(self):
138 138 raise NotImplementedError()
139 139
140 140 def _after(self, data):
141 141 if data is not None:
142 142 self._stats.append(data)
143 143
144 144 def __enter__(self):
145 145 return self
146 146
147 147 def __exit__(self, typ, value, tb):
148 148 self.send()
149 149
150 150 def send(self):
151 151 if not self._stats:
152 152 return
153 153 self._send()
154 154
155 155 def pipeline(self):
156 156 return self.__class__(self)
@@ -1,75 +1,75 b''
1 from __future__ import absolute_import, division, unicode_literals
1
2 2
3 3 import socket
4 4
5 5 from .base import StatsClientBase, PipelineBase
6 6
7 7
8 8 class StreamPipeline(PipelineBase):
9 9 def _send(self):
10 10 self._client._after('\n'.join(self._stats))
11 11 self._stats.clear()
12 12
13 13
14 14 class StreamClientBase(StatsClientBase):
15 15 def connect(self):
16 16 raise NotImplementedError()
17 17
18 18 def close(self):
19 19 if self._sock and hasattr(self._sock, 'close'):
20 20 self._sock.close()
21 21 self._sock = None
22 22
23 23 def reconnect(self):
24 24 self.close()
25 25 self.connect()
26 26
27 27 def pipeline(self):
28 28 return StreamPipeline(self)
29 29
30 30 def _send(self, data):
31 31 """Send data to statsd."""
32 32 if not self._sock:
33 33 self.connect()
34 34 self._do_send(data)
35 35
36 36 def _do_send(self, data):
37 37 self._sock.sendall(data.encode('ascii') + b'\n')
38 38
39 39
40 40 class TCPStatsClient(StreamClientBase):
41 41 """TCP version of StatsClient."""
42 42
43 43 def __init__(self, host='localhost', port=8125, prefix=None,
44 44 timeout=None, ipv6=False):
45 45 """Create a new client."""
46 46 self._host = host
47 47 self._port = port
48 48 self._ipv6 = ipv6
49 49 self._timeout = timeout
50 50 self._prefix = prefix
51 51 self._sock = None
52 52
53 53 def connect(self):
54 54 fam = socket.AF_INET6 if self._ipv6 else socket.AF_INET
55 55 family, _, _, _, addr = socket.getaddrinfo(
56 56 self._host, self._port, fam, socket.SOCK_STREAM)[0]
57 57 self._sock = socket.socket(family, socket.SOCK_STREAM)
58 58 self._sock.settimeout(self._timeout)
59 59 self._sock.connect(addr)
60 60
61 61
62 62 class UnixSocketStatsClient(StreamClientBase):
63 63 """Unix domain socket version of StatsClient."""
64 64
65 65 def __init__(self, socket_path, prefix=None, timeout=None):
66 66 """Create a new client."""
67 67 self._socket_path = socket_path
68 68 self._timeout = timeout
69 69 self._prefix = prefix
70 70 self._sock = None
71 71
72 72 def connect(self):
73 73 self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
74 74 self._sock.settimeout(self._timeout)
75 75 self._sock.connect(self._socket_path)
@@ -1,75 +1,68 b''
1 1 from __future__ import absolute_import, division, unicode_literals
2 2
3 3 import functools
4
5 # Use timer that's not susceptible to time of day adjustments.
6 try:
7 # perf_counter is only present on Py3.3+
8 from time import perf_counter as time_now
9 except ImportError:
10 # fall back to using time
11 from time import time as time_now
4 from time import perf_counter as time_now
12 5
13 6
14 7 def safe_wraps(wrapper, *args, **kwargs):
15 8 """Safely wraps partial functions."""
16 9 while isinstance(wrapper, functools.partial):
17 10 wrapper = wrapper.func
18 11 return functools.wraps(wrapper, *args, **kwargs)
19 12
20 13
21 14 class Timer(object):
22 15 """A context manager/decorator for statsd.timing()."""
23 16
24 17 def __init__(self, client, stat, rate=1, tags=None, use_decimals=True, auto_send=True):
25 18 self.client = client
26 19 self.stat = stat
27 20 self.rate = rate
28 21 self.tags = tags
29 22 self.ms = None
30 23 self._sent = False
31 24 self._start_time = None
32 25 self.use_decimals = use_decimals
33 26 self.auto_send = auto_send
34 27
35 28 def __call__(self, f):
36 29 """Thread-safe timing function decorator."""
37 30 @safe_wraps(f)
38 31 def _wrapped(*args, **kwargs):
39 32 start_time = time_now()
40 33 try:
41 34 return f(*args, **kwargs)
42 35 finally:
43 36 elapsed_time_ms = 1000.0 * (time_now() - start_time)
44 37 self.client.timing(self.stat, elapsed_time_ms, self.rate, self.tags, self.use_decimals)
45 38 self._sent = True
46 39 return _wrapped
47 40
48 41 def __enter__(self):
49 42 return self.start()
50 43
51 44 def __exit__(self, typ, value, tb):
52 45 self.stop(send=self.auto_send)
53 46
54 47 def start(self):
55 48 self.ms = None
56 49 self._sent = False
57 50 self._start_time = time_now()
58 51 return self
59 52
60 53 def stop(self, send=True):
61 54 if self._start_time is None:
62 55 raise RuntimeError('Timer has not started.')
63 56 dt = time_now() - self._start_time
64 57 self.ms = 1000.0 * dt # Convert to milliseconds.
65 58 if send:
66 59 self.send()
67 60 return self
68 61
69 62 def send(self):
70 63 if self.ms is None:
71 64 raise RuntimeError('No data recorded.')
72 65 if self._sent:
73 66 raise RuntimeError('Already sent data.')
74 67 self._sent = True
75 68 self.client.timing(self.stat, self.ms, self.rate, self.tags, self.use_decimals)
@@ -1,55 +1,55 b''
1 from __future__ import absolute_import, division, unicode_literals
1
2 2
3 3 import socket
4 4
5 5 from .base import StatsClientBase, PipelineBase
6 6
7 7
8 8 class Pipeline(PipelineBase):
9 9
10 10 def __init__(self, client):
11 11 super(Pipeline, self).__init__(client)
12 12 self._maxudpsize = client._maxudpsize
13 13
14 14 def _send(self):
15 15 data = self._stats.popleft()
16 16 while self._stats:
17 17 # Use popleft to preserve the order of the stats.
18 18 stat = self._stats.popleft()
19 19 if len(stat) + len(data) + 1 >= self._maxudpsize:
20 20 self._client._after(data)
21 21 data = stat
22 22 else:
23 23 data += '\n' + stat
24 24 self._client._after(data)
25 25
26 26
27 27 class StatsClient(StatsClientBase):
28 28 """A client for statsd."""
29 29
30 30 def __init__(self, host='localhost', port=8125, prefix=None,
31 31 maxudpsize=512, ipv6=False):
32 32 """Create a new client."""
33 33 fam = socket.AF_INET6 if ipv6 else socket.AF_INET
34 34 family, _, _, _, addr = socket.getaddrinfo(
35 35 host, port, fam, socket.SOCK_DGRAM)[0]
36 36 self._addr = addr
37 37 self._sock = socket.socket(family, socket.SOCK_DGRAM)
38 38 self._prefix = prefix
39 39 self._maxudpsize = maxudpsize
40 40
41 41 def _send(self, data):
42 42 """Send data to statsd."""
43 43 try:
44 44 self._sock.sendto(data.encode('ascii'), self._addr)
45 45 except (socket.error, RuntimeError):
46 46 # No time for love, Dr. Jones!
47 47 pass
48 48
49 49 def close(self):
50 50 if self._sock and hasattr(self._sock, 'close'):
51 51 self._sock.close()
52 52 self._sock = None
53 53
54 54 def pipeline(self):
55 55 return Pipeline(self)
General Comments 0
You need to be logged in to leave comments. Login now