##// END OF EJS Templates
configs: improve gunicor config on worker_recycle logic
super-admin -
r5174:60fffd9c default
parent child Browse files
Show More
@@ -1,510 +1,510 b''
1 1 """
2 2 Gunicorn config extension and hooks. This config file adds some extra settings and memory management.
3 3 Gunicorn configuration should be managed by .ini files entries of RhodeCode or VCSServer
4 4 """
5 5
6 6 import gc
7 7 import os
8 8 import sys
9 9 import math
10 10 import time
11 11 import threading
12 12 import traceback
13 13 import random
14 14 import socket
15 15 import dataclasses
16 16 from gunicorn.glogging import Logger
17 17
18 18
19 19 def get_workers():
20 20 import multiprocessing
21 21 return multiprocessing.cpu_count() * 2 + 1
22 22
23 23
24 24 bind = "127.0.0.1:10020"
25 25
26 26
27 27 # Error logging output for gunicorn (-) is stdout
28 28 errorlog = '-'
29 29
30 30 # Access logging output for gunicorn (-) is stdout
31 31 accesslog = '-'
32 32
33 33
34 34 # SERVER MECHANICS
35 35 # None == system temp dir
36 36 # worker_tmp_dir is recommended to be set to some tmpfs
37 37 worker_tmp_dir = None
38 38 tmp_upload_dir = None
39 39
40 40 # use re-use port logic
41 41 #reuse_port = True
42 42
43 43 # Custom log format
44 44 #access_log_format = (
45 45 # '%(t)s %(p)s INFO [GNCRN] %(h)-15s rqt:%(L)s %(s)s %(b)-6s "%(m)s:%(U)s %(q)s" usr:%(u)s "%(f)s" "%(a)s"')
46 46
47 47 # loki format for easier parsing in grafana
48 48 access_log_format = (
49 49 'time="%(t)s" pid=%(p)s level="INFO" type="[GNCRN]" ip="%(h)-15s" rqt="%(L)s" response_code="%(s)s" response_bytes="%(b)-6s" uri="%(m)s:%(U)s %(q)s" user=":%(u)s" user_agent="%(a)s"')
50 50
51 51
52 52 # Sets the number of process workers. More workers means more concurrent connections
53 53 # RhodeCode can handle at the same time. Each additional worker also it increases
54 54 # memory usage as each has it's own set of caches.
55 55 # Recommended value is (2 * NUMBER_OF_CPUS + 1), eg 2CPU = 5 workers, but no more
56 56 # than 8-10 unless for huge deployments .e.g 700-1000 users.
57 57 # `instance_id = *` must be set in the [app:main] section below (which is the default)
58 58 # when using more than 1 worker.
59 59 workers = 4
60 60
61 61 # self adjust workers based on CPU count, to use maximum of CPU and not overquota the resources
62 62 # workers = get_workers()
63 63
64 64 # Gunicorn access log level
65 65 loglevel = 'info'
66 66
67 67 # Process name visible in a process list
68 68 proc_name = 'rhodecode_enterprise'
69 69
70 70 # Type of worker class, one of `sync`, `gevent`
71 71 # currently `sync` is the only option allowed.
72 72 worker_class = 'gevent'
73 73
74 74 # The maximum number of simultaneous clients. Valid only for gevent
75 75 worker_connections = 10
76 76
77 77 # Max number of requests that worker will handle before being gracefully restarted.
78 78 # Prevents memory leaks, jitter adds variability so not all workers are restarted at once.
79 79 max_requests = 2000
80 max_requests_jitter = 30
80 max_requests_jitter = int(max_requests * 0.2) # 20% of max_requests
81 81
82 82 # The maximum number of pending connections.
83 83 # Exceeding this number results in the client getting an error when attempting to connect.
84 84 backlog = 64
85 85
86 # Amount of time a worker can spend with handling a request before it
87 # gets killed and restarted. By default set to 21600 (6hrs)
86 # The Amount of time a worker can spend with handling a request before it
87 # gets killed and restarted. By default, set to 21600 (6hrs)
88 88 # Examples: 1800 (30min), 3600 (1hr), 7200 (2hr), 43200 (12h)
89 89 timeout = 21600
90 90
91 91 # The maximum size of HTTP request line in bytes.
92 92 # 0 for unlimited
93 93 limit_request_line = 0
94 94
95 95 # Limit the number of HTTP headers fields in a request.
96 96 # By default this value is 100 and can't be larger than 32768.
97 97 limit_request_fields = 32768
98 98
99 99 # Limit the allowed size of an HTTP request header field.
100 100 # Value is a positive number or 0.
101 101 # Setting it to 0 will allow unlimited header field sizes.
102 102 limit_request_field_size = 0
103 103
104 104 # Timeout for graceful workers restart.
105 105 # After receiving a restart signal, workers have this much time to finish
106 106 # serving requests. Workers still alive after the timeout (starting from the
107 107 # receipt of the restart signal) are force killed.
108 108 # Examples: 1800 (30min), 3600 (1hr), 7200 (2hr), 43200 (12h)
109 109 graceful_timeout = 21600
110 110
111 111 # The number of seconds to wait for requests on a Keep-Alive connection.
112 112 # Generally set in the 1-5 seconds range.
113 113 keepalive = 2
114 114
115 115 # Maximum memory usage that each worker can use before it will receive a
116 116 # graceful restart signal 0 = memory monitoring is disabled
117 117 # Examples: 268435456 (256MB), 536870912 (512MB)
118 118 # 1073741824 (1GB), 2147483648 (2GB), 4294967296 (4GB)
119 119 # Dynamic formula 1024 * 1024 * 256 == 256MBs
120 120 memory_max_usage = 0
121 121
122 122 # How often in seconds to check for memory usage for each gunicorn worker
123 123 memory_usage_check_interval = 60
124 124
125 125 # Threshold value for which we don't recycle worker if GarbageCollection
126 126 # frees up enough resources. Before each restart, we try to run GC on worker
127 # in case we get enough free memory after that, restart will not happen.
127 # in case we get enough free memory after that; restart will not happen.
128 128 memory_usage_recovery_threshold = 0.8
129 129
130 130
131 131 @dataclasses.dataclass
132 132 class MemoryCheckConfig:
133 133 max_usage: int
134 134 check_interval: int
135 135 recovery_threshold: float
136 136
137 137
138 138 def _get_process_rss(pid=None):
139 139 try:
140 140 import psutil
141 141 if pid:
142 142 proc = psutil.Process(pid)
143 143 else:
144 144 proc = psutil.Process()
145 145 return proc.memory_info().rss
146 146 except Exception:
147 147 return None
148 148
149 149
150 150 def _get_config(ini_path):
151 151 import configparser
152 152
153 153 try:
154 154 config = configparser.RawConfigParser()
155 155 config.read(ini_path)
156 156 return config
157 157 except Exception:
158 158 return None
159 159
160 160
161 161 def get_memory_usage_params(config=None):
162 162 # memory spec defaults
163 163 _memory_max_usage = memory_max_usage
164 164 _memory_usage_check_interval = memory_usage_check_interval
165 165 _memory_usage_recovery_threshold = memory_usage_recovery_threshold
166 166
167 167 if config:
168 168 ini_path = os.path.abspath(config)
169 169 conf = _get_config(ini_path)
170 170
171 171 section = 'server:main'
172 172 if conf and conf.has_section(section):
173 173
174 174 if conf.has_option(section, 'memory_max_usage'):
175 175 _memory_max_usage = conf.getint(section, 'memory_max_usage')
176 176
177 177 if conf.has_option(section, 'memory_usage_check_interval'):
178 178 _memory_usage_check_interval = conf.getint(section, 'memory_usage_check_interval')
179 179
180 180 if conf.has_option(section, 'memory_usage_recovery_threshold'):
181 181 _memory_usage_recovery_threshold = conf.getfloat(section, 'memory_usage_recovery_threshold')
182 182
183 183 _memory_max_usage = int(os.environ.get('RC_GUNICORN_MEMORY_MAX_USAGE', '')
184 184 or _memory_max_usage)
185 185 _memory_usage_check_interval = int(os.environ.get('RC_GUNICORN_MEMORY_USAGE_CHECK_INTERVAL', '')
186 186 or _memory_usage_check_interval)
187 187 _memory_usage_recovery_threshold = float(os.environ.get('RC_GUNICORN_MEMORY_USAGE_RECOVERY_THRESHOLD', '')
188 188 or _memory_usage_recovery_threshold)
189 189
190 190 return MemoryCheckConfig(_memory_max_usage, _memory_usage_check_interval, _memory_usage_recovery_threshold)
191 191
192 192
193 193 def _time_with_offset(check_interval):
194 194 return time.time() - random.randint(0, check_interval/2.0)
195 195
196 196
197 197 def pre_fork(server, worker):
198 198 pass
199 199
200 200
201 201 def post_fork(server, worker):
202 202
203 203 memory_conf = get_memory_usage_params()
204 204 _memory_max_usage = memory_conf.max_usage
205 205 _memory_usage_check_interval = memory_conf.check_interval
206 206 _memory_usage_recovery_threshold = memory_conf.recovery_threshold
207 207
208 208 worker._memory_max_usage = int(os.environ.get('RC_GUNICORN_MEMORY_MAX_USAGE', '')
209 209 or _memory_max_usage)
210 210 worker._memory_usage_check_interval = int(os.environ.get('RC_GUNICORN_MEMORY_USAGE_CHECK_INTERVAL', '')
211 211 or _memory_usage_check_interval)
212 212 worker._memory_usage_recovery_threshold = float(os.environ.get('RC_GUNICORN_MEMORY_USAGE_RECOVERY_THRESHOLD', '')
213 213 or _memory_usage_recovery_threshold)
214 214
215 215 # register memory last check time, with some random offset so we don't recycle all
216 216 # at once
217 217 worker._last_memory_check_time = _time_with_offset(_memory_usage_check_interval)
218 218
219 219 if _memory_max_usage:
220 220 server.log.info("pid=[%-10s] WORKER spawned with max memory set at %s", worker.pid,
221 221 _format_data_size(_memory_max_usage))
222 222 else:
223 223 server.log.info("pid=[%-10s] WORKER spawned", worker.pid)
224 224
225 225
226 226 def pre_exec(server):
227 227 server.log.info("Forked child, re-executing.")
228 228
229 229
230 230 def on_starting(server):
231 231 server_lbl = '{} {}'.format(server.proc_name, server.address)
232 232 server.log.info("Server %s is starting.", server_lbl)
233 233 server.log.info('Config:')
234 234 server.log.info(f"\n{server.cfg}")
235 235 server.log.info(get_memory_usage_params())
236 236
237 237
238 238 def when_ready(server):
239 239 server.log.info("Server %s is ready. Spawning workers", server)
240 240
241 241
242 242 def on_reload(server):
243 243 pass
244 244
245 245
246 246 def _format_data_size(size, unit="B", precision=1, binary=True):
247 247 """Format a number using SI units (kilo, mega, etc.).
248 248
249 249 ``size``: The number as a float or int.
250 250
251 251 ``unit``: The unit name in plural form. Examples: "bytes", "B".
252 252
253 253 ``precision``: How many digits to the right of the decimal point. Default
254 254 is 1. 0 suppresses the decimal point.
255 255
256 256 ``binary``: If false, use base-10 decimal prefixes (kilo = K = 1000).
257 257 If true, use base-2 binary prefixes (kibi = Ki = 1024).
258 258
259 259 ``full_name``: If false (default), use the prefix abbreviation ("k" or
260 260 "Ki"). If true, use the full prefix ("kilo" or "kibi"). If false,
261 261 use abbreviation ("k" or "Ki").
262 262
263 263 """
264 264
265 265 if not binary:
266 266 base = 1000
267 267 multiples = ('', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
268 268 else:
269 269 base = 1024
270 270 multiples = ('', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi', 'Yi')
271 271
272 272 sign = ""
273 273 if size > 0:
274 274 m = int(math.log(size, base))
275 275 elif size < 0:
276 276 sign = "-"
277 277 size = -size
278 278 m = int(math.log(size, base))
279 279 else:
280 280 m = 0
281 281 if m > 8:
282 282 m = 8
283 283
284 284 if m == 0:
285 285 precision = '%.0f'
286 286 else:
287 287 precision = '%%.%df' % precision
288 288
289 289 size = precision % (size / math.pow(base, m))
290 290
291 291 return '%s%s %s%s' % (sign, size.strip(), multiples[m], unit)
292 292
293 293
294 294 def _check_memory_usage(worker):
295 295 _memory_max_usage = worker._memory_max_usage
296 296 if not _memory_max_usage:
297 297 return
298 298
299 299 _memory_usage_check_interval = worker._memory_usage_check_interval
300 300 _memory_usage_recovery_threshold = memory_max_usage * worker._memory_usage_recovery_threshold
301 301
302 302 elapsed = time.time() - worker._last_memory_check_time
303 303 if elapsed > _memory_usage_check_interval:
304 304 mem_usage = _get_process_rss()
305 305 if mem_usage and mem_usage > _memory_max_usage:
306 306 worker.log.info(
307 307 "memory usage %s > %s, forcing gc",
308 308 _format_data_size(mem_usage), _format_data_size(_memory_max_usage))
309 309 # Try to clean it up by forcing a full collection.
310 310 gc.collect()
311 311 mem_usage = _get_process_rss()
312 312 if mem_usage > _memory_usage_recovery_threshold:
313 313 # Didn't clean up enough, we'll have to terminate.
314 314 worker.log.warning(
315 315 "memory usage %s > %s after gc, quitting",
316 316 _format_data_size(mem_usage), _format_data_size(_memory_max_usage))
317 317 # This will cause worker to auto-restart itself
318 318 worker.alive = False
319 319 worker._last_memory_check_time = time.time()
320 320
321 321
322 322 def worker_int(worker):
323 323 worker.log.info("pid=[%-10s] worker received INT or QUIT signal", worker.pid)
324 324
325 # get traceback info, on worker crash
325 # get traceback info, when a worker crashes
326 326 def get_thread_id(t_id):
327 327 id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
328 328 return id2name.get(t_id, "unknown_thread_id")
329 329
330 330 code = []
331 331 for thread_id, stack in sys._current_frames().items(): # noqa
332 332 code.append(
333 333 "\n# Thread: %s(%d)" % (get_thread_id(thread_id), thread_id))
334 334 for fname, lineno, name, line in traceback.extract_stack(stack):
335 335 code.append('File: "%s", line %d, in %s' % (fname, lineno, name))
336 336 if line:
337 337 code.append(" %s" % (line.strip()))
338 338 worker.log.debug("\n".join(code))
339 339
340 340
341 341 def worker_abort(worker):
342 342 worker.log.info("pid=[%-10s] worker received SIGABRT signal", worker.pid)
343 343
344 344
345 345 def worker_exit(server, worker):
346 346 worker.log.info("pid=[%-10s] worker exit", worker.pid)
347 347
348 348
349 349 def child_exit(server, worker):
350 350 worker.log.info("pid=[%-10s] worker child exit", worker.pid)
351 351
352 352
353 353 def pre_request(worker, req):
354 354 worker.start_time = time.time()
355 355 worker.log.debug(
356 356 "GNCRN PRE WORKER [cnt:%s]: %s %s", worker.nr, req.method, req.path)
357 357
358 358
359 359 def post_request(worker, req, environ, resp):
360 360 total_time = time.time() - worker.start_time
361 361 # Gunicorn sometimes has problems with reading the status_code
362 362 status_code = getattr(resp, 'status_code', '')
363 363 worker.log.debug(
364 364 "GNCRN POST WORKER [cnt:%s]: %s %s resp: %s, Load Time: %.4fs",
365 365 worker.nr, req.method, req.path, status_code, total_time)
366 366 _check_memory_usage(worker)
367 367
368 368
369 369 def _filter_proxy(ip):
370 370 """
371 371 Passed in IP addresses in HEADERS can be in a special format of multiple
372 372 ips. Those comma separated IPs are passed from various proxies in the
373 373 chain of request processing. The left-most being the original client.
374 374 We only care about the first IP which came from the org. client.
375 375
376 376 :param ip: ip string from headers
377 377 """
378 378 if ',' in ip:
379 379 _ips = ip.split(',')
380 380 _first_ip = _ips[0].strip()
381 381 return _first_ip
382 382 return ip
383 383
384 384
385 385 def _filter_port(ip):
386 386 """
387 387 Removes a port from ip, there are 4 main cases to handle here.
388 388 - ipv4 eg. 127.0.0.1
389 389 - ipv6 eg. ::1
390 390 - ipv4+port eg. 127.0.0.1:8080
391 391 - ipv6+port eg. [::1]:8080
392 392
393 393 :param ip:
394 394 """
395 395 def is_ipv6(ip_addr):
396 396 if hasattr(socket, 'inet_pton'):
397 397 try:
398 398 socket.inet_pton(socket.AF_INET6, ip_addr)
399 399 except socket.error:
400 400 return False
401 401 else:
402 402 return False
403 403 return True
404 404
405 405 if ':' not in ip: # must be ipv4 pure ip
406 406 return ip
407 407
408 408 if '[' in ip and ']' in ip: # ipv6 with port
409 409 return ip.split(']')[0][1:].lower()
410 410
411 411 # must be ipv6 or ipv4 with port
412 412 if is_ipv6(ip):
413 413 return ip
414 414 else:
415 415 ip, _port = ip.split(':')[:2] # means ipv4+port
416 416 return ip
417 417
418 418
419 419 def get_ip_addr(environ):
420 420 proxy_key = 'HTTP_X_REAL_IP'
421 421 proxy_key2 = 'HTTP_X_FORWARDED_FOR'
422 422 def_key = 'REMOTE_ADDR'
423 423
424 424 def _filters(x):
425 425 return _filter_port(_filter_proxy(x))
426 426
427 427 ip = environ.get(proxy_key)
428 428 if ip:
429 429 return _filters(ip)
430 430
431 431 ip = environ.get(proxy_key2)
432 432 if ip:
433 433 return _filters(ip)
434 434
435 435 ip = environ.get(def_key, '0.0.0.0')
436 436 return _filters(ip)
437 437
438 438
439 439 class RhodeCodeLogger(Logger):
440 440 """
441 441 Custom Logger that allows some customization that gunicorn doesn't allow
442 442 """
443 443
444 444 datefmt = r"%Y-%m-%d %H:%M:%S"
445 445
446 446 def __init__(self, cfg):
447 447 Logger.__init__(self, cfg)
448 448
449 449 def now(self):
450 450 """ return date in RhodeCode Log format """
451 451 now = time.time()
452 452 msecs = int((now - int(now)) * 1000)
453 453 return time.strftime(self.datefmt, time.localtime(now)) + '.{0:03d}'.format(msecs)
454 454
455 455 def atoms(self, resp, req, environ, request_time):
456 456 """ Gets atoms for log formatting.
457 457 """
458 458 status = resp.status
459 459 if isinstance(status, str):
460 460 status = status.split(None, 1)[0]
461 461 atoms = {
462 462 'h': get_ip_addr(environ),
463 463 'l': '-',
464 464 'u': self._get_user(environ) or '-',
465 465 't': self.now(),
466 466 'r': "%s %s %s" % (environ['REQUEST_METHOD'],
467 467 environ['RAW_URI'],
468 468 environ["SERVER_PROTOCOL"]),
469 469 's': status,
470 470 'm': environ.get('REQUEST_METHOD'),
471 471 'U': environ.get('PATH_INFO'),
472 472 'q': environ.get('QUERY_STRING'),
473 473 'H': environ.get('SERVER_PROTOCOL'),
474 474 'b': getattr(resp, 'sent', None) is not None and str(resp.sent) or '-',
475 475 'B': getattr(resp, 'sent', None),
476 476 'f': environ.get('HTTP_REFERER', '-'),
477 477 'a': environ.get('HTTP_USER_AGENT', '-'),
478 478 'T': request_time.seconds,
479 479 'D': (request_time.seconds * 1000000) + request_time.microseconds,
480 480 'M': (request_time.seconds * 1000) + int(request_time.microseconds/1000),
481 481 'L': "%d.%06d" % (request_time.seconds, request_time.microseconds),
482 482 'p': "<%s>" % os.getpid()
483 483 }
484 484
485 485 # add request headers
486 486 if hasattr(req, 'headers'):
487 487 req_headers = req.headers
488 488 else:
489 489 req_headers = req
490 490
491 491 if hasattr(req_headers, "items"):
492 492 req_headers = req_headers.items()
493 493
494 494 atoms.update({"{%s}i" % k.lower(): v for k, v in req_headers})
495 495
496 496 resp_headers = resp.headers
497 497 if hasattr(resp_headers, "items"):
498 498 resp_headers = resp_headers.items()
499 499
500 500 # add response headers
501 501 atoms.update({"{%s}o" % k.lower(): v for k, v in resp_headers})
502 502
503 503 # add environ variables
504 504 environ_variables = environ.items()
505 505 atoms.update({"{%s}e" % k.lower(): v for k, v in environ_variables})
506 506
507 507 return atoms
508 508
509 509
510 510 logger_class = RhodeCodeLogger
General Comments 0
You need to be logged in to leave comments. Login now