##// END OF EJS Templates
configs: fixed IP extraction in gunicorn
super-admin -
r1025:f2b029ea default
parent child Browse files
Show More
@@ -1,274 +1,396 b''
1 """
1 """
2 Gunicorn config extension and hooks. This config file adds some extra settings and memory management.
2 Gunicorn config extension and hooks. This config file adds some extra settings and memory management.
3 Gunicorn configuration should be managed by .ini files entries of RhodeCode or VCSServer
3 Gunicorn configuration should be managed by .ini files entries of RhodeCode or VCSServer
4 """
4 """
5
5
6 import gc
6 import gc
7 import os
7 import os
8 import sys
8 import sys
9 import math
9 import math
10 import time
10 import time
11 import threading
11 import threading
12 import traceback
12 import traceback
13 import random
13 import random
14 import socket
14 from gunicorn.glogging import Logger
15 from gunicorn.glogging import Logger
15
16
16
17
17 def get_workers():
18 def get_workers():
18 import multiprocessing
19 import multiprocessing
19 return multiprocessing.cpu_count() * 2 + 1
20 return multiprocessing.cpu_count() * 2 + 1
20
21
21 # GLOBAL
22 # GLOBAL
22 errorlog = '-'
23 errorlog = '-'
23 accesslog = '-'
24 accesslog = '-'
24
25
25
26
26 # SERVER MECHANICS
27 # SERVER MECHANICS
27 # None == system temp dir
28 # None == system temp dir
28 # worker_tmp_dir is recommended to be set to some tmpfs
29 # worker_tmp_dir is recommended to be set to some tmpfs
29 worker_tmp_dir = None
30 worker_tmp_dir = None
30 tmp_upload_dir = None
31 tmp_upload_dir = None
31
32
32 #reuse_port = True
33 #reuse_port = True
33
34
34 # Custom log format
35 # Custom log format
35 #access_log_format = (
36 #access_log_format = (
36 # '%(t)s %(p)s INFO [GNCRN] %(h)-15s rqt:%(L)s %(s)s %(b)-6s "%(m)s:%(U)s %(q)s" usr:%(u)s "%(f)s" "%(a)s"')
37 # '%(t)s %(p)s INFO [GNCRN] %(h)-15s rqt:%(L)s %(s)s %(b)-6s "%(m)s:%(U)s %(q)s" usr:%(u)s "%(f)s" "%(a)s"')
37
38
38 # loki format for easier parsing in grafana
39 # loki format for easier parsing in grafana
39 access_log_format = (
40 access_log_format = (
40 'time="%(t)s" pid=%(p)s level="INFO" type="[GNCRN]" ip="%(h)-15s" rqt="%(L)s" response_code="%(s)s" response_bytes="%(b)-6s" uri="%(m)s:%(U)s %(q)s" user=":%(u)s" user_agent="%(a)s"')
41 'time="%(t)s" pid=%(p)s level="INFO" type="[GNCRN]" ip="%(h)-15s" rqt="%(L)s" response_code="%(s)s" response_bytes="%(b)-6s" uri="%(m)s:%(U)s %(q)s" user=":%(u)s" user_agent="%(a)s"')
41
42
42 # self adjust workers based on CPU count
43 # self adjust workers based on CPU count
43 # workers = get_workers()
44 # workers = get_workers()
44
45
45
46
46 def _get_process_rss(pid=None):
47 def _get_process_rss(pid=None):
47 try:
48 try:
48 import psutil
49 import psutil
49 if pid:
50 if pid:
50 proc = psutil.Process(pid)
51 proc = psutil.Process(pid)
51 else:
52 else:
52 proc = psutil.Process()
53 proc = psutil.Process()
53 return proc.memory_info().rss
54 return proc.memory_info().rss
54 except Exception:
55 except Exception:
55 return None
56 return None
56
57
57
58
58 def _get_config(ini_path):
59 def _get_config(ini_path):
59
60
60 try:
61 try:
61 import configparser
62 import configparser
62 except ImportError:
63 except ImportError:
63 import ConfigParser as configparser
64 import ConfigParser as configparser
64 try:
65 try:
65 config = configparser.RawConfigParser()
66 config = configparser.RawConfigParser()
66 config.read(ini_path)
67 config.read(ini_path)
67 return config
68 return config
68 except Exception:
69 except Exception:
69 return None
70 return None
70
71
71
72
72 def _time_with_offset(memory_usage_check_interval):
73 def _time_with_offset(memory_usage_check_interval):
73 return time.time() - random.randint(0, memory_usage_check_interval/2.0)
74 return time.time() - random.randint(0, memory_usage_check_interval/2.0)
74
75
75
76
76 def pre_fork(server, worker):
77 def pre_fork(server, worker):
77 pass
78 pass
78
79
79
80
80 def post_fork(server, worker):
81 def post_fork(server, worker):
81
82
82 # memory spec defaults
83 # memory spec defaults
83 _memory_max_usage = 0
84 _memory_max_usage = 0
84 _memory_usage_check_interval = 60
85 _memory_usage_check_interval = 60
85 _memory_usage_recovery_threshold = 0.8
86 _memory_usage_recovery_threshold = 0.8
86
87
87 ini_path = os.path.abspath(server.cfg.paste)
88 ini_path = os.path.abspath(server.cfg.paste)
88 conf = _get_config(ini_path)
89 conf = _get_config(ini_path)
89
90
90 section = 'server:main'
91 section = 'server:main'
91 if conf and conf.has_section(section):
92 if conf and conf.has_section(section):
92
93
93 if conf.has_option(section, 'memory_max_usage'):
94 if conf.has_option(section, 'memory_max_usage'):
94 _memory_max_usage = conf.getint(section, 'memory_max_usage')
95 _memory_max_usage = conf.getint(section, 'memory_max_usage')
95
96
96 if conf.has_option(section, 'memory_usage_check_interval'):
97 if conf.has_option(section, 'memory_usage_check_interval'):
97 _memory_usage_check_interval = conf.getint(section, 'memory_usage_check_interval')
98 _memory_usage_check_interval = conf.getint(section, 'memory_usage_check_interval')
98
99
99 if conf.has_option(section, 'memory_usage_recovery_threshold'):
100 if conf.has_option(section, 'memory_usage_recovery_threshold'):
100 _memory_usage_recovery_threshold = conf.getfloat(section, 'memory_usage_recovery_threshold')
101 _memory_usage_recovery_threshold = conf.getfloat(section, 'memory_usage_recovery_threshold')
101
102
102 worker._memory_max_usage = int(os.environ.get('RC_GUNICORN_MEMORY_MAX_USAGE', '')
103 worker._memory_max_usage = int(os.environ.get('RC_GUNICORN_MEMORY_MAX_USAGE', '')
103 or _memory_max_usage)
104 or _memory_max_usage)
104 worker._memory_usage_check_interval = int(os.environ.get('RC_GUNICORN_MEMORY_USAGE_CHECK_INTERVAL', '')
105 worker._memory_usage_check_interval = int(os.environ.get('RC_GUNICORN_MEMORY_USAGE_CHECK_INTERVAL', '')
105 or _memory_usage_check_interval)
106 or _memory_usage_check_interval)
106 worker._memory_usage_recovery_threshold = float(os.environ.get('RC_GUNICORN_MEMORY_USAGE_RECOVERY_THRESHOLD', '')
107 worker._memory_usage_recovery_threshold = float(os.environ.get('RC_GUNICORN_MEMORY_USAGE_RECOVERY_THRESHOLD', '')
107 or _memory_usage_recovery_threshold)
108 or _memory_usage_recovery_threshold)
108
109
109 # register memory last check time, with some random offset so we don't recycle all
110 # register memory last check time, with some random offset so we don't recycle all
110 # at once
111 # at once
111 worker._last_memory_check_time = _time_with_offset(_memory_usage_check_interval)
112 worker._last_memory_check_time = _time_with_offset(_memory_usage_check_interval)
112
113
113 if _memory_max_usage:
114 if _memory_max_usage:
114 server.log.info("[%-10s] WORKER spawned with max memory set at %s", worker.pid,
115 server.log.info("[%-10s] WORKER spawned with max memory set at %s", worker.pid,
115 _format_data_size(_memory_max_usage))
116 _format_data_size(_memory_max_usage))
116 else:
117 else:
117 server.log.info("[%-10s] WORKER spawned", worker.pid)
118 server.log.info("[%-10s] WORKER spawned", worker.pid)
118
119
119
120
120 def pre_exec(server):
121 def pre_exec(server):
121 server.log.info("Forked child, re-executing.")
122 server.log.info("Forked child, re-executing.")
122
123
123
124
124 def on_starting(server):
125 def on_starting(server):
125 server_lbl = '{} {}'.format(server.proc_name, server.address)
126 server_lbl = '{} {}'.format(server.proc_name, server.address)
126 server.log.info("Server %s is starting.", server_lbl)
127 server.log.info("Server %s is starting.", server_lbl)
127
128
128
129
129 def when_ready(server):
130 def when_ready(server):
130 server.log.info("Server %s is ready. Spawning workers", server)
131 server.log.info("Server %s is ready. Spawning workers", server)
131
132
132
133
133 def on_reload(server):
134 def on_reload(server):
134 pass
135 pass
135
136
136
137
137 def _format_data_size(size, unit="B", precision=1, binary=True):
138 def _format_data_size(size, unit="B", precision=1, binary=True):
138 """Format a number using SI units (kilo, mega, etc.).
139 """Format a number using SI units (kilo, mega, etc.).
139
140
140 ``size``: The number as a float or int.
141 ``size``: The number as a float or int.
141
142
142 ``unit``: The unit name in plural form. Examples: "bytes", "B".
143 ``unit``: The unit name in plural form. Examples: "bytes", "B".
143
144
144 ``precision``: How many digits to the right of the decimal point. Default
145 ``precision``: How many digits to the right of the decimal point. Default
145 is 1. 0 suppresses the decimal point.
146 is 1. 0 suppresses the decimal point.
146
147
147 ``binary``: If false, use base-10 decimal prefixes (kilo = K = 1000).
148 ``binary``: If false, use base-10 decimal prefixes (kilo = K = 1000).
148 If true, use base-2 binary prefixes (kibi = Ki = 1024).
149 If true, use base-2 binary prefixes (kibi = Ki = 1024).
149
150
150 ``full_name``: If false (default), use the prefix abbreviation ("k" or
151 ``full_name``: If false (default), use the prefix abbreviation ("k" or
151 "Ki"). If true, use the full prefix ("kilo" or "kibi"). If false,
152 "Ki"). If true, use the full prefix ("kilo" or "kibi"). If false,
152 use abbreviation ("k" or "Ki").
153 use abbreviation ("k" or "Ki").
153
154
154 """
155 """
155
156
156 if not binary:
157 if not binary:
157 base = 1000
158 base = 1000
158 multiples = ('', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
159 multiples = ('', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
159 else:
160 else:
160 base = 1024
161 base = 1024
161 multiples = ('', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi', 'Yi')
162 multiples = ('', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi', 'Yi')
162
163
163 sign = ""
164 sign = ""
164 if size > 0:
165 if size > 0:
165 m = int(math.log(size, base))
166 m = int(math.log(size, base))
166 elif size < 0:
167 elif size < 0:
167 sign = "-"
168 sign = "-"
168 size = -size
169 size = -size
169 m = int(math.log(size, base))
170 m = int(math.log(size, base))
170 else:
171 else:
171 m = 0
172 m = 0
172 if m > 8:
173 if m > 8:
173 m = 8
174 m = 8
174
175
175 if m == 0:
176 if m == 0:
176 precision = '%.0f'
177 precision = '%.0f'
177 else:
178 else:
178 precision = '%%.%df' % precision
179 precision = '%%.%df' % precision
179
180
180 size = precision % (size / math.pow(base, m))
181 size = precision % (size / math.pow(base, m))
181
182
182 return '%s%s %s%s' % (sign, size.strip(), multiples[m], unit)
183 return '%s%s %s%s' % (sign, size.strip(), multiples[m], unit)
183
184
184
185
185 def _check_memory_usage(worker):
186 def _check_memory_usage(worker):
186 memory_max_usage = worker._memory_max_usage
187 memory_max_usage = worker._memory_max_usage
187 if not memory_max_usage:
188 if not memory_max_usage:
188 return
189 return
189
190
190 memory_usage_check_interval = worker._memory_usage_check_interval
191 memory_usage_check_interval = worker._memory_usage_check_interval
191 memory_usage_recovery_threshold = memory_max_usage * worker._memory_usage_recovery_threshold
192 memory_usage_recovery_threshold = memory_max_usage * worker._memory_usage_recovery_threshold
192
193
193 elapsed = time.time() - worker._last_memory_check_time
194 elapsed = time.time() - worker._last_memory_check_time
194 if elapsed > memory_usage_check_interval:
195 if elapsed > memory_usage_check_interval:
195 mem_usage = _get_process_rss()
196 mem_usage = _get_process_rss()
196 if mem_usage and mem_usage > memory_max_usage:
197 if mem_usage and mem_usage > memory_max_usage:
197 worker.log.info(
198 worker.log.info(
198 "memory usage %s > %s, forcing gc",
199 "memory usage %s > %s, forcing gc",
199 _format_data_size(mem_usage), _format_data_size(memory_max_usage))
200 _format_data_size(mem_usage), _format_data_size(memory_max_usage))
200 # Try to clean it up by forcing a full collection.
201 # Try to clean it up by forcing a full collection.
201 gc.collect()
202 gc.collect()
202 mem_usage = _get_process_rss()
203 mem_usage = _get_process_rss()
203 if mem_usage > memory_usage_recovery_threshold:
204 if mem_usage > memory_usage_recovery_threshold:
204 # Didn't clean up enough, we'll have to terminate.
205 # Didn't clean up enough, we'll have to terminate.
205 worker.log.warning(
206 worker.log.warning(
206 "memory usage %s > %s after gc, quitting",
207 "memory usage %s > %s after gc, quitting",
207 _format_data_size(mem_usage), _format_data_size(memory_max_usage))
208 _format_data_size(mem_usage), _format_data_size(memory_max_usage))
208 # This will cause worker to auto-restart itself
209 # This will cause worker to auto-restart itself
209 worker.alive = False
210 worker.alive = False
210 worker._last_memory_check_time = time.time()
211 worker._last_memory_check_time = time.time()
211
212
212
213
213 def worker_int(worker):
214 def worker_int(worker):
214 worker.log.info("[%-10s] worker received INT or QUIT signal", worker.pid)
215 worker.log.info("[%-10s] worker received INT or QUIT signal", worker.pid)
215
216
216 # get traceback info, on worker crash
217 # get traceback info, on worker crash
217 id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
218 id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
218 code = []
219 code = []
219 for thread_id, stack in sys._current_frames().items():
220 for thread_id, stack in sys._current_frames().items():
220 code.append(
221 code.append(
221 "\n# Thread: %s(%d)" % (id2name.get(thread_id, ""), thread_id))
222 "\n# Thread: %s(%d)" % (id2name.get(thread_id, ""), thread_id))
222 for fname, lineno, name, line in traceback.extract_stack(stack):
223 for fname, lineno, name, line in traceback.extract_stack(stack):
223 code.append('File: "%s", line %d, in %s' % (fname, lineno, name))
224 code.append('File: "%s", line %d, in %s' % (fname, lineno, name))
224 if line:
225 if line:
225 code.append(" %s" % (line.strip()))
226 code.append(" %s" % (line.strip()))
226 worker.log.debug("\n".join(code))
227 worker.log.debug("\n".join(code))
227
228
228
229
229 def worker_abort(worker):
230 def worker_abort(worker):
230 worker.log.info("[%-10s] worker received SIGABRT signal", worker.pid)
231 worker.log.info("[%-10s] worker received SIGABRT signal", worker.pid)
231
232
232
233
233 def worker_exit(server, worker):
234 def worker_exit(server, worker):
234 worker.log.info("[%-10s] worker exit", worker.pid)
235 worker.log.info("[%-10s] worker exit", worker.pid)
235
236
236
237
237 def child_exit(server, worker):
238 def child_exit(server, worker):
238 worker.log.info("[%-10s] worker child exit", worker.pid)
239 worker.log.info("[%-10s] worker child exit", worker.pid)
239
240
240
241
241 def pre_request(worker, req):
242 def pre_request(worker, req):
242 worker.start_time = time.time()
243 worker.start_time = time.time()
243 worker.log.debug(
244 worker.log.debug(
244 "GNCRN PRE WORKER [cnt:%s]: %s %s", worker.nr, req.method, req.path)
245 "GNCRN PRE WORKER [cnt:%s]: %s %s", worker.nr, req.method, req.path)
245
246
246
247
247 def post_request(worker, req, environ, resp):
248 def post_request(worker, req, environ, resp):
248 total_time = time.time() - worker.start_time
249 total_time = time.time() - worker.start_time
249 # Gunicorn sometimes has problems with reading the status_code
250 # Gunicorn sometimes has problems with reading the status_code
250 status_code = getattr(resp, 'status_code', '')
251 status_code = getattr(resp, 'status_code', '')
251 worker.log.debug(
252 worker.log.debug(
252 "GNCRN POST WORKER [cnt:%s]: %s %s resp: %s, Load Time: %.4fs",
253 "GNCRN POST WORKER [cnt:%s]: %s %s resp: %s, Load Time: %.4fs",
253 worker.nr, req.method, req.path, status_code, total_time)
254 worker.nr, req.method, req.path, status_code, total_time)
254 _check_memory_usage(worker)
255 _check_memory_usage(worker)
255
256
256
257
258 def _filter_proxy(ip):
259 """
260 Passed in IP addresses in HEADERS can be in a special format of multiple
261 ips. Those comma separated IPs are passed from various proxies in the
262 chain of request processing. The left-most being the original client.
263 We only care about the first IP which came from the org. client.
264
265 :param ip: ip string from headers
266 """
267 if ',' in ip:
268 _ips = ip.split(',')
269 _first_ip = _ips[0].strip()
270 return _first_ip
271 return ip
272
273
274 def _filter_port(ip):
275 """
276 Removes a port from ip, there are 4 main cases to handle here.
277 - ipv4 eg. 127.0.0.1
278 - ipv6 eg. ::1
279 - ipv4+port eg. 127.0.0.1:8080
280 - ipv6+port eg. [::1]:8080
281
282 :param ip:
283 """
284 def is_ipv6(ip_addr):
285 if hasattr(socket, 'inet_pton'):
286 try:
287 socket.inet_pton(socket.AF_INET6, ip_addr)
288 except socket.error:
289 return False
290 else:
291 return False
292 return True
293
294 if ':' not in ip: # must be ipv4 pure ip
295 return ip
296
297 if '[' in ip and ']' in ip: # ipv6 with port
298 return ip.split(']')[0][1:].lower()
299
300 # must be ipv6 or ipv4 with port
301 if is_ipv6(ip):
302 return ip
303 else:
304 ip, _port = ip.split(':')[:2] # means ipv4+port
305 return ip
306
307
308 def get_ip_addr(environ):
309 proxy_key = 'HTTP_X_REAL_IP'
310 proxy_key2 = 'HTTP_X_FORWARDED_FOR'
311 def_key = 'REMOTE_ADDR'
312 _filters = lambda x: _filter_port(_filter_proxy(x))
313
314 ip = environ.get(proxy_key)
315 if ip:
316 return _filters(ip)
317
318 ip = environ.get(proxy_key2)
319 if ip:
320 return _filters(ip)
321
322 ip = environ.get(def_key, '0.0.0.0')
323 return _filters(ip)
324
325
257 class RhodeCodeLogger(Logger):
326 class RhodeCodeLogger(Logger):
258 """
327 """
259 Custom Logger that allows some customization that gunicorn doesn't allow
328 Custom Logger that allows some customization that gunicorn doesn't allow
260 """
329 """
261
330
262 datefmt = r"%Y-%m-%d %H:%M:%S"
331 datefmt = r"%Y-%m-%d %H:%M:%S"
263
332
264 def __init__(self, cfg):
333 def __init__(self, cfg):
265 Logger.__init__(self, cfg)
334 Logger.__init__(self, cfg)
266
335
267 def now(self):
336 def now(self):
268 """ return date in RhodeCode Log format """
337 """ return date in RhodeCode Log format """
269 now = time.time()
338 now = time.time()
270 msecs = int((now - long(now)) * 1000)
339 msecs = int((now - long(now)) * 1000)
271 return time.strftime(self.datefmt, time.localtime(now)) + '.{0:03d}'.format(msecs)
340 return time.strftime(self.datefmt, time.localtime(now)) + '.{0:03d}'.format(msecs)
272
341
342 def atoms(self, resp, req, environ, request_time):
343 """ Gets atoms for log formatting.
344 """
345 status = resp.status
346 if isinstance(status, str):
347 status = status.split(None, 1)[0]
348 atoms = {
349 'h': get_ip_addr(environ),
350 'l': '-',
351 'u': self._get_user(environ) or '-',
352 't': self.now(),
353 'r': "%s %s %s" % (environ['REQUEST_METHOD'],
354 environ['RAW_URI'],
355 environ["SERVER_PROTOCOL"]),
356 's': status,
357 'm': environ.get('REQUEST_METHOD'),
358 'U': environ.get('PATH_INFO'),
359 'q': environ.get('QUERY_STRING'),
360 'H': environ.get('SERVER_PROTOCOL'),
361 'b': getattr(resp, 'sent', None) is not None and str(resp.sent) or '-',
362 'B': getattr(resp, 'sent', None),
363 'f': environ.get('HTTP_REFERER', '-'),
364 'a': environ.get('HTTP_USER_AGENT', '-'),
365 'T': request_time.seconds,
366 'D': (request_time.seconds * 1000000) + request_time.microseconds,
367 'M': (request_time.seconds * 1000) + int(request_time.microseconds/1000),
368 'L': "%d.%06d" % (request_time.seconds, request_time.microseconds),
369 'p': "<%s>" % os.getpid()
370 }
371
372 # add request headers
373 if hasattr(req, 'headers'):
374 req_headers = req.headers
375 else:
376 req_headers = req
377
378 if hasattr(req_headers, "items"):
379 req_headers = req_headers.items()
380
381 atoms.update({"{%s}i" % k.lower(): v for k, v in req_headers})
382
383 resp_headers = resp.headers
384 if hasattr(resp_headers, "items"):
385 resp_headers = resp_headers.items()
386
387 # add response headers
388 atoms.update({"{%s}o" % k.lower(): v for k, v in resp_headers})
389
390 # add environ variables
391 environ_variables = environ.items()
392 atoms.update({"{%s}e" % k.lower(): v for k, v in environ_variables})
393
394 return atoms
273
395
274 logger_class = RhodeCodeLogger
396 logger_class = RhodeCodeLogger
General Comments 0
You need to be logged in to leave comments. Login now