##// END OF EJS Templates
core: refactor of vendor lib to easier sync with vcsserver
super-admin -
r5431:2e534b16 default
parent child Browse files
Show More
@@ -1,237 +1,237 b''
1 1 '''
2 2 This library is provided to allow standard python logging
3 3 to output log data as JSON formatted strings
4 4 '''
5 5 import logging
6 6 import re
7 7 from datetime import date, datetime, time, tzinfo, timedelta
8 8 import traceback
9 9 import importlib
10 10
11 11 from inspect import istraceback
12 12
13 13 from collections import OrderedDict
14 from rhodecode.lib.logging_formatter import _inject_req_id, ExceptionAwareFormatter
15 from rhodecode.lib.ext_json import sjson as json
16 14
15 from ...logging_formatter import _inject_req_id, ExceptionAwareFormatter
16 from ...ext_json import sjson as json
17 17
18 18 ZERO = timedelta(0)
19 19 HOUR = timedelta(hours=1)
20 20
21 21
22 22 class UTC(tzinfo):
23 23 """UTC"""
24 24
25 25 def utcoffset(self, dt):
26 26 return ZERO
27 27
28 28 def tzname(self, dt):
29 29 return "UTC"
30 30
31 31 def dst(self, dt):
32 32 return ZERO
33 33
34 34 utc = UTC()
35 35
36 36
37 37 # skip natural LogRecord attributes
38 38 # http://docs.python.org/library/logging.html#logrecord-attributes
39 39 RESERVED_ATTRS = (
40 40 'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename',
41 41 'funcName', 'levelname', 'levelno', 'lineno', 'module',
42 42 'msecs', 'message', 'msg', 'name', 'pathname', 'process',
43 43 'processName', 'relativeCreated', 'stack_info', 'thread', 'threadName')
44 44
45 45
46 46 def merge_record_extra(record, target, reserved):
47 47 """
48 48 Merges extra attributes from LogRecord object into target dictionary
49 49
50 50 :param record: logging.LogRecord
51 51 :param target: dict to update
52 52 :param reserved: dict or list with reserved keys to skip
53 53 """
54 54 for key, value in record.__dict__.items():
55 55 # this allows to have numeric keys
56 56 if (key not in reserved
57 57 and not (hasattr(key, "startswith")
58 58 and key.startswith('_'))):
59 59 target[key] = value
60 60 return target
61 61
62 62
63 63 class JsonEncoder(json.JSONEncoder):
64 64 """
65 65 A custom encoder extending the default JSONEncoder
66 66 """
67 67
68 68 def default(self, obj):
69 69 if isinstance(obj, (date, datetime, time)):
70 70 return self.format_datetime_obj(obj)
71 71
72 72 elif istraceback(obj):
73 73 return ''.join(traceback.format_tb(obj)).strip()
74 74
75 75 elif type(obj) == Exception \
76 76 or isinstance(obj, Exception) \
77 77 or type(obj) == type:
78 78 return str(obj)
79 79
80 80 try:
81 return super(JsonEncoder, self).default(obj)
81 return super().default(obj)
82 82
83 83 except TypeError:
84 84 try:
85 85 return str(obj)
86 86
87 87 except Exception:
88 88 return None
89 89
90 90 def format_datetime_obj(self, obj):
91 91 return obj.isoformat()
92 92
93 93
94 94 class JsonFormatter(ExceptionAwareFormatter):
95 95 """
96 96 A custom formatter to format logging records as json strings.
97 97 Extra values will be formatted as str() if not supported by
98 98 json default encoder
99 99 """
100 100
101 101 def __init__(self, *args, **kwargs):
102 102 """
103 103 :param json_default: a function for encoding non-standard objects
104 104 as outlined in http://docs.python.org/2/library/json.html
105 105 :param json_encoder: optional custom encoder
106 106 :param json_serializer: a :meth:`json.dumps`-compatible callable
107 107 that will be used to serialize the log record.
108 108 :param json_indent: an optional :meth:`json.dumps`-compatible numeric value
109 109 that will be used to customize the indent of the output json.
110 110 :param prefix: an optional string prefix added at the beginning of
111 111 the formatted string
112 112 :param json_indent: indent parameter for json.dumps
113 113 :param json_ensure_ascii: ensure_ascii parameter for json.dumps
114 114 :param reserved_attrs: an optional list of fields that will be skipped when
115 115 outputting json log record. Defaults to all log record attributes:
116 116 http://docs.python.org/library/logging.html#logrecord-attributes
117 117 :param timestamp: an optional string/boolean field to add a timestamp when
118 118 outputting the json log record. If string is passed, timestamp will be added
119 119 to log record using string as key. If True boolean is passed, timestamp key
120 120 will be "timestamp". Defaults to False/off.
121 121 """
122 122 self.json_default = self._str_to_fn(kwargs.pop("json_default", None))
123 123 self.json_encoder = self._str_to_fn(kwargs.pop("json_encoder", None))
124 124 self.json_serializer = self._str_to_fn(kwargs.pop("json_serializer", json.dumps))
125 125 self.json_indent = kwargs.pop("json_indent", None)
126 126 self.json_ensure_ascii = kwargs.pop("json_ensure_ascii", True)
127 127 self.prefix = kwargs.pop("prefix", "")
128 128 reserved_attrs = kwargs.pop("reserved_attrs", RESERVED_ATTRS)
129 129 self.reserved_attrs = dict(list(zip(reserved_attrs, reserved_attrs)))
130 130 self.timestamp = kwargs.pop("timestamp", True)
131 131
132 132 # super(JsonFormatter, self).__init__(*args, **kwargs)
133 133 logging.Formatter.__init__(self, *args, **kwargs)
134 134 if not self.json_encoder and not self.json_default:
135 135 self.json_encoder = JsonEncoder
136 136
137 137 self._required_fields = self.parse()
138 138 self._skip_fields = dict(list(zip(self._required_fields,
139 139 self._required_fields)))
140 140 self._skip_fields.update(self.reserved_attrs)
141 141
142 142 def _str_to_fn(self, fn_as_str):
143 143 """
144 144 If the argument is not a string, return whatever was passed in.
145 145 Parses a string such as package.module.function, imports the module
146 146 and returns the function.
147 147
148 148 :param fn_as_str: The string to parse. If not a string, return it.
149 149 """
150 150 if not isinstance(fn_as_str, str):
151 151 return fn_as_str
152 152
153 153 path, _, function = fn_as_str.rpartition('.')
154 154 module = importlib.import_module(path)
155 155 return getattr(module, function)
156 156
157 157 def parse(self):
158 158 """
159 159 Parses format string looking for substitutions
160 160
161 161 This method is responsible for returning a list of fields (as strings)
162 162 to include in all log messages.
163 163 """
164 164 standard_formatters = re.compile(r'\((.+?)\)', re.IGNORECASE)
165 165 return standard_formatters.findall(self._fmt)
166 166
167 167 def add_fields(self, log_record, record, message_dict):
168 168 """
169 169 Override this method to implement custom logic for adding fields.
170 170 """
171 171 for field in self._required_fields:
172 172 log_record[field] = record.__dict__.get(field)
173 173 log_record.update(message_dict)
174 174 merge_record_extra(record, log_record, reserved=self._skip_fields)
175 175
176 176 if self.timestamp:
177 177 key = self.timestamp if type(self.timestamp) == str else 'timestamp'
178 178 log_record[key] = datetime.fromtimestamp(record.created, tz=utc)
179 179
180 180 def process_log_record(self, log_record):
181 181 """
182 182 Override this method to implement custom logic
183 183 on the possibly ordered dictionary.
184 184 """
185 185 return log_record
186 186
187 187 def jsonify_log_record(self, log_record):
188 188 """Returns a json string of the log record."""
189 189 return self.json_serializer(log_record,
190 190 default=self.json_default,
191 191 cls=self.json_encoder,
192 192 indent=self.json_indent,
193 193 ensure_ascii=self.json_ensure_ascii)
194 194
195 195 def serialize_log_record(self, log_record):
196 196 """Returns the final representation of the log record."""
197 return "%s%s" % (self.prefix, self.jsonify_log_record(log_record))
197 return "{}{}".format(self.prefix, self.jsonify_log_record(log_record))
198 198
199 199 def format(self, record):
200 200 """Formats a log record and serializes to json"""
201 201 message_dict = {}
202 202 # FIXME: logging.LogRecord.msg and logging.LogRecord.message in typeshed
203 203 # are always type of str. We shouldn't need to override that.
204 204 if isinstance(record.msg, dict):
205 205 message_dict = record.msg
206 206 record.message = None
207 207 else:
208 208 record.message = record.getMessage()
209 209 # only format time if needed
210 210 if "asctime" in self._required_fields:
211 211 record.asctime = self.formatTime(record, self.datefmt)
212 212
213 213 # Display formatted exception, but allow overriding it in the
214 214 # user-supplied dict.
215 215 if record.exc_info and not message_dict.get('exc_info'):
216 216 message_dict['exc_info'] = self.formatException(record.exc_info)
217 217 if not message_dict.get('exc_info') and record.exc_text:
218 218 message_dict['exc_info'] = record.exc_text
219 219 # Display formatted record of stack frames
220 220 # default format is a string returned from :func:`traceback.print_stack`
221 221 try:
222 222 if record.stack_info and not message_dict.get('stack_info'):
223 223 message_dict['stack_info'] = self.formatStack(record.stack_info)
224 224 except AttributeError:
225 225 # Python2.7 doesn't have stack_info.
226 226 pass
227 227
228 228 try:
229 229 log_record = OrderedDict()
230 230 except NameError:
231 231 log_record = {}
232 232
233 233 _inject_req_id(record, with_prefix=False)
234 234 self.add_fields(log_record, record, message_dict)
235 235 log_record = self.process_log_record(log_record)
236 236
237 237 return self.serialize_log_record(log_record)
@@ -1,402 +1,402 b''
1 1
2 2 import threading
3 3 import weakref
4 4 from base64 import b64encode
5 5 from logging import getLogger
6 6 from os import urandom
7 7 from typing import Union
8 8
9 9 from redis import StrictRedis
10 10
11 11 __version__ = '4.0.0'
12 12
13 13 loggers = {
14 14 k: getLogger("rhodecode." + ".".join((__name__, k)))
15 15 for k in [
16 16 "acquire",
17 17 "refresh.thread.start",
18 18 "refresh.thread.stop",
19 19 "refresh.thread.exit",
20 20 "refresh.start",
21 21 "refresh.shutdown",
22 22 "refresh.exit",
23 23 "release",
24 24 ]
25 25 }
26 26
27 27 text_type = str
28 28 binary_type = bytes
29 29
30 30
31 31 # Check if the id match. If not, return an error code.
32 32 UNLOCK_SCRIPT = b"""
33 33 if redis.call("get", KEYS[1]) ~= ARGV[1] then
34 34 return 1
35 35 else
36 36 redis.call("del", KEYS[2])
37 37 redis.call("lpush", KEYS[2], 1)
38 38 redis.call("pexpire", KEYS[2], ARGV[2])
39 39 redis.call("del", KEYS[1])
40 40 return 0
41 41 end
42 42 """
43 43
44 44 # Covers both cases when key doesn't exist and doesn't equal to lock's id
45 45 EXTEND_SCRIPT = b"""
46 46 if redis.call("get", KEYS[1]) ~= ARGV[1] then
47 47 return 1
48 48 elseif redis.call("ttl", KEYS[1]) < 0 then
49 49 return 2
50 50 else
51 51 redis.call("expire", KEYS[1], ARGV[2])
52 52 return 0
53 53 end
54 54 """
55 55
56 56 RESET_SCRIPT = b"""
57 57 redis.call('del', KEYS[2])
58 58 redis.call('lpush', KEYS[2], 1)
59 59 redis.call('pexpire', KEYS[2], ARGV[2])
60 60 return redis.call('del', KEYS[1])
61 61 """
62 62
63 63 RESET_ALL_SCRIPT = b"""
64 64 local locks = redis.call('keys', 'lock:*')
65 65 local signal
66 66 for _, lock in pairs(locks) do
67 67 signal = 'lock-signal:' .. string.sub(lock, 6)
68 68 redis.call('del', signal)
69 69 redis.call('lpush', signal, 1)
70 70 redis.call('expire', signal, 1)
71 71 redis.call('del', lock)
72 72 end
73 73 return #locks
74 74 """
75 75
76 76
77 77 class AlreadyAcquired(RuntimeError):
78 78 pass
79 79
80 80
81 81 class NotAcquired(RuntimeError):
82 82 pass
83 83
84 84
85 85 class AlreadyStarted(RuntimeError):
86 86 pass
87 87
88 88
89 89 class TimeoutNotUsable(RuntimeError):
90 90 pass
91 91
92 92
93 93 class InvalidTimeout(RuntimeError):
94 94 pass
95 95
96 96
97 97 class TimeoutTooLarge(RuntimeError):
98 98 pass
99 99
100 100
101 101 class NotExpirable(RuntimeError):
102 102 pass
103 103
104 104
105 class Lock(object):
105 class Lock:
106 106 """
107 107 A Lock context manager implemented via redis SETNX/BLPOP.
108 108 """
109 109
110 110 unlock_script = None
111 111 extend_script = None
112 112 reset_script = None
113 113 reset_all_script = None
114 114 blocking = None
115 115
116 116 _lock_renewal_interval: float
117 117 _lock_renewal_thread: Union[threading.Thread, None]
118 118
119 119 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000, blocking=True):
120 120 """
121 121 :param redis_client:
122 122 An instance of :class:`~StrictRedis`.
123 123 :param name:
124 124 The name (redis key) the lock should have.
125 125 :param expire:
126 126 The lock expiry time in seconds. If left at the default (None)
127 127 the lock will not expire.
128 128 :param id:
129 129 The ID (redis value) the lock should have. A random value is
130 130 generated when left at the default.
131 131
132 132 Note that if you specify this then the lock is marked as "held". Acquires
133 133 won't be possible.
134 134 :param auto_renewal:
135 135 If set to ``True``, Lock will automatically renew the lock so that it
136 136 doesn't expire for as long as the lock is held (acquire() called
137 137 or running in a context manager).
138 138
139 139 Implementation note: Renewal will happen using a daemon thread with
140 140 an interval of ``expire*2/3``. If wishing to use a different renewal
141 141 time, subclass Lock, call ``super().__init__()`` then set
142 142 ``self._lock_renewal_interval`` to your desired interval.
143 143 :param strict:
144 144 If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
145 145 :param signal_expire:
146 146 Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
147 147 :param blocking:
148 148 Boolean value specifying whether lock should be blocking or not.
149 149 Used in `__enter__` method.
150 150 """
151 151 if strict and not isinstance(redis_client, StrictRedis):
152 152 raise ValueError("redis_client must be instance of StrictRedis. "
153 153 "Use strict=False if you know what you're doing.")
154 154 if auto_renewal and expire is None:
155 155 raise ValueError("Expire may not be None when auto_renewal is set")
156 156
157 157 self._client = redis_client
158 158
159 159 if expire:
160 160 expire = int(expire)
161 161 if expire < 0:
162 162 raise ValueError("A negative expire is not acceptable.")
163 163 else:
164 164 expire = None
165 165 self._expire = expire
166 166
167 167 self._signal_expire = signal_expire
168 168 if id is None:
169 169 self._id = b64encode(urandom(18)).decode('ascii')
170 170 elif isinstance(id, binary_type):
171 171 try:
172 172 self._id = id.decode('ascii')
173 173 except UnicodeDecodeError:
174 174 self._id = b64encode(id).decode('ascii')
175 175 elif isinstance(id, text_type):
176 176 self._id = id
177 177 else:
178 178 raise TypeError(f"Incorrect type for `id`. Must be bytes/str not {type(id)}.")
179 179 self._name = 'lock:' + name
180 180 self._signal = 'lock-signal:' + name
181 181 self._lock_renewal_interval = (float(expire) * 2 / 3
182 182 if auto_renewal
183 183 else None)
184 184 self._lock_renewal_thread = None
185 185
186 186 self.blocking = blocking
187 187
188 188 self.register_scripts(redis_client)
189 189
190 190 @classmethod
191 191 def register_scripts(cls, redis_client):
192 192 global reset_all_script
193 193 if reset_all_script is None:
194 194 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
195 195 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
196 196 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
197 197 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
198 198 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
199 199
200 200 @property
201 201 def _held(self):
202 202 return self.id == self.get_owner_id()
203 203
204 204 def reset(self):
205 205 """
206 206 Forcibly deletes the lock. Use this with care.
207 207 """
208 208 self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
209 209
210 210 @property
211 211 def id(self):
212 212 return self._id
213 213
214 214 def get_owner_id(self):
215 215 owner_id = self._client.get(self._name)
216 216 if isinstance(owner_id, binary_type):
217 217 owner_id = owner_id.decode('ascii', 'replace')
218 218 return owner_id
219 219
220 220 def acquire(self, blocking=True, timeout=None):
221 221 """
222 222 :param blocking:
223 223 Boolean value specifying whether lock should be blocking or not.
224 224 :param timeout:
225 225 An integer value specifying the maximum number of seconds to block.
226 226 """
227 227 logger = loggers["acquire"]
228 228
229 229 logger.debug("Getting blocking: %s acquire on %r ...", blocking, self._name)
230 230
231 231 if self._held:
232 232 owner_id = self.get_owner_id()
233 233 raise AlreadyAcquired("Already acquired from this Lock instance. Lock id: {}".format(owner_id))
234 234
235 235 if not blocking and timeout is not None:
236 236 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
237 237
238 238 if timeout:
239 239 timeout = int(timeout)
240 240 if timeout < 0:
241 241 raise InvalidTimeout(f"Timeout ({timeout}) cannot be less than or equal to 0")
242 242
243 243 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
244 244 raise TimeoutTooLarge(f"Timeout ({timeout}) cannot be greater than expire ({self._expire})")
245 245
246 246 busy = True
247 247 blpop_timeout = timeout or self._expire or 0
248 248 timed_out = False
249 249 while busy:
250 250 busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
251 251 if busy:
252 252 if timed_out:
253 253 return False
254 254 elif blocking:
255 255 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
256 256 else:
257 257 logger.warning("Failed to acquire Lock(%r).", self._name)
258 258 return False
259 259
260 260 logger.debug("Acquired Lock(%r).", self._name)
261 261 if self._lock_renewal_interval is not None:
262 262 self._start_lock_renewer()
263 263 return True
264 264
265 265 def extend(self, expire=None):
266 266 """
267 267 Extends expiration time of the lock.
268 268
269 269 :param expire:
270 270 New expiration time. If ``None`` - `expire` provided during
271 271 lock initialization will be taken.
272 272 """
273 273 if expire:
274 274 expire = int(expire)
275 275 if expire < 0:
276 276 raise ValueError("A negative expire is not acceptable.")
277 277 elif self._expire is not None:
278 278 expire = self._expire
279 279 else:
280 280 raise TypeError(
281 281 "To extend a lock 'expire' must be provided as an "
282 282 "argument to extend() method or at initialization time."
283 283 )
284 284
285 285 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
286 286 if error == 1:
287 287 raise NotAcquired(f"Lock {self._name} is not acquired or it already expired.")
288 288 elif error == 2:
289 289 raise NotExpirable(f"Lock {self._name} has no assigned expiration time")
290 290 elif error:
291 291 raise RuntimeError(f"Unsupported error code {error} from EXTEND script")
292 292
293 293 @staticmethod
294 294 def _lock_renewer(name, lockref, interval, stop):
295 295 """
296 296 Renew the lock key in redis every `interval` seconds for as long
297 297 as `self._lock_renewal_thread.should_exit` is False.
298 298 """
299 299 while not stop.wait(timeout=interval):
300 300 loggers["refresh.thread.start"].debug("Refreshing Lock(%r).", name)
301 301 lock: "Lock" = lockref()
302 302 if lock is None:
303 303 loggers["refresh.thread.stop"].debug(
304 304 "Stopping loop because Lock(%r) was garbage collected.", name
305 305 )
306 306 break
307 307 lock.extend(expire=lock._expire)
308 308 del lock
309 309 loggers["refresh.thread.exit"].debug("Exiting renewal thread for Lock(%r).", name)
310 310
311 311 def _start_lock_renewer(self):
312 312 """
313 313 Starts the lock refresher thread.
314 314 """
315 315 if self._lock_renewal_thread is not None:
316 316 raise AlreadyStarted("Lock refresh thread already started")
317 317
318 318 loggers["refresh.start"].debug(
319 319 "Starting renewal thread for Lock(%r). Refresh interval: %s seconds.",
320 320 self._name, self._lock_renewal_interval
321 321 )
322 322 self._lock_renewal_stop = threading.Event()
323 323 self._lock_renewal_thread = threading.Thread(
324 324 group=None,
325 325 target=self._lock_renewer,
326 326 kwargs={
327 327 'name': self._name,
328 328 'lockref': weakref.ref(self),
329 329 'interval': self._lock_renewal_interval,
330 330 'stop': self._lock_renewal_stop,
331 331 },
332 332 )
333 333 self._lock_renewal_thread.daemon = True
334 334 self._lock_renewal_thread.start()
335 335
336 336 def _stop_lock_renewer(self):
337 337 """
338 338 Stop the lock renewer.
339 339
340 340 This signals the renewal thread and waits for its exit.
341 341 """
342 342 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
343 343 return
344 344 loggers["refresh.shutdown"].debug("Signaling renewal thread for Lock(%r) to exit.", self._name)
345 345 self._lock_renewal_stop.set()
346 346 self._lock_renewal_thread.join()
347 347 self._lock_renewal_thread = None
348 348 loggers["refresh.exit"].debug("Renewal thread for Lock(%r) exited.", self._name)
349 349
350 350 def __enter__(self):
351 351 acquired = self.acquire(blocking=self.blocking)
352 352 if not acquired:
353 353 if self.blocking:
354 354 raise AssertionError(f"Lock({self._name}) wasn't acquired, but blocking=True was used!")
355 355 raise NotAcquired(f"Lock({self._name}) is not acquired or it already expired.")
356 356 return self
357 357
358 358 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
359 359 self.release()
360 360
361 361 def release(self):
362 362 """Releases the lock, that was acquired with the same object.
363 363
364 364 .. note::
365 365
366 366 If you want to release a lock that you acquired in a different place you have two choices:
367 367
368 368 * Use ``Lock("name", id=id_from_other_place).release()``
369 369 * Use ``Lock("name").reset()``
370 370 """
371 371 if self._lock_renewal_thread is not None:
372 372 self._stop_lock_renewer()
373 373 loggers["release"].debug("Releasing Lock(%r).", self._name)
374 374 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
375 375 if error == 1:
376 376 raise NotAcquired(f"Lock({self._name}) is not acquired or it already expired.")
377 377 elif error:
378 378 raise RuntimeError(f"Unsupported error code {error} from EXTEND script.")
379 379
380 380 def locked(self):
381 381 """
382 382 Return true if the lock is acquired.
383 383
384 384 Checks that lock with same name already exists. This method returns true, even if
385 385 lock have another id.
386 386 """
387 387 return self._client.exists(self._name) == 1
388 388
389 389
390 390 reset_all_script = None
391 391
392 392
393 393 def reset_all(redis_client):
394 394 """
395 395 Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
396 396
397 397 :param redis_client:
398 398 An instance of :class:`~StrictRedis`.
399 399 """
400 400 Lock.register_scripts(redis_client)
401 401
402 402 reset_all_script(client=redis_client) # noqa
@@ -1,53 +1,51 b''
1
2
3 1 import logging
4 2
5 3 from .stream import TCPStatsClient, UnixSocketStatsClient # noqa
6 4 from .udp import StatsClient # noqa
7 5
8 6 HOST = 'localhost'
9 7 PORT = 8125
10 8 IPV6 = False
11 9 PREFIX = None
12 10 MAXUDPSIZE = 512
13 11
14 12 log = logging.getLogger('rhodecode.statsd')
15 13
16 14
17 15 def statsd_config(config, prefix='statsd.'):
18 16 _config = {}
19 17 for key in list(config.keys()):
20 18 if key.startswith(prefix):
21 19 _config[key[len(prefix):]] = config[key]
22 20 return _config
23 21
24 22
25 23 def client_from_config(configuration, prefix='statsd.', **kwargs):
26 24 from pyramid.settings import asbool
27 25
28 26 _config = statsd_config(configuration, prefix)
29 27 statsd_flag = _config.get('enabled')
30 28 statsd_enabled = asbool(_config.pop('enabled', False))
31 29 if not statsd_enabled:
32 30 log.debug('statsd client not enabled by statsd.enabled = %s flag, skipping...', statsd_flag)
33 31 return
34 32
35 33 host = _config.pop('statsd_host', HOST)
36 34 port = _config.pop('statsd_port', PORT)
37 35 prefix = _config.pop('statsd_prefix', PREFIX)
38 36 maxudpsize = _config.pop('statsd_maxudpsize', MAXUDPSIZE)
39 37 ipv6 = asbool(_config.pop('statsd_ipv6', IPV6))
40 38 log.debug('configured statsd client %s:%s', host, port)
41 39
42 40 try:
43 41 client = StatsClient(
44 42 host=host, port=port, prefix=prefix, maxudpsize=maxudpsize, ipv6=ipv6)
45 43 except Exception:
46 44 log.exception('StatsD is enabled, but failed to connect to statsd server, fallback: disable statsd')
47 45 client = None
48 46
49 47 return client
50 48
51 49
52 50 def get_statsd_client(request):
53 51 return client_from_config(request.registry.settings)
@@ -1,156 +1,154 b''
1
2
3 1 import re
4 2 import random
5 3 from collections import deque
6 4 from datetime import timedelta
7 5 from repoze.lru import lru_cache
8 6
9 7 from .timer import Timer
10 8
11 9 TAG_INVALID_CHARS_RE = re.compile(
12 10 r"[^\w\d_\-:/\.]",
13 11 #re.UNICODE
14 12 )
15 13 TAG_INVALID_CHARS_SUBS = "_"
16 14
17 15 # we save and expose methods called by statsd for discovery
18 16 buckets_dict = {
19 17
20 18 }
21 19
22 20
23 21 @lru_cache(maxsize=500)
24 22 def _normalize_tags_with_cache(tag_list):
25 23 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
26 24
27 25
28 26 def normalize_tags(tag_list):
29 27 # We have to turn our input tag list into a non-mutable tuple for it to
30 28 # be hashable (and thus usable) by the @lru_cache decorator.
31 29 return _normalize_tags_with_cache(tuple(tag_list))
32 30
33 31
34 class StatsClientBase(object):
32 class StatsClientBase:
35 33 """A Base class for various statsd clients."""
36 34
37 35 def close(self):
38 36 """Used to close and clean up any underlying resources."""
39 37 raise NotImplementedError()
40 38
41 39 def _send(self):
42 40 raise NotImplementedError()
43 41
44 42 def pipeline(self):
45 43 raise NotImplementedError()
46 44
47 45 def timer(self, stat, rate=1, tags=None, auto_send=True):
48 46 """
49 47 statsd = StatsdClient.statsd
50 48 with statsd.timer('bucket_name', auto_send=True) as tmr:
51 49 # This block will be timed.
52 50 for i in range(0, 100000):
53 51 i ** 2
54 52 # you can access time here...
55 53 elapsed_ms = tmr.ms
56 54 """
57 55 return Timer(self, stat, rate, tags, auto_send=auto_send)
58 56
59 57 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
60 58 """
61 59 Send new timing information.
62 60
63 61 `delta` can be either a number of milliseconds or a timedelta.
64 62 """
65 63 if isinstance(delta, timedelta):
66 64 # Convert timedelta to number of milliseconds.
67 65 delta = delta.total_seconds() * 1000.
68 66 if use_decimals:
69 67 fmt = '%0.6f|ms'
70 68 else:
71 69 fmt = '%s|ms'
72 70 self._send_stat(stat, fmt % delta, rate, tags)
73 71
74 72 def incr(self, stat, count=1, rate=1, tags=None):
75 73 """Increment a stat by `count`."""
76 self._send_stat(stat, '%s|c' % count, rate, tags)
74 self._send_stat(stat, f'{count}|c', rate, tags)
77 75
78 76 def decr(self, stat, count=1, rate=1, tags=None):
79 77 """Decrement a stat by `count`."""
80 78 self.incr(stat, -count, rate, tags)
81 79
82 80 def gauge(self, stat, value, rate=1, delta=False, tags=None):
83 81 """Set a gauge value."""
84 82 if value < 0 and not delta:
85 83 if rate < 1:
86 84 if random.random() > rate:
87 85 return
88 86 with self.pipeline() as pipe:
89 87 pipe._send_stat(stat, '0|g', 1)
90 pipe._send_stat(stat, '%s|g' % value, 1)
88 pipe._send_stat(stat, f'{value}|g', 1)
91 89 else:
92 90 prefix = '+' if delta and value >= 0 else ''
93 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
91 self._send_stat(stat, f'{prefix}{value}|g', rate, tags)
94 92
95 93 def set(self, stat, value, rate=1):
96 94 """Set a set value."""
97 self._send_stat(stat, '%s|s' % value, rate)
95 self._send_stat(stat, f'{value}|s', rate)
98 96
99 97 def histogram(self, stat, value, rate=1, tags=None):
100 98 """Set a histogram"""
101 self._send_stat(stat, '%s|h' % value, rate, tags)
99 self._send_stat(stat, f'{value}|h', rate, tags)
102 100
103 101 def _send_stat(self, stat, value, rate, tags=None):
104 102 self._after(self._prepare(stat, value, rate, tags))
105 103
106 104 def _prepare(self, stat, value, rate, tags=None):
107 105 global buckets_dict
108 106 buckets_dict[stat] = 1
109 107
110 108 if rate < 1:
111 109 if random.random() > rate:
112 110 return
113 value = '%s|@%s' % (value, rate)
111 value = f'{value}|@{rate}'
114 112
115 113 if self._prefix:
116 stat = '%s.%s' % (self._prefix, stat)
114 stat = f'{self._prefix}.{stat}'
117 115
118 116 res = '%s:%s%s' % (
119 117 stat,
120 118 value,
121 119 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
122 120 )
123 121 return res
124 122
125 123 def _after(self, data):
126 124 if data:
127 125 self._send(data)
128 126
129 127
130 128 class PipelineBase(StatsClientBase):
131 129
132 130 def __init__(self, client):
133 131 self._client = client
134 132 self._prefix = client._prefix
135 133 self._stats = deque()
136 134
137 135 def _send(self):
138 136 raise NotImplementedError()
139 137
140 138 def _after(self, data):
141 139 if data is not None:
142 140 self._stats.append(data)
143 141
144 142 def __enter__(self):
145 143 return self
146 144
147 145 def __exit__(self, typ, value, tb):
148 146 self.send()
149 147
150 148 def send(self):
151 149 if not self._stats:
152 150 return
153 151 self._send()
154 152
155 153 def pipeline(self):
156 154 return self.__class__(self)
@@ -1,75 +1,73 b''
1
2
3 1 import socket
4 2
5 3 from .base import StatsClientBase, PipelineBase
6 4
7 5
8 6 class StreamPipeline(PipelineBase):
9 7 def _send(self):
10 8 self._client._after('\n'.join(self._stats))
11 9 self._stats.clear()
12 10
13 11
14 12 class StreamClientBase(StatsClientBase):
15 13 def connect(self):
16 14 raise NotImplementedError()
17 15
18 16 def close(self):
19 17 if self._sock and hasattr(self._sock, 'close'):
20 18 self._sock.close()
21 19 self._sock = None
22 20
23 21 def reconnect(self):
24 22 self.close()
25 23 self.connect()
26 24
27 25 def pipeline(self):
28 26 return StreamPipeline(self)
29 27
30 28 def _send(self, data):
31 29 """Send data to statsd."""
32 30 if not self._sock:
33 31 self.connect()
34 32 self._do_send(data)
35 33
36 34 def _do_send(self, data):
37 35 self._sock.sendall(data.encode('ascii') + b'\n')
38 36
39 37
40 38 class TCPStatsClient(StreamClientBase):
41 39 """TCP version of StatsClient."""
42 40
43 41 def __init__(self, host='localhost', port=8125, prefix=None,
44 42 timeout=None, ipv6=False):
45 43 """Create a new client."""
46 44 self._host = host
47 45 self._port = port
48 46 self._ipv6 = ipv6
49 47 self._timeout = timeout
50 48 self._prefix = prefix
51 49 self._sock = None
52 50
53 51 def connect(self):
54 52 fam = socket.AF_INET6 if self._ipv6 else socket.AF_INET
55 53 family, _, _, _, addr = socket.getaddrinfo(
56 54 self._host, self._port, fam, socket.SOCK_STREAM)[0]
57 55 self._sock = socket.socket(family, socket.SOCK_STREAM)
58 56 self._sock.settimeout(self._timeout)
59 57 self._sock.connect(addr)
60 58
61 59
62 60 class UnixSocketStatsClient(StreamClientBase):
63 61 """Unix domain socket version of StatsClient."""
64 62
65 63 def __init__(self, socket_path, prefix=None, timeout=None):
66 64 """Create a new client."""
67 65 self._socket_path = socket_path
68 66 self._timeout = timeout
69 67 self._prefix = prefix
70 68 self._sock = None
71 69
72 70 def connect(self):
73 71 self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
74 72 self._sock.settimeout(self._timeout)
75 73 self._sock.connect(self._socket_path)
@@ -1,68 +1,66 b''
1
2
3 1 import functools
4 2 from time import perf_counter as time_now
5 3
6 4
7 5 def safe_wraps(wrapper, *args, **kwargs):
8 6 """Safely wraps partial functions."""
9 7 while isinstance(wrapper, functools.partial):
10 8 wrapper = wrapper.func
11 9 return functools.wraps(wrapper, *args, **kwargs)
12 10
13 11
14 class Timer(object):
12 class Timer:
15 13 """A context manager/decorator for statsd.timing()."""
16 14
17 15 def __init__(self, client, stat, rate=1, tags=None, use_decimals=True, auto_send=True):
18 16 self.client = client
19 17 self.stat = stat
20 18 self.rate = rate
21 19 self.tags = tags
22 20 self.ms = None
23 21 self._sent = False
24 22 self._start_time = None
25 23 self.use_decimals = use_decimals
26 24 self.auto_send = auto_send
27 25
28 26 def __call__(self, f):
29 27 """Thread-safe timing function decorator."""
30 28 @safe_wraps(f)
31 29 def _wrapped(*args, **kwargs):
32 30 start_time = time_now()
33 31 try:
34 32 return f(*args, **kwargs)
35 33 finally:
36 34 elapsed_time_ms = 1000.0 * (time_now() - start_time)
37 35 self.client.timing(self.stat, elapsed_time_ms, self.rate, self.tags, self.use_decimals)
38 36 self._sent = True
39 37 return _wrapped
40 38
41 39 def __enter__(self):
42 40 return self.start()
43 41
44 42 def __exit__(self, typ, value, tb):
45 43 self.stop(send=self.auto_send)
46 44
47 45 def start(self):
48 46 self.ms = None
49 47 self._sent = False
50 48 self._start_time = time_now()
51 49 return self
52 50
53 51 def stop(self, send=True):
54 52 if self._start_time is None:
55 53 raise RuntimeError('Timer has not started.')
56 54 dt = time_now() - self._start_time
57 55 self.ms = 1000.0 * dt # Convert to milliseconds.
58 56 if send:
59 57 self.send()
60 58 return self
61 59
62 60 def send(self):
63 61 if self.ms is None:
64 62 raise RuntimeError('No data recorded.')
65 63 if self._sent:
66 64 raise RuntimeError('Already sent data.')
67 65 self._sent = True
68 66 self.client.timing(self.stat, self.ms, self.rate, self.tags, self.use_decimals)
@@ -1,55 +1,53 b''
1
2
3 1 import socket
4 2
5 3 from .base import StatsClientBase, PipelineBase
6 4
7 5
8 6 class Pipeline(PipelineBase):
9 7
10 8 def __init__(self, client):
11 super(Pipeline, self).__init__(client)
9 super().__init__(client)
12 10 self._maxudpsize = client._maxudpsize
13 11
14 12 def _send(self):
15 13 data = self._stats.popleft()
16 14 while self._stats:
17 15 # Use popleft to preserve the order of the stats.
18 16 stat = self._stats.popleft()
19 17 if len(stat) + len(data) + 1 >= self._maxudpsize:
20 18 self._client._after(data)
21 19 data = stat
22 20 else:
23 21 data += '\n' + stat
24 22 self._client._after(data)
25 23
26 24
27 25 class StatsClient(StatsClientBase):
28 26 """A client for statsd."""
29 27
30 28 def __init__(self, host='localhost', port=8125, prefix=None,
31 29 maxudpsize=512, ipv6=False):
32 30 """Create a new client."""
33 31 fam = socket.AF_INET6 if ipv6 else socket.AF_INET
34 32 family, _, _, _, addr = socket.getaddrinfo(
35 33 host, port, fam, socket.SOCK_DGRAM)[0]
36 34 self._addr = addr
37 35 self._sock = socket.socket(family, socket.SOCK_DGRAM)
38 36 self._prefix = prefix
39 37 self._maxudpsize = maxudpsize
40 38
41 39 def _send(self, data):
42 40 """Send data to statsd."""
43 41 try:
44 42 self._sock.sendto(data.encode('ascii'), self._addr)
45 43 except (socket.error, RuntimeError):
46 44 # No time for love, Dr. Jones!
47 45 pass
48 46
49 47 def close(self):
50 48 if self._sock and hasattr(self._sock, 'close'):
51 49 self._sock.close()
52 50 self._sock = None
53 51
54 52 def pipeline(self):
55 53 return Pipeline(self)
@@ -1,185 +1,185 b''
1 1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import sys
20 20 import logging
21 21
22 22
23 23 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = list(range(30, 38))
24 24
25 25 # Sequences
26 26 RESET_SEQ = "\033[0m"
27 27 COLOR_SEQ = "\033[0;%dm"
28 28 BOLD_SEQ = "\033[1m"
29 29
30 30 COLORS = {
31 31 'CRITICAL': MAGENTA,
32 32 'ERROR': RED,
33 33 'WARNING': CYAN,
34 34 'INFO': GREEN,
35 35 'DEBUG': BLUE,
36 36 'SQL': YELLOW
37 37 }
38 38
39 39
40 40 def _inject_req_id(record, with_prefix=True):
41 41 from pyramid.threadlocal import get_current_request
42 42 dummy = '00000000-0000-0000-0000-000000000000'
43 43 req_id = None
44 44
45 45 req = get_current_request()
46 46 if req:
47 47 req_id = getattr(req, 'req_id', None)
48 48 if with_prefix:
49 49 req_id = 'req_id:%-36s' % (req_id or dummy)
50 50 else:
51 51 req_id = (req_id or dummy)
52 52 record.req_id = req_id
53 53
54 54
55 55 def _add_log_to_debug_bucket(formatted_record):
56 56 from pyramid.threadlocal import get_current_request
57 57 req = get_current_request()
58 58 if req:
59 59 req.req_id_bucket.append(formatted_record)
60 60
61 61
62 62 def one_space_trim(s):
63 63 if s.find(" ") == -1:
64 64 return s
65 65 else:
66 66 s = s.replace(' ', ' ')
67 67 return one_space_trim(s)
68 68
69 69
70 70 def format_sql(sql):
71 71 sql = sql.replace('\n', '')
72 72 sql = one_space_trim(sql)
73 73 sql = sql\
74 74 .replace(',', ',\n\t')\
75 75 .replace('SELECT', '\n\tSELECT \n\t')\
76 76 .replace('UPDATE', '\n\tUPDATE \n\t')\
77 77 .replace('DELETE', '\n\tDELETE \n\t')\
78 78 .replace('FROM', '\n\tFROM')\
79 79 .replace('ORDER BY', '\n\tORDER BY')\
80 80 .replace('LIMIT', '\n\tLIMIT')\
81 81 .replace('WHERE', '\n\tWHERE')\
82 82 .replace('AND', '\n\tAND')\
83 83 .replace('LEFT', '\n\tLEFT')\
84 84 .replace('INNER', '\n\tINNER')\
85 85 .replace('INSERT', '\n\tINSERT')\
86 86 .replace('DELETE', '\n\tDELETE')
87 87 return sql
88 88
89 89
90 90 class ExceptionAwareFormatter(logging.Formatter):
91 91 """
92 92 Extended logging formatter which prints out remote tracebacks.
93 93 """
94 94
95 95 def formatException(self, ei):
96 96 ex_type, ex_value, ex_tb = ei
97 97
98 98 local_tb = logging.Formatter.formatException(self, ei)
99 99 if hasattr(ex_value, '_vcs_server_traceback'):
100 100
101 101 def formatRemoteTraceback(remote_tb_lines):
102 102 result = ["\n +--- This exception occured remotely on VCSServer - Remote traceback:\n\n"]
103 103 result.append(remote_tb_lines)
104 104 result.append("\n +--- End of remote traceback\n")
105 105 return result
106 106
107 107 try:
108 108 if ex_type is not None and ex_value is None and ex_tb is None:
109 109 # possible old (3.x) call syntax where caller is only
110 110 # providing exception object
111 111 if type(ex_type) is not type:
112 112 raise TypeError(
113 113 "invalid argument: ex_type should be an exception "
114 114 "type, or just supply no arguments at all")
115 115 if ex_type is None and ex_tb is None:
116 116 ex_type, ex_value, ex_tb = sys.exc_info()
117 117
118 118 remote_tb = getattr(ex_value, "_vcs_server_traceback", None)
119 119
120 120 if remote_tb:
121 121 remote_tb = formatRemoteTraceback(remote_tb)
122 122 return local_tb + ''.join(remote_tb)
123 123 finally:
124 124 # clean up cycle to traceback, to allow proper GC
125 125 del ex_type, ex_value, ex_tb
126 126
127 127 return local_tb
128 128
129 129
130 130 class RequestTrackingFormatter(ExceptionAwareFormatter):
131 131 def format(self, record):
132 132 _inject_req_id(record)
133 133 def_record = logging.Formatter.format(self, record)
134 134 _add_log_to_debug_bucket(def_record)
135 135 return def_record
136 136
137 137
138 138 class ColorFormatter(ExceptionAwareFormatter):
139 139
140 140 def format(self, record):
141 141 """
142 142 Changes record's levelname to use with COLORS enum
143 143 """
144 def_record = super(ColorFormatter, self).format(record)
144 def_record = super().format(record)
145 145
146 146 levelname = record.levelname
147 147 start = COLOR_SEQ % (COLORS[levelname])
148 148 end = RESET_SEQ
149 149
150 150 colored_record = ''.join([start, def_record, end])
151 151 return colored_record
152 152
153 153
154 154 class ColorRequestTrackingFormatter(RequestTrackingFormatter):
155 155
156 156 def format(self, record):
157 157 """
158 158 Changes record's levelname to use with COLORS enum
159 159 """
160 160 def_record = super(ColorRequestTrackingFormatter, self).format(record)
161 161
162 162 levelname = record.levelname
163 163 start = COLOR_SEQ % (COLORS[levelname])
164 164 end = RESET_SEQ
165 165
166 166 colored_record = ''.join([start, def_record, end])
167 167 return colored_record
168 168
169 169
170 170 class ColorFormatterSql(logging.Formatter):
171 171
172 172 def format(self, record):
173 173 """
174 174 Changes record's levelname to use with COLORS enum
175 175 """
176 176
177 177 start = COLOR_SEQ % (COLORS['SQL'])
178 178 def_record = format_sql(logging.Formatter.format(self, record))
179 179 end = RESET_SEQ
180 180
181 181 colored_record = ''.join([start, def_record, end])
182 182 return colored_record
183 183
184 184 # NOTE (marcink): needs to stay with this name for backward .ini compatability
185 185 Pyro4AwareFormatter = ExceptionAwareFormatter # noqa
General Comments 0
You need to be logged in to leave comments. Login now