##// END OF EJS Templates
configs: fixed IP extraction in gunicorn
super-admin -
r1025:f2b029ea default
parent child Browse files
Show More
@@ -1,274 +1,396 b''
1 1 """
2 2 Gunicorn config extension and hooks. This config file adds some extra settings and memory management.
3 3 Gunicorn configuration should be managed by .ini files entries of RhodeCode or VCSServer
4 4 """
5 5
6 6 import gc
7 7 import os
8 8 import sys
9 9 import math
10 10 import time
11 11 import threading
12 12 import traceback
13 13 import random
14 import socket
14 15 from gunicorn.glogging import Logger
15 16
16 17
17 18 def get_workers():
18 19 import multiprocessing
19 20 return multiprocessing.cpu_count() * 2 + 1
20 21
21 22 # GLOBAL
22 23 errorlog = '-'
23 24 accesslog = '-'
24 25
25 26
26 27 # SERVER MECHANICS
27 28 # None == system temp dir
28 29 # worker_tmp_dir is recommended to be set to some tmpfs
29 30 worker_tmp_dir = None
30 31 tmp_upload_dir = None
31 32
32 33 #reuse_port = True
33 34
34 35 # Custom log format
35 36 #access_log_format = (
36 37 # '%(t)s %(p)s INFO [GNCRN] %(h)-15s rqt:%(L)s %(s)s %(b)-6s "%(m)s:%(U)s %(q)s" usr:%(u)s "%(f)s" "%(a)s"')
37 38
38 39 # loki format for easier parsing in grafana
39 40 access_log_format = (
40 41 'time="%(t)s" pid=%(p)s level="INFO" type="[GNCRN]" ip="%(h)-15s" rqt="%(L)s" response_code="%(s)s" response_bytes="%(b)-6s" uri="%(m)s:%(U)s %(q)s" user=":%(u)s" user_agent="%(a)s"')
41 42
42 43 # self adjust workers based on CPU count
43 44 # workers = get_workers()
44 45
45 46
46 47 def _get_process_rss(pid=None):
47 48 try:
48 49 import psutil
49 50 if pid:
50 51 proc = psutil.Process(pid)
51 52 else:
52 53 proc = psutil.Process()
53 54 return proc.memory_info().rss
54 55 except Exception:
55 56 return None
56 57
57 58
58 59 def _get_config(ini_path):
59 60
60 61 try:
61 62 import configparser
62 63 except ImportError:
63 64 import ConfigParser as configparser
64 65 try:
65 66 config = configparser.RawConfigParser()
66 67 config.read(ini_path)
67 68 return config
68 69 except Exception:
69 70 return None
70 71
71 72
72 73 def _time_with_offset(memory_usage_check_interval):
73 74 return time.time() - random.randint(0, memory_usage_check_interval/2.0)
74 75
75 76
76 77 def pre_fork(server, worker):
77 78 pass
78 79
79 80
80 81 def post_fork(server, worker):
81 82
82 83 # memory spec defaults
83 84 _memory_max_usage = 0
84 85 _memory_usage_check_interval = 60
85 86 _memory_usage_recovery_threshold = 0.8
86 87
87 88 ini_path = os.path.abspath(server.cfg.paste)
88 89 conf = _get_config(ini_path)
89 90
90 91 section = 'server:main'
91 92 if conf and conf.has_section(section):
92 93
93 94 if conf.has_option(section, 'memory_max_usage'):
94 95 _memory_max_usage = conf.getint(section, 'memory_max_usage')
95 96
96 97 if conf.has_option(section, 'memory_usage_check_interval'):
97 98 _memory_usage_check_interval = conf.getint(section, 'memory_usage_check_interval')
98 99
99 100 if conf.has_option(section, 'memory_usage_recovery_threshold'):
100 101 _memory_usage_recovery_threshold = conf.getfloat(section, 'memory_usage_recovery_threshold')
101 102
102 103 worker._memory_max_usage = int(os.environ.get('RC_GUNICORN_MEMORY_MAX_USAGE', '')
103 104 or _memory_max_usage)
104 105 worker._memory_usage_check_interval = int(os.environ.get('RC_GUNICORN_MEMORY_USAGE_CHECK_INTERVAL', '')
105 106 or _memory_usage_check_interval)
106 107 worker._memory_usage_recovery_threshold = float(os.environ.get('RC_GUNICORN_MEMORY_USAGE_RECOVERY_THRESHOLD', '')
107 108 or _memory_usage_recovery_threshold)
108 109
109 110 # register memory last check time, with some random offset so we don't recycle all
110 111 # at once
111 112 worker._last_memory_check_time = _time_with_offset(_memory_usage_check_interval)
112 113
113 114 if _memory_max_usage:
114 115 server.log.info("[%-10s] WORKER spawned with max memory set at %s", worker.pid,
115 116 _format_data_size(_memory_max_usage))
116 117 else:
117 118 server.log.info("[%-10s] WORKER spawned", worker.pid)
118 119
119 120
120 121 def pre_exec(server):
121 122 server.log.info("Forked child, re-executing.")
122 123
123 124
124 125 def on_starting(server):
125 126 server_lbl = '{} {}'.format(server.proc_name, server.address)
126 127 server.log.info("Server %s is starting.", server_lbl)
127 128
128 129
129 130 def when_ready(server):
130 131 server.log.info("Server %s is ready. Spawning workers", server)
131 132
132 133
133 134 def on_reload(server):
134 135 pass
135 136
136 137
137 138 def _format_data_size(size, unit="B", precision=1, binary=True):
138 139 """Format a number using SI units (kilo, mega, etc.).
139 140
140 141 ``size``: The number as a float or int.
141 142
142 143 ``unit``: The unit name in plural form. Examples: "bytes", "B".
143 144
144 145 ``precision``: How many digits to the right of the decimal point. Default
145 146 is 1. 0 suppresses the decimal point.
146 147
147 148 ``binary``: If false, use base-10 decimal prefixes (kilo = K = 1000).
148 149 If true, use base-2 binary prefixes (kibi = Ki = 1024).
149 150
150 151 ``full_name``: If false (default), use the prefix abbreviation ("k" or
151 152 "Ki"). If true, use the full prefix ("kilo" or "kibi"). If false,
152 153 use abbreviation ("k" or "Ki").
153 154
154 155 """
155 156
156 157 if not binary:
157 158 base = 1000
158 159 multiples = ('', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
159 160 else:
160 161 base = 1024
161 162 multiples = ('', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi', 'Yi')
162 163
163 164 sign = ""
164 165 if size > 0:
165 166 m = int(math.log(size, base))
166 167 elif size < 0:
167 168 sign = "-"
168 169 size = -size
169 170 m = int(math.log(size, base))
170 171 else:
171 172 m = 0
172 173 if m > 8:
173 174 m = 8
174 175
175 176 if m == 0:
176 177 precision = '%.0f'
177 178 else:
178 179 precision = '%%.%df' % precision
179 180
180 181 size = precision % (size / math.pow(base, m))
181 182
182 183 return '%s%s %s%s' % (sign, size.strip(), multiples[m], unit)
183 184
184 185
185 186 def _check_memory_usage(worker):
186 187 memory_max_usage = worker._memory_max_usage
187 188 if not memory_max_usage:
188 189 return
189 190
190 191 memory_usage_check_interval = worker._memory_usage_check_interval
191 192 memory_usage_recovery_threshold = memory_max_usage * worker._memory_usage_recovery_threshold
192 193
193 194 elapsed = time.time() - worker._last_memory_check_time
194 195 if elapsed > memory_usage_check_interval:
195 196 mem_usage = _get_process_rss()
196 197 if mem_usage and mem_usage > memory_max_usage:
197 198 worker.log.info(
198 199 "memory usage %s > %s, forcing gc",
199 200 _format_data_size(mem_usage), _format_data_size(memory_max_usage))
200 201 # Try to clean it up by forcing a full collection.
201 202 gc.collect()
202 203 mem_usage = _get_process_rss()
203 204 if mem_usage > memory_usage_recovery_threshold:
204 205 # Didn't clean up enough, we'll have to terminate.
205 206 worker.log.warning(
206 207 "memory usage %s > %s after gc, quitting",
207 208 _format_data_size(mem_usage), _format_data_size(memory_max_usage))
208 209 # This will cause worker to auto-restart itself
209 210 worker.alive = False
210 211 worker._last_memory_check_time = time.time()
211 212
212 213
213 214 def worker_int(worker):
214 215 worker.log.info("[%-10s] worker received INT or QUIT signal", worker.pid)
215 216
216 217 # get traceback info, on worker crash
217 218 id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
218 219 code = []
219 220 for thread_id, stack in sys._current_frames().items():
220 221 code.append(
221 222 "\n# Thread: %s(%d)" % (id2name.get(thread_id, ""), thread_id))
222 223 for fname, lineno, name, line in traceback.extract_stack(stack):
223 224 code.append('File: "%s", line %d, in %s' % (fname, lineno, name))
224 225 if line:
225 226 code.append(" %s" % (line.strip()))
226 227 worker.log.debug("\n".join(code))
227 228
228 229
229 230 def worker_abort(worker):
230 231 worker.log.info("[%-10s] worker received SIGABRT signal", worker.pid)
231 232
232 233
233 234 def worker_exit(server, worker):
234 235 worker.log.info("[%-10s] worker exit", worker.pid)
235 236
236 237
237 238 def child_exit(server, worker):
238 239 worker.log.info("[%-10s] worker child exit", worker.pid)
239 240
240 241
241 242 def pre_request(worker, req):
242 243 worker.start_time = time.time()
243 244 worker.log.debug(
244 245 "GNCRN PRE WORKER [cnt:%s]: %s %s", worker.nr, req.method, req.path)
245 246
246 247
247 248 def post_request(worker, req, environ, resp):
248 249 total_time = time.time() - worker.start_time
249 250 # Gunicorn sometimes has problems with reading the status_code
250 251 status_code = getattr(resp, 'status_code', '')
251 252 worker.log.debug(
252 253 "GNCRN POST WORKER [cnt:%s]: %s %s resp: %s, Load Time: %.4fs",
253 254 worker.nr, req.method, req.path, status_code, total_time)
254 255 _check_memory_usage(worker)
255 256
256 257
258 def _filter_proxy(ip):
259 """
260 Passed in IP addresses in HEADERS can be in a special format of multiple
261 ips. Those comma separated IPs are passed from various proxies in the
262 chain of request processing. The left-most being the original client.
263 We only care about the first IP which came from the org. client.
264
265 :param ip: ip string from headers
266 """
267 if ',' in ip:
268 _ips = ip.split(',')
269 _first_ip = _ips[0].strip()
270 return _first_ip
271 return ip
272
273
274 def _filter_port(ip):
275 """
276 Removes a port from ip, there are 4 main cases to handle here.
277 - ipv4 eg. 127.0.0.1
278 - ipv6 eg. ::1
279 - ipv4+port eg. 127.0.0.1:8080
280 - ipv6+port eg. [::1]:8080
281
282 :param ip:
283 """
284 def is_ipv6(ip_addr):
285 if hasattr(socket, 'inet_pton'):
286 try:
287 socket.inet_pton(socket.AF_INET6, ip_addr)
288 except socket.error:
289 return False
290 else:
291 return False
292 return True
293
294 if ':' not in ip: # must be ipv4 pure ip
295 return ip
296
297 if '[' in ip and ']' in ip: # ipv6 with port
298 return ip.split(']')[0][1:].lower()
299
300 # must be ipv6 or ipv4 with port
301 if is_ipv6(ip):
302 return ip
303 else:
304 ip, _port = ip.split(':')[:2] # means ipv4+port
305 return ip
306
307
308 def get_ip_addr(environ):
309 proxy_key = 'HTTP_X_REAL_IP'
310 proxy_key2 = 'HTTP_X_FORWARDED_FOR'
311 def_key = 'REMOTE_ADDR'
312 _filters = lambda x: _filter_port(_filter_proxy(x))
313
314 ip = environ.get(proxy_key)
315 if ip:
316 return _filters(ip)
317
318 ip = environ.get(proxy_key2)
319 if ip:
320 return _filters(ip)
321
322 ip = environ.get(def_key, '0.0.0.0')
323 return _filters(ip)
324
325
257 326 class RhodeCodeLogger(Logger):
258 327 """
259 328 Custom Logger that allows some customization that gunicorn doesn't allow
260 329 """
261 330
262 331 datefmt = r"%Y-%m-%d %H:%M:%S"
263 332
264 333 def __init__(self, cfg):
265 334 Logger.__init__(self, cfg)
266 335
267 336 def now(self):
268 337 """ return date in RhodeCode Log format """
269 338 now = time.time()
270 339 msecs = int((now - long(now)) * 1000)
271 340 return time.strftime(self.datefmt, time.localtime(now)) + '.{0:03d}'.format(msecs)
272 341
342 def atoms(self, resp, req, environ, request_time):
343 """ Gets atoms for log formatting.
344 """
345 status = resp.status
346 if isinstance(status, str):
347 status = status.split(None, 1)[0]
348 atoms = {
349 'h': get_ip_addr(environ),
350 'l': '-',
351 'u': self._get_user(environ) or '-',
352 't': self.now(),
353 'r': "%s %s %s" % (environ['REQUEST_METHOD'],
354 environ['RAW_URI'],
355 environ["SERVER_PROTOCOL"]),
356 's': status,
357 'm': environ.get('REQUEST_METHOD'),
358 'U': environ.get('PATH_INFO'),
359 'q': environ.get('QUERY_STRING'),
360 'H': environ.get('SERVER_PROTOCOL'),
361 'b': getattr(resp, 'sent', None) is not None and str(resp.sent) or '-',
362 'B': getattr(resp, 'sent', None),
363 'f': environ.get('HTTP_REFERER', '-'),
364 'a': environ.get('HTTP_USER_AGENT', '-'),
365 'T': request_time.seconds,
366 'D': (request_time.seconds * 1000000) + request_time.microseconds,
367 'M': (request_time.seconds * 1000) + int(request_time.microseconds/1000),
368 'L': "%d.%06d" % (request_time.seconds, request_time.microseconds),
369 'p': "<%s>" % os.getpid()
370 }
371
372 # add request headers
373 if hasattr(req, 'headers'):
374 req_headers = req.headers
375 else:
376 req_headers = req
377
378 if hasattr(req_headers, "items"):
379 req_headers = req_headers.items()
380
381 atoms.update({"{%s}i" % k.lower(): v for k, v in req_headers})
382
383 resp_headers = resp.headers
384 if hasattr(resp_headers, "items"):
385 resp_headers = resp_headers.items()
386
387 # add response headers
388 atoms.update({"{%s}o" % k.lower(): v for k, v in resp_headers})
389
390 # add environ variables
391 environ_variables = environ.items()
392 atoms.update({"{%s}e" % k.lower(): v for k, v in environ_variables})
393
394 return atoms
273 395
274 396 logger_class = RhodeCodeLogger
General Comments 0
You need to be logged in to leave comments. Login now