##// END OF EJS Templates
py3: synced _vendor with vcsserver py3 changes
super-admin -
r4909:b5f74a71 default
parent child Browse files
Show More
@@ -1,236 +1,236 b''
1 '''
1 '''
2 This library is provided to allow standard python logging
2 This library is provided to allow standard python logging
3 to output log data as JSON formatted strings
3 to output log data as JSON formatted strings
4 '''
4 '''
5 import logging
5 import logging
6 import json
6 import json
7 import re
7 import re
8 from datetime import date, datetime, time, tzinfo, timedelta
8 from datetime import date, datetime, time, tzinfo, timedelta
9 import traceback
9 import traceback
10 import importlib
10 import importlib
11
11
12 from inspect import istraceback
12 from inspect import istraceback
13
13
14 from collections import OrderedDict
14 from collections import OrderedDict
15 from rhodecode.lib.logging_formatter import _inject_req_id, ExceptionAwareFormatter
15 from rhodecode.lib.logging_formatter import _inject_req_id, ExceptionAwareFormatter
16
16
17 ZERO = timedelta(0)
17 ZERO = timedelta(0)
18 HOUR = timedelta(hours=1)
18 HOUR = timedelta(hours=1)
19
19
20
20
21 class UTC(tzinfo):
21 class UTC(tzinfo):
22 """UTC"""
22 """UTC"""
23
23
24 def utcoffset(self, dt):
24 def utcoffset(self, dt):
25 return ZERO
25 return ZERO
26
26
27 def tzname(self, dt):
27 def tzname(self, dt):
28 return "UTC"
28 return "UTC"
29
29
30 def dst(self, dt):
30 def dst(self, dt):
31 return ZERO
31 return ZERO
32
32
33 utc = UTC()
33 utc = UTC()
34
34
35
35
36 # skip natural LogRecord attributes
36 # skip natural LogRecord attributes
37 # http://docs.python.org/library/logging.html#logrecord-attributes
37 # http://docs.python.org/library/logging.html#logrecord-attributes
38 RESERVED_ATTRS = (
38 RESERVED_ATTRS = (
39 'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename',
39 'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename',
40 'funcName', 'levelname', 'levelno', 'lineno', 'module',
40 'funcName', 'levelname', 'levelno', 'lineno', 'module',
41 'msecs', 'message', 'msg', 'name', 'pathname', 'process',
41 'msecs', 'message', 'msg', 'name', 'pathname', 'process',
42 'processName', 'relativeCreated', 'stack_info', 'thread', 'threadName')
42 'processName', 'relativeCreated', 'stack_info', 'thread', 'threadName')
43
43
44
44
45 def merge_record_extra(record, target, reserved):
45 def merge_record_extra(record, target, reserved):
46 """
46 """
47 Merges extra attributes from LogRecord object into target dictionary
47 Merges extra attributes from LogRecord object into target dictionary
48
48
49 :param record: logging.LogRecord
49 :param record: logging.LogRecord
50 :param target: dict to update
50 :param target: dict to update
51 :param reserved: dict or list with reserved keys to skip
51 :param reserved: dict or list with reserved keys to skip
52 """
52 """
53 for key, value in record.__dict__.items():
53 for key, value in record.__dict__.items():
54 # this allows to have numeric keys
54 # this allows to have numeric keys
55 if (key not in reserved
55 if (key not in reserved
56 and not (hasattr(key, "startswith")
56 and not (hasattr(key, "startswith")
57 and key.startswith('_'))):
57 and key.startswith('_'))):
58 target[key] = value
58 target[key] = value
59 return target
59 return target
60
60
61
61
62 class JsonEncoder(json.JSONEncoder):
62 class JsonEncoder(json.JSONEncoder):
63 """
63 """
64 A custom encoder extending the default JSONEncoder
64 A custom encoder extending the default JSONEncoder
65 """
65 """
66
66
67 def default(self, obj):
67 def default(self, obj):
68 if isinstance(obj, (date, datetime, time)):
68 if isinstance(obj, (date, datetime, time)):
69 return self.format_datetime_obj(obj)
69 return self.format_datetime_obj(obj)
70
70
71 elif istraceback(obj):
71 elif istraceback(obj):
72 return ''.join(traceback.format_tb(obj)).strip()
72 return ''.join(traceback.format_tb(obj)).strip()
73
73
74 elif type(obj) == Exception \
74 elif type(obj) == Exception \
75 or isinstance(obj, Exception) \
75 or isinstance(obj, Exception) \
76 or type(obj) == type:
76 or type(obj) == type:
77 return str(obj)
77 return str(obj)
78
78
79 try:
79 try:
80 return super(JsonEncoder, self).default(obj)
80 return super(JsonEncoder, self).default(obj)
81
81
82 except TypeError:
82 except TypeError:
83 try:
83 try:
84 return str(obj)
84 return str(obj)
85
85
86 except Exception:
86 except Exception:
87 return None
87 return None
88
88
89 def format_datetime_obj(self, obj):
89 def format_datetime_obj(self, obj):
90 return obj.isoformat()
90 return obj.isoformat()
91
91
92
92
93 class JsonFormatter(ExceptionAwareFormatter):
93 class JsonFormatter(ExceptionAwareFormatter):
94 """
94 """
95 A custom formatter to format logging records as json strings.
95 A custom formatter to format logging records as json strings.
96 Extra values will be formatted as str() if not supported by
96 Extra values will be formatted as str() if not supported by
97 json default encoder
97 json default encoder
98 """
98 """
99
99
100 def __init__(self, *args, **kwargs):
100 def __init__(self, *args, **kwargs):
101 """
101 """
102 :param json_default: a function for encoding non-standard objects
102 :param json_default: a function for encoding non-standard objects
103 as outlined in http://docs.python.org/2/library/json.html
103 as outlined in http://docs.python.org/2/library/json.html
104 :param json_encoder: optional custom encoder
104 :param json_encoder: optional custom encoder
105 :param json_serializer: a :meth:`json.dumps`-compatible callable
105 :param json_serializer: a :meth:`json.dumps`-compatible callable
106 that will be used to serialize the log record.
106 that will be used to serialize the log record.
107 :param json_indent: an optional :meth:`json.dumps`-compatible numeric value
107 :param json_indent: an optional :meth:`json.dumps`-compatible numeric value
108 that will be used to customize the indent of the output json.
108 that will be used to customize the indent of the output json.
109 :param prefix: an optional string prefix added at the beginning of
109 :param prefix: an optional string prefix added at the beginning of
110 the formatted string
110 the formatted string
111 :param json_indent: indent parameter for json.dumps
111 :param json_indent: indent parameter for json.dumps
112 :param json_ensure_ascii: ensure_ascii parameter for json.dumps
112 :param json_ensure_ascii: ensure_ascii parameter for json.dumps
113 :param reserved_attrs: an optional list of fields that will be skipped when
113 :param reserved_attrs: an optional list of fields that will be skipped when
114 outputting json log record. Defaults to all log record attributes:
114 outputting json log record. Defaults to all log record attributes:
115 http://docs.python.org/library/logging.html#logrecord-attributes
115 http://docs.python.org/library/logging.html#logrecord-attributes
116 :param timestamp: an optional string/boolean field to add a timestamp when
116 :param timestamp: an optional string/boolean field to add a timestamp when
117 outputting the json log record. If string is passed, timestamp will be added
117 outputting the json log record. If string is passed, timestamp will be added
118 to log record using string as key. If True boolean is passed, timestamp key
118 to log record using string as key. If True boolean is passed, timestamp key
119 will be "timestamp". Defaults to False/off.
119 will be "timestamp". Defaults to False/off.
120 """
120 """
121 self.json_default = self._str_to_fn(kwargs.pop("json_default", None))
121 self.json_default = self._str_to_fn(kwargs.pop("json_default", None))
122 self.json_encoder = self._str_to_fn(kwargs.pop("json_encoder", None))
122 self.json_encoder = self._str_to_fn(kwargs.pop("json_encoder", None))
123 self.json_serializer = self._str_to_fn(kwargs.pop("json_serializer", json.dumps))
123 self.json_serializer = self._str_to_fn(kwargs.pop("json_serializer", json.dumps))
124 self.json_indent = kwargs.pop("json_indent", None)
124 self.json_indent = kwargs.pop("json_indent", None)
125 self.json_ensure_ascii = kwargs.pop("json_ensure_ascii", True)
125 self.json_ensure_ascii = kwargs.pop("json_ensure_ascii", True)
126 self.prefix = kwargs.pop("prefix", "")
126 self.prefix = kwargs.pop("prefix", "")
127 reserved_attrs = kwargs.pop("reserved_attrs", RESERVED_ATTRS)
127 reserved_attrs = kwargs.pop("reserved_attrs", RESERVED_ATTRS)
128 self.reserved_attrs = dict(zip(reserved_attrs, reserved_attrs))
128 self.reserved_attrs = dict(list(zip(reserved_attrs, reserved_attrs)))
129 self.timestamp = kwargs.pop("timestamp", True)
129 self.timestamp = kwargs.pop("timestamp", True)
130
130
131 # super(JsonFormatter, self).__init__(*args, **kwargs)
131 # super(JsonFormatter, self).__init__(*args, **kwargs)
132 logging.Formatter.__init__(self, *args, **kwargs)
132 logging.Formatter.__init__(self, *args, **kwargs)
133 if not self.json_encoder and not self.json_default:
133 if not self.json_encoder and not self.json_default:
134 self.json_encoder = JsonEncoder
134 self.json_encoder = JsonEncoder
135
135
136 self._required_fields = self.parse()
136 self._required_fields = self.parse()
137 self._skip_fields = dict(zip(self._required_fields,
137 self._skip_fields = dict(list(zip(self._required_fields,
138 self._required_fields))
138 self._required_fields)))
139 self._skip_fields.update(self.reserved_attrs)
139 self._skip_fields.update(self.reserved_attrs)
140
140
141 def _str_to_fn(self, fn_as_str):
141 def _str_to_fn(self, fn_as_str):
142 """
142 """
143 If the argument is not a string, return whatever was passed in.
143 If the argument is not a string, return whatever was passed in.
144 Parses a string such as package.module.function, imports the module
144 Parses a string such as package.module.function, imports the module
145 and returns the function.
145 and returns the function.
146
146
147 :param fn_as_str: The string to parse. If not a string, return it.
147 :param fn_as_str: The string to parse. If not a string, return it.
148 """
148 """
149 if not isinstance(fn_as_str, str):
149 if not isinstance(fn_as_str, str):
150 return fn_as_str
150 return fn_as_str
151
151
152 path, _, function = fn_as_str.rpartition('.')
152 path, _, function = fn_as_str.rpartition('.')
153 module = importlib.import_module(path)
153 module = importlib.import_module(path)
154 return getattr(module, function)
154 return getattr(module, function)
155
155
156 def parse(self):
156 def parse(self):
157 """
157 """
158 Parses format string looking for substitutions
158 Parses format string looking for substitutions
159
159
160 This method is responsible for returning a list of fields (as strings)
160 This method is responsible for returning a list of fields (as strings)
161 to include in all log messages.
161 to include in all log messages.
162 """
162 """
163 standard_formatters = re.compile(r'\((.+?)\)', re.IGNORECASE)
163 standard_formatters = re.compile(r'\((.+?)\)', re.IGNORECASE)
164 return standard_formatters.findall(self._fmt)
164 return standard_formatters.findall(self._fmt)
165
165
166 def add_fields(self, log_record, record, message_dict):
166 def add_fields(self, log_record, record, message_dict):
167 """
167 """
168 Override this method to implement custom logic for adding fields.
168 Override this method to implement custom logic for adding fields.
169 """
169 """
170 for field in self._required_fields:
170 for field in self._required_fields:
171 log_record[field] = record.__dict__.get(field)
171 log_record[field] = record.__dict__.get(field)
172 log_record.update(message_dict)
172 log_record.update(message_dict)
173 merge_record_extra(record, log_record, reserved=self._skip_fields)
173 merge_record_extra(record, log_record, reserved=self._skip_fields)
174
174
175 if self.timestamp:
175 if self.timestamp:
176 key = self.timestamp if type(self.timestamp) == str else 'timestamp'
176 key = self.timestamp if type(self.timestamp) == str else 'timestamp'
177 log_record[key] = datetime.fromtimestamp(record.created, tz=utc)
177 log_record[key] = datetime.fromtimestamp(record.created, tz=utc)
178
178
179 def process_log_record(self, log_record):
179 def process_log_record(self, log_record):
180 """
180 """
181 Override this method to implement custom logic
181 Override this method to implement custom logic
182 on the possibly ordered dictionary.
182 on the possibly ordered dictionary.
183 """
183 """
184 return log_record
184 return log_record
185
185
186 def jsonify_log_record(self, log_record):
186 def jsonify_log_record(self, log_record):
187 """Returns a json string of the log record."""
187 """Returns a json string of the log record."""
188 return self.json_serializer(log_record,
188 return self.json_serializer(log_record,
189 default=self.json_default,
189 default=self.json_default,
190 cls=self.json_encoder,
190 cls=self.json_encoder,
191 indent=self.json_indent,
191 indent=self.json_indent,
192 ensure_ascii=self.json_ensure_ascii)
192 ensure_ascii=self.json_ensure_ascii)
193
193
194 def serialize_log_record(self, log_record):
194 def serialize_log_record(self, log_record):
195 """Returns the final representation of the log record."""
195 """Returns the final representation of the log record."""
196 return "%s%s" % (self.prefix, self.jsonify_log_record(log_record))
196 return "%s%s" % (self.prefix, self.jsonify_log_record(log_record))
197
197
198 def format(self, record):
198 def format(self, record):
199 """Formats a log record and serializes to json"""
199 """Formats a log record and serializes to json"""
200 message_dict = {}
200 message_dict = {}
201 # FIXME: logging.LogRecord.msg and logging.LogRecord.message in typeshed
201 # FIXME: logging.LogRecord.msg and logging.LogRecord.message in typeshed
202 # are always type of str. We shouldn't need to override that.
202 # are always type of str. We shouldn't need to override that.
203 if isinstance(record.msg, dict):
203 if isinstance(record.msg, dict):
204 message_dict = record.msg
204 message_dict = record.msg
205 record.message = None
205 record.message = None
206 else:
206 else:
207 record.message = record.getMessage()
207 record.message = record.getMessage()
208 # only format time if needed
208 # only format time if needed
209 if "asctime" in self._required_fields:
209 if "asctime" in self._required_fields:
210 record.asctime = self.formatTime(record, self.datefmt)
210 record.asctime = self.formatTime(record, self.datefmt)
211
211
212 # Display formatted exception, but allow overriding it in the
212 # Display formatted exception, but allow overriding it in the
213 # user-supplied dict.
213 # user-supplied dict.
214 if record.exc_info and not message_dict.get('exc_info'):
214 if record.exc_info and not message_dict.get('exc_info'):
215 message_dict['exc_info'] = self.formatException(record.exc_info)
215 message_dict['exc_info'] = self.formatException(record.exc_info)
216 if not message_dict.get('exc_info') and record.exc_text:
216 if not message_dict.get('exc_info') and record.exc_text:
217 message_dict['exc_info'] = record.exc_text
217 message_dict['exc_info'] = record.exc_text
218 # Display formatted record of stack frames
218 # Display formatted record of stack frames
219 # default format is a string returned from :func:`traceback.print_stack`
219 # default format is a string returned from :func:`traceback.print_stack`
220 try:
220 try:
221 if record.stack_info and not message_dict.get('stack_info'):
221 if record.stack_info and not message_dict.get('stack_info'):
222 message_dict['stack_info'] = self.formatStack(record.stack_info)
222 message_dict['stack_info'] = self.formatStack(record.stack_info)
223 except AttributeError:
223 except AttributeError:
224 # Python2.7 doesn't have stack_info.
224 # Python2.7 doesn't have stack_info.
225 pass
225 pass
226
226
227 try:
227 try:
228 log_record = OrderedDict()
228 log_record = OrderedDict()
229 except NameError:
229 except NameError:
230 log_record = {}
230 log_record = {}
231
231
232 _inject_req_id(record, with_prefix=False)
232 _inject_req_id(record, with_prefix=False)
233 self.add_fields(log_record, record, message_dict)
233 self.add_fields(log_record, record, message_dict)
234 log_record = self.process_log_record(log_record)
234 log_record = self.process_log_record(log_record)
235
235
236 return self.serialize_log_record(log_record)
236 return self.serialize_log_record(log_record)
@@ -1,390 +1,384 b''
1 import sys
1 import sys
2 import threading
2 import threading
3 import weakref
3 import weakref
4 from base64 import b64encode
4 from base64 import b64encode
5 from logging import getLogger
5 from logging import getLogger
6 from os import urandom
6 from os import urandom
7
7
8 from redis import StrictRedis
8 from redis import StrictRedis
9
9
10 __version__ = '3.7.0'
10 __version__ = '3.7.0'
11
11
12 loggers = {
12 loggers = {
13 k: getLogger("rhodecode." + ".".join((__name__, k)))
13 k: getLogger("rhodecode." + ".".join((__name__, k)))
14 for k in [
14 for k in [
15 "acquire",
15 "acquire",
16 "refresh.thread.start",
16 "refresh.thread.start",
17 "refresh.thread.stop",
17 "refresh.thread.stop",
18 "refresh.thread.exit",
18 "refresh.thread.exit",
19 "refresh.start",
19 "refresh.start",
20 "refresh.shutdown",
20 "refresh.shutdown",
21 "refresh.exit",
21 "refresh.exit",
22 "release",
22 "release",
23 ]
23 ]
24 }
24 }
25
25
26 PY3 = sys.version_info[0] == 3
27
28 if PY3:
29 text_type = str
26 text_type = str
30 binary_type = bytes
27 binary_type = bytes
31 else:
32 text_type = unicode # noqa
33 binary_type = str
34
28
35
29
36 # Check if the id match. If not, return an error code.
30 # Check if the id match. If not, return an error code.
37 UNLOCK_SCRIPT = b"""
31 UNLOCK_SCRIPT = b"""
38 if redis.call("get", KEYS[1]) ~= ARGV[1] then
32 if redis.call("get", KEYS[1]) ~= ARGV[1] then
39 return 1
33 return 1
40 else
34 else
41 redis.call("del", KEYS[2])
35 redis.call("del", KEYS[2])
42 redis.call("lpush", KEYS[2], 1)
36 redis.call("lpush", KEYS[2], 1)
43 redis.call("pexpire", KEYS[2], ARGV[2])
37 redis.call("pexpire", KEYS[2], ARGV[2])
44 redis.call("del", KEYS[1])
38 redis.call("del", KEYS[1])
45 return 0
39 return 0
46 end
40 end
47 """
41 """
48
42
49 # Covers both cases when key doesn't exist and doesn't equal to lock's id
43 # Covers both cases when key doesn't exist and doesn't equal to lock's id
50 EXTEND_SCRIPT = b"""
44 EXTEND_SCRIPT = b"""
51 if redis.call("get", KEYS[1]) ~= ARGV[1] then
45 if redis.call("get", KEYS[1]) ~= ARGV[1] then
52 return 1
46 return 1
53 elseif redis.call("ttl", KEYS[1]) < 0 then
47 elseif redis.call("ttl", KEYS[1]) < 0 then
54 return 2
48 return 2
55 else
49 else
56 redis.call("expire", KEYS[1], ARGV[2])
50 redis.call("expire", KEYS[1], ARGV[2])
57 return 0
51 return 0
58 end
52 end
59 """
53 """
60
54
61 RESET_SCRIPT = b"""
55 RESET_SCRIPT = b"""
62 redis.call('del', KEYS[2])
56 redis.call('del', KEYS[2])
63 redis.call('lpush', KEYS[2], 1)
57 redis.call('lpush', KEYS[2], 1)
64 redis.call('pexpire', KEYS[2], ARGV[2])
58 redis.call('pexpire', KEYS[2], ARGV[2])
65 return redis.call('del', KEYS[1])
59 return redis.call('del', KEYS[1])
66 """
60 """
67
61
68 RESET_ALL_SCRIPT = b"""
62 RESET_ALL_SCRIPT = b"""
69 local locks = redis.call('keys', 'lock:*')
63 local locks = redis.call('keys', 'lock:*')
70 local signal
64 local signal
71 for _, lock in pairs(locks) do
65 for _, lock in pairs(locks) do
72 signal = 'lock-signal:' .. string.sub(lock, 6)
66 signal = 'lock-signal:' .. string.sub(lock, 6)
73 redis.call('del', signal)
67 redis.call('del', signal)
74 redis.call('lpush', signal, 1)
68 redis.call('lpush', signal, 1)
75 redis.call('expire', signal, 1)
69 redis.call('expire', signal, 1)
76 redis.call('del', lock)
70 redis.call('del', lock)
77 end
71 end
78 return #locks
72 return #locks
79 """
73 """
80
74
81
75
82 class AlreadyAcquired(RuntimeError):
76 class AlreadyAcquired(RuntimeError):
83 pass
77 pass
84
78
85
79
86 class NotAcquired(RuntimeError):
80 class NotAcquired(RuntimeError):
87 pass
81 pass
88
82
89
83
90 class AlreadyStarted(RuntimeError):
84 class AlreadyStarted(RuntimeError):
91 pass
85 pass
92
86
93
87
94 class TimeoutNotUsable(RuntimeError):
88 class TimeoutNotUsable(RuntimeError):
95 pass
89 pass
96
90
97
91
98 class InvalidTimeout(RuntimeError):
92 class InvalidTimeout(RuntimeError):
99 pass
93 pass
100
94
101
95
102 class TimeoutTooLarge(RuntimeError):
96 class TimeoutTooLarge(RuntimeError):
103 pass
97 pass
104
98
105
99
106 class NotExpirable(RuntimeError):
100 class NotExpirable(RuntimeError):
107 pass
101 pass
108
102
109
103
110 class Lock(object):
104 class Lock(object):
111 """
105 """
112 A Lock context manager implemented via redis SETNX/BLPOP.
106 A Lock context manager implemented via redis SETNX/BLPOP.
113 """
107 """
114 unlock_script = None
108 unlock_script = None
115 extend_script = None
109 extend_script = None
116 reset_script = None
110 reset_script = None
117 reset_all_script = None
111 reset_all_script = None
118
112
119 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
113 def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
120 """
114 """
121 :param redis_client:
115 :param redis_client:
122 An instance of :class:`~StrictRedis`.
116 An instance of :class:`~StrictRedis`.
123 :param name:
117 :param name:
124 The name (redis key) the lock should have.
118 The name (redis key) the lock should have.
125 :param expire:
119 :param expire:
126 The lock expiry time in seconds. If left at the default (None)
120 The lock expiry time in seconds. If left at the default (None)
127 the lock will not expire.
121 the lock will not expire.
128 :param id:
122 :param id:
129 The ID (redis value) the lock should have. A random value is
123 The ID (redis value) the lock should have. A random value is
130 generated when left at the default.
124 generated when left at the default.
131
125
132 Note that if you specify this then the lock is marked as "held". Acquires
126 Note that if you specify this then the lock is marked as "held". Acquires
133 won't be possible.
127 won't be possible.
134 :param auto_renewal:
128 :param auto_renewal:
135 If set to ``True``, Lock will automatically renew the lock so that it
129 If set to ``True``, Lock will automatically renew the lock so that it
136 doesn't expire for as long as the lock is held (acquire() called
130 doesn't expire for as long as the lock is held (acquire() called
137 or running in a context manager).
131 or running in a context manager).
138
132
139 Implementation note: Renewal will happen using a daemon thread with
133 Implementation note: Renewal will happen using a daemon thread with
140 an interval of ``expire*2/3``. If wishing to use a different renewal
134 an interval of ``expire*2/3``. If wishing to use a different renewal
141 time, subclass Lock, call ``super().__init__()`` then set
135 time, subclass Lock, call ``super().__init__()`` then set
142 ``self._lock_renewal_interval`` to your desired interval.
136 ``self._lock_renewal_interval`` to your desired interval.
143 :param strict:
137 :param strict:
144 If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
138 If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
145 :param signal_expire:
139 :param signal_expire:
146 Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
140 Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
147 """
141 """
148 if strict and not isinstance(redis_client, StrictRedis):
142 if strict and not isinstance(redis_client, StrictRedis):
149 raise ValueError("redis_client must be instance of StrictRedis. "
143 raise ValueError("redis_client must be instance of StrictRedis. "
150 "Use strict=False if you know what you're doing.")
144 "Use strict=False if you know what you're doing.")
151 if auto_renewal and expire is None:
145 if auto_renewal and expire is None:
152 raise ValueError("Expire may not be None when auto_renewal is set")
146 raise ValueError("Expire may not be None when auto_renewal is set")
153
147
154 self._client = redis_client
148 self._client = redis_client
155
149
156 if expire:
150 if expire:
157 expire = int(expire)
151 expire = int(expire)
158 if expire < 0:
152 if expire < 0:
159 raise ValueError("A negative expire is not acceptable.")
153 raise ValueError("A negative expire is not acceptable.")
160 else:
154 else:
161 expire = None
155 expire = None
162 self._expire = expire
156 self._expire = expire
163
157
164 self._signal_expire = signal_expire
158 self._signal_expire = signal_expire
165 if id is None:
159 if id is None:
166 self._id = b64encode(urandom(18)).decode('ascii')
160 self._id = b64encode(urandom(18)).decode('ascii')
167 elif isinstance(id, binary_type):
161 elif isinstance(id, binary_type):
168 try:
162 try:
169 self._id = id.decode('ascii')
163 self._id = id.decode('ascii')
170 except UnicodeDecodeError:
164 except UnicodeDecodeError:
171 self._id = b64encode(id).decode('ascii')
165 self._id = b64encode(id).decode('ascii')
172 elif isinstance(id, text_type):
166 elif isinstance(id, text_type):
173 self._id = id
167 self._id = id
174 else:
168 else:
175 raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id))
169 raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id))
176 self._name = 'lock:' + name
170 self._name = 'lock:' + name
177 self._signal = 'lock-signal:' + name
171 self._signal = 'lock-signal:' + name
178 self._lock_renewal_interval = (float(expire) * 2 / 3
172 self._lock_renewal_interval = (float(expire) * 2 / 3
179 if auto_renewal
173 if auto_renewal
180 else None)
174 else None)
181 self._lock_renewal_thread = None
175 self._lock_renewal_thread = None
182
176
183 self.register_scripts(redis_client)
177 self.register_scripts(redis_client)
184
178
185 @classmethod
179 @classmethod
186 def register_scripts(cls, redis_client):
180 def register_scripts(cls, redis_client):
187 global reset_all_script
181 global reset_all_script
188 if reset_all_script is None:
182 if reset_all_script is None:
189 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
183 reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
190 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
184 cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
191 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
185 cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
192 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
186 cls.reset_script = redis_client.register_script(RESET_SCRIPT)
193 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
187 cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
194
188
195 @property
189 @property
196 def _held(self):
190 def _held(self):
197 return self.id == self.get_owner_id()
191 return self.id == self.get_owner_id()
198
192
199 def reset(self):
193 def reset(self):
200 """
194 """
201 Forcibly deletes the lock. Use this with care.
195 Forcibly deletes the lock. Use this with care.
202 """
196 """
203 self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
197 self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
204
198
205 @property
199 @property
206 def id(self):
200 def id(self):
207 return self._id
201 return self._id
208
202
209 def get_owner_id(self):
203 def get_owner_id(self):
210 owner_id = self._client.get(self._name)
204 owner_id = self._client.get(self._name)
211 if isinstance(owner_id, binary_type):
205 if isinstance(owner_id, binary_type):
212 owner_id = owner_id.decode('ascii', 'replace')
206 owner_id = owner_id.decode('ascii', 'replace')
213 return owner_id
207 return owner_id
214
208
215 def acquire(self, blocking=True, timeout=None):
209 def acquire(self, blocking=True, timeout=None):
216 """
210 """
217 :param blocking:
211 :param blocking:
218 Boolean value specifying whether lock should be blocking or not.
212 Boolean value specifying whether lock should be blocking or not.
219 :param timeout:
213 :param timeout:
220 An integer value specifying the maximum number of seconds to block.
214 An integer value specifying the maximum number of seconds to block.
221 """
215 """
222 logger = loggers["acquire"]
216 logger = loggers["acquire"]
223
217
224 logger.debug("Getting blocking: %s acquire on %r ...", blocking, self._name)
218 logger.debug("Getting blocking: %s acquire on %r ...", blocking, self._name)
225
219
226 if self._held:
220 if self._held:
227 owner_id = self.get_owner_id()
221 owner_id = self.get_owner_id()
228 raise AlreadyAcquired("Already acquired from this Lock instance. Lock id: {}".format(owner_id))
222 raise AlreadyAcquired("Already acquired from this Lock instance. Lock id: {}".format(owner_id))
229
223
230 if not blocking and timeout is not None:
224 if not blocking and timeout is not None:
231 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
225 raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
232
226
233 if timeout:
227 if timeout:
234 timeout = int(timeout)
228 timeout = int(timeout)
235 if timeout < 0:
229 if timeout < 0:
236 raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout)
230 raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout)
237
231
238 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
232 if self._expire and not self._lock_renewal_interval and timeout > self._expire:
239 raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire))
233 raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire))
240
234
241 busy = True
235 busy = True
242 blpop_timeout = timeout or self._expire or 0
236 blpop_timeout = timeout or self._expire or 0
243 timed_out = False
237 timed_out = False
244 while busy:
238 while busy:
245 busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
239 busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
246 if busy:
240 if busy:
247 if timed_out:
241 if timed_out:
248 return False
242 return False
249 elif blocking:
243 elif blocking:
250 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
244 timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
251 else:
245 else:
252 logger.warning("Failed to get %r.", self._name)
246 logger.warning("Failed to get %r.", self._name)
253 return False
247 return False
254
248
255 logger.debug("Got lock for %r.", self._name)
249 logger.debug("Got lock for %r.", self._name)
256 if self._lock_renewal_interval is not None:
250 if self._lock_renewal_interval is not None:
257 self._start_lock_renewer()
251 self._start_lock_renewer()
258 return True
252 return True
259
253
260 def extend(self, expire=None):
254 def extend(self, expire=None):
261 """Extends expiration time of the lock.
255 """Extends expiration time of the lock.
262
256
263 :param expire:
257 :param expire:
264 New expiration time. If ``None`` - `expire` provided during
258 New expiration time. If ``None`` - `expire` provided during
265 lock initialization will be taken.
259 lock initialization will be taken.
266 """
260 """
267 if expire:
261 if expire:
268 expire = int(expire)
262 expire = int(expire)
269 if expire < 0:
263 if expire < 0:
270 raise ValueError("A negative expire is not acceptable.")
264 raise ValueError("A negative expire is not acceptable.")
271 elif self._expire is not None:
265 elif self._expire is not None:
272 expire = self._expire
266 expire = self._expire
273 else:
267 else:
274 raise TypeError(
268 raise TypeError(
275 "To extend a lock 'expire' must be provided as an "
269 "To extend a lock 'expire' must be provided as an "
276 "argument to extend() method or at initialization time."
270 "argument to extend() method or at initialization time."
277 )
271 )
278
272
279 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
273 error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
280 if error == 1:
274 if error == 1:
281 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
275 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
282 elif error == 2:
276 elif error == 2:
283 raise NotExpirable("Lock %s has no assigned expiration time" % self._name)
277 raise NotExpirable("Lock %s has no assigned expiration time" % self._name)
284 elif error:
278 elif error:
285 raise RuntimeError("Unsupported error code %s from EXTEND script" % error)
279 raise RuntimeError("Unsupported error code %s from EXTEND script" % error)
286
280
287 @staticmethod
281 @staticmethod
288 def _lock_renewer(lockref, interval, stop):
282 def _lock_renewer(lockref, interval, stop):
289 """
283 """
290 Renew the lock key in redis every `interval` seconds for as long
284 Renew the lock key in redis every `interval` seconds for as long
291 as `self._lock_renewal_thread.should_exit` is False.
285 as `self._lock_renewal_thread.should_exit` is False.
292 """
286 """
293 while not stop.wait(timeout=interval):
287 while not stop.wait(timeout=interval):
294 loggers["refresh.thread.start"].debug("Refreshing lock")
288 loggers["refresh.thread.start"].debug("Refreshing lock")
295 lock = lockref()
289 lock = lockref()
296 if lock is None:
290 if lock is None:
297 loggers["refresh.thread.stop"].debug(
291 loggers["refresh.thread.stop"].debug(
298 "The lock no longer exists, stopping lock refreshing"
292 "The lock no longer exists, stopping lock refreshing"
299 )
293 )
300 break
294 break
301 lock.extend(expire=lock._expire)
295 lock.extend(expire=lock._expire)
302 del lock
296 del lock
303 loggers["refresh.thread.exit"].debug("Exit requested, stopping lock refreshing")
297 loggers["refresh.thread.exit"].debug("Exit requested, stopping lock refreshing")
304
298
305 def _start_lock_renewer(self):
299 def _start_lock_renewer(self):
306 """
300 """
307 Starts the lock refresher thread.
301 Starts the lock refresher thread.
308 """
302 """
309 if self._lock_renewal_thread is not None:
303 if self._lock_renewal_thread is not None:
310 raise AlreadyStarted("Lock refresh thread already started")
304 raise AlreadyStarted("Lock refresh thread already started")
311
305
312 loggers["refresh.start"].debug(
306 loggers["refresh.start"].debug(
313 "Starting thread to refresh lock every %s seconds",
307 "Starting thread to refresh lock every %s seconds",
314 self._lock_renewal_interval
308 self._lock_renewal_interval
315 )
309 )
316 self._lock_renewal_stop = threading.Event()
310 self._lock_renewal_stop = threading.Event()
317 self._lock_renewal_thread = threading.Thread(
311 self._lock_renewal_thread = threading.Thread(
318 group=None,
312 group=None,
319 target=self._lock_renewer,
313 target=self._lock_renewer,
320 kwargs={'lockref': weakref.ref(self),
314 kwargs={'lockref': weakref.ref(self),
321 'interval': self._lock_renewal_interval,
315 'interval': self._lock_renewal_interval,
322 'stop': self._lock_renewal_stop}
316 'stop': self._lock_renewal_stop}
323 )
317 )
324 self._lock_renewal_thread.setDaemon(True)
318 self._lock_renewal_thread.setDaemon(True)
325 self._lock_renewal_thread.start()
319 self._lock_renewal_thread.start()
326
320
327 def _stop_lock_renewer(self):
321 def _stop_lock_renewer(self):
328 """
322 """
329 Stop the lock renewer.
323 Stop the lock renewer.
330
324
331 This signals the renewal thread and waits for its exit.
325 This signals the renewal thread and waits for its exit.
332 """
326 """
333 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
327 if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
334 return
328 return
335 loggers["refresh.shutdown"].debug("Signalling the lock refresher to stop")
329 loggers["refresh.shutdown"].debug("Signalling the lock refresher to stop")
336 self._lock_renewal_stop.set()
330 self._lock_renewal_stop.set()
337 self._lock_renewal_thread.join()
331 self._lock_renewal_thread.join()
338 self._lock_renewal_thread = None
332 self._lock_renewal_thread = None
339 loggers["refresh.exit"].debug("Lock refresher has stopped")
333 loggers["refresh.exit"].debug("Lock refresher has stopped")
340
334
341 def __enter__(self):
335 def __enter__(self):
342 acquired = self.acquire(blocking=True)
336 acquired = self.acquire(blocking=True)
343 assert acquired, "Lock wasn't acquired, but blocking=True"
337 assert acquired, "Lock wasn't acquired, but blocking=True"
344 return self
338 return self
345
339
346 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
340 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
347 self.release()
341 self.release()
348
342
349 def release(self):
343 def release(self):
350 """Releases the lock, that was acquired with the same object.
344 """Releases the lock, that was acquired with the same object.
351
345
352 .. note::
346 .. note::
353
347
354 If you want to release a lock that you acquired in a different place you have two choices:
348 If you want to release a lock that you acquired in a different place you have two choices:
355
349
356 * Use ``Lock("name", id=id_from_other_place).release()``
350 * Use ``Lock("name", id=id_from_other_place).release()``
357 * Use ``Lock("name").reset()``
351 * Use ``Lock("name").reset()``
358 """
352 """
359 if self._lock_renewal_thread is not None:
353 if self._lock_renewal_thread is not None:
360 self._stop_lock_renewer()
354 self._stop_lock_renewer()
361 loggers["release"].debug("Releasing %r.", self._name)
355 loggers["release"].debug("Releasing %r.", self._name)
362 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
356 error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire))
363 if error == 1:
357 if error == 1:
364 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
358 raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
365 elif error:
359 elif error:
366 raise RuntimeError("Unsupported error code %s from EXTEND script." % error)
360 raise RuntimeError("Unsupported error code %s from EXTEND script." % error)
367
361
368 def locked(self):
362 def locked(self):
369 """
363 """
370 Return true if the lock is acquired.
364 Return true if the lock is acquired.
371
365
372 Checks that lock with same name already exists. This method returns true, even if
366 Checks that lock with same name already exists. This method returns true, even if
373 lock have another id.
367 lock have another id.
374 """
368 """
375 return self._client.exists(self._name) == 1
369 return self._client.exists(self._name) == 1
376
370
377
371
378 reset_all_script = None
372 reset_all_script = None
379
373
380
374
381 def reset_all(redis_client):
375 def reset_all(redis_client):
382 """
376 """
383 Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
377 Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
384
378
385 :param redis_client:
379 :param redis_client:
386 An instance of :class:`~StrictRedis`.
380 An instance of :class:`~StrictRedis`.
387 """
381 """
388 Lock.register_scripts(redis_client)
382 Lock.register_scripts(redis_client)
389
383
390 reset_all_script(client=redis_client) # noqa
384 reset_all_script(client=redis_client) # noqa
@@ -1,52 +1,52 b''
1 from __future__ import absolute_import, division, unicode_literals
1
2
2
3 import logging
3 import logging
4
4
5 from .stream import TCPStatsClient, UnixSocketStatsClient # noqa
5 from .stream import TCPStatsClient, UnixSocketStatsClient # noqa
6 from .udp import StatsClient # noqa
6 from .udp import StatsClient # noqa
7
7
8 HOST = 'localhost'
8 HOST = 'localhost'
9 PORT = 8125
9 PORT = 8125
10 IPV6 = False
10 IPV6 = False
11 PREFIX = None
11 PREFIX = None
12 MAXUDPSIZE = 512
12 MAXUDPSIZE = 512
13
13
14 log = logging.getLogger('rhodecode.statsd')
14 log = logging.getLogger('rhodecode.statsd')
15
15
16
16
17 def statsd_config(config, prefix='statsd.'):
17 def statsd_config(config, prefix='statsd.'):
18 _config = {}
18 _config = {}
19 for key in config.keys():
19 for key in config.keys():
20 if key.startswith(prefix):
20 if key.startswith(prefix):
21 _config[key[len(prefix):]] = config[key]
21 _config[key[len(prefix):]] = config[key]
22 return _config
22 return _config
23
23
24
24
25 def client_from_config(configuration, prefix='statsd.', **kwargs):
25 def client_from_config(configuration, prefix='statsd.', **kwargs):
26 from pyramid.settings import asbool
26 from pyramid.settings import asbool
27
27
28 _config = statsd_config(configuration, prefix)
28 _config = statsd_config(configuration, prefix)
29 statsd_enabled = asbool(_config.pop('enabled', False))
29 statsd_enabled = asbool(_config.pop('enabled', False))
30 if not statsd_enabled:
30 if not statsd_enabled:
31 log.debug('statsd client not enabled by statsd.enabled = flag, skipping...')
31 log.debug('statsd client not enabled by statsd.enabled = flag, skipping...')
32 return
32 return
33
33
34 host = _config.pop('statsd_host', HOST)
34 host = _config.pop('statsd_host', HOST)
35 port = _config.pop('statsd_port', PORT)
35 port = _config.pop('statsd_port', PORT)
36 prefix = _config.pop('statsd_prefix', PREFIX)
36 prefix = _config.pop('statsd_prefix', PREFIX)
37 maxudpsize = _config.pop('statsd_maxudpsize', MAXUDPSIZE)
37 maxudpsize = _config.pop('statsd_maxudpsize', MAXUDPSIZE)
38 ipv6 = asbool(_config.pop('statsd_ipv6', IPV6))
38 ipv6 = asbool(_config.pop('statsd_ipv6', IPV6))
39 log.debug('configured statsd client %s:%s', host, port)
39 log.debug('configured statsd client %s:%s', host, port)
40
40
41 try:
41 try:
42 client = StatsClient(
42 client = StatsClient(
43 host=host, port=port, prefix=prefix, maxudpsize=maxudpsize, ipv6=ipv6)
43 host=host, port=port, prefix=prefix, maxudpsize=maxudpsize, ipv6=ipv6)
44 except Exception:
44 except Exception:
45 log.exception('StatsD is enabled, but failed to connect to statsd server, fallback: disable statsd')
45 log.exception('StatsD is enabled, but failed to connect to statsd server, fallback: disable statsd')
46 client = None
46 client = None
47
47
48 return client
48 return client
49
49
50
50
51 def get_statsd_client(request):
51 def get_statsd_client(request):
52 return client_from_config(request.registry.settings)
52 return client_from_config(request.registry.settings)
@@ -1,156 +1,156 b''
1 from __future__ import absolute_import, division, unicode_literals
1
2
2
3 import re
3 import re
4 import random
4 import random
5 from collections import deque
5 from collections import deque
6 from datetime import timedelta
6 from datetime import timedelta
7 from repoze.lru import lru_cache
7 from repoze.lru import lru_cache
8
8
9 from .timer import Timer
9 from .timer import Timer
10
10
11 TAG_INVALID_CHARS_RE = re.compile(
11 TAG_INVALID_CHARS_RE = re.compile(
12 r"[^\w\d_\-:/\.]",
12 r"[^\w\d_\-:/\.]",
13 #re.UNICODE
13 #re.UNICODE
14 )
14 )
15 TAG_INVALID_CHARS_SUBS = "_"
15 TAG_INVALID_CHARS_SUBS = "_"
16
16
17 # we save and expose methods called by statsd for discovery
17 # we save and expose methods called by statsd for discovery
18 buckets_dict = {
18 buckets_dict = {
19
19
20 }
20 }
21
21
22
22
23 @lru_cache(maxsize=500)
23 @lru_cache(maxsize=500)
24 def _normalize_tags_with_cache(tag_list):
24 def _normalize_tags_with_cache(tag_list):
25 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
25 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
26
26
27
27
28 def normalize_tags(tag_list):
28 def normalize_tags(tag_list):
29 # We have to turn our input tag list into a non-mutable tuple for it to
29 # We have to turn our input tag list into a non-mutable tuple for it to
30 # be hashable (and thus usable) by the @lru_cache decorator.
30 # be hashable (and thus usable) by the @lru_cache decorator.
31 return _normalize_tags_with_cache(tuple(tag_list))
31 return _normalize_tags_with_cache(tuple(tag_list))
32
32
33
33
34 class StatsClientBase(object):
34 class StatsClientBase(object):
35 """A Base class for various statsd clients."""
35 """A Base class for various statsd clients."""
36
36
37 def close(self):
37 def close(self):
38 """Used to close and clean up any underlying resources."""
38 """Used to close and clean up any underlying resources."""
39 raise NotImplementedError()
39 raise NotImplementedError()
40
40
41 def _send(self):
41 def _send(self):
42 raise NotImplementedError()
42 raise NotImplementedError()
43
43
44 def pipeline(self):
44 def pipeline(self):
45 raise NotImplementedError()
45 raise NotImplementedError()
46
46
47 def timer(self, stat, rate=1, tags=None, auto_send=True):
47 def timer(self, stat, rate=1, tags=None, auto_send=True):
48 """
48 """
49 statsd = StatsdClient.statsd
49 statsd = StatsdClient.statsd
50 with statsd.timer('bucket_name', auto_send=True) as tmr:
50 with statsd.timer('bucket_name', auto_send=True) as tmr:
51 # This block will be timed.
51 # This block will be timed.
52 for i in range(0, 100000):
52 for i in range(0, 100000):
53 i ** 2
53 i ** 2
54 # you can access time here...
54 # you can access time here...
55 elapsed_ms = tmr.ms
55 elapsed_ms = tmr.ms
56 """
56 """
57 return Timer(self, stat, rate, tags, auto_send=auto_send)
57 return Timer(self, stat, rate, tags, auto_send=auto_send)
58
58
59 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
59 def timing(self, stat, delta, rate=1, tags=None, use_decimals=True):
60 """
60 """
61 Send new timing information.
61 Send new timing information.
62
62
63 `delta` can be either a number of milliseconds or a timedelta.
63 `delta` can be either a number of milliseconds or a timedelta.
64 """
64 """
65 if isinstance(delta, timedelta):
65 if isinstance(delta, timedelta):
66 # Convert timedelta to number of milliseconds.
66 # Convert timedelta to number of milliseconds.
67 delta = delta.total_seconds() * 1000.
67 delta = delta.total_seconds() * 1000.
68 if use_decimals:
68 if use_decimals:
69 fmt = '%0.6f|ms'
69 fmt = '%0.6f|ms'
70 else:
70 else:
71 fmt = '%s|ms'
71 fmt = '%s|ms'
72 self._send_stat(stat, fmt % delta, rate, tags)
72 self._send_stat(stat, fmt % delta, rate, tags)
73
73
74 def incr(self, stat, count=1, rate=1, tags=None):
74 def incr(self, stat, count=1, rate=1, tags=None):
75 """Increment a stat by `count`."""
75 """Increment a stat by `count`."""
76 self._send_stat(stat, '%s|c' % count, rate, tags)
76 self._send_stat(stat, '%s|c' % count, rate, tags)
77
77
78 def decr(self, stat, count=1, rate=1, tags=None):
78 def decr(self, stat, count=1, rate=1, tags=None):
79 """Decrement a stat by `count`."""
79 """Decrement a stat by `count`."""
80 self.incr(stat, -count, rate, tags)
80 self.incr(stat, -count, rate, tags)
81
81
82 def gauge(self, stat, value, rate=1, delta=False, tags=None):
82 def gauge(self, stat, value, rate=1, delta=False, tags=None):
83 """Set a gauge value."""
83 """Set a gauge value."""
84 if value < 0 and not delta:
84 if value < 0 and not delta:
85 if rate < 1:
85 if rate < 1:
86 if random.random() > rate:
86 if random.random() > rate:
87 return
87 return
88 with self.pipeline() as pipe:
88 with self.pipeline() as pipe:
89 pipe._send_stat(stat, '0|g', 1)
89 pipe._send_stat(stat, '0|g', 1)
90 pipe._send_stat(stat, '%s|g' % value, 1)
90 pipe._send_stat(stat, '%s|g' % value, 1)
91 else:
91 else:
92 prefix = '+' if delta and value >= 0 else ''
92 prefix = '+' if delta and value >= 0 else ''
93 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
93 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
94
94
95 def set(self, stat, value, rate=1):
95 def set(self, stat, value, rate=1):
96 """Set a set value."""
96 """Set a set value."""
97 self._send_stat(stat, '%s|s' % value, rate)
97 self._send_stat(stat, '%s|s' % value, rate)
98
98
99 def histogram(self, stat, value, rate=1, tags=None):
99 def histogram(self, stat, value, rate=1, tags=None):
100 """Set a histogram"""
100 """Set a histogram"""
101 self._send_stat(stat, '%s|h' % value, rate, tags)
101 self._send_stat(stat, '%s|h' % value, rate, tags)
102
102
103 def _send_stat(self, stat, value, rate, tags=None):
103 def _send_stat(self, stat, value, rate, tags=None):
104 self._after(self._prepare(stat, value, rate, tags))
104 self._after(self._prepare(stat, value, rate, tags))
105
105
106 def _prepare(self, stat, value, rate, tags=None):
106 def _prepare(self, stat, value, rate, tags=None):
107 global buckets_dict
107 global buckets_dict
108 buckets_dict[stat] = 1
108 buckets_dict[stat] = 1
109
109
110 if rate < 1:
110 if rate < 1:
111 if random.random() > rate:
111 if random.random() > rate:
112 return
112 return
113 value = '%s|@%s' % (value, rate)
113 value = '%s|@%s' % (value, rate)
114
114
115 if self._prefix:
115 if self._prefix:
116 stat = '%s.%s' % (self._prefix, stat)
116 stat = '%s.%s' % (self._prefix, stat)
117
117
118 res = '%s:%s%s' % (
118 res = '%s:%s%s' % (
119 stat,
119 stat,
120 value,
120 value,
121 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
121 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
122 )
122 )
123 return res
123 return res
124
124
125 def _after(self, data):
125 def _after(self, data):
126 if data:
126 if data:
127 self._send(data)
127 self._send(data)
128
128
129
129
130 class PipelineBase(StatsClientBase):
130 class PipelineBase(StatsClientBase):
131
131
132 def __init__(self, client):
132 def __init__(self, client):
133 self._client = client
133 self._client = client
134 self._prefix = client._prefix
134 self._prefix = client._prefix
135 self._stats = deque()
135 self._stats = deque()
136
136
137 def _send(self):
137 def _send(self):
138 raise NotImplementedError()
138 raise NotImplementedError()
139
139
140 def _after(self, data):
140 def _after(self, data):
141 if data is not None:
141 if data is not None:
142 self._stats.append(data)
142 self._stats.append(data)
143
143
144 def __enter__(self):
144 def __enter__(self):
145 return self
145 return self
146
146
147 def __exit__(self, typ, value, tb):
147 def __exit__(self, typ, value, tb):
148 self.send()
148 self.send()
149
149
150 def send(self):
150 def send(self):
151 if not self._stats:
151 if not self._stats:
152 return
152 return
153 self._send()
153 self._send()
154
154
155 def pipeline(self):
155 def pipeline(self):
156 return self.__class__(self)
156 return self.__class__(self)
@@ -1,75 +1,75 b''
1 from __future__ import absolute_import, division, unicode_literals
1
2
2
3 import socket
3 import socket
4
4
5 from .base import StatsClientBase, PipelineBase
5 from .base import StatsClientBase, PipelineBase
6
6
7
7
8 class StreamPipeline(PipelineBase):
8 class StreamPipeline(PipelineBase):
9 def _send(self):
9 def _send(self):
10 self._client._after('\n'.join(self._stats))
10 self._client._after('\n'.join(self._stats))
11 self._stats.clear()
11 self._stats.clear()
12
12
13
13
14 class StreamClientBase(StatsClientBase):
14 class StreamClientBase(StatsClientBase):
15 def connect(self):
15 def connect(self):
16 raise NotImplementedError()
16 raise NotImplementedError()
17
17
18 def close(self):
18 def close(self):
19 if self._sock and hasattr(self._sock, 'close'):
19 if self._sock and hasattr(self._sock, 'close'):
20 self._sock.close()
20 self._sock.close()
21 self._sock = None
21 self._sock = None
22
22
23 def reconnect(self):
23 def reconnect(self):
24 self.close()
24 self.close()
25 self.connect()
25 self.connect()
26
26
27 def pipeline(self):
27 def pipeline(self):
28 return StreamPipeline(self)
28 return StreamPipeline(self)
29
29
30 def _send(self, data):
30 def _send(self, data):
31 """Send data to statsd."""
31 """Send data to statsd."""
32 if not self._sock:
32 if not self._sock:
33 self.connect()
33 self.connect()
34 self._do_send(data)
34 self._do_send(data)
35
35
36 def _do_send(self, data):
36 def _do_send(self, data):
37 self._sock.sendall(data.encode('ascii') + b'\n')
37 self._sock.sendall(data.encode('ascii') + b'\n')
38
38
39
39
40 class TCPStatsClient(StreamClientBase):
40 class TCPStatsClient(StreamClientBase):
41 """TCP version of StatsClient."""
41 """TCP version of StatsClient."""
42
42
43 def __init__(self, host='localhost', port=8125, prefix=None,
43 def __init__(self, host='localhost', port=8125, prefix=None,
44 timeout=None, ipv6=False):
44 timeout=None, ipv6=False):
45 """Create a new client."""
45 """Create a new client."""
46 self._host = host
46 self._host = host
47 self._port = port
47 self._port = port
48 self._ipv6 = ipv6
48 self._ipv6 = ipv6
49 self._timeout = timeout
49 self._timeout = timeout
50 self._prefix = prefix
50 self._prefix = prefix
51 self._sock = None
51 self._sock = None
52
52
53 def connect(self):
53 def connect(self):
54 fam = socket.AF_INET6 if self._ipv6 else socket.AF_INET
54 fam = socket.AF_INET6 if self._ipv6 else socket.AF_INET
55 family, _, _, _, addr = socket.getaddrinfo(
55 family, _, _, _, addr = socket.getaddrinfo(
56 self._host, self._port, fam, socket.SOCK_STREAM)[0]
56 self._host, self._port, fam, socket.SOCK_STREAM)[0]
57 self._sock = socket.socket(family, socket.SOCK_STREAM)
57 self._sock = socket.socket(family, socket.SOCK_STREAM)
58 self._sock.settimeout(self._timeout)
58 self._sock.settimeout(self._timeout)
59 self._sock.connect(addr)
59 self._sock.connect(addr)
60
60
61
61
62 class UnixSocketStatsClient(StreamClientBase):
62 class UnixSocketStatsClient(StreamClientBase):
63 """Unix domain socket version of StatsClient."""
63 """Unix domain socket version of StatsClient."""
64
64
65 def __init__(self, socket_path, prefix=None, timeout=None):
65 def __init__(self, socket_path, prefix=None, timeout=None):
66 """Create a new client."""
66 """Create a new client."""
67 self._socket_path = socket_path
67 self._socket_path = socket_path
68 self._timeout = timeout
68 self._timeout = timeout
69 self._prefix = prefix
69 self._prefix = prefix
70 self._sock = None
70 self._sock = None
71
71
72 def connect(self):
72 def connect(self):
73 self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
73 self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
74 self._sock.settimeout(self._timeout)
74 self._sock.settimeout(self._timeout)
75 self._sock.connect(self._socket_path)
75 self._sock.connect(self._socket_path)
@@ -1,75 +1,68 b''
1 from __future__ import absolute_import, division, unicode_literals
1 from __future__ import absolute_import, division, unicode_literals
2
2
3 import functools
3 import functools
4
5 # Use timer that's not susceptible to time of day adjustments.
6 try:
7 # perf_counter is only present on Py3.3+
8 from time import perf_counter as time_now
4 from time import perf_counter as time_now
9 except ImportError:
10 # fall back to using time
11 from time import time as time_now
12
5
13
6
14 def safe_wraps(wrapper, *args, **kwargs):
7 def safe_wraps(wrapper, *args, **kwargs):
15 """Safely wraps partial functions."""
8 """Safely wraps partial functions."""
16 while isinstance(wrapper, functools.partial):
9 while isinstance(wrapper, functools.partial):
17 wrapper = wrapper.func
10 wrapper = wrapper.func
18 return functools.wraps(wrapper, *args, **kwargs)
11 return functools.wraps(wrapper, *args, **kwargs)
19
12
20
13
21 class Timer(object):
14 class Timer(object):
22 """A context manager/decorator for statsd.timing()."""
15 """A context manager/decorator for statsd.timing()."""
23
16
24 def __init__(self, client, stat, rate=1, tags=None, use_decimals=True, auto_send=True):
17 def __init__(self, client, stat, rate=1, tags=None, use_decimals=True, auto_send=True):
25 self.client = client
18 self.client = client
26 self.stat = stat
19 self.stat = stat
27 self.rate = rate
20 self.rate = rate
28 self.tags = tags
21 self.tags = tags
29 self.ms = None
22 self.ms = None
30 self._sent = False
23 self._sent = False
31 self._start_time = None
24 self._start_time = None
32 self.use_decimals = use_decimals
25 self.use_decimals = use_decimals
33 self.auto_send = auto_send
26 self.auto_send = auto_send
34
27
35 def __call__(self, f):
28 def __call__(self, f):
36 """Thread-safe timing function decorator."""
29 """Thread-safe timing function decorator."""
37 @safe_wraps(f)
30 @safe_wraps(f)
38 def _wrapped(*args, **kwargs):
31 def _wrapped(*args, **kwargs):
39 start_time = time_now()
32 start_time = time_now()
40 try:
33 try:
41 return f(*args, **kwargs)
34 return f(*args, **kwargs)
42 finally:
35 finally:
43 elapsed_time_ms = 1000.0 * (time_now() - start_time)
36 elapsed_time_ms = 1000.0 * (time_now() - start_time)
44 self.client.timing(self.stat, elapsed_time_ms, self.rate, self.tags, self.use_decimals)
37 self.client.timing(self.stat, elapsed_time_ms, self.rate, self.tags, self.use_decimals)
45 self._sent = True
38 self._sent = True
46 return _wrapped
39 return _wrapped
47
40
48 def __enter__(self):
41 def __enter__(self):
49 return self.start()
42 return self.start()
50
43
51 def __exit__(self, typ, value, tb):
44 def __exit__(self, typ, value, tb):
52 self.stop(send=self.auto_send)
45 self.stop(send=self.auto_send)
53
46
54 def start(self):
47 def start(self):
55 self.ms = None
48 self.ms = None
56 self._sent = False
49 self._sent = False
57 self._start_time = time_now()
50 self._start_time = time_now()
58 return self
51 return self
59
52
60 def stop(self, send=True):
53 def stop(self, send=True):
61 if self._start_time is None:
54 if self._start_time is None:
62 raise RuntimeError('Timer has not started.')
55 raise RuntimeError('Timer has not started.')
63 dt = time_now() - self._start_time
56 dt = time_now() - self._start_time
64 self.ms = 1000.0 * dt # Convert to milliseconds.
57 self.ms = 1000.0 * dt # Convert to milliseconds.
65 if send:
58 if send:
66 self.send()
59 self.send()
67 return self
60 return self
68
61
69 def send(self):
62 def send(self):
70 if self.ms is None:
63 if self.ms is None:
71 raise RuntimeError('No data recorded.')
64 raise RuntimeError('No data recorded.')
72 if self._sent:
65 if self._sent:
73 raise RuntimeError('Already sent data.')
66 raise RuntimeError('Already sent data.')
74 self._sent = True
67 self._sent = True
75 self.client.timing(self.stat, self.ms, self.rate, self.tags, self.use_decimals)
68 self.client.timing(self.stat, self.ms, self.rate, self.tags, self.use_decimals)
@@ -1,55 +1,55 b''
1 from __future__ import absolute_import, division, unicode_literals
1
2
2
3 import socket
3 import socket
4
4
5 from .base import StatsClientBase, PipelineBase
5 from .base import StatsClientBase, PipelineBase
6
6
7
7
8 class Pipeline(PipelineBase):
8 class Pipeline(PipelineBase):
9
9
10 def __init__(self, client):
10 def __init__(self, client):
11 super(Pipeline, self).__init__(client)
11 super(Pipeline, self).__init__(client)
12 self._maxudpsize = client._maxudpsize
12 self._maxudpsize = client._maxudpsize
13
13
14 def _send(self):
14 def _send(self):
15 data = self._stats.popleft()
15 data = self._stats.popleft()
16 while self._stats:
16 while self._stats:
17 # Use popleft to preserve the order of the stats.
17 # Use popleft to preserve the order of the stats.
18 stat = self._stats.popleft()
18 stat = self._stats.popleft()
19 if len(stat) + len(data) + 1 >= self._maxudpsize:
19 if len(stat) + len(data) + 1 >= self._maxudpsize:
20 self._client._after(data)
20 self._client._after(data)
21 data = stat
21 data = stat
22 else:
22 else:
23 data += '\n' + stat
23 data += '\n' + stat
24 self._client._after(data)
24 self._client._after(data)
25
25
26
26
27 class StatsClient(StatsClientBase):
27 class StatsClient(StatsClientBase):
28 """A client for statsd."""
28 """A client for statsd."""
29
29
30 def __init__(self, host='localhost', port=8125, prefix=None,
30 def __init__(self, host='localhost', port=8125, prefix=None,
31 maxudpsize=512, ipv6=False):
31 maxudpsize=512, ipv6=False):
32 """Create a new client."""
32 """Create a new client."""
33 fam = socket.AF_INET6 if ipv6 else socket.AF_INET
33 fam = socket.AF_INET6 if ipv6 else socket.AF_INET
34 family, _, _, _, addr = socket.getaddrinfo(
34 family, _, _, _, addr = socket.getaddrinfo(
35 host, port, fam, socket.SOCK_DGRAM)[0]
35 host, port, fam, socket.SOCK_DGRAM)[0]
36 self._addr = addr
36 self._addr = addr
37 self._sock = socket.socket(family, socket.SOCK_DGRAM)
37 self._sock = socket.socket(family, socket.SOCK_DGRAM)
38 self._prefix = prefix
38 self._prefix = prefix
39 self._maxudpsize = maxudpsize
39 self._maxudpsize = maxudpsize
40
40
41 def _send(self, data):
41 def _send(self, data):
42 """Send data to statsd."""
42 """Send data to statsd."""
43 try:
43 try:
44 self._sock.sendto(data.encode('ascii'), self._addr)
44 self._sock.sendto(data.encode('ascii'), self._addr)
45 except (socket.error, RuntimeError):
45 except (socket.error, RuntimeError):
46 # No time for love, Dr. Jones!
46 # No time for love, Dr. Jones!
47 pass
47 pass
48
48
49 def close(self):
49 def close(self):
50 if self._sock and hasattr(self._sock, 'close'):
50 if self._sock and hasattr(self._sock, 'close'):
51 self._sock.close()
51 self._sock.close()
52 self._sock = None
52 self._sock = None
53
53
54 def pipeline(self):
54 def pipeline(self):
55 return Pipeline(self)
55 return Pipeline(self)
General Comments 0
You need to be logged in to leave comments. Login now