##// END OF EJS Templates
feat(configs): allow json log format for gunicorn
super-admin -
r5502:bff5e272 default
parent child Browse files
Show More
@@ -1,520 +1,545 b''
1 1 """
2 2 Gunicorn config extension and hooks. This config file adds some extra settings and memory management.
3 3 Gunicorn configuration should be managed by .ini files entries of RhodeCode or VCSServer
4 4 """
5 5
6 6 import gc
7 7 import os
8 8 import sys
9 9 import math
10 10 import time
11 11 import threading
12 12 import traceback
13 13 import random
14 14 import socket
15 15 import dataclasses
16 import json
16 17 from gunicorn.glogging import Logger
17 18
18 19
19 20 def get_workers():
20 21 import multiprocessing
21 22 return multiprocessing.cpu_count() * 2 + 1
22 23
23 24
24 25 bind = "127.0.0.1:10020"
25 26
26 27
27 28 # Error logging output for gunicorn (-) is stdout
28 29 errorlog = '-'
29 30
30 31 # Access logging output for gunicorn (-) is stdout
31 32 accesslog = '-'
32 33
33 34
34 35 # SERVER MECHANICS
35 36 # None == system temp dir
36 37 # worker_tmp_dir is recommended to be set to some tmpfs
37 38 worker_tmp_dir = None
38 39 tmp_upload_dir = None
39 40
40 # use re-use port logic
41 #reuse_port = True
41 # use re-use port logic to let linux internals load-balance the requests better.
42 reuse_port = True
42 43
43 44 # Custom log format
44 45 #access_log_format = (
45 46 # '%(t)s %(p)s INFO [GNCRN] %(h)-15s rqt:%(L)s %(s)s %(b)-6s "%(m)s:%(U)s %(q)s" usr:%(u)s "%(f)s" "%(a)s"')
46 47
47 48 # loki format for easier parsing in grafana
48 access_log_format = (
49 loki_access_log_format = (
49 50 'time="%(t)s" pid=%(p)s level="INFO" type="[GNCRN]" ip="%(h)-15s" rqt="%(L)s" response_code="%(s)s" response_bytes="%(b)-6s" uri="%(m)s:%(U)s %(q)s" user=":%(u)s" user_agent="%(a)s"')
50 51
52 # JSON format
53 json_access_log_format = json.dumps({
54 'time': r'%(t)s',
55 'pid': r'%(p)s',
56 'level': 'INFO',
57 'ip': r'%(h)s',
58 'request_time': r'%(L)s',
59 'remote_address': r'%(h)s',
60 'user_name': r'%(u)s',
61 'status': r'%(s)s',
62 'method': r'%(m)s',
63 'url_path': r'%(U)s',
64 'query_string': r'%(q)s',
65 'protocol': r'%(H)s',
66 'response_length': r'%(B)s',
67 'referer': r'%(f)s',
68 'user_agent': r'%(a)s',
69
70 })
71
72 access_log_format = loki_access_log_format
73 if os.environ.get('RC_LOGGING_FORMATTER') == 'json':
74 access_log_format = json_access_log_format
75
51 76 # self adjust workers based on CPU count, to use maximum of CPU and not overquota the resources
52 77 # workers = get_workers()
53 78
54 79 # Gunicorn access log level
55 80 loglevel = 'info'
56 81
57 82 # Process name visible in a process list
58 83 proc_name = 'rhodecode_enterprise'
59 84
60 85 # Type of worker class, one of `sync`, `gevent` or `gthread`
61 86 # currently `sync` is the only option allowed for vcsserver and for rhodecode all of 3 are allowed
62 87 # gevent:
63 88 # In this case, the maximum number of concurrent requests is (N workers * X worker_connections)
64 89 # e.g. workers =3 worker_connections=10 = 3*10, 30 concurrent requests can be handled
65 90 # gthread:
66 91 # In this case, the maximum number of concurrent requests is (N workers * X threads)
67 92 # e.g. workers = 3 threads=3 = 3*3, 9 concurrent requests can be handled
68 93 worker_class = 'gthread'
69 94
70 95 # Sets the number of process workers. More workers means more concurrent connections
71 96 # RhodeCode can handle at the same time. Each additional worker also it increases
72 97 # memory usage as each has its own set of caches.
73 98 # The Recommended value is (2 * NUMBER_OF_CPUS + 1), eg 2CPU = 5 workers, but no more
74 99 # than 8-10 unless for huge deployments .e.g 700-1000 users.
75 100 # `instance_id = *` must be set in the [app:main] section below (which is the default)
76 101 # when using more than 1 worker.
77 102 workers = 2
78 103
79 104 # Threads numbers for worker class gthread
80 105 threads = 1
81 106
82 107 # The maximum number of simultaneous clients. Valid only for gevent
83 108 # In this case, the maximum number of concurrent requests is (N workers * X worker_connections)
84 109 # e.g workers =3 worker_connections=10 = 3*10, 30 concurrent requests can be handled
85 110 worker_connections = 10
86 111
87 112 # Max number of requests that worker will handle before being gracefully restarted.
88 113 # Prevents memory leaks, jitter adds variability so not all workers are restarted at once.
89 114 max_requests = 2000
90 115 max_requests_jitter = int(max_requests * 0.2) # 20% of max_requests
91 116
92 117 # The maximum number of pending connections.
93 118 # Exceeding this number results in the client getting an error when attempting to connect.
94 119 backlog = 64
95 120
96 121 # The Amount of time a worker can spend with handling a request before it
97 122 # gets killed and restarted. By default, set to 21600 (6hrs)
98 123 # Examples: 1800 (30min), 3600 (1hr), 7200 (2hr), 43200 (12h)
99 124 timeout = 21600
100 125
101 126 # The maximum size of HTTP request line in bytes.
102 127 # 0 for unlimited
103 128 limit_request_line = 0
104 129
105 130 # Limit the number of HTTP headers fields in a request.
106 131 # By default this value is 100 and can't be larger than 32768.
107 132 limit_request_fields = 32768
108 133
109 134 # Limit the allowed size of an HTTP request header field.
110 135 # Value is a positive number or 0.
111 136 # Setting it to 0 will allow unlimited header field sizes.
112 137 limit_request_field_size = 0
113 138
114 139 # Timeout for graceful workers restart.
115 140 # After receiving a restart signal, workers have this much time to finish
116 141 # serving requests. Workers still alive after the timeout (starting from the
117 142 # receipt of the restart signal) are force killed.
118 143 # Examples: 1800 (30min), 3600 (1hr), 7200 (2hr), 43200 (12h)
119 144 graceful_timeout = 21600
120 145
121 146 # The number of seconds to wait for requests on a Keep-Alive connection.
122 147 # Generally set in the 1-5 seconds range.
123 148 keepalive = 2
124 149
125 150 # Maximum memory usage that each worker can use before it will receive a
126 151 # graceful restart signal 0 = memory monitoring is disabled
127 152 # Examples: 268435456 (256MB), 536870912 (512MB)
128 153 # 1073741824 (1GB), 2147483648 (2GB), 4294967296 (4GB)
129 154 # Dynamic formula 1024 * 1024 * 256 == 256MBs
130 155 memory_max_usage = 0
131 156
132 157 # How often in seconds to check for memory usage for each gunicorn worker
133 158 memory_usage_check_interval = 60
134 159
135 160 # Threshold value for which we don't recycle worker if GarbageCollection
136 161 # frees up enough resources. Before each restart, we try to run GC on worker
137 162 # in case we get enough free memory after that; restart will not happen.
138 163 memory_usage_recovery_threshold = 0.8
139 164
140 165
141 166 @dataclasses.dataclass
142 167 class MemoryCheckConfig:
143 168 max_usage: int
144 169 check_interval: int
145 170 recovery_threshold: float
146 171
147 172
148 173 def _get_process_rss(pid=None):
149 174 try:
150 175 import psutil
151 176 if pid:
152 177 proc = psutil.Process(pid)
153 178 else:
154 179 proc = psutil.Process()
155 180 return proc.memory_info().rss
156 181 except Exception:
157 182 return None
158 183
159 184
160 185 def _get_config(ini_path):
161 186 import configparser
162 187
163 188 try:
164 189 config = configparser.RawConfigParser()
165 190 config.read(ini_path)
166 191 return config
167 192 except Exception:
168 193 return None
169 194
170 195
171 196 def get_memory_usage_params(config=None):
172 197 # memory spec defaults
173 198 _memory_max_usage = memory_max_usage
174 199 _memory_usage_check_interval = memory_usage_check_interval
175 200 _memory_usage_recovery_threshold = memory_usage_recovery_threshold
176 201
177 202 if config:
178 203 ini_path = os.path.abspath(config)
179 204 conf = _get_config(ini_path)
180 205
181 206 section = 'server:main'
182 207 if conf and conf.has_section(section):
183 208
184 209 if conf.has_option(section, 'memory_max_usage'):
185 210 _memory_max_usage = conf.getint(section, 'memory_max_usage')
186 211
187 212 if conf.has_option(section, 'memory_usage_check_interval'):
188 213 _memory_usage_check_interval = conf.getint(section, 'memory_usage_check_interval')
189 214
190 215 if conf.has_option(section, 'memory_usage_recovery_threshold'):
191 216 _memory_usage_recovery_threshold = conf.getfloat(section, 'memory_usage_recovery_threshold')
192 217
193 218 _memory_max_usage = int(os.environ.get('RC_GUNICORN_MEMORY_MAX_USAGE', '')
194 219 or _memory_max_usage)
195 220 _memory_usage_check_interval = int(os.environ.get('RC_GUNICORN_MEMORY_USAGE_CHECK_INTERVAL', '')
196 221 or _memory_usage_check_interval)
197 222 _memory_usage_recovery_threshold = float(os.environ.get('RC_GUNICORN_MEMORY_USAGE_RECOVERY_THRESHOLD', '')
198 223 or _memory_usage_recovery_threshold)
199 224
200 225 return MemoryCheckConfig(_memory_max_usage, _memory_usage_check_interval, _memory_usage_recovery_threshold)
201 226
202 227
203 228 def _time_with_offset(check_interval):
204 229 return time.time() - random.randint(0, check_interval/2.0)
205 230
206 231
207 232 def pre_fork(server, worker):
208 233 pass
209 234
210 235
211 236 def post_fork(server, worker):
212 237
213 238 memory_conf = get_memory_usage_params()
214 239 _memory_max_usage = memory_conf.max_usage
215 240 _memory_usage_check_interval = memory_conf.check_interval
216 241 _memory_usage_recovery_threshold = memory_conf.recovery_threshold
217 242
218 243 worker._memory_max_usage = int(os.environ.get('RC_GUNICORN_MEMORY_MAX_USAGE', '')
219 244 or _memory_max_usage)
220 245 worker._memory_usage_check_interval = int(os.environ.get('RC_GUNICORN_MEMORY_USAGE_CHECK_INTERVAL', '')
221 246 or _memory_usage_check_interval)
222 247 worker._memory_usage_recovery_threshold = float(os.environ.get('RC_GUNICORN_MEMORY_USAGE_RECOVERY_THRESHOLD', '')
223 248 or _memory_usage_recovery_threshold)
224 249
225 250 # register memory last check time, with some random offset so we don't recycle all
226 251 # at once
227 252 worker._last_memory_check_time = _time_with_offset(_memory_usage_check_interval)
228 253
229 254 if _memory_max_usage:
230 255 server.log.info("pid=[%-10s] WORKER spawned with max memory set at %s", worker.pid,
231 256 _format_data_size(_memory_max_usage))
232 257 else:
233 258 server.log.info("pid=[%-10s] WORKER spawned", worker.pid)
234 259
235 260
236 261 def pre_exec(server):
237 262 server.log.info("Forked child, re-executing.")
238 263
239 264
240 265 def on_starting(server):
241 266 server_lbl = '{} {}'.format(server.proc_name, server.address)
242 267 server.log.info("Server %s is starting.", server_lbl)
243 268 server.log.info('Config:')
244 269 server.log.info(f"\n{server.cfg}")
245 270 server.log.info(get_memory_usage_params())
246 271
247 272
248 273 def when_ready(server):
249 274 server.log.info("Server %s is ready. Spawning workers", server)
250 275
251 276
252 277 def on_reload(server):
253 278 pass
254 279
255 280
256 281 def _format_data_size(size, unit="B", precision=1, binary=True):
257 282 """Format a number using SI units (kilo, mega, etc.).
258 283
259 284 ``size``: The number as a float or int.
260 285
261 286 ``unit``: The unit name in plural form. Examples: "bytes", "B".
262 287
263 288 ``precision``: How many digits to the right of the decimal point. Default
264 289 is 1. 0 suppresses the decimal point.
265 290
266 291 ``binary``: If false, use base-10 decimal prefixes (kilo = K = 1000).
267 292 If true, use base-2 binary prefixes (kibi = Ki = 1024).
268 293
269 294 ``full_name``: If false (default), use the prefix abbreviation ("k" or
270 295 "Ki"). If true, use the full prefix ("kilo" or "kibi"). If false,
271 296 use abbreviation ("k" or "Ki").
272 297
273 298 """
274 299
275 300 if not binary:
276 301 base = 1000
277 302 multiples = ('', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
278 303 else:
279 304 base = 1024
280 305 multiples = ('', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi', 'Yi')
281 306
282 307 sign = ""
283 308 if size > 0:
284 309 m = int(math.log(size, base))
285 310 elif size < 0:
286 311 sign = "-"
287 312 size = -size
288 313 m = int(math.log(size, base))
289 314 else:
290 315 m = 0
291 316 if m > 8:
292 317 m = 8
293 318
294 319 if m == 0:
295 320 precision = '%.0f'
296 321 else:
297 322 precision = '%%.%df' % precision
298 323
299 324 size = precision % (size / math.pow(base, m))
300 325
301 326 return '%s%s %s%s' % (sign, size.strip(), multiples[m], unit)
302 327
303 328
304 329 def _check_memory_usage(worker):
305 330 _memory_max_usage = worker._memory_max_usage
306 331 if not _memory_max_usage:
307 332 return
308 333
309 334 _memory_usage_check_interval = worker._memory_usage_check_interval
310 335 _memory_usage_recovery_threshold = memory_max_usage * worker._memory_usage_recovery_threshold
311 336
312 337 elapsed = time.time() - worker._last_memory_check_time
313 338 if elapsed > _memory_usage_check_interval:
314 339 mem_usage = _get_process_rss()
315 340 if mem_usage and mem_usage > _memory_max_usage:
316 341 worker.log.info(
317 342 "memory usage %s > %s, forcing gc",
318 343 _format_data_size(mem_usage), _format_data_size(_memory_max_usage))
319 344 # Try to clean it up by forcing a full collection.
320 345 gc.collect()
321 346 mem_usage = _get_process_rss()
322 347 if mem_usage > _memory_usage_recovery_threshold:
323 348 # Didn't clean up enough, we'll have to terminate.
324 349 worker.log.warning(
325 350 "memory usage %s > %s after gc, quitting",
326 351 _format_data_size(mem_usage), _format_data_size(_memory_max_usage))
327 352 # This will cause worker to auto-restart itself
328 353 worker.alive = False
329 354 worker._last_memory_check_time = time.time()
330 355
331 356
332 357 def worker_int(worker):
333 358 worker.log.info("pid=[%-10s] worker received INT or QUIT signal", worker.pid)
334 359
335 360 # get traceback info, when a worker crashes
336 361 def get_thread_id(t_id):
337 362 id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
338 363 return id2name.get(t_id, "unknown_thread_id")
339 364
340 365 code = []
341 366 for thread_id, stack in sys._current_frames().items(): # noqa
342 367 code.append(
343 368 "\n# Thread: %s(%d)" % (get_thread_id(thread_id), thread_id))
344 369 for fname, lineno, name, line in traceback.extract_stack(stack):
345 370 code.append('File: "%s", line %d, in %s' % (fname, lineno, name))
346 371 if line:
347 372 code.append(" %s" % (line.strip()))
348 373 worker.log.debug("\n".join(code))
349 374
350 375
351 376 def worker_abort(worker):
352 377 worker.log.info("pid=[%-10s] worker received SIGABRT signal", worker.pid)
353 378
354 379
355 380 def worker_exit(server, worker):
356 381 worker.log.info("pid=[%-10s] worker exit", worker.pid)
357 382
358 383
359 384 def child_exit(server, worker):
360 385 worker.log.info("pid=[%-10s] worker child exit", worker.pid)
361 386
362 387
363 388 def pre_request(worker, req):
364 389 worker.start_time = time.time()
365 390 worker.log.debug(
366 391 "GNCRN PRE WORKER [cnt:%s]: %s %s", worker.nr, req.method, req.path)
367 392
368 393
369 394 def post_request(worker, req, environ, resp):
370 395 total_time = time.time() - worker.start_time
371 396 # Gunicorn sometimes has problems with reading the status_code
372 397 status_code = getattr(resp, 'status_code', '')
373 398 worker.log.debug(
374 399 "GNCRN POST WORKER [cnt:%s]: %s %s resp: %s, Load Time: %.4fs",
375 400 worker.nr, req.method, req.path, status_code, total_time)
376 401 _check_memory_usage(worker)
377 402
378 403
379 404 def _filter_proxy(ip):
380 405 """
381 406 Passed in IP addresses in HEADERS can be in a special format of multiple
382 407 ips. Those comma separated IPs are passed from various proxies in the
383 408 chain of request processing. The left-most being the original client.
384 409 We only care about the first IP which came from the org. client.
385 410
386 411 :param ip: ip string from headers
387 412 """
388 413 if ',' in ip:
389 414 _ips = ip.split(',')
390 415 _first_ip = _ips[0].strip()
391 416 return _first_ip
392 417 return ip
393 418
394 419
395 420 def _filter_port(ip):
396 421 """
397 422 Removes a port from ip, there are 4 main cases to handle here.
398 423 - ipv4 eg. 127.0.0.1
399 424 - ipv6 eg. ::1
400 425 - ipv4+port eg. 127.0.0.1:8080
401 426 - ipv6+port eg. [::1]:8080
402 427
403 428 :param ip:
404 429 """
405 430 def is_ipv6(ip_addr):
406 431 if hasattr(socket, 'inet_pton'):
407 432 try:
408 433 socket.inet_pton(socket.AF_INET6, ip_addr)
409 434 except socket.error:
410 435 return False
411 436 else:
412 437 return False
413 438 return True
414 439
415 440 if ':' not in ip: # must be ipv4 pure ip
416 441 return ip
417 442
418 443 if '[' in ip and ']' in ip: # ipv6 with port
419 444 return ip.split(']')[0][1:].lower()
420 445
421 446 # must be ipv6 or ipv4 with port
422 447 if is_ipv6(ip):
423 448 return ip
424 449 else:
425 450 ip, _port = ip.split(':')[:2] # means ipv4+port
426 451 return ip
427 452
428 453
429 454 def get_ip_addr(environ):
430 455 proxy_key = 'HTTP_X_REAL_IP'
431 456 proxy_key2 = 'HTTP_X_FORWARDED_FOR'
432 457 def_key = 'REMOTE_ADDR'
433 458
434 459 def _filters(x):
435 460 return _filter_port(_filter_proxy(x))
436 461
437 462 ip = environ.get(proxy_key)
438 463 if ip:
439 464 return _filters(ip)
440 465
441 466 ip = environ.get(proxy_key2)
442 467 if ip:
443 468 return _filters(ip)
444 469
445 470 ip = environ.get(def_key, '0.0.0.0')
446 471 return _filters(ip)
447 472
448 473
449 474 class RhodeCodeLogger(Logger):
450 475 """
451 476 Custom Logger that allows some customization that gunicorn doesn't allow
452 477 """
453 478
454 479 datefmt = r"%Y-%m-%d %H:%M:%S"
455 480
456 481 def __init__(self, cfg):
457 482 Logger.__init__(self, cfg)
458 483
459 484 def now(self):
460 485 """ return date in RhodeCode Log format """
461 486 now = time.time()
462 487 msecs = int((now - int(now)) * 1000)
463 488 return time.strftime(self.datefmt, time.localtime(now)) + '.{0:03d}'.format(msecs)
464 489
465 490 def atoms(self, resp, req, environ, request_time):
466 491 """ Gets atoms for log formatting.
467 492 """
468 493 status = resp.status
469 494 if isinstance(status, str):
470 495 status = status.split(None, 1)[0]
471 496 atoms = {
472 497 'h': get_ip_addr(environ),
473 498 'l': '-',
474 499 'u': self._get_user(environ) or '-',
475 500 't': self.now(),
476 501 'r': "%s %s %s" % (environ['REQUEST_METHOD'],
477 502 environ['RAW_URI'],
478 503 environ["SERVER_PROTOCOL"]),
479 504 's': status,
480 505 'm': environ.get('REQUEST_METHOD'),
481 506 'U': environ.get('PATH_INFO'),
482 507 'q': environ.get('QUERY_STRING'),
483 508 'H': environ.get('SERVER_PROTOCOL'),
484 509 'b': getattr(resp, 'sent', None) is not None and str(resp.sent) or '-',
485 510 'B': getattr(resp, 'sent', None),
486 511 'f': environ.get('HTTP_REFERER', '-'),
487 512 'a': environ.get('HTTP_USER_AGENT', '-'),
488 513 'T': request_time.seconds,
489 514 'D': (request_time.seconds * 1000000) + request_time.microseconds,
490 515 'M': (request_time.seconds * 1000) + int(request_time.microseconds/1000),
491 516 'L': "%d.%06d" % (request_time.seconds, request_time.microseconds),
492 517 'p': "<%s>" % os.getpid()
493 518 }
494 519
495 520 # add request headers
496 521 if hasattr(req, 'headers'):
497 522 req_headers = req.headers
498 523 else:
499 524 req_headers = req
500 525
501 526 if hasattr(req_headers, "items"):
502 527 req_headers = req_headers.items()
503 528
504 529 atoms.update({"{%s}i" % k.lower(): v for k, v in req_headers})
505 530
506 531 resp_headers = resp.headers
507 532 if hasattr(resp_headers, "items"):
508 533 resp_headers = resp_headers.items()
509 534
510 535 # add response headers
511 536 atoms.update({"{%s}o" % k.lower(): v for k, v in resp_headers})
512 537
513 538 # add environ variables
514 539 environ_variables = environ.items()
515 540 atoms.update({"{%s}e" % k.lower(): v for k, v in environ_variables})
516 541
517 542 return atoms
518 543
519 544
520 545 logger_class = RhodeCodeLogger
General Comments 0
You need to be logged in to leave comments. Login now