##// END OF EJS Templates
gunicorn: fixed config syntax
super-admin -
r4925:310a1c9d default
parent child Browse files
Show More
@@ -1,396 +1,393 b''
1 1 """
2 2 Gunicorn config extension and hooks. This config file adds some extra settings and memory management.
3 3 Gunicorn configuration should be managed by .ini files entries of RhodeCode or VCSServer
4 4 """
5 5
6 6 import gc
7 7 import os
8 8 import sys
9 9 import math
10 10 import time
11 11 import threading
12 12 import traceback
13 13 import random
14 14 import socket
15 15 from gunicorn.glogging import Logger
16 16
17 17
18 18 def get_workers():
19 19 import multiprocessing
20 20 return multiprocessing.cpu_count() * 2 + 1
21 21
22 22 # GLOBAL
23 23 errorlog = '-'
24 24 accesslog = '-'
25 25
26 26
27 27 # SERVER MECHANICS
28 28 # None == system temp dir
29 29 # worker_tmp_dir is recommended to be set to some tmpfs
30 30 worker_tmp_dir = None
31 31 tmp_upload_dir = None
32 32
33 33 #reuse_port = True
34 34
35 35 # Custom log format
36 36 #access_log_format = (
37 37 # '%(t)s %(p)s INFO [GNCRN] %(h)-15s rqt:%(L)s %(s)s %(b)-6s "%(m)s:%(U)s %(q)s" usr:%(u)s "%(f)s" "%(a)s"')
38 38
39 39 # loki format for easier parsing in grafana
40 40 access_log_format = (
41 41 'time="%(t)s" pid=%(p)s level="INFO" type="[GNCRN]" ip="%(h)-15s" rqt="%(L)s" response_code="%(s)s" response_bytes="%(b)-6s" uri="%(m)s:%(U)s %(q)s" user=":%(u)s" user_agent="%(a)s"')
42 42
43 43 # self adjust workers based on CPU count
44 44 # workers = get_workers()
45 45
46 46
47 47 def _get_process_rss(pid=None):
48 48 try:
49 49 import psutil
50 50 if pid:
51 51 proc = psutil.Process(pid)
52 52 else:
53 53 proc = psutil.Process()
54 54 return proc.memory_info().rss
55 55 except Exception:
56 56 return None
57 57
58 58
59 59 def _get_config(ini_path):
60 import configparser
60 61
61 62 try:
62 import configparser
63 except ImportError:
64 import ConfigParser as configparser
65 try:
66 63 config = configparser.RawConfigParser()
67 64 config.read(ini_path)
68 65 return config
69 66 except Exception:
70 67 return None
71 68
72 69
73 70 def _time_with_offset(memory_usage_check_interval):
74 71 return time.time() - random.randint(0, memory_usage_check_interval/2.0)
75 72
76 73
77 74 def pre_fork(server, worker):
78 75 pass
79 76
80 77
81 78 def post_fork(server, worker):
82 79
83 80 # memory spec defaults
84 81 _memory_max_usage = 0
85 82 _memory_usage_check_interval = 60
86 83 _memory_usage_recovery_threshold = 0.8
87 84
88 85 ini_path = os.path.abspath(server.cfg.paste)
89 86 conf = _get_config(ini_path)
90 87
91 88 section = 'server:main'
92 89 if conf and conf.has_section(section):
93 90
94 91 if conf.has_option(section, 'memory_max_usage'):
95 92 _memory_max_usage = conf.getint(section, 'memory_max_usage')
96 93
97 94 if conf.has_option(section, 'memory_usage_check_interval'):
98 95 _memory_usage_check_interval = conf.getint(section, 'memory_usage_check_interval')
99 96
100 97 if conf.has_option(section, 'memory_usage_recovery_threshold'):
101 98 _memory_usage_recovery_threshold = conf.getfloat(section, 'memory_usage_recovery_threshold')
102 99
103 100 worker._memory_max_usage = int(os.environ.get('RC_GUNICORN_MEMORY_MAX_USAGE', '')
104 101 or _memory_max_usage)
105 102 worker._memory_usage_check_interval = int(os.environ.get('RC_GUNICORN_MEMORY_USAGE_CHECK_INTERVAL', '')
106 103 or _memory_usage_check_interval)
107 104 worker._memory_usage_recovery_threshold = float(os.environ.get('RC_GUNICORN_MEMORY_USAGE_RECOVERY_THRESHOLD', '')
108 105 or _memory_usage_recovery_threshold)
109 106
110 107 # register memory last check time, with some random offset so we don't recycle all
111 108 # at once
112 109 worker._last_memory_check_time = _time_with_offset(_memory_usage_check_interval)
113 110
114 111 if _memory_max_usage:
115 112 server.log.info("[%-10s] WORKER spawned with max memory set at %s", worker.pid,
116 113 _format_data_size(_memory_max_usage))
117 114 else:
118 115 server.log.info("[%-10s] WORKER spawned", worker.pid)
119 116
120 117
121 118 def pre_exec(server):
122 119 server.log.info("Forked child, re-executing.")
123 120
124 121
125 122 def on_starting(server):
126 123 server_lbl = '{} {}'.format(server.proc_name, server.address)
127 124 server.log.info("Server %s is starting.", server_lbl)
128 125
129 126
130 127 def when_ready(server):
131 128 server.log.info("Server %s is ready. Spawning workers", server)
132 129
133 130
134 131 def on_reload(server):
135 132 pass
136 133
137 134
138 135 def _format_data_size(size, unit="B", precision=1, binary=True):
139 136 """Format a number using SI units (kilo, mega, etc.).
140 137
141 138 ``size``: The number as a float or int.
142 139
143 140 ``unit``: The unit name in plural form. Examples: "bytes", "B".
144 141
145 142 ``precision``: How many digits to the right of the decimal point. Default
146 143 is 1. 0 suppresses the decimal point.
147 144
148 145 ``binary``: If false, use base-10 decimal prefixes (kilo = K = 1000).
149 146 If true, use base-2 binary prefixes (kibi = Ki = 1024).
150 147
151 148 ``full_name``: If false (default), use the prefix abbreviation ("k" or
152 149 "Ki"). If true, use the full prefix ("kilo" or "kibi"). If false,
153 150 use abbreviation ("k" or "Ki").
154 151
155 152 """
156 153
157 154 if not binary:
158 155 base = 1000
159 156 multiples = ('', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
160 157 else:
161 158 base = 1024
162 159 multiples = ('', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi', 'Yi')
163 160
164 161 sign = ""
165 162 if size > 0:
166 163 m = int(math.log(size, base))
167 164 elif size < 0:
168 165 sign = "-"
169 166 size = -size
170 167 m = int(math.log(size, base))
171 168 else:
172 169 m = 0
173 170 if m > 8:
174 171 m = 8
175 172
176 173 if m == 0:
177 174 precision = '%.0f'
178 175 else:
179 176 precision = '%%.%df' % precision
180 177
181 178 size = precision % (size / math.pow(base, m))
182 179
183 180 return '%s%s %s%s' % (sign, size.strip(), multiples[m], unit)
184 181
185 182
186 183 def _check_memory_usage(worker):
187 184 memory_max_usage = worker._memory_max_usage
188 185 if not memory_max_usage:
189 186 return
190 187
191 188 memory_usage_check_interval = worker._memory_usage_check_interval
192 189 memory_usage_recovery_threshold = memory_max_usage * worker._memory_usage_recovery_threshold
193 190
194 191 elapsed = time.time() - worker._last_memory_check_time
195 192 if elapsed > memory_usage_check_interval:
196 193 mem_usage = _get_process_rss()
197 194 if mem_usage and mem_usage > memory_max_usage:
198 195 worker.log.info(
199 196 "memory usage %s > %s, forcing gc",
200 197 _format_data_size(mem_usage), _format_data_size(memory_max_usage))
201 198 # Try to clean it up by forcing a full collection.
202 199 gc.collect()
203 200 mem_usage = _get_process_rss()
204 201 if mem_usage > memory_usage_recovery_threshold:
205 202 # Didn't clean up enough, we'll have to terminate.
206 203 worker.log.warning(
207 204 "memory usage %s > %s after gc, quitting",
208 205 _format_data_size(mem_usage), _format_data_size(memory_max_usage))
209 206 # This will cause worker to auto-restart itself
210 207 worker.alive = False
211 208 worker._last_memory_check_time = time.time()
212 209
213 210
214 211 def worker_int(worker):
215 212 worker.log.info("[%-10s] worker received INT or QUIT signal", worker.pid)
216 213
217 214 # get traceback info, on worker crash
218 215 id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
219 216 code = []
220 217 for thread_id, stack in sys._current_frames().items():
221 218 code.append(
222 219 "\n# Thread: %s(%d)" % (id2name.get(thread_id, ""), thread_id))
223 220 for fname, lineno, name, line in traceback.extract_stack(stack):
224 221 code.append('File: "%s", line %d, in %s' % (fname, lineno, name))
225 222 if line:
226 223 code.append(" %s" % (line.strip()))
227 224 worker.log.debug("\n".join(code))
228 225
229 226
230 227 def worker_abort(worker):
231 228 worker.log.info("[%-10s] worker received SIGABRT signal", worker.pid)
232 229
233 230
234 231 def worker_exit(server, worker):
235 232 worker.log.info("[%-10s] worker exit", worker.pid)
236 233
237 234
238 235 def child_exit(server, worker):
239 236 worker.log.info("[%-10s] worker child exit", worker.pid)
240 237
241 238
242 239 def pre_request(worker, req):
243 240 worker.start_time = time.time()
244 241 worker.log.debug(
245 242 "GNCRN PRE WORKER [cnt:%s]: %s %s", worker.nr, req.method, req.path)
246 243
247 244
248 245 def post_request(worker, req, environ, resp):
249 246 total_time = time.time() - worker.start_time
250 247 # Gunicorn sometimes has problems with reading the status_code
251 248 status_code = getattr(resp, 'status_code', '')
252 249 worker.log.debug(
253 250 "GNCRN POST WORKER [cnt:%s]: %s %s resp: %s, Load Time: %.4fs",
254 251 worker.nr, req.method, req.path, status_code, total_time)
255 252 _check_memory_usage(worker)
256 253
257 254
258 255 def _filter_proxy(ip):
259 256 """
260 257 Passed in IP addresses in HEADERS can be in a special format of multiple
261 258 ips. Those comma separated IPs are passed from various proxies in the
262 259 chain of request processing. The left-most being the original client.
263 260 We only care about the first IP which came from the org. client.
264 261
265 262 :param ip: ip string from headers
266 263 """
267 264 if ',' in ip:
268 265 _ips = ip.split(',')
269 266 _first_ip = _ips[0].strip()
270 267 return _first_ip
271 268 return ip
272 269
273 270
274 271 def _filter_port(ip):
275 272 """
276 273 Removes a port from ip, there are 4 main cases to handle here.
277 274 - ipv4 eg. 127.0.0.1
278 275 - ipv6 eg. ::1
279 276 - ipv4+port eg. 127.0.0.1:8080
280 277 - ipv6+port eg. [::1]:8080
281 278
282 279 :param ip:
283 280 """
284 281 def is_ipv6(ip_addr):
285 282 if hasattr(socket, 'inet_pton'):
286 283 try:
287 284 socket.inet_pton(socket.AF_INET6, ip_addr)
288 285 except socket.error:
289 286 return False
290 287 else:
291 288 return False
292 289 return True
293 290
294 291 if ':' not in ip: # must be ipv4 pure ip
295 292 return ip
296 293
297 294 if '[' in ip and ']' in ip: # ipv6 with port
298 295 return ip.split(']')[0][1:].lower()
299 296
300 297 # must be ipv6 or ipv4 with port
301 298 if is_ipv6(ip):
302 299 return ip
303 300 else:
304 301 ip, _port = ip.split(':')[:2] # means ipv4+port
305 302 return ip
306 303
307 304
308 305 def get_ip_addr(environ):
309 306 proxy_key = 'HTTP_X_REAL_IP'
310 307 proxy_key2 = 'HTTP_X_FORWARDED_FOR'
311 308 def_key = 'REMOTE_ADDR'
312 309 _filters = lambda x: _filter_port(_filter_proxy(x))
313 310
314 311 ip = environ.get(proxy_key)
315 312 if ip:
316 313 return _filters(ip)
317 314
318 315 ip = environ.get(proxy_key2)
319 316 if ip:
320 317 return _filters(ip)
321 318
322 319 ip = environ.get(def_key, '0.0.0.0')
323 320 return _filters(ip)
324 321
325 322
326 323 class RhodeCodeLogger(Logger):
327 324 """
328 325 Custom Logger that allows some customization that gunicorn doesn't allow
329 326 """
330 327
331 328 datefmt = r"%Y-%m-%d %H:%M:%S"
332 329
333 330 def __init__(self, cfg):
334 331 Logger.__init__(self, cfg)
335 332
336 333 def now(self):
337 334 """ return date in RhodeCode Log format """
338 335 now = time.time()
339 msecs = int((now - long(now)) * 1000)
336 msecs = int((now - int(now)) * 1000)
340 337 return time.strftime(self.datefmt, time.localtime(now)) + '.{0:03d}'.format(msecs)
341 338
342 339 def atoms(self, resp, req, environ, request_time):
343 340 """ Gets atoms for log formatting.
344 341 """
345 342 status = resp.status
346 343 if isinstance(status, str):
347 344 status = status.split(None, 1)[0]
348 345 atoms = {
349 346 'h': get_ip_addr(environ),
350 347 'l': '-',
351 348 'u': self._get_user(environ) or '-',
352 349 't': self.now(),
353 350 'r': "%s %s %s" % (environ['REQUEST_METHOD'],
354 351 environ['RAW_URI'],
355 352 environ["SERVER_PROTOCOL"]),
356 353 's': status,
357 354 'm': environ.get('REQUEST_METHOD'),
358 355 'U': environ.get('PATH_INFO'),
359 356 'q': environ.get('QUERY_STRING'),
360 357 'H': environ.get('SERVER_PROTOCOL'),
361 358 'b': getattr(resp, 'sent', None) is not None and str(resp.sent) or '-',
362 359 'B': getattr(resp, 'sent', None),
363 360 'f': environ.get('HTTP_REFERER', '-'),
364 361 'a': environ.get('HTTP_USER_AGENT', '-'),
365 362 'T': request_time.seconds,
366 363 'D': (request_time.seconds * 1000000) + request_time.microseconds,
367 364 'M': (request_time.seconds * 1000) + int(request_time.microseconds/1000),
368 365 'L': "%d.%06d" % (request_time.seconds, request_time.microseconds),
369 366 'p': "<%s>" % os.getpid()
370 367 }
371 368
372 369 # add request headers
373 370 if hasattr(req, 'headers'):
374 371 req_headers = req.headers
375 372 else:
376 373 req_headers = req
377 374
378 375 if hasattr(req_headers, "items"):
379 376 req_headers = req_headers.items()
380 377
381 378 atoms.update({"{%s}i" % k.lower(): v for k, v in req_headers})
382 379
383 380 resp_headers = resp.headers
384 381 if hasattr(resp_headers, "items"):
385 382 resp_headers = resp_headers.items()
386 383
387 384 # add response headers
388 385 atoms.update({"{%s}o" % k.lower(): v for k, v in resp_headers})
389 386
390 387 # add environ variables
391 388 environ_variables = environ.items()
392 389 atoms.update({"{%s}e" % k.lower(): v for k, v in environ_variables})
393 390
394 391 return atoms
395 392
396 393 logger_class = RhodeCodeLogger
General Comments 0
You need to be logged in to leave comments. Login now