##// END OF EJS Templates
core: refactor of vendor lib to easier sync with vcsserver
super-admin -
r5431:2e534b16 default
parent child Browse files
Show More
@@ -1,237 +1,237 b''
1 '''
1 '''
2 This library is provided to allow standard python logging
2 This library is provided to allow standard python logging
3 to output log data as JSON formatted strings
3 to output log data as JSON formatted strings
4 '''
4 '''
5 import logging
5 import logging
6 import re
6 import re
7 from datetime import date, datetime, time, tzinfo, timedelta
7 from datetime import date, datetime, time, tzinfo, timedelta
8 import traceback
8 import traceback
9 import importlib
9 import importlib
10
10
11 from inspect import istraceback
11 from inspect import istraceback
12
12
13 from collections import OrderedDict
13 from collections import OrderedDict
14 from rhodecode.lib.logging_formatter import _inject_req_id, ExceptionAwareFormatter
15 from rhodecode.lib.ext_json import sjson as json
16
14
15 from ...logging_formatter import _inject_req_id, ExceptionAwareFormatter
16 from ...ext_json import sjson as json
17
17
18 ZERO = timedelta(0)
18 ZERO = timedelta(0)
19 HOUR = timedelta(hours=1)
19 HOUR = timedelta(hours=1)
20
20
21
21
22 class UTC(tzinfo):
22 class UTC(tzinfo):
23 """UTC"""
23 """UTC"""
24
24
25 def utcoffset(self, dt):
25 def utcoffset(self, dt):
26 return ZERO
26 return ZERO
27
27
28 def tzname(self, dt):
28 def tzname(self, dt):
29 return "UTC"
29 return "UTC"
30
30
31 def dst(self, dt):
31 def dst(self, dt):
32 return ZERO
32 return ZERO
33
33
34 utc = UTC()
34 utc = UTC()
35
35
36
36
37 # skip natural LogRecord attributes
37 # skip natural LogRecord attributes
38 # http://docs.python.org/library/logging.html#logrecord-attributes
38 # http://docs.python.org/library/logging.html#logrecord-attributes
39 RESERVED_ATTRS = (
39 RESERVED_ATTRS = (
40 'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename',
40 'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename',
41 'funcName', 'levelname', 'levelno', 'lineno', 'module',
41 'funcName', 'levelname', 'levelno', 'lineno', 'module',
42 'msecs', 'message', 'msg', 'name', 'pathname', 'process',
42 'msecs', 'message', 'msg', 'name', 'pathname', 'process',
43 'processName', 'relativeCreated', 'stack_info', 'thread', 'threadName')
43 'processName', 'relativeCreated', 'stack_info', 'thread', 'threadName')
44
44
45
45
46 def merge_record_extra(record, target, reserved):
46 def merge_record_extra(record, target, reserved):
47 """
47 """
48 Merges extra attributes from LogRecord object into target dictionary
48 Merges extra attributes from LogRecord object into target dictionary
49
49
50 :param record: logging.LogRecord
50 :param record: logging.LogRecord
51 :param target: dict to update
51 :param target: dict to update
52 :param reserved: dict or list with reserved keys to skip
52 :param reserved: dict or list with reserved keys to skip
53 """
53 """
54 for key, value in record.__dict__.items():
54 for key, value in record.__dict__.items():
55 # this allows to have numeric keys
55 # this allows to have numeric keys
56 if (key not in reserved
56 if (key not in reserved
57 and not (hasattr(key, "startswith")
57 and not (hasattr(key, "startswith")
58 and key.startswith('_'))):
58 and key.startswith('_'))):
59 target[key] = value
59 target[key] = value
60 return target
60 return target
61
61
62
62
63 class JsonEncoder(json.JSONEncoder):
63 class JsonEncoder(json.JSONEncoder):
64 """
64 """
65 A custom encoder extending the default JSONEncoder
65 A custom encoder extending the default JSONEncoder
66 """
66 """
67
67
68 def default(self, obj):
68 def default(self, obj):
69 if isinstance(obj, (date, datetime, time)):
69 if isinstance(obj, (date, datetime, time)):
70 return self.format_datetime_obj(obj)
70 return self.format_datetime_obj(obj)
71
71
72 elif istraceback(obj):
72 elif istraceback(obj):
73 return ''.join(traceback.format_tb(obj)).strip()
73 return ''.join(traceback.format_tb(obj)).strip()
74
74
75 elif type(obj) == Exception \
75 elif type(obj) == Exception \
76 or isinstance(obj, Exception) \
76 or isinstance(obj, Exception) \
77 or type(obj) == type:
77 or type(obj) == type:
78 return str(obj)
78 return str(obj)
79
79
80 try:
80 try:
81 return super(JsonEncoder, self).default(obj)
81 return super().default(obj)
82
82
83 except TypeError:
83 except TypeError:
84 try:
84 try:
85 return str(obj)
85 return str(obj)
86
86
87 except Exception:
87 except Exception:
88 return None
88 return None
89
89
90 def format_datetime_obj(self, obj):
90 def format_datetime_obj(self, obj):
91 return obj.isoformat()
91 return obj.isoformat()
92
92
93
93
94 class JsonFormatter(ExceptionAwareFormatter):
94 class JsonFormatter(ExceptionAwareFormatter):
95 """
95 """
96 A custom formatter to format logging records as json strings.
96 A custom formatter to format logging records as json strings.
97 Extra values will be formatted as str() if not supported by
97 Extra values will be formatted as str() if not supported by
98 json default encoder
98 json default encoder
99 """
99 """
100
100
101 def __init__(self, *args, **kwargs):
101 def __init__(self, *args, **kwargs):
102 """
102 """
103 :param json_default: a function for encoding non-standard objects
103 :param json_default: a function for encoding non-standard objects
104 as outlined in http://docs.python.org/2/library/json.html
104 as outlined in http://docs.python.org/2/library/json.html
105 :param json_encoder: optional custom encoder
105 :param json_encoder: optional custom encoder
106 :param json_serializer: a :meth:`json.dumps`-compatible callable
106 :param json_serializer: a :meth:`json.dumps`-compatible callable
107 that will be used to serialize the log record.
107 that will be used to serialize the log record.
108 :param json_indent: an optional :meth:`json.dumps`-compatible numeric value
108 :param json_indent: an optional :meth:`json.dumps`-compatible numeric value
109 that will be used to customize the indent of the output json.
109 that will be used to customize the indent of the output json.
110 :param prefix: an optional string prefix added at the beginning of
110 :param prefix: an optional string prefix added at the beginning of
111 the formatted string
111 the formatted string
112 :param json_indent: indent parameter for json.dumps
112 :param json_indent: indent parameter for json.dumps
113 :param json_ensure_ascii: ensure_ascii parameter for json.dumps
113 :param json_ensure_ascii: ensure_ascii parameter for json.dumps
114 :param reserved_attrs: an optional list of fields that will be skipped when
114 :param reserved_attrs: an optional list of fields that will be skipped when
115 outputting json log record. Defaults to all log record attributes:
115 outputting json log record. Defaults to all log record attributes:
116 http://docs.python.org/library/logging.html#logrecord-attributes
116 http://docs.python.org/library/logging.html#logrecord-attributes
117 :param timestamp: an optional string/boolean field to add a timestamp when
117 :param timestamp: an optional string/boolean field to add a timestamp when
118 outputting the json log record. If string is passed, timestamp will be added
118 outputting the json log record. If string is passed, timestamp will be added
119 to log record using string as key. If True boolean is passed, timestamp key
119 to log record using string as key. If True boolean is passed, timestamp key
120 will be "timestamp". Defaults to False/off.
120 will be "timestamp". Defaults to False/off.
121 """
121 """
122 self.json_default = self._str_to_fn(kwargs.pop("json_default", None))
122 self.json_default = self._str_to_fn(kwargs.pop("json_default", None))
123 self.json_encoder = self._str_to_fn(kwargs.pop("json_encoder", None))
123 self.json_encoder = self._str_to_fn(kwargs.pop("json_encoder", None))
124 self.json_serializer = self._str_to_fn(kwargs.pop("json_serializer", json.dumps))
124 self.json_serializer = self._str_to_fn(kwargs.pop("json_serializer", json.dumps))
125 self.json_indent = kwargs.pop("json_indent", None)
125 self.json_indent = kwargs.pop("json_indent", None)
126 self.json_ensure_ascii = kwargs.pop("json_ensure_ascii", True)
126 self.json_ensure_ascii = kwargs.pop("json_ensure_ascii", True)
127 self.prefix = kwargs.pop("prefix", "")
127 self.prefix = kwargs.pop("prefix", "")
128 reserved_attrs = kwargs.pop("reserved_attrs", RESERVED_ATTRS)
128 reserved_attrs = kwargs.pop("reserved_attrs", RESERVED_ATTRS)
129 self.reserved_attrs = dict(list(zip(reserved_attrs, reserved_attrs)))
129 self.reserved_attrs = dict(list(zip(reserved_attrs, reserved_attrs)))
130 self.timestamp = kwargs.pop("timestamp", True)
130 self.timestamp = kwargs.pop("timestamp", True)
131
131
132 # super(JsonFormatter, self).__init__(*args, **kwargs)
132 # super(JsonFormatter, self).__init__(*args, **kwargs)
133 logging.Formatter.__init__(self, *args, **kwargs)
133 logging.Formatter.__init__(self, *args, **kwargs)
134 if not self.json_encoder and not self.json_default:
134 if not self.json_encoder and not self.json_default:
135 self.json_encoder = JsonEncoder
135 self.json_encoder = JsonEncoder
136
136
137 self._required_fields = self.parse()
137 self._required_fields = self.parse()
138 self._skip_fields = dict(list(zip(self._required_fields,
138 self._skip_fields = dict(list(zip(self._required_fields,
139 self._required_fields)))
139 self._required_fields)))
140 self._skip_fields.update(self.reserved_attrs)
140 self._skip_fields.update(self.reserved_attrs)
141
141
142 def _str_to_fn(self, fn_as_str):
142 def _str_to_fn(self, fn_as_str):
143 """
143 """
144 If the argument is not a string, return whatever was passed in.
144 If the argument is not a string, return whatever was passed in.
145 Parses a string such as package.module.function, imports the module
145 Parses a string such as package.module.function, imports the module
146 and returns the function.
146 and returns the function.
147
147
148 :param fn_as_str: The string to parse. If not a string, return it.
148 :param fn_as_str: The string to parse. If not a string, return it.
149 """
149 """
150 if not isinstance(fn_as_str, str):
150 if not isinstance(fn_as_str, str):
151 return fn_as_str
151 return fn_as_str
152
152
153 path, _, function = fn_as_str.rpartition('.')
153 path, _, function = fn_as_str.rpartition('.')
154 module = importlib.import_module(path)
154 module = importlib.import_module(path)
155 return getattr(module, function)
155 return getattr(module, function)
156
156
157 def parse(self):
157 def parse(self):
158 """
158 """
159 Parses format string looking for substitutions
159 Parses format string looking for substitutions
160
160
161 This method is responsible for returning a list of fields (as strings)
161 This method is responsible for returning a list of fields (as strings)
162 to include in all log messages.
162 to include in all log messages.
163 """
163 """
164 standard_formatters = re.compile(r'\((.+?)\)', re.IGNORECASE)
164 standard_formatters = re.compile(r'\((.+?)\)', re.IGNORECASE)
165 return standard_formatters.findall(self._fmt)
165 return standard_formatters.findall(self._fmt)
166
166
167 def add_fields(self, log_record, record, message_dict):
167 def add_fields(self, log_record, record, message_dict):
168 """
168 """
169 Override this method to implement custom logic for adding fields.
169 Override this method to implement custom logic for adding fields.
170 """
170 """
171 for field in self._required_fields:
171 for field in self._required_fields:
172 log_record[field] = record.__dict__.get(field)
172 log_record[field] = record.__dict__.get(field)
173 log_record.update(message_dict)
173 log_record.update(message_dict)
174 merge_record_extra(record, log_record, reserved=self._skip_fields)
174 merge_record_extra(record, log_record, reserved=self._skip_fields)
175
175
176 if self.timestamp:
176 if self.timestamp:
177 key = self.timestamp if type(self.timestamp) == str else 'timestamp'
177 key = self.timestamp if type(self.timestamp) == str else 'timestamp'
178 log_record[key] = datetime.fromtimestamp(record.created, tz=utc)
178 log_record[key] = datetime.fromtimestamp(record.created, tz=utc)
179
179
180 def process_log_record(self, log_record):
180 def process_log_record(self, log_record):
181 """
181 """
182 Override this method to implement custom logic
182 Override this method to implement custom logic
183 on the possibly ordered dictionary.
183 on the possibly ordered dictionary.
184 """
184 """
185 return log_record
185 return log_record
186
186
187 def jsonify_log_record(self, log_record):
187 def jsonify_log_record(self, log_record):
188 """Returns a json string of the log record."""
188 """Returns a json string of the log record."""
189 return self.json_serializer(log_record,
189 return self.json_serializer(log_record,
190 default=self.json_default,
190 default=self.json_default,
191 cls=self.json_encoder,
191 cls=self.json_encoder,
192 indent=self.json_indent,
192 indent=self.json_indent,
193 ensure_ascii=self.json_ensure_ascii)
193 ensure_ascii=self.json_ensure_ascii)
194
194
195 def serialize_log_record(self, log_record):
195 def serialize_log_record(self, log_record):
196 """Returns the final representation of the log record."""
196 """Returns the final representation of the log record."""
197 return "%s%s" % (self.prefix, self.jsonify_log_record(log_record))
197 return "{}{}".format(self.prefix, self.jsonify_log_record(log_record))
198
198
199 def format(self, record):
199 def format(self, record):
200 """Formats a log record and serializes to json"""
200 """Formats a log record and serializes to json"""
201 message_dict = {}
201 message_dict = {}
202 # FIXME: logging.LogRecord.msg and logging.LogRecord.message in typeshed
202 # FIXME: logging.LogRecord.msg and logging.LogRecord.message in typeshed
203 # are always type of str. We shouldn't need to override that.
203 # are always type of str. We shouldn't need to override that.
204 if isinstance(record.msg, dict):
204 if isinstance(record.msg, dict):
205 message_dict = record.msg
205 message_dict = record.msg
206 record.message = None
206 record.message = None
207 else:
207 else:
208 record.message = record.getMessage()
208 record.message = record.getMessage()
209 # only format time if needed
209 # only format time if needed
210 if "asctime" in self._required_fields:
210 if "asctime" in self._required_fields:
211 record.asctime = self.formatTime(record, self.datefmt)
211 record.asctime = self.formatTime(record, self.datefmt)
212
212
213 # Display formatted exception, but allow overriding it in the
213 # Display formatted exception, but allow overriding it in the
214 # user-supplied dict.
214 # user-supplied dict.
215 if record.exc_info and not message_dict.get('exc_info'):
215 if record.exc_info and not message_dict.get('exc_info'):
216 message_dict['exc_info'] = self.formatException(record.exc_info)
216 message_dict['exc_info'] = self.formatException(record.exc_info)
217 if not message_dict.get('exc_info') and record.exc_text:
217 if not message_dict.get('exc_info') and record.exc_text:
218 message_dict['exc_info'] = record.exc_text
218 message_dict['exc_info'] = record.exc_text
219 # Display formatted record of stack frames
219 # Display formatted record of stack frames
220 # default format is a string returned from :func:`traceback.print_stack`
220 # default format is a string returned from :func:`traceback.print_stack`
221 try:
221 try:
222 if record.stack_info and not message_dict.get('stack_info'):
222 if record.stack_info and not message_dict.get('stack_info'):
223 message_dict['stack_info'] = self.formatStack(record.stack_info)
223 message_dict['stack_info'] = self.formatStack(record.stack_info)
224 except AttributeError:
224 except AttributeError:
225 # Python2.7 doesn't have stack_info.
225 # Python2.7 doesn't have stack_info.
226 pass
226 pass
227
227
228 try:
228 try:
229 log_record = OrderedDict()
229 log_record = OrderedDict()
230 except NameError:
230 except NameError:
231 log_record = {}
231 log_record = {}
232
232
233 _inject_req_id(record, with_prefix=False)
233 _inject_req_id(record, with_prefix=False)
234 self.add_fields(log_record, record, message_dict)
234 self.add_fields(log_record, record, message_dict)
235 log_record = self.process_log_record(log_record)
235 log_record = self.process_log_record(log_record)
236
236
237 return self.serialize_log_record(log_record)
237 return self.serialize_log_record(log_record)
@@ -1,402 +1,402 b''
1
1
2 import threading
2 import threading
3 import weakref
3 import weakref
4 from base64 import b64encode
4 from base64 import b64encode
5 from logging import getLogger
5 from logging import getLogger
6 from os import urandom
6 from os import urandom
7 from typing import Union
7 from typing import Union
8
8
9 from redis import StrictRedis
9 from redis import StrictRedis
10
10
11 __version__ = '4.0.0'
11 __version__ = '4.0.0'
12
12
13 loggers = {
13 loggers = {
14 k: getLogger("rhodecode." + ".".join((__name__, k)))
14 k: getLogger("rhodecode." + ".".join((__name__, k)))
15 for k in [
15 for k in [
16 "acquire",
16 "acquire",
17 "refresh.thread.start",
17 "refresh.thread.start",
18 "refresh.thread.stop",
18 "refresh.thread.stop",
19 "refresh.thread.exit",
19 "refresh.thread.exit",
20 "refresh.start",
20 "refresh.start",
21 "refresh.shutdown",
21 "refresh.shutdown",
22 "refresh.exit",
22 "refresh.exit",
23 "release",
23 "release",
24 ]
24 ]
25 }
25 }
26
26
27 text_type = str
27 text_type = str
28 binary_type = bytes
28 binary_type = bytes
29
29
30
30
31 # Check if the id match. If not, return an error code.
31 # Check if the id match. If not, return an error code.
32 UNLOCK_SCRIPT = b"""
32 UNLOCK_SCRIPT = b"""
33 if redis.call("get", KEYS[1]) ~= ARGV[1] then
33 if redis.call("get", KEYS[1]) ~= ARGV[1] then
34 return 1
34 return 1
35 else
35 else
36 redis.call("del", KEYS[2])
36 redis.call("del", KEYS[2])
37 redis.call("lpush", KEYS[2], 1)
37 redis.call("lpush", KEYS[2], 1)
38 redis.call("pexpire", KEYS[2], ARGV[2])
38 redis.call("pexpire", KEYS[2], ARGV[2])
39 redis.call("del", KEYS[1])
39 redis.call("del", KEYS[1])
40 return 0
40 return 0
41 end
41 end
42 """
42 """
43
43
44 # Covers both cases when key doesn't exist and doesn't equal to lock's id
44 # Covers both cases when key doesn't exist and doesn't equal to lock's id
45 EXTEND_SCRIPT = b"""
45 EXTEND_SCRIPT = b"""
46 if redis.call("get", KEYS[1]) ~= ARGV[1] then
46 if redis.call("get", KEYS[1]) ~= ARGV[1] then
47 return 1
47 return 1
48 elseif redis.call("ttl", KEYS[1]) < 0 then
48 elseif redis.call("ttl", KEYS[1]) < 0 then
49 return 2
49 return 2
50 else
50 else
51 redis.call("expire", KEYS[1], ARGV[2])
51 redis.call("expire", KEYS[1], ARGV[2])
52 return 0
52 return 0
53 end
53 end
54 """
54 """
55
55
56 RESET_SCRIPT = b"""
56 RESET_SCRIPT = b"""
57 redis.call('del', KEYS[2])
57 redis.call('del', KEYS[2])
58 redis.call('lpush', KEYS[2], 1)
58 redis.call('lpush', KEYS[2], 1)
59 redis.call('pexpire', KEYS[2], ARGV[2])
59 redis.call('pexpire', KEYS[2], ARGV[2])
60 return redis.call('del', KEYS[1])
60 return redis.call('del', KEYS[1])
61 """
61 """
62
62
63 RESET_ALL_SCRIPT = b"""
63 RESET_ALL_SCRIPT = b"""
64 local locks = redis.call('keys', 'lock:*')
64 local locks = redis.call('keys', 'lock:*')
65 local signal
65 local signal
66 for _, lock in pairs(locks) do
66 for _, lock in pairs(locks) do
67 signal = 'lock-signal:' .. string.sub(lock, 6)
67 signal = 'lock-signal:' .. string.sub(lock, 6)
68 redis.call('del', signal)
68 redis.call('del', signal)
69 redis.call('lpush', signal, 1)
69 redis.call('lpush', signal, 1)
70 redis.call('expire', signal, 1)
70 redis.call('expire', signal, 1)
71 redis.call('del', lock)
71 redis.call('del', lock)
72 end
72 end
73 return #locks
73 return #locks
74 """
74 """
75
75
76
76
77 class AlreadyAcquired(RuntimeError):
77 class AlreadyAcquired(RuntimeError):
78 pass
78 pass
79
79
80
80
81 class NotAcquired(RuntimeError):
81 class NotAcquired(RuntimeError):
82 pass
82 pass
83
83
84
84
85 class AlreadyStarted(RuntimeError):
85 class AlreadyStarted(RuntimeError):
86 pass
86 pass
87
87
88
88
89 class TimeoutNotUsable(RuntimeError):
89 class TimeoutNotUsable(RuntimeError):
90 pass
90 pass
91
91
92
92
93 class InvalidTimeout(RuntimeError):
93 class InvalidTimeout(RuntimeError):
94 pass
94 pass
95
95
96
96
97 class TimeoutTooLarge(RuntimeError):
97 class TimeoutTooLarge(RuntimeError):
98 pass
98 pass
99
99
100
100
101 class NotExpirable(RuntimeError):
101 class NotExpirable(RuntimeError):
102 pass
102 pass
103
103
104
104
105 class Lock(object):
105 class Lock:
106 """
106 """
107 A Lock context manager implemented via redis SETNX/BLPOP.
107 A Lock context manager implemented via redis SETNX/BLPOP.
108 """
108 """
109
109
110 unlock_script = None
110 unlock_script = None
111 extend_script = None
111 extend_script = None
112 reset_script = None
112 reset_script = None
113 reset_all_script = None
113 reset_all_script = None
114 blocking = None
114 blocking = None
115
115
116 _lock_renewal_interval: float
116 _lock_renewal_interval: float
117 _lock_renewal_thread: Union[threading.Thread, None]
117 _lock_renewal_thread: Union[threading.Thread, None]
118
118
119 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000, blocking=True):
119 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000, blocking=True):
120 """
120 """
121 :param redis_client:
121 :param redis_client:
122 An instance of :class:`~StrictRedis`.
122 An instance of :class:`~StrictRedis`.
123 :param name:
123 :param name:
124 The name (redis key) the lock should have.
124 The name (redis key) the lock should have.
125 :param expire:
125 :param expire:
126 The lock expiry time in seconds. If left at the default (None)
126 The lock expiry time in seconds. If left at the default (None)
127 the lock will not expire.
127 the lock will not expire.
128 :param id:
128 :param id:
129 The ID (redis value) the lock should have. A random value is
129 The ID (redis value) the lock should have. A random value is
130 generated when left at the default.
130 generated when left at the default.
131
131
132 Note that if you specify this then the lock is marked as "held". Acquires
132 Note that if you specify this then the lock is marked as "held". Acquires
133 won't be possible.
133 won't be possible.
134 :param auto_renewal:
134 :param auto_renewal:
135 If set to ``True``, Lock will automatically renew the lock so that it
135 If set to ``True``, Lock will automatically renew the lock so that it
136 doesn't expire for as long as the lock is held (acquire() called
136 doesn't expire for as long as the lock is held (acquire() called
137 or running in a context manager).
137 or running in a context manager).
138
138
139 Implementation note: Renewal will happen using a daemon thread with
139 Implementation note: Renewal will happen using a daemon thread with
140 an interval of ``expire*2/3``. If wishing to use a different renewal
140 an interval of ``expire*2/3``. If wishing to use a different renewal
141 time, subclass Lock, call ``super().__init__()`` then set
141 time, subclass Lock, call ``super().__init__()`` then set
142 ``self._lock_renewal_interval`` to your desired interval.
142 ``self._lock_renewal_interval`` to your desired interval.
143 :param strict:
143 :param strict:
144 If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
144 If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
145 :param signal_expire:
145 :param signal_expire:
146 Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
146 Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
147 :param blocking:
147 :param blocking:
148 Boolean value specifying whether lock should be blocking or not.
148 Boolean value specifying whether lock should be blocking or not.
149 Used in `__enter__` method.
149 Used in `__enter__` method.
150 """
150 """
151 if strict and not isinstance(redis_client, StrictRedis):
151 if strict and not isinstance(redis_client, StrictRedis):
152 raise ValueError("redis_client must be instance of StrictRedis. "
152 raise ValueError("redis_client must be instance of StrictRedis. "
153 "Use strict=False if you know what you're doing.")
153 "Use strict=False if you know what you're doing.")
154 if auto_renewal and expire is None:
154 if auto_renewal and expire is None:
155 raise ValueError("Expire may not be None when auto_renewal is set")
155 raise ValueError("Expire may not be None when auto_renewal is set")
156
156
157 self._client = redis_client
157 self._client = redis_client
158
158
159 if expire:
159 if expire:
160 expire = int(expire)
160 expire = int(expire)
161 if expire < 0:
161 if expire < 0:
162 raise ValueError("A negative expire is not acceptable.")
162 raise ValueError("A negative expire is not acceptable.")
163 else:
163 else:
164 expire = None
164 expire = None
165 self._expire = expire
165 self._expire = expire
166
166
167 self._signal_expire = signal_expire
167 self._signal_expire = signal_expire
168 if id is None:
168 if id is None:
169 self._id = b64encode(urandom(18)).decode('ascii')
169 self._id = b64encode(urandom(18)).decode('ascii')
170 elif isinstance(id, binary_type):
170 elif isinstance(id, binary_type):
171 try:
171 try:
172 self._id = id.decode('ascii')
172 self._id = id.decode('ascii')
173 except UnicodeDecodeError:
173 except UnicodeDecodeError:
174 self._id = b64encode(id).decode('ascii')
174 self._id = b64encode(id).decode('ascii')
175 elif isinstance(id, text_type):
175 elif isinstance(id, text_type):
176 self._id = id
176 self._id = id
177 else:
177 else:
178 raise TypeError(f"Incorrect type for `id`. Must be bytes/str not {type(id)}.")
178 raise TypeError(f"Incorrect type for `id`. Must be bytes/str not {type(id)}.")
179 self._name = 'lock:' + name
179 self._name = 'lock:' + name
180 self._signal = 'lock-signal:' + name
180 self._signal = 'lock-signal:' + name
181 self._lock_renewal_interval = (float(expire) * 2 / 3
181 self._lock_renewal_interval = (float(expire) * 2 / 3
182 if auto_renewal
182 if auto_renewal
183 else None)
183 else None)
184 self._lock_renewal_thread = None
184 self._lock_renewal_thread = None
185
185
186 self.blocking = blocking
186 self.blocking = blocking
187
187
188 self.register_scripts(redis_client)
188 self.register_scripts(redis_client)
189
189
190 @classmethod
190 @classmethod
191 def register_scripts(cls, redis_client):
191 def register_scripts(cls, redis_client):
192 global reset_all_script
192 global reset_all_script
193 if reset_all_script is None:
193 if reset_all_script is None:
194 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
194 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
195 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
195 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
196 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
196 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
197 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
197 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
198 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
198 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
199
199
200 @property
200 @property
201 def _held(self):
201 def _held(self):
202 return self.id == self.get_owner_id()
202 return self.id == self.get_owner_id()
203
203
204 def reset(self):
204 def reset(self):
205 """
205 """
206 Forcibly deletes the lock. Use this with care.
206 Forcibly deletes the lock. Use this with care.
207 """
207 """
208 self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
208 self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
209
209
210 @property
210 @property
211 def id(self):
211 def id(self):
212 return self._id
212 return self._id
213
213
214 def get_owner_id(self):
214 def get_owner_id(self):
215 owner_id = self._client.get(self._name)
215 owner_id = self._client.get(self._name)
216 if isinstance(owner_id, binary_type):
216 if isinstance(owner_id, binary_type):
217 owner_id = owner_id.decode('ascii', 'replace')
217 owner_id = owner_id.decode('ascii', 'replace')
218 return owner_id
218 return owner_id
219
219
220 def acquire(self, blocking=True, timeout=None):
220 def acquire(self, blocking=True, timeout=None):
221 """
221 """
222 :param blocking:
222 :param blocking:
223 Boolean value specifying whether lock should be blocking or not.
223 Boolean value specifying whether lock should be blocking or not.
224 :param timeout:
224 :param timeout:
225 An integer value specifying the maximum number of seconds to block.
225 An integer value specifying the maximum number of seconds to block.
226 """
226 """
227 logger = loggers["acquire"]
227 logger = loggers["acquire"]
228
228
229 logger.debug("Getting blocking: %s acquire on %r ...", blocking, self._name)
229 logger.debug("Getting blocking: %s acquire on %r ...", blocking, self._name)
230
230
231 if self._held:
231 if self._held:
232 owner_id = self.get_owner_id()
232 owner_id = self.get_owner_id()
233 raise AlreadyAcquired("Already acquired from this Lock instance. Lock id: {}".format(owner_id))
233 raise AlreadyAcquired("Already acquired from this Lock instance. Lock id: {}".format(owner_id))
234
234
235 if not blocking and timeout is not None:
235 if not blocking and timeout is not None:
236 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
236 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
237
237
238 if timeout:
238 if timeout:
239 timeout = int(timeout)
239 timeout = int(timeout)
240 if timeout < 0:
240 if timeout < 0:
241 raise InvalidTimeout(f"Timeout ({timeout}) cannot be less than or equal to 0")
241 raise InvalidTimeout(f"Timeout ({timeout}) cannot be less than or equal to 0")
242
242
243 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
243 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
244 raise TimeoutTooLarge(f"Timeout ({timeout}) cannot be greater than expire ({self._expire})")
244 raise TimeoutTooLarge(f"Timeout ({timeout}) cannot be greater than expire ({self._expire})")
245
245
246 busy = True
246 busy = True
247 blpop_timeout = timeout or self._expire or 0
247 blpop_timeout = timeout or self._expire or 0
248 timed_out = False
248 timed_out = False
249 while busy:
249 while busy:
250 busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
250 busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
251 if busy:
251 if busy:
252 if timed_out:
252 if timed_out:
253 return False
253 return False
254 elif blocking:
254 elif blocking:
255 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
255 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
256 else:
256 else:
257 logger.warning("Failed to acquire Lock(%r).", self._name)
257 logger.warning("Failed to acquire Lock(%r).", self._name)
258 return False
258 return False
259
259
260 logger.debug("Acquired Lock(%r).", self._name)
260 logger.debug("Acquired Lock(%r).", self._name)
261 if self._lock_renewal_interval is not None:
261 if self._lock_renewal_interval is not None:
262 self._start_lock_renewer()
262 self._start_lock_renewer()
263 return True
263 return True
264
264
265 def extend(self, expire=None):
265 def extend(self, expire=None):
266 """
266 """
267 Extends expiration time of the lock.
267 Extends expiration time of the lock.
268
268
269 :param expire:
269 :param expire:
270 New expiration time. If ``None`` - `expire` provided during
270 New expiration time. If ``None`` - `expire` provided during
271 lock initialization will be taken.
271 lock initialization will be taken.
272 """
272 """
273 if expire:
273 if expire:
274 expire = int(expire)
274 expire = int(expire)
275 if expire < 0:
275 if expire < 0:
276 raise ValueError("A negative expire is not acceptable.")
276 raise ValueError("A negative expire is not acceptable.")
277 elif self._expire is not None:
277 elif self._expire is not None:
278 expire = self._expire
278 expire = self._expire
279 else:
279 else:
280 raise TypeError(
280 raise TypeError(
281 "To extend a lock 'expire' must be provided as an "
281 "To extend a lock 'expire' must be provided as an "
282 "argument to extend() method or at initialization time."
282 "argument to extend() method or at initialization time."
283 )
283 )
284
284
285 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
285 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
286 if error == 1:
286 if error == 1:
287 raise NotAcquired(f"Lock {self._name} is not acquired or it already expired.")
287 raise NotAcquired(f"Lock {self._name} is not acquired or it already expired.")
288 elif error == 2:
288 elif error == 2:
289 raise NotExpirable(f"Lock {self._name} has no assigned expiration time")
289 raise NotExpirable(f"Lock {self._name} has no assigned expiration time")
290 elif error:
290 elif error:
291 raise RuntimeError(f"Unsupported error code {error} from EXTEND script")
291 raise RuntimeError(f"Unsupported error code {error} from EXTEND script")
292
292
293 @staticmethod
293 @staticmethod
294 def _lock_renewer(name, lockref, interval, stop):
294 def _lock_renewer(name, lockref, interval, stop):
295 """
295 """
296 Renew the lock key in redis every `interval` seconds for as long
296 Renew the lock key in redis every `interval` seconds for as long
297 as `self._lock_renewal_thread.should_exit` is False.
297 as `self._lock_renewal_thread.should_exit` is False.
298 """
298 """
299 while not stop.wait(timeout=interval):
299 while not stop.wait(timeout=interval):
300 loggers["refresh.thread.start"].debug("Refreshing Lock(%r).", name)
300 loggers["refresh.thread.start"].debug("Refreshing Lock(%r).", name)
301 lock: "Lock" = lockref()
301 lock: "Lock" = lockref()
302 if lock is None:
302 if lock is None:
303 loggers["refresh.thread.stop"].debug(
303 loggers["refresh.thread.stop"].debug(
304 "Stopping loop because Lock(%r) was garbage collected.", name
304 "Stopping loop because Lock(%r) was garbage collected.", name
305 )
305 )
306 break
306 break
307 lock.extend(expire=lock._expire)
307 lock.extend(expire=lock._expire)
308 del lock
308 del lock
309 loggers["refresh.thread.exit"].debug("Exiting renewal thread for Lock(%r).", name)
309 loggers["refresh.thread.exit"].debug("Exiting renewal thread for Lock(%r).", name)
310
310
311 def _start_lock_renewer(self):
311 def _start_lock_renewer(self):
312 """
312 """
313 Starts the lock refresher thread.
313 Starts the lock refresher thread.
314 """
314 """
315 if self._lock_renewal_thread is not None:
315 if self._lock_renewal_thread is not None:
316 raise AlreadyStarted("Lock refresh thread already started")
316 raise AlreadyStarted("Lock refresh thread already started")
317
317
318 loggers["refresh.start"].debug(
318 loggers["refresh.start"].debug(
319 "Starting renewal thread for Lock(%r). Refresh interval: %s seconds.",
319 "Starting renewal thread for Lock(%r). Refresh interval: %s seconds.",
320 self._name, self._lock_renewal_interval
320 self._name, self._lock_renewal_interval
321 )
321 )
322 self._lock_renewal_stop = threading.Event()
322 self._lock_renewal_stop = threading.Event()
323 self._lock_renewal_thread = threading.Thread(
323 self._lock_renewal_thread = threading.Thread(
324 group=None,
324 group=None,
325 target=self._lock_renewer,
325 target=self._lock_renewer,
326 kwargs={
326 kwargs={
327 'name': self._name,
327 'name': self._name,
328 'lockref': weakref.ref(self),
328 'lockref': weakref.ref(self),
329 'interval': self._lock_renewal_interval,
329 'interval': self._lock_renewal_interval,
330 'stop': self._lock_renewal_stop,
330 'stop': self._lock_renewal_stop,
331 },
331 },
332 )
332 )
333 self._lock_renewal_thread.daemon = True
333 self._lock_renewal_thread.daemon = True
334 self._lock_renewal_thread.start()
334 self._lock_renewal_thread.start()
335
335
336 def _stop_lock_renewer(self):
336 def _stop_lock_renewer(self):
337 """
337 """
338 Stop the lock renewer.
338 Stop the lock renewer.
339
339
340 This signals the renewal thread and waits for its exit.
340 This signals the renewal thread and waits for its exit.
341 """
341 """
342 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
342 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
343 return
343 return
344 loggers["refresh.shutdown"].debug("Signaling renewal thread for Lock(%r) to exit.", self._name)
344 loggers["refresh.shutdown"].debug("Signaling renewal thread for Lock(%r) to exit.", self._name)
345 self._lock_renewal_stop.set()
345 self._lock_renewal_stop.set()
346 self._lock_renewal_thread.join()
346 self._lock_renewal_thread.join()
347 self._lock_renewal_thread = None
347 self._lock_renewal_thread = None
348 loggers["refresh.exit"].debug("Renewal thread for Lock(%r) exited.", self._name)
348 loggers["refresh.exit"].debug("Renewal thread for Lock(%r) exited.", self._name)
349
349
350 def __enter__(self):
350 def __enter__(self):
351 acquired = self.acquire(blocking=self.blocking)
351 acquired = self.acquire(blocking=self.blocking)
352 if not acquired:
352 if not acquired:
353 if self.blocking:
353 if self.blocking:
354 raise AssertionError(f"Lock({self._name}) wasn't acquired, but blocking=True was used!")
354 raise AssertionError(f"Lock({self._name}) wasn't acquired, but blocking=True was used!")
355 raise NotAcquired(f"Lock({self._name}) is not acquired or it already expired.")
355 raise NotAcquired(f"Lock({self._name}) is not acquired or it already expired.")
356 return self
356 return self
357
357
358 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
358 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
359 self.release()
359 self.release()
360
360
361 def release(self):
361 def release(self):
362 """Releases the lock, that was acquired with the same object.
362 """Releases the lock, that was acquired with the same object.
363
363
364 .. note::
364 .. note::
365
365
366 If you want to release a lock that you acquired in a different place you have two choices:
366 If you want to release a lock that you acquired in a different place you have two choices:
367
367
368 * Use ``Lock("name", id=id_from_other_place).release()``
368 * Use ``Lock("name", id=id_from_other_place).release()``
369 * Use ``Lock("name").reset()``
369 * Use ``Lock("name").reset()``
370 """
370 """
371 if self._lock_renewal_thread is not None:
371 if self._lock_renewal_thread is not None:
372 self._stop_lock_renewer()
372 self._stop_lock_renewer()
373 loggers["release"].debug("Releasing Lock(%r).", self._name)
373 loggers["release"].debug("Releasing Lock(%r).", self._name)
374 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
374 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
375 if error == 1:
375 if error == 1:
376 raise NotAcquired(f"Lock({self._name}) is not acquired or it already expired.")
376 raise NotAcquired(f"Lock({self._name}) is not acquired or it already expired.")
377 elif error:
377 elif error:
378 raise RuntimeError(f"Unsupported error code {error} from EXTEND script.")
378 raise RuntimeError(f"Unsupported error code {error} from EXTEND script.")
379
379
380 def locked(self):
380 def locked(self):
381 """
381 """
382 Return true if the lock is acquired.
382 Return true if the lock is acquired.
383
383
384 Checks that lock with same name already exists. This method returns true, even if
384 Checks that lock with same name already exists. This method returns true, even if
385 lock have another id.
385 lock have another id.
386 """
386 """
387 return self._client.exists(self._name) == 1
387 return self._client.exists(self._name) == 1
388
388
389
389
390 reset_all_script = None
390 reset_all_script = None
391
391
392
392
393 def reset_all(redis_client):
393 def reset_all(redis_client):
394 """
394 """
395 Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
395 Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
396
396
397 :param redis_client:
397 :param redis_client:
398 An instance of :class:`~StrictRedis`.
398 An instance of :class:`~StrictRedis`.
399 """
399 """
400 Lock.register_scripts(redis_client)
400 Lock.register_scripts(redis_client)
401
401
402 reset_all_script(client=redis_client) # noqa
402 reset_all_script(client=redis_client) # noqa
@@ -1,53 +1,51 b''
1
2
3 import logging
1 import logging
4
2
5 from .stream import TCPStatsClient, UnixSocketStatsClient # noqa
3 from .stream import TCPStatsClient, UnixSocketStatsClient # noqa
6 from .udp import StatsClient # noqa
4 from .udp import StatsClient # noqa
7
5
8 HOST = 'localhost'
6 HOST = 'localhost'
9 PORT = 8125
7 PORT = 8125
10 IPV6 = False
8 IPV6 = False
11 PREFIX = None
9 PREFIX = None
12 MAXUDPSIZE = 512
10 MAXUDPSIZE = 512
13
11
14 log = logging.getLogger('rhodecode.statsd')
12 log = logging.getLogger('rhodecode.statsd')
15
13
16
14
17 def statsd_config(config, prefix='statsd.'):
15 def statsd_config(config, prefix='statsd.'):
18 _config = {}
16 _config = {}
19 for key in list(config.keys()):
17 for key in list(config.keys()):
20 if key.startswith(prefix):
18 if key.startswith(prefix):
21 _config[key[len(prefix):]] = config[key]
19 _config[key[len(prefix):]] = config[key]
22 return _config
20 return _config
23
21
24
22
25 def client_from_config(configuration, prefix='statsd.', **kwargs):
23 def client_from_config(configuration, prefix='statsd.', **kwargs):
26 from pyramid.settings import asbool
24 from pyramid.settings import asbool
27
25
28 _config = statsd_config(configuration, prefix)
26 _config = statsd_config(configuration, prefix)
29 statsd_flag = _config.get('enabled')
27 statsd_flag = _config.get('enabled')
30 statsd_enabled = asbool(_config.pop('enabled', False))
28 statsd_enabled = asbool(_config.pop('enabled', False))
31 if not statsd_enabled:
29 if not statsd_enabled:
32 log.debug('statsd client not enabled by statsd.enabled = %s flag, skipping...', statsd_flag)
30 log.debug('statsd client not enabled by statsd.enabled = %s flag, skipping...', statsd_flag)
33 return
31 return
34
32
35 host = _config.pop('statsd_host', HOST)
33 host = _config.pop('statsd_host', HOST)
36 port = _config.pop('statsd_port', PORT)
34 port = _config.pop('statsd_port', PORT)
37 prefix = _config.pop('statsd_prefix', PREFIX)
35 prefix = _config.pop('statsd_prefix', PREFIX)
38 maxudpsize = _config.pop('statsd_maxudpsize', MAXUDPSIZE)
36 maxudpsize = _config.pop('statsd_maxudpsize', MAXUDPSIZE)
39 ipv6 = asbool(_config.pop('statsd_ipv6', IPV6))
37 ipv6 = asbool(_config.pop('statsd_ipv6', IPV6))
40 log.debug('configured statsd client %s:%s', host, port)
38 log.debug('configured statsd client %s:%s', host, port)
41
39
42 try:
40 try:
43 client = StatsClient(
41 client = StatsClient(
44 host=host, port=port, prefix=prefix, maxudpsize=maxudpsize, ipv6=ipv6)
42 host=host, port=port, prefix=prefix, maxudpsize=maxudpsize, ipv6=ipv6)
45 except Exception:
43 except Exception:
46 log.exception('StatsD is enabled, but failed to connect to statsd server, fallback: disable statsd')
44 log.exception('StatsD is enabled, but failed to connect to statsd server, fallback: disable statsd')
47 client = None
45 client = None
48
46
49 return client
47 return client
50
48
51
49
52 def get_statsd_client(request):
50 def get_statsd_client(request):
53 return client_from_config(request.registry.settings)
51 return client_from_config(request.registry.settings)
@@ -1,156 +1,154 b''
1
2
3 import re
1 import re
4 import random
2 import random
5 from collections import deque
3 from collections import deque
6 from datetime import timedelta
4 from datetime import timedelta
7 from repoze.lru import lru_cache
5 from repoze.lru import lru_cache
8
6
9 from .timer import Timer
7 from .timer import Timer
10
8
11 TAG_INVALID_CHARS_RE = re.compile(
9 TAG_INVALID_CHARS_RE = re.compile(
12 r"[^\w\d_\-:/\.]",
10 r"[^\w\d_\-:/\.]",
13 #re.UNICODE
11 #re.UNICODE
14 )
12 )
15 TAG_INVALID_CHARS_SUBS = "_"
13 TAG_INVALID_CHARS_SUBS = "_"
16
14
17 # we save and expose methods called by statsd for discovery
15 # we save and expose methods called by statsd for discovery
18 buckets_dict = {
16 buckets_dict = {
19
17
20 }
18 }
21
19
22
20
23 @lru_cache(maxsize=500)
21 @lru_cache(maxsize=500)
24 def _normalize_tags_with_cache(tag_list):
22 def _normalize_tags_with_cache(tag_list):
25 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
23 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
26
24
27
25
28 def normalize_tags(tag_list):
26 def normalize_tags(tag_list):
29 # We have to turn our input tag list into a non-mutable tuple for it to
27 # We have to turn our input tag list into a non-mutable tuple for it to
30 # be hashable (and thus usable) by the @lru_cache decorator.
28 # be hashable (and thus usable) by the @lru_cache decorator.
31 return _normalize_tags_with_cache(tuple(tag_list))
29 return _normalize_tags_with_cache(tuple(tag_list))
32
30
33
31
34 class StatsClientBase(object):
32 class StatsClientBase:
35 """A Base class for various statsd clients."""
33 """A Base class for various statsd clients."""
36
34
37 def close(self):
35 def close(self):
38 """Used to close and clean up any underlying resources."""
36 """Used to close and clean up any underlying resources."""
39 raise NotImplementedError()
37 raise NotImplementedError()
40
38
41 def _send(self):
39 def _send(self):
42 raise NotImplementedError()
40 raise NotImplementedError()
43
41
44 def pipeline(self):
42 def pipeline(self):
45 raise NotImplementedError()
43 raise NotImplementedError()
46
44
47 def timer(self, stat, rate=1, tags=None, auto_send=True):
45 def timer(self, stat, rate=1, tags=None, auto_send=True):
48 """
46 """
49 statsd = StatsdClient.statsd
47 statsd = StatsdClient.statsd
50 with statsd.timer('bucket_name', auto_send=True) as tmr:
48 with statsd.timer('bucket_name', auto_send=True) as tmr:
51 # This block will be timed.
49 # This block will be timed.
52 for i in range(0, 100000):
50 for i in range(0, 100000):
53 i ** 2
51 i ** 2
54 # you can access time here...
52 # you can access time here...
55 elapsed_ms = tmr.ms
53 elapsed_ms = tmr.ms
56 """
54 """
57 return Timer(self, stat, rate, tags, auto_send=auto_send)
55 return Timer(self, stat, rate, tags, auto_send=auto_send)
58
56
59 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
57 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
60 """
58 """
61 Send new timing information.
59 Send new timing information.
62
60
63 `delta` can be either a number of milliseconds or a timedelta.
61 `delta` can be either a number of milliseconds or a timedelta.
64 """
62 """
65 if isinstance(delta, timedelta):
63 if isinstance(delta, timedelta):
66 # Convert timedelta to number of milliseconds.
64 # Convert timedelta to number of milliseconds.
67 delta = delta.total_seconds() * 1000.
65 delta = delta.total_seconds() * 1000.
68 if use_decimals:
66 if use_decimals:
69 fmt = '%0.6f|ms'
67 fmt = '%0.6f|ms'
70 else:
68 else:
71 fmt = '%s|ms'
69 fmt = '%s|ms'
72 self._send_stat(stat, fmt % delta, rate, tags)
70 self._send_stat(stat, fmt % delta, rate, tags)
73
71
74 def incr(self, stat, count=1, rate=1, tags=None):
72 def incr(self, stat, count=1, rate=1, tags=None):
75 """Increment a stat by `count`."""
73 """Increment a stat by `count`."""
76 self._send_stat(stat, '%s|c' % count, rate, tags)
74 self._send_stat(stat, f'{count}|c', rate, tags)
77
75
78 def decr(self, stat, count=1, rate=1, tags=None):
76 def decr(self, stat, count=1, rate=1, tags=None):
79 """Decrement a stat by `count`."""
77 """Decrement a stat by `count`."""
80 self.incr(stat, -count, rate, tags)
78 self.incr(stat, -count, rate, tags)
81
79
82 def gauge(self, stat, value, rate=1, delta=False, tags=None):
80 def gauge(self, stat, value, rate=1, delta=False, tags=None):
83 """Set a gauge value."""
81 """Set a gauge value."""
84 if value < 0 and not delta:
82 if value < 0 and not delta:
85 if rate < 1:
83 if rate < 1:
86 if random.random() > rate:
84 if random.random() > rate:
87 return
85 return
88 with self.pipeline() as pipe:
86 with self.pipeline() as pipe:
89 pipe._send_stat(stat, '0|g', 1)
87 pipe._send_stat(stat, '0|g', 1)
90 pipe._send_stat(stat, '%s|g' % value, 1)
88 pipe._send_stat(stat, f'{value}|g', 1)
91 else:
89 else:
92 prefix = '+' if delta and value >= 0 else ''
90 prefix = '+' if delta and value >= 0 else ''
93 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
91 self._send_stat(stat, f'{prefix}{value}|g', rate, tags)
94
92
95 def set(self, stat, value, rate=1):
93 def set(self, stat, value, rate=1):
96 """Set a set value."""
94 """Set a set value."""
97 self._send_stat(stat, '%s|s' % value, rate)
95 self._send_stat(stat, f'{value}|s', rate)
98
96
99 def histogram(self, stat, value, rate=1, tags=None):
97 def histogram(self, stat, value, rate=1, tags=None):
100 """Set a histogram"""
98 """Set a histogram"""
101 self._send_stat(stat, '%s|h' % value, rate, tags)
99 self._send_stat(stat, f'{value}|h', rate, tags)
102
100
103 def _send_stat(self, stat, value, rate, tags=None):
101 def _send_stat(self, stat, value, rate, tags=None):
104 self._after(self._prepare(stat, value, rate, tags))
102 self._after(self._prepare(stat, value, rate, tags))
105
103
106 def _prepare(self, stat, value, rate, tags=None):
104 def _prepare(self, stat, value, rate, tags=None):
107 global buckets_dict
105 global buckets_dict
108 buckets_dict[stat] = 1
106 buckets_dict[stat] = 1
109
107
110 if rate < 1:
108 if rate < 1:
111 if random.random() > rate:
109 if random.random() > rate:
112 return
110 return
113 value = '%s|@%s' % (value, rate)
111 value = f'{value}|@{rate}'
114
112
115 if self._prefix:
113 if self._prefix:
116 stat = '%s.%s' % (self._prefix, stat)
114 stat = f'{self._prefix}.{stat}'
117
115
118 res = '%s:%s%s' % (
116 res = '%s:%s%s' % (
119 stat,
117 stat,
120 value,
118 value,
121 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
119 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
122 )
120 )
123 return res
121 return res
124
122
125 def _after(self, data):
123 def _after(self, data):
126 if data:
124 if data:
127 self._send(data)
125 self._send(data)
128
126
129
127
130 class PipelineBase(StatsClientBase):
128 class PipelineBase(StatsClientBase):
131
129
132 def __init__(self, client):
130 def __init__(self, client):
133 self._client = client
131 self._client = client
134 self._prefix = client._prefix
132 self._prefix = client._prefix
135 self._stats = deque()
133 self._stats = deque()
136
134
137 def _send(self):
135 def _send(self):
138 raise NotImplementedError()
136 raise NotImplementedError()
139
137
140 def _after(self, data):
138 def _after(self, data):
141 if data is not None:
139 if data is not None:
142 self._stats.append(data)
140 self._stats.append(data)
143
141
144 def __enter__(self):
142 def __enter__(self):
145 return self
143 return self
146
144
147 def __exit__(self, typ, value, tb):
145 def __exit__(self, typ, value, tb):
148 self.send()
146 self.send()
149
147
150 def send(self):
148 def send(self):
151 if not self._stats:
149 if not self._stats:
152 return
150 return
153 self._send()
151 self._send()
154
152
155 def pipeline(self):
153 def pipeline(self):
156 return self.__class__(self)
154 return self.__class__(self)
@@ -1,75 +1,73 b''
1
2
3 import socket
1 import socket
4
2
5 from .base import StatsClientBase, PipelineBase
3 from .base import StatsClientBase, PipelineBase
6
4
7
5
8 class StreamPipeline(PipelineBase):
6 class StreamPipeline(PipelineBase):
9 def _send(self):
7 def _send(self):
10 self._client._after('\n'.join(self._stats))
8 self._client._after('\n'.join(self._stats))
11 self._stats.clear()
9 self._stats.clear()
12
10
13
11
14 class StreamClientBase(StatsClientBase):
12 class StreamClientBase(StatsClientBase):
15 def connect(self):
13 def connect(self):
16 raise NotImplementedError()
14 raise NotImplementedError()
17
15
18 def close(self):
16 def close(self):
19 if self._sock and hasattr(self._sock, 'close'):
17 if self._sock and hasattr(self._sock, 'close'):
20 self._sock.close()
18 self._sock.close()
21 self._sock = None
19 self._sock = None
22
20
23 def reconnect(self):
21 def reconnect(self):
24 self.close()
22 self.close()
25 self.connect()
23 self.connect()
26
24
27 def pipeline(self):
25 def pipeline(self):
28 return StreamPipeline(self)
26 return StreamPipeline(self)
29
27
30 def _send(self, data):
28 def _send(self, data):
31 """Send data to statsd."""
29 """Send data to statsd."""
32 if not self._sock:
30 if not self._sock:
33 self.connect()
31 self.connect()
34 self._do_send(data)
32 self._do_send(data)
35
33
36 def _do_send(self, data):
34 def _do_send(self, data):
37 self._sock.sendall(data.encode('ascii') + b'\n')
35 self._sock.sendall(data.encode('ascii') + b'\n')
38
36
39
37
40 class TCPStatsClient(StreamClientBase):
38 class TCPStatsClient(StreamClientBase):
41 """TCP version of StatsClient."""
39 """TCP version of StatsClient."""
42
40
43 def __init__(self, host='localhost', port=8125, prefix=None,
41 def __init__(self, host='localhost', port=8125, prefix=None,
44 timeout=None, ipv6=False):
42 timeout=None, ipv6=False):
45 """Create a new client."""
43 """Create a new client."""
46 self._host = host
44 self._host = host
47 self._port = port
45 self._port = port
48 self._ipv6 = ipv6
46 self._ipv6 = ipv6
49 self._timeout = timeout
47 self._timeout = timeout
50 self._prefix = prefix
48 self._prefix = prefix
51 self._sock = None
49 self._sock = None
52
50
53 def connect(self):
51 def connect(self):
54 fam = socket.AF_INET6 if self._ipv6 else socket.AF_INET
52 fam = socket.AF_INET6 if self._ipv6 else socket.AF_INET
55 family, _, _, _, addr = socket.getaddrinfo(
53 family, _, _, _, addr = socket.getaddrinfo(
56 self._host, self._port, fam, socket.SOCK_STREAM)[0]
54 self._host, self._port, fam, socket.SOCK_STREAM)[0]
57 self._sock = socket.socket(family, socket.SOCK_STREAM)
55 self._sock = socket.socket(family, socket.SOCK_STREAM)
58 self._sock.settimeout(self._timeout)
56 self._sock.settimeout(self._timeout)
59 self._sock.connect(addr)
57 self._sock.connect(addr)
60
58
61
59
62 class UnixSocketStatsClient(StreamClientBase):
60 class UnixSocketStatsClient(StreamClientBase):
63 """Unix domain socket version of StatsClient."""
61 """Unix domain socket version of StatsClient."""
64
62
65 def __init__(self, socket_path, prefix=None, timeout=None):
63 def __init__(self, socket_path, prefix=None, timeout=None):
66 """Create a new client."""
64 """Create a new client."""
67 self._socket_path = socket_path
65 self._socket_path = socket_path
68 self._timeout = timeout
66 self._timeout = timeout
69 self._prefix = prefix
67 self._prefix = prefix
70 self._sock = None
68 self._sock = None
71
69
72 def connect(self):
70 def connect(self):
73 self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
71 self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
74 self._sock.settimeout(self._timeout)
72 self._sock.settimeout(self._timeout)
75 self._sock.connect(self._socket_path)
73 self._sock.connect(self._socket_path)
@@ -1,68 +1,66 b''
1
2
3 import functools
1 import functools
4 from time import perf_counter as time_now
2 from time import perf_counter as time_now
5
3
6
4
7 def safe_wraps(wrapper, *args, **kwargs):
5 def safe_wraps(wrapper, *args, **kwargs):
8 """Safely wraps partial functions."""
6 """Safely wraps partial functions."""
9 while isinstance(wrapper, functools.partial):
7 while isinstance(wrapper, functools.partial):
10 wrapper = wrapper.func
8 wrapper = wrapper.func
11 return functools.wraps(wrapper, *args, **kwargs)
9 return functools.wraps(wrapper, *args, **kwargs)
12
10
13
11
14 class Timer(object):
12 class Timer:
15 """A context manager/decorator for statsd.timing()."""
13 """A context manager/decorator for statsd.timing()."""
16
14
17 def __init__(self, client, stat, rate=1, tags=None, use_decimals=True, auto_send=True):
15 def __init__(self, client, stat, rate=1, tags=None, use_decimals=True, auto_send=True):
18 self.client = client
16 self.client = client
19 self.stat = stat
17 self.stat = stat
20 self.rate = rate
18 self.rate = rate
21 self.tags = tags
19 self.tags = tags
22 self.ms = None
20 self.ms = None
23 self._sent = False
21 self._sent = False
24 self._start_time = None
22 self._start_time = None
25 self.use_decimals = use_decimals
23 self.use_decimals = use_decimals
26 self.auto_send = auto_send
24 self.auto_send = auto_send
27
25
28 def __call__(self, f):
26 def __call__(self, f):
29 """Thread-safe timing function decorator."""
27 """Thread-safe timing function decorator."""
30 @safe_wraps(f)
28 @safe_wraps(f)
31 def _wrapped(*args, **kwargs):
29 def _wrapped(*args, **kwargs):
32 start_time = time_now()
30 start_time = time_now()
33 try:
31 try:
34 return f(*args, **kwargs)
32 return f(*args, **kwargs)
35 finally:
33 finally:
36 elapsed_time_ms = 1000.0 * (time_now() - start_time)
34 elapsed_time_ms = 1000.0 * (time_now() - start_time)
37 self.client.timing(self.stat, elapsed_time_ms, self.rate, self.tags, self.use_decimals)
35 self.client.timing(self.stat, elapsed_time_ms, self.rate, self.tags, self.use_decimals)
38 self._sent = True
36 self._sent = True
39 return _wrapped
37 return _wrapped
40
38
41 def __enter__(self):
39 def __enter__(self):
42 return self.start()
40 return self.start()
43
41
44 def __exit__(self, typ, value, tb):
42 def __exit__(self, typ, value, tb):
45 self.stop(send=self.auto_send)
43 self.stop(send=self.auto_send)
46
44
47 def start(self):
45 def start(self):
48 self.ms = None
46 self.ms = None
49 self._sent = False
47 self._sent = False
50 self._start_time = time_now()
48 self._start_time = time_now()
51 return self
49 return self
52
50
53 def stop(self, send=True):
51 def stop(self, send=True):
54 if self._start_time is None:
52 if self._start_time is None:
55 raise RuntimeError('Timer has not started.')
53 raise RuntimeError('Timer has not started.')
56 dt = time_now() - self._start_time
54 dt = time_now() - self._start_time
57 self.ms = 1000.0 * dt # Convert to milliseconds.
55 self.ms = 1000.0 * dt # Convert to milliseconds.
58 if send:
56 if send:
59 self.send()
57 self.send()
60 return self
58 return self
61
59
62 def send(self):
60 def send(self):
63 if self.ms is None:
61 if self.ms is None:
64 raise RuntimeError('No data recorded.')
62 raise RuntimeError('No data recorded.')
65 if self._sent:
63 if self._sent:
66 raise RuntimeError('Already sent data.')
64 raise RuntimeError('Already sent data.')
67 self._sent = True
65 self._sent = True
68 self.client.timing(self.stat, self.ms, self.rate, self.tags, self.use_decimals)
66 self.client.timing(self.stat, self.ms, self.rate, self.tags, self.use_decimals)
@@ -1,55 +1,53 b''
1
2
3 import socket
1 import socket
4
2
5 from .base import StatsClientBase, PipelineBase
3 from .base import StatsClientBase, PipelineBase
6
4
7
5
8 class Pipeline(PipelineBase):
6 class Pipeline(PipelineBase):
9
7
10 def __init__(self, client):
8 def __init__(self, client):
11 super(Pipeline, self).__init__(client)
9 super().__init__(client)
12 self._maxudpsize = client._maxudpsize
10 self._maxudpsize = client._maxudpsize
13
11
14 def _send(self):
12 def _send(self):
15 data = self._stats.popleft()
13 data = self._stats.popleft()
16 while self._stats:
14 while self._stats:
17 # Use popleft to preserve the order of the stats.
15 # Use popleft to preserve the order of the stats.
18 stat = self._stats.popleft()
16 stat = self._stats.popleft()
19 if len(stat) + len(data) + 1 >= self._maxudpsize:
17 if len(stat) + len(data) + 1 >= self._maxudpsize:
20 self._client._after(data)
18 self._client._after(data)
21 data = stat
19 data = stat
22 else:
20 else:
23 data += '\n' + stat
21 data += '\n' + stat
24 self._client._after(data)
22 self._client._after(data)
25
23
26
24
27 class StatsClient(StatsClientBase):
25 class StatsClient(StatsClientBase):
28 """A client for statsd."""
26 """A client for statsd."""
29
27
30 def __init__(self, host='localhost', port=8125, prefix=None,
28 def __init__(self, host='localhost', port=8125, prefix=None,
31 maxudpsize=512, ipv6=False):
29 maxudpsize=512, ipv6=False):
32 """Create a new client."""
30 """Create a new client."""
33 fam = socket.AF_INET6 if ipv6 else socket.AF_INET
31 fam = socket.AF_INET6 if ipv6 else socket.AF_INET
34 family, _, _, _, addr = socket.getaddrinfo(
32 family, _, _, _, addr = socket.getaddrinfo(
35 host, port, fam, socket.SOCK_DGRAM)[0]
33 host, port, fam, socket.SOCK_DGRAM)[0]
36 self._addr = addr
34 self._addr = addr
37 self._sock = socket.socket(family, socket.SOCK_DGRAM)
35 self._sock = socket.socket(family, socket.SOCK_DGRAM)
38 self._prefix = prefix
36 self._prefix = prefix
39 self._maxudpsize = maxudpsize
37 self._maxudpsize = maxudpsize
40
38
41 def _send(self, data):
39 def _send(self, data):
42 """Send data to statsd."""
40 """Send data to statsd."""
43 try:
41 try:
44 self._sock.sendto(data.encode('ascii'), self._addr)
42 self._sock.sendto(data.encode('ascii'), self._addr)
45 except (socket.error, RuntimeError):
43 except (socket.error, RuntimeError):
46 # No time for love, Dr. Jones!
44 # No time for love, Dr. Jones!
47 pass
45 pass
48
46
49 def close(self):
47 def close(self):
50 if self._sock and hasattr(self._sock, 'close'):
48 if self._sock and hasattr(self._sock, 'close'):
51 self._sock.close()
49 self._sock.close()
52 self._sock = None
50 self._sock = None
53
51
54 def pipeline(self):
52 def pipeline(self):
55 return Pipeline(self)
53 return Pipeline(self)
@@ -1,185 +1,185 b''
1 # Copyright (C) 2010-2023 RhodeCode GmbH
1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import sys
19 import sys
20 import logging
20 import logging
21
21
22
22
23 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = list(range(30, 38))
23 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = list(range(30, 38))
24
24
25 # Sequences
25 # Sequences
26 RESET_SEQ = "\033[0m"
26 RESET_SEQ = "\033[0m"
27 COLOR_SEQ = "\033[0;%dm"
27 COLOR_SEQ = "\033[0;%dm"
28 BOLD_SEQ = "\033[1m"
28 BOLD_SEQ = "\033[1m"
29
29
30 COLORS = {
30 COLORS = {
31 'CRITICAL': MAGENTA,
31 'CRITICAL': MAGENTA,
32 'ERROR': RED,
32 'ERROR': RED,
33 'WARNING': CYAN,
33 'WARNING': CYAN,
34 'INFO': GREEN,
34 'INFO': GREEN,
35 'DEBUG': BLUE,
35 'DEBUG': BLUE,
36 'SQL': YELLOW
36 'SQL': YELLOW
37 }
37 }
38
38
39
39
40 def _inject_req_id(record, with_prefix=True):
40 def _inject_req_id(record, with_prefix=True):
41 from pyramid.threadlocal import get_current_request
41 from pyramid.threadlocal import get_current_request
42 dummy = '00000000-0000-0000-0000-000000000000'
42 dummy = '00000000-0000-0000-0000-000000000000'
43 req_id = None
43 req_id = None
44
44
45 req = get_current_request()
45 req = get_current_request()
46 if req:
46 if req:
47 req_id = getattr(req, 'req_id', None)
47 req_id = getattr(req, 'req_id', None)
48 if with_prefix:
48 if with_prefix:
49 req_id = 'req_id:%-36s' % (req_id or dummy)
49 req_id = 'req_id:%-36s' % (req_id or dummy)
50 else:
50 else:
51 req_id = (req_id or dummy)
51 req_id = (req_id or dummy)
52 record.req_id = req_id
52 record.req_id = req_id
53
53
54
54
55 def _add_log_to_debug_bucket(formatted_record):
55 def _add_log_to_debug_bucket(formatted_record):
56 from pyramid.threadlocal import get_current_request
56 from pyramid.threadlocal import get_current_request
57 req = get_current_request()
57 req = get_current_request()
58 if req:
58 if req:
59 req.req_id_bucket.append(formatted_record)
59 req.req_id_bucket.append(formatted_record)
60
60
61
61
62 def one_space_trim(s):
62 def one_space_trim(s):
63 if s.find(" ") == -1:
63 if s.find(" ") == -1:
64 return s
64 return s
65 else:
65 else:
66 s = s.replace(' ', ' ')
66 s = s.replace(' ', ' ')
67 return one_space_trim(s)
67 return one_space_trim(s)
68
68
69
69
70 def format_sql(sql):
70 def format_sql(sql):
71 sql = sql.replace('\n', '')
71 sql = sql.replace('\n', '')
72 sql = one_space_trim(sql)
72 sql = one_space_trim(sql)
73 sql = sql\
73 sql = sql\
74 .replace(',', ',\n\t')\
74 .replace(',', ',\n\t')\
75 .replace('SELECT', '\n\tSELECT \n\t')\
75 .replace('SELECT', '\n\tSELECT \n\t')\
76 .replace('UPDATE', '\n\tUPDATE \n\t')\
76 .replace('UPDATE', '\n\tUPDATE \n\t')\
77 .replace('DELETE', '\n\tDELETE \n\t')\
77 .replace('DELETE', '\n\tDELETE \n\t')\
78 .replace('FROM', '\n\tFROM')\
78 .replace('FROM', '\n\tFROM')\
79 .replace('ORDER BY', '\n\tORDER BY')\
79 .replace('ORDER BY', '\n\tORDER BY')\
80 .replace('LIMIT', '\n\tLIMIT')\
80 .replace('LIMIT', '\n\tLIMIT')\
81 .replace('WHERE', '\n\tWHERE')\
81 .replace('WHERE', '\n\tWHERE')\
82 .replace('AND', '\n\tAND')\
82 .replace('AND', '\n\tAND')\
83 .replace('LEFT', '\n\tLEFT')\
83 .replace('LEFT', '\n\tLEFT')\
84 .replace('INNER', '\n\tINNER')\
84 .replace('INNER', '\n\tINNER')\
85 .replace('INSERT', '\n\tINSERT')\
85 .replace('INSERT', '\n\tINSERT')\
86 .replace('DELETE', '\n\tDELETE')
86 .replace('DELETE', '\n\tDELETE')
87 return sql
87 return sql
88
88
89
89
90 class ExceptionAwareFormatter(logging.Formatter):
90 class ExceptionAwareFormatter(logging.Formatter):
91 """
91 """
92 Extended logging formatter which prints out remote tracebacks.
92 Extended logging formatter which prints out remote tracebacks.
93 """
93 """
94
94
95 def formatException(self, ei):
95 def formatException(self, ei):
96 ex_type, ex_value, ex_tb = ei
96 ex_type, ex_value, ex_tb = ei
97
97
98 local_tb = logging.Formatter.formatException(self, ei)
98 local_tb = logging.Formatter.formatException(self, ei)
99 if hasattr(ex_value, '_vcs_server_traceback'):
99 if hasattr(ex_value, '_vcs_server_traceback'):
100
100
101 def formatRemoteTraceback(remote_tb_lines):
101 def formatRemoteTraceback(remote_tb_lines):
102 result = ["\n +--- This exception occured remotely on VCSServer - Remote traceback:\n\n"]
102 result = ["\n +--- This exception occured remotely on VCSServer - Remote traceback:\n\n"]
103 result.append(remote_tb_lines)
103 result.append(remote_tb_lines)
104 result.append("\n +--- End of remote traceback\n")
104 result.append("\n +--- End of remote traceback\n")
105 return result
105 return result
106
106
107 try:
107 try:
108 if ex_type is not None and ex_value is None and ex_tb is None:
108 if ex_type is not None and ex_value is None and ex_tb is None:
109 # possible old (3.x) call syntax where caller is only
109 # possible old (3.x) call syntax where caller is only
110 # providing exception object
110 # providing exception object
111 if type(ex_type) is not type:
111 if type(ex_type) is not type:
112 raise TypeError(
112 raise TypeError(
113 "invalid argument: ex_type should be an exception "
113 "invalid argument: ex_type should be an exception "
114 "type, or just supply no arguments at all")
114 "type, or just supply no arguments at all")
115 if ex_type is None and ex_tb is None:
115 if ex_type is None and ex_tb is None:
116 ex_type, ex_value, ex_tb = sys.exc_info()
116 ex_type, ex_value, ex_tb = sys.exc_info()
117
117
118 remote_tb = getattr(ex_value, "_vcs_server_traceback", None)
118 remote_tb = getattr(ex_value, "_vcs_server_traceback", None)
119
119
120 if remote_tb:
120 if remote_tb:
121 remote_tb = formatRemoteTraceback(remote_tb)
121 remote_tb = formatRemoteTraceback(remote_tb)
122 return local_tb + ''.join(remote_tb)
122 return local_tb + ''.join(remote_tb)
123 finally:
123 finally:
124 # clean up cycle to traceback, to allow proper GC
124 # clean up cycle to traceback, to allow proper GC
125 del ex_type, ex_value, ex_tb
125 del ex_type, ex_value, ex_tb
126
126
127 return local_tb
127 return local_tb
128
128
129
129
130 class RequestTrackingFormatter(ExceptionAwareFormatter):
130 class RequestTrackingFormatter(ExceptionAwareFormatter):
131 def format(self, record):
131 def format(self, record):
132 _inject_req_id(record)
132 _inject_req_id(record)
133 def_record = logging.Formatter.format(self, record)
133 def_record = logging.Formatter.format(self, record)
134 _add_log_to_debug_bucket(def_record)
134 _add_log_to_debug_bucket(def_record)
135 return def_record
135 return def_record
136
136
137
137
138 class ColorFormatter(ExceptionAwareFormatter):
138 class ColorFormatter(ExceptionAwareFormatter):
139
139
140 def format(self, record):
140 def format(self, record):
141 """
141 """
142 Changes record's levelname to use with COLORS enum
142 Changes record's levelname to use with COLORS enum
143 """
143 """
144 def_record = super(ColorFormatter, self).format(record)
144 def_record = super().format(record)
145
145
146 levelname = record.levelname
146 levelname = record.levelname
147 start = COLOR_SEQ % (COLORS[levelname])
147 start = COLOR_SEQ % (COLORS[levelname])
148 end = RESET_SEQ
148 end = RESET_SEQ
149
149
150 colored_record = ''.join([start, def_record, end])
150 colored_record = ''.join([start, def_record, end])
151 return colored_record
151 return colored_record
152
152
153
153
154 class ColorRequestTrackingFormatter(RequestTrackingFormatter):
154 class ColorRequestTrackingFormatter(RequestTrackingFormatter):
155
155
156 def format(self, record):
156 def format(self, record):
157 """
157 """
158 Changes record's levelname to use with COLORS enum
158 Changes record's levelname to use with COLORS enum
159 """
159 """
160 def_record = super(ColorRequestTrackingFormatter, self).format(record)
160 def_record = super(ColorRequestTrackingFormatter, self).format(record)
161
161
162 levelname = record.levelname
162 levelname = record.levelname
163 start = COLOR_SEQ % (COLORS[levelname])
163 start = COLOR_SEQ % (COLORS[levelname])
164 end = RESET_SEQ
164 end = RESET_SEQ
165
165
166 colored_record = ''.join([start, def_record, end])
166 colored_record = ''.join([start, def_record, end])
167 return colored_record
167 return colored_record
168
168
169
169
170 class ColorFormatterSql(logging.Formatter):
170 class ColorFormatterSql(logging.Formatter):
171
171
172 def format(self, record):
172 def format(self, record):
173 """
173 """
174 Changes record's levelname to use with COLORS enum
174 Changes record's levelname to use with COLORS enum
175 """
175 """
176
176
177 start = COLOR_SEQ % (COLORS['SQL'])
177 start = COLOR_SEQ % (COLORS['SQL'])
178 def_record = format_sql(logging.Formatter.format(self, record))
178 def_record = format_sql(logging.Formatter.format(self, record))
179 end = RESET_SEQ
179 end = RESET_SEQ
180
180
181 colored_record = ''.join([start, def_record, end])
181 colored_record = ''.join([start, def_record, end])
182 return colored_record
182 return colored_record
183
183
184 # NOTE (marcink): needs to stay with this name for backward .ini compatability
184 # NOTE (marcink): needs to stay with this name for backward .ini compatability
185 Pyro4AwareFormatter = ExceptionAwareFormatter # noqa
185 Pyro4AwareFormatter = ExceptionAwareFormatter # noqa
General Comments 0
You need to be logged in to leave comments. Login now