##// END OF EJS Templates
core: synced vendor/ext_json with ce for better compatability
super-admin -
r1250:2c57bb5b default
parent child Browse files
Show More
@@ -1,243 +1,237 b''
1 1 '''
2 2 This library is provided to allow standard python logging
3 3 to output log data as JSON formatted strings
4 4 '''
5 5 import logging
6 import json
7 6 import re
8 7 from datetime import date, datetime, time, tzinfo, timedelta
9 8 import traceback
10 9 import importlib
11 10
12 11 from inspect import istraceback
13 12
14 13 from collections import OrderedDict
15 14
16
17 def _inject_req_id(record, *args, **kwargs):
18 return record
19
20
21 ExceptionAwareFormatter = logging.Formatter
22
15 from ...logging_formatter import _inject_req_id, ExceptionAwareFormatter
16 from ...ext_json import sjson as json
23 17
24 18 ZERO = timedelta(0)
25 19 HOUR = timedelta(hours=1)
26 20
27 21
28 22 class UTC(tzinfo):
29 23 """UTC"""
30 24
31 25 def utcoffset(self, dt):
32 26 return ZERO
33 27
34 28 def tzname(self, dt):
35 29 return "UTC"
36 30
37 31 def dst(self, dt):
38 32 return ZERO
39 33
40 34 utc = UTC()
41 35
42 36
43 37 # skip natural LogRecord attributes
44 38 # http://docs.python.org/library/logging.html#logrecord-attributes
45 39 RESERVED_ATTRS = (
46 40 'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename',
47 41 'funcName', 'levelname', 'levelno', 'lineno', 'module',
48 42 'msecs', 'message', 'msg', 'name', 'pathname', 'process',
49 43 'processName', 'relativeCreated', 'stack_info', 'thread', 'threadName')
50 44
51 45
52 46 def merge_record_extra(record, target, reserved):
53 47 """
54 48 Merges extra attributes from LogRecord object into target dictionary
55 49
56 50 :param record: logging.LogRecord
57 51 :param target: dict to update
58 52 :param reserved: dict or list with reserved keys to skip
59 53 """
60 54 for key, value in record.__dict__.items():
61 55 # this allows to have numeric keys
62 56 if (key not in reserved
63 57 and not (hasattr(key, "startswith")
64 58 and key.startswith('_'))):
65 59 target[key] = value
66 60 return target
67 61
68 62
69 63 class JsonEncoder(json.JSONEncoder):
70 64 """
71 65 A custom encoder extending the default JSONEncoder
72 66 """
73 67
74 68 def default(self, obj):
75 69 if isinstance(obj, (date, datetime, time)):
76 70 return self.format_datetime_obj(obj)
77 71
78 72 elif istraceback(obj):
79 73 return ''.join(traceback.format_tb(obj)).strip()
80 74
81 75 elif type(obj) == Exception \
82 76 or isinstance(obj, Exception) \
83 77 or type(obj) == type:
84 78 return str(obj)
85 79
86 80 try:
87 81 return super().default(obj)
88 82
89 83 except TypeError:
90 84 try:
91 85 return str(obj)
92 86
93 87 except Exception:
94 88 return None
95 89
96 90 def format_datetime_obj(self, obj):
97 91 return obj.isoformat()
98 92
99 93
100 94 class JsonFormatter(ExceptionAwareFormatter):
101 95 """
102 96 A custom formatter to format logging records as json strings.
103 97 Extra values will be formatted as str() if not supported by
104 98 json default encoder
105 99 """
106 100
107 101 def __init__(self, *args, **kwargs):
108 102 """
109 103 :param json_default: a function for encoding non-standard objects
110 104 as outlined in http://docs.python.org/2/library/json.html
111 105 :param json_encoder: optional custom encoder
112 106 :param json_serializer: a :meth:`json.dumps`-compatible callable
113 107 that will be used to serialize the log record.
114 108 :param json_indent: an optional :meth:`json.dumps`-compatible numeric value
115 109 that will be used to customize the indent of the output json.
116 110 :param prefix: an optional string prefix added at the beginning of
117 111 the formatted string
118 112 :param json_indent: indent parameter for json.dumps
119 113 :param json_ensure_ascii: ensure_ascii parameter for json.dumps
120 114 :param reserved_attrs: an optional list of fields that will be skipped when
121 115 outputting json log record. Defaults to all log record attributes:
122 116 http://docs.python.org/library/logging.html#logrecord-attributes
123 117 :param timestamp: an optional string/boolean field to add a timestamp when
124 118 outputting the json log record. If string is passed, timestamp will be added
125 119 to log record using string as key. If True boolean is passed, timestamp key
126 120 will be "timestamp". Defaults to False/off.
127 121 """
128 122 self.json_default = self._str_to_fn(kwargs.pop("json_default", None))
129 123 self.json_encoder = self._str_to_fn(kwargs.pop("json_encoder", None))
130 124 self.json_serializer = self._str_to_fn(kwargs.pop("json_serializer", json.dumps))
131 125 self.json_indent = kwargs.pop("json_indent", None)
132 126 self.json_ensure_ascii = kwargs.pop("json_ensure_ascii", True)
133 127 self.prefix = kwargs.pop("prefix", "")
134 128 reserved_attrs = kwargs.pop("reserved_attrs", RESERVED_ATTRS)
135 129 self.reserved_attrs = dict(list(zip(reserved_attrs, reserved_attrs)))
136 130 self.timestamp = kwargs.pop("timestamp", True)
137 131
138 132 # super(JsonFormatter, self).__init__(*args, **kwargs)
139 133 logging.Formatter.__init__(self, *args, **kwargs)
140 134 if not self.json_encoder and not self.json_default:
141 135 self.json_encoder = JsonEncoder
142 136
143 137 self._required_fields = self.parse()
144 138 self._skip_fields = dict(list(zip(self._required_fields,
145 139 self._required_fields)))
146 140 self._skip_fields.update(self.reserved_attrs)
147 141
148 142 def _str_to_fn(self, fn_as_str):
149 143 """
150 144 If the argument is not a string, return whatever was passed in.
151 145 Parses a string such as package.module.function, imports the module
152 146 and returns the function.
153 147
154 148 :param fn_as_str: The string to parse. If not a string, return it.
155 149 """
156 150 if not isinstance(fn_as_str, str):
157 151 return fn_as_str
158 152
159 153 path, _, function = fn_as_str.rpartition('.')
160 154 module = importlib.import_module(path)
161 155 return getattr(module, function)
162 156
163 157 def parse(self):
164 158 """
165 159 Parses format string looking for substitutions
166 160
167 161 This method is responsible for returning a list of fields (as strings)
168 162 to include in all log messages.
169 163 """
170 164 standard_formatters = re.compile(r'\((.+?)\)', re.IGNORECASE)
171 165 return standard_formatters.findall(self._fmt)
172 166
173 167 def add_fields(self, log_record, record, message_dict):
174 168 """
175 169 Override this method to implement custom logic for adding fields.
176 170 """
177 171 for field in self._required_fields:
178 172 log_record[field] = record.__dict__.get(field)
179 173 log_record.update(message_dict)
180 174 merge_record_extra(record, log_record, reserved=self._skip_fields)
181 175
182 176 if self.timestamp:
183 177 key = self.timestamp if type(self.timestamp) == str else 'timestamp'
184 178 log_record[key] = datetime.fromtimestamp(record.created, tz=utc)
185 179
186 180 def process_log_record(self, log_record):
187 181 """
188 182 Override this method to implement custom logic
189 183 on the possibly ordered dictionary.
190 184 """
191 185 return log_record
192 186
193 187 def jsonify_log_record(self, log_record):
194 188 """Returns a json string of the log record."""
195 189 return self.json_serializer(log_record,
196 190 default=self.json_default,
197 191 cls=self.json_encoder,
198 192 indent=self.json_indent,
199 193 ensure_ascii=self.json_ensure_ascii)
200 194
201 195 def serialize_log_record(self, log_record):
202 196 """Returns the final representation of the log record."""
203 197 return "{}{}".format(self.prefix, self.jsonify_log_record(log_record))
204 198
205 199 def format(self, record):
206 200 """Formats a log record and serializes to json"""
207 201 message_dict = {}
208 202 # FIXME: logging.LogRecord.msg and logging.LogRecord.message in typeshed
209 203 # are always type of str. We shouldn't need to override that.
210 204 if isinstance(record.msg, dict):
211 205 message_dict = record.msg
212 206 record.message = None
213 207 else:
214 208 record.message = record.getMessage()
215 209 # only format time if needed
216 210 if "asctime" in self._required_fields:
217 211 record.asctime = self.formatTime(record, self.datefmt)
218 212
219 213 # Display formatted exception, but allow overriding it in the
220 214 # user-supplied dict.
221 215 if record.exc_info and not message_dict.get('exc_info'):
222 216 message_dict['exc_info'] = self.formatException(record.exc_info)
223 217 if not message_dict.get('exc_info') and record.exc_text:
224 218 message_dict['exc_info'] = record.exc_text
225 219 # Display formatted record of stack frames
226 220 # default format is a string returned from :func:`traceback.print_stack`
227 221 try:
228 222 if record.stack_info and not message_dict.get('stack_info'):
229 223 message_dict['stack_info'] = self.formatStack(record.stack_info)
230 224 except AttributeError:
231 225 # Python2.7 doesn't have stack_info.
232 226 pass
233 227
234 228 try:
235 229 log_record = OrderedDict()
236 230 except NameError:
237 231 log_record = {}
238 232
239 233 _inject_req_id(record, with_prefix=False)
240 234 self.add_fields(log_record, record, message_dict)
241 235 log_record = self.process_log_record(log_record)
242 236
243 237 return self.serialize_log_record(log_record)
@@ -1,394 +1,402 b''
1 1
2 2 import threading
3 3 import weakref
4 4 from base64 import b64encode
5 5 from logging import getLogger
6 6 from os import urandom
7 7 from typing import Union
8 8
9 9 from redis import StrictRedis
10 10
11 11 __version__ = '4.0.0'
12 12
13 13 loggers = {
14 14 k: getLogger("vcsserver." + ".".join((__name__, k)))
15 15 for k in [
16 16 "acquire",
17 17 "refresh.thread.start",
18 18 "refresh.thread.stop",
19 19 "refresh.thread.exit",
20 20 "refresh.start",
21 21 "refresh.shutdown",
22 22 "refresh.exit",
23 23 "release",
24 24 ]
25 25 }
26 26
27 27 text_type = str
28 28 binary_type = bytes
29 29
30 30
31 31 # Check if the id match. If not, return an error code.
32 32 UNLOCK_SCRIPT = b"""
33 33 if redis.call("get", KEYS[1]) ~= ARGV[1] then
34 34 return 1
35 35 else
36 36 redis.call("del", KEYS[2])
37 37 redis.call("lpush", KEYS[2], 1)
38 38 redis.call("pexpire", KEYS[2], ARGV[2])
39 39 redis.call("del", KEYS[1])
40 40 return 0
41 41 end
42 42 """
43 43
44 44 # Covers both cases when key doesn't exist and doesn't equal to lock's id
45 45 EXTEND_SCRIPT = b"""
46 46 if redis.call("get", KEYS[1]) ~= ARGV[1] then
47 47 return 1
48 48 elseif redis.call("ttl", KEYS[1]) < 0 then
49 49 return 2
50 50 else
51 51 redis.call("expire", KEYS[1], ARGV[2])
52 52 return 0
53 53 end
54 54 """
55 55
56 56 RESET_SCRIPT = b"""
57 57 redis.call('del', KEYS[2])
58 58 redis.call('lpush', KEYS[2], 1)
59 59 redis.call('pexpire', KEYS[2], ARGV[2])
60 60 return redis.call('del', KEYS[1])
61 61 """
62 62
63 63 RESET_ALL_SCRIPT = b"""
64 64 local locks = redis.call('keys', 'lock:*')
65 65 local signal
66 66 for _, lock in pairs(locks) do
67 67 signal = 'lock-signal:' .. string.sub(lock, 6)
68 68 redis.call('del', signal)
69 69 redis.call('lpush', signal, 1)
70 70 redis.call('expire', signal, 1)
71 71 redis.call('del', lock)
72 72 end
73 73 return #locks
74 74 """
75 75
76 76
77 77 class AlreadyAcquired(RuntimeError):
78 78 pass
79 79
80 80
81 81 class NotAcquired(RuntimeError):
82 82 pass
83 83
84 84
85 85 class AlreadyStarted(RuntimeError):
86 86 pass
87 87
88 88
89 89 class TimeoutNotUsable(RuntimeError):
90 90 pass
91 91
92 92
93 93 class InvalidTimeout(RuntimeError):
94 94 pass
95 95
96 96
97 97 class TimeoutTooLarge(RuntimeError):
98 98 pass
99 99
100 100
101 101 class NotExpirable(RuntimeError):
102 102 pass
103 103
104 104
105 105 class Lock:
106 106 """
107 107 A Lock context manager implemented via redis SETNX/BLPOP.
108 108 """
109 109
110 110 unlock_script = None
111 111 extend_script = None
112 112 reset_script = None
113 113 reset_all_script = None
114 blocking = None
114 115
115 116 _lock_renewal_interval: float
116 117 _lock_renewal_thread: Union[threading.Thread, None]
117 118
118 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
119 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000, blocking=True):
119 120 """
120 121 :param redis_client:
121 122 An instance of :class:`~StrictRedis`.
122 123 :param name:
123 124 The name (redis key) the lock should have.
124 125 :param expire:
125 126 The lock expiry time in seconds. If left at the default (None)
126 127 the lock will not expire.
127 128 :param id:
128 129 The ID (redis value) the lock should have. A random value is
129 130 generated when left at the default.
130 131
131 132 Note that if you specify this then the lock is marked as "held". Acquires
132 133 won't be possible.
133 134 :param auto_renewal:
134 135 If set to ``True``, Lock will automatically renew the lock so that it
135 136 doesn't expire for as long as the lock is held (acquire() called
136 137 or running in a context manager).
137 138
138 139 Implementation note: Renewal will happen using a daemon thread with
139 140 an interval of ``expire*2/3``. If wishing to use a different renewal
140 141 time, subclass Lock, call ``super().__init__()`` then set
141 142 ``self._lock_renewal_interval`` to your desired interval.
142 143 :param strict:
143 144 If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
144 145 :param signal_expire:
145 146 Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
147 :param blocking:
148 Boolean value specifying whether lock should be blocking or not.
149 Used in `__enter__` method.
146 150 """
147 151 if strict and not isinstance(redis_client, StrictRedis):
148 152 raise ValueError("redis_client must be instance of StrictRedis. "
149 153 "Use strict=False if you know what you're doing.")
150 154 if auto_renewal and expire is None:
151 155 raise ValueError("Expire may not be None when auto_renewal is set")
152 156
153 157 self._client = redis_client
154 158
155 159 if expire:
156 160 expire = int(expire)
157 161 if expire < 0:
158 162 raise ValueError("A negative expire is not acceptable.")
159 163 else:
160 164 expire = None
161 165 self._expire = expire
162 166
163 167 self._signal_expire = signal_expire
164 168 if id is None:
165 169 self._id = b64encode(urandom(18)).decode('ascii')
166 170 elif isinstance(id, binary_type):
167 171 try:
168 172 self._id = id.decode('ascii')
169 173 except UnicodeDecodeError:
170 174 self._id = b64encode(id).decode('ascii')
171 175 elif isinstance(id, text_type):
172 176 self._id = id
173 177 else:
174 178 raise TypeError(f"Incorrect type for `id`. Must be bytes/str not {type(id)}.")
175 179 self._name = 'lock:' + name
176 180 self._signal = 'lock-signal:' + name
177 181 self._lock_renewal_interval = (float(expire) * 2 / 3
178 182 if auto_renewal
179 183 else None)
180 184 self._lock_renewal_thread = None
181 185
186 self.blocking = blocking
187
182 188 self.register_scripts(redis_client)
183 189
184 190 @classmethod
185 191 def register_scripts(cls, redis_client):
186 192 global reset_all_script
187 193 if reset_all_script is None:
188 194 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
189 195 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
190 196 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
191 197 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
192 198 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
193 199
194 200 @property
195 201 def _held(self):
196 202 return self.id == self.get_owner_id()
197 203
198 204 def reset(self):
199 205 """
200 206 Forcibly deletes the lock. Use this with care.
201 207 """
202 208 self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
203 209
204 210 @property
205 211 def id(self):
206 212 return self._id
207 213
208 214 def get_owner_id(self):
209 215 owner_id = self._client.get(self._name)
210 216 if isinstance(owner_id, binary_type):
211 217 owner_id = owner_id.decode('ascii', 'replace')
212 218 return owner_id
213 219
214 220 def acquire(self, blocking=True, timeout=None):
215 221 """
216 222 :param blocking:
217 223 Boolean value specifying whether lock should be blocking or not.
218 224 :param timeout:
219 225 An integer value specifying the maximum number of seconds to block.
220 226 """
221 227 logger = loggers["acquire"]
222 228
223 229 logger.debug("Getting blocking: %s acquire on %r ...", blocking, self._name)
224 230
225 231 if self._held:
226 232 owner_id = self.get_owner_id()
227 233 raise AlreadyAcquired("Already acquired from this Lock instance. Lock id: {}".format(owner_id))
228 234
229 235 if not blocking and timeout is not None:
230 236 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
231 237
232 238 if timeout:
233 239 timeout = int(timeout)
234 240 if timeout < 0:
235 241 raise InvalidTimeout(f"Timeout ({timeout}) cannot be less than or equal to 0")
236 242
237 243 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
238 244 raise TimeoutTooLarge(f"Timeout ({timeout}) cannot be greater than expire ({self._expire})")
239 245
240 246 busy = True
241 247 blpop_timeout = timeout or self._expire or 0
242 248 timed_out = False
243 249 while busy:
244 250 busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
245 251 if busy:
246 252 if timed_out:
247 253 return False
248 254 elif blocking:
249 255 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
250 256 else:
251 257 logger.warning("Failed to acquire Lock(%r).", self._name)
252 258 return False
253 259
254 260 logger.debug("Acquired Lock(%r).", self._name)
255 261 if self._lock_renewal_interval is not None:
256 262 self._start_lock_renewer()
257 263 return True
258 264
259 265 def extend(self, expire=None):
260 266 """
261 267 Extends expiration time of the lock.
262 268
263 269 :param expire:
264 270 New expiration time. If ``None`` - `expire` provided during
265 271 lock initialization will be taken.
266 272 """
267 273 if expire:
268 274 expire = int(expire)
269 275 if expire < 0:
270 276 raise ValueError("A negative expire is not acceptable.")
271 277 elif self._expire is not None:
272 278 expire = self._expire
273 279 else:
274 280 raise TypeError(
275 281 "To extend a lock 'expire' must be provided as an "
276 282 "argument to extend() method or at initialization time."
277 283 )
278 284
279 285 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
280 286 if error == 1:
281 287 raise NotAcquired(f"Lock {self._name} is not acquired or it already expired.")
282 288 elif error == 2:
283 289 raise NotExpirable(f"Lock {self._name} has no assigned expiration time")
284 290 elif error:
285 291 raise RuntimeError(f"Unsupported error code {error} from EXTEND script")
286 292
287 293 @staticmethod
288 294 def _lock_renewer(name, lockref, interval, stop):
289 295 """
290 296 Renew the lock key in redis every `interval` seconds for as long
291 297 as `self._lock_renewal_thread.should_exit` is False.
292 298 """
293 299 while not stop.wait(timeout=interval):
294 300 loggers["refresh.thread.start"].debug("Refreshing Lock(%r).", name)
295 301 lock: "Lock" = lockref()
296 302 if lock is None:
297 303 loggers["refresh.thread.stop"].debug(
298 304 "Stopping loop because Lock(%r) was garbage collected.", name
299 305 )
300 306 break
301 307 lock.extend(expire=lock._expire)
302 308 del lock
303 309 loggers["refresh.thread.exit"].debug("Exiting renewal thread for Lock(%r).", name)
304 310
305 311 def _start_lock_renewer(self):
306 312 """
307 313 Starts the lock refresher thread.
308 314 """
309 315 if self._lock_renewal_thread is not None:
310 316 raise AlreadyStarted("Lock refresh thread already started")
311 317
312 318 loggers["refresh.start"].debug(
313 319 "Starting renewal thread for Lock(%r). Refresh interval: %s seconds.",
314 320 self._name, self._lock_renewal_interval
315 321 )
316 322 self._lock_renewal_stop = threading.Event()
317 323 self._lock_renewal_thread = threading.Thread(
318 324 group=None,
319 325 target=self._lock_renewer,
320 326 kwargs={
321 327 'name': self._name,
322 328 'lockref': weakref.ref(self),
323 329 'interval': self._lock_renewal_interval,
324 330 'stop': self._lock_renewal_stop,
325 331 },
326 332 )
327 333 self._lock_renewal_thread.daemon = True
328 334 self._lock_renewal_thread.start()
329 335
330 336 def _stop_lock_renewer(self):
331 337 """
332 338 Stop the lock renewer.
333 339
334 340 This signals the renewal thread and waits for its exit.
335 341 """
336 342 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
337 343 return
338 344 loggers["refresh.shutdown"].debug("Signaling renewal thread for Lock(%r) to exit.", self._name)
339 345 self._lock_renewal_stop.set()
340 346 self._lock_renewal_thread.join()
341 347 self._lock_renewal_thread = None
342 348 loggers["refresh.exit"].debug("Renewal thread for Lock(%r) exited.", self._name)
343 349
344 350 def __enter__(self):
345 acquired = self.acquire(blocking=True)
351 acquired = self.acquire(blocking=self.blocking)
346 352 if not acquired:
347 raise AssertionError(f"Lock({self._name}) wasn't acquired, but blocking=True was used!")
353 if self.blocking:
354 raise AssertionError(f"Lock({self._name}) wasn't acquired, but blocking=True was used!")
355 raise NotAcquired(f"Lock({self._name}) is not acquired or it already expired.")
348 356 return self
349 357
350 358 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
351 359 self.release()
352 360
353 361 def release(self):
354 362 """Releases the lock, that was acquired with the same object.
355 363
356 364 .. note::
357 365
358 366 If you want to release a lock that you acquired in a different place you have two choices:
359 367
360 368 * Use ``Lock("name", id=id_from_other_place).release()``
361 369 * Use ``Lock("name").reset()``
362 370 """
363 371 if self._lock_renewal_thread is not None:
364 372 self._stop_lock_renewer()
365 373 loggers["release"].debug("Releasing Lock(%r).", self._name)
366 374 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
367 375 if error == 1:
368 376 raise NotAcquired(f"Lock({self._name}) is not acquired or it already expired.")
369 377 elif error:
370 378 raise RuntimeError(f"Unsupported error code {error} from EXTEND script.")
371 379
372 380 def locked(self):
373 381 """
374 382 Return true if the lock is acquired.
375 383
376 384 Checks that lock with same name already exists. This method returns true, even if
377 385 lock have another id.
378 386 """
379 387 return self._client.exists(self._name) == 1
380 388
381 389
382 390 reset_all_script = None
383 391
384 392
385 393 def reset_all(redis_client):
386 394 """
387 395 Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
388 396
389 397 :param redis_client:
390 398 An instance of :class:`~StrictRedis`.
391 399 """
392 400 Lock.register_scripts(redis_client)
393 401
394 402 reset_all_script(client=redis_client) # noqa
@@ -1,50 +1,51 b''
1 1 import logging
2 2
3 3 from .stream import TCPStatsClient, UnixSocketStatsClient # noqa
4 4 from .udp import StatsClient # noqa
5 5
6 6 HOST = 'localhost'
7 7 PORT = 8125
8 8 IPV6 = False
9 9 PREFIX = None
10 10 MAXUDPSIZE = 512
11 11
12 12 log = logging.getLogger('rhodecode.statsd')
13 13
14 14
15 15 def statsd_config(config, prefix='statsd.'):
16 16 _config = {}
17 for key in config.keys():
17 for key in list(config.keys()):
18 18 if key.startswith(prefix):
19 19 _config[key[len(prefix):]] = config[key]
20 20 return _config
21 21
22 22
23 23 def client_from_config(configuration, prefix='statsd.', **kwargs):
24 24 from pyramid.settings import asbool
25 25
26 26 _config = statsd_config(configuration, prefix)
27 statsd_flag = _config.get('enabled')
27 28 statsd_enabled = asbool(_config.pop('enabled', False))
28 29 if not statsd_enabled:
29 log.debug('statsd client not enabled by statsd.enabled = flag, skipping...')
30 log.debug('statsd client not enabled by statsd.enabled = %s flag, skipping...', statsd_flag)
30 31 return
31 32
32 33 host = _config.pop('statsd_host', HOST)
33 34 port = _config.pop('statsd_port', PORT)
34 35 prefix = _config.pop('statsd_prefix', PREFIX)
35 36 maxudpsize = _config.pop('statsd_maxudpsize', MAXUDPSIZE)
36 37 ipv6 = asbool(_config.pop('statsd_ipv6', IPV6))
37 38 log.debug('configured statsd client %s:%s', host, port)
38 39
39 40 try:
40 41 client = StatsClient(
41 42 host=host, port=port, prefix=prefix, maxudpsize=maxudpsize, ipv6=ipv6)
42 43 except Exception:
43 44 log.exception('StatsD is enabled, but failed to connect to statsd server, fallback: disable statsd')
44 45 client = None
45 46
46 47 return client
47 48
48 49
49 50 def get_statsd_client(request):
50 51 return client_from_config(request.registry.settings)
@@ -1,2 +1,14 b''
1 # use orjson by default
2 import orjson as json
1 import json as stdlib_json
2
3 try:
4 # we keep simplejson for having dump functionality still
5 # orjson doesn't support it
6 import simplejson as sjson
7 except ImportError:
8 sjson = stdlib_json
9
10 try:
11 import orjson
12 import orjson as json
13 except ImportError:
14 json = stdlib_json
@@ -1,53 +1,63 b''
1 1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import sys
20 20 import logging
21 21
22 22
23 23 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = list(range(30, 38))
24 24
25 25 # Sequences
26 26 RESET_SEQ = "\033[0m"
27 27 COLOR_SEQ = "\033[0;%dm"
28 28 BOLD_SEQ = "\033[1m"
29 29
30 30 COLORS = {
31 31 'CRITICAL': MAGENTA,
32 32 'ERROR': RED,
33 33 'WARNING': CYAN,
34 34 'INFO': GREEN,
35 35 'DEBUG': BLUE,
36 36 'SQL': YELLOW
37 37 }
38 38
39 39
40 def _inject_req_id(record, *args, **kwargs):
41 return record
42
43
44 class ExceptionAwareFormatter(logging.Formatter):
45 pass
46
47
40 48 class ColorFormatter(logging.Formatter):
41 49
42 50 def format(self, record):
43 51 """
44 Change record's levelname to use with COLORS enum
52 Changes record's levelname to use with COLORS enum
45 53 """
46 54 def_record = super().format(record)
47 55
48 56 levelname = record.levelname
49 57 start = COLOR_SEQ % (COLORS[levelname])
50 58 end = RESET_SEQ
51 59
52 60 colored_record = ''.join([start, def_record, end])
53 61 return colored_record
62
63
General Comments 0
You need to be logged in to leave comments. Login now