##// END OF EJS Templates
gunicorn: fixed python3 compat
super-admin -
r1056:7e87d29b python3
parent child Browse files
Show More
@@ -1,393 +1,393 b''
1 1 """
2 2 Gunicorn config extension and hooks. This config file adds some extra settings and memory management.
3 3 Gunicorn configuration should be managed by .ini files entries of RhodeCode or VCSServer
4 4 """
5 5
6 6 import gc
7 7 import os
8 8 import sys
9 9 import math
10 10 import time
11 11 import threading
12 12 import traceback
13 13 import random
14 14 import socket
15 15 from gunicorn.glogging import Logger
16 16
17 17
18 18 def get_workers():
19 19 import multiprocessing
20 20 return multiprocessing.cpu_count() * 2 + 1
21 21
22 22 # GLOBAL
23 23 errorlog = '-'
24 24 accesslog = '-'
25 25
26 26
27 27 # SERVER MECHANICS
28 28 # None == system temp dir
29 29 # worker_tmp_dir is recommended to be set to some tmpfs
30 30 worker_tmp_dir = None
31 31 tmp_upload_dir = None
32 32
33 33 #reuse_port = True
34 34
35 35 # Custom log format
36 36 #access_log_format = (
37 37 # '%(t)s %(p)s INFO [GNCRN] %(h)-15s rqt:%(L)s %(s)s %(b)-6s "%(m)s:%(U)s %(q)s" usr:%(u)s "%(f)s" "%(a)s"')
38 38
39 39 # loki format for easier parsing in grafana
40 40 access_log_format = (
41 41 'time="%(t)s" pid=%(p)s level="INFO" type="[GNCRN]" ip="%(h)-15s" rqt="%(L)s" response_code="%(s)s" response_bytes="%(b)-6s" uri="%(m)s:%(U)s %(q)s" user=":%(u)s" user_agent="%(a)s"')
42 42
43 43 # self adjust workers based on CPU count
44 44 # workers = get_workers()
45 45
46 46
47 47 def _get_process_rss(pid=None):
48 48 try:
49 49 import psutil
50 50 if pid:
51 51 proc = psutil.Process(pid)
52 52 else:
53 53 proc = psutil.Process()
54 54 return proc.memory_info().rss
55 55 except Exception:
56 56 return None
57 57
58 58
59 59 def _get_config(ini_path):
60 60 import configparser
61 61
62 62 try:
63 63 config = configparser.RawConfigParser()
64 64 config.read(ini_path)
65 65 return config
66 66 except Exception:
67 67 return None
68 68
69 69
70 70 def _time_with_offset(memory_usage_check_interval):
71 71 return time.time() - random.randint(0, memory_usage_check_interval/2.0)
72 72
73 73
74 74 def pre_fork(server, worker):
75 75 pass
76 76
77 77
78 78 def post_fork(server, worker):
79 79
80 80 # memory spec defaults
81 81 _memory_max_usage = 0
82 82 _memory_usage_check_interval = 60
83 83 _memory_usage_recovery_threshold = 0.8
84 84
85 85 ini_path = os.path.abspath(server.cfg.paste)
86 86 conf = _get_config(ini_path)
87 87
88 88 section = 'server:main'
89 89 if conf and conf.has_section(section):
90 90
91 91 if conf.has_option(section, 'memory_max_usage'):
92 92 _memory_max_usage = conf.getint(section, 'memory_max_usage')
93 93
94 94 if conf.has_option(section, 'memory_usage_check_interval'):
95 95 _memory_usage_check_interval = conf.getint(section, 'memory_usage_check_interval')
96 96
97 97 if conf.has_option(section, 'memory_usage_recovery_threshold'):
98 98 _memory_usage_recovery_threshold = conf.getfloat(section, 'memory_usage_recovery_threshold')
99 99
100 100 worker._memory_max_usage = int(os.environ.get('RC_GUNICORN_MEMORY_MAX_USAGE', '')
101 101 or _memory_max_usage)
102 102 worker._memory_usage_check_interval = int(os.environ.get('RC_GUNICORN_MEMORY_USAGE_CHECK_INTERVAL', '')
103 103 or _memory_usage_check_interval)
104 104 worker._memory_usage_recovery_threshold = float(os.environ.get('RC_GUNICORN_MEMORY_USAGE_RECOVERY_THRESHOLD', '')
105 105 or _memory_usage_recovery_threshold)
106 106
107 107 # register memory last check time, with some random offset so we don't recycle all
108 108 # at once
109 109 worker._last_memory_check_time = _time_with_offset(_memory_usage_check_interval)
110 110
111 111 if _memory_max_usage:
112 112 server.log.info("[%-10s] WORKER spawned with max memory set at %s", worker.pid,
113 113 _format_data_size(_memory_max_usage))
114 114 else:
115 115 server.log.info("[%-10s] WORKER spawned", worker.pid)
116 116
117 117
118 118 def pre_exec(server):
119 119 server.log.info("Forked child, re-executing.")
120 120
121 121
122 122 def on_starting(server):
123 123 server_lbl = '{} {}'.format(server.proc_name, server.address)
124 124 server.log.info("Server %s is starting.", server_lbl)
125 125
126 126
127 127 def when_ready(server):
128 128 server.log.info("Server %s is ready. Spawning workers", server)
129 129
130 130
131 131 def on_reload(server):
132 132 pass
133 133
134 134
135 135 def _format_data_size(size, unit="B", precision=1, binary=True):
136 136 """Format a number using SI units (kilo, mega, etc.).
137 137
138 138 ``size``: The number as a float or int.
139 139
140 140 ``unit``: The unit name in plural form. Examples: "bytes", "B".
141 141
142 142 ``precision``: How many digits to the right of the decimal point. Default
143 143 is 1. 0 suppresses the decimal point.
144 144
145 145 ``binary``: If false, use base-10 decimal prefixes (kilo = K = 1000).
146 146 If true, use base-2 binary prefixes (kibi = Ki = 1024).
147 147
148 148 ``full_name``: If false (default), use the prefix abbreviation ("k" or
149 149 "Ki"). If true, use the full prefix ("kilo" or "kibi"). If false,
150 150 use abbreviation ("k" or "Ki").
151 151
152 152 """
153 153
154 154 if not binary:
155 155 base = 1000
156 156 multiples = ('', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
157 157 else:
158 158 base = 1024
159 159 multiples = ('', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi', 'Yi')
160 160
161 161 sign = ""
162 162 if size > 0:
163 163 m = int(math.log(size, base))
164 164 elif size < 0:
165 165 sign = "-"
166 166 size = -size
167 167 m = int(math.log(size, base))
168 168 else:
169 169 m = 0
170 170 if m > 8:
171 171 m = 8
172 172
173 173 if m == 0:
174 174 precision = '%.0f'
175 175 else:
176 176 precision = '%%.%df' % precision
177 177
178 178 size = precision % (size / math.pow(base, m))
179 179
180 180 return '%s%s %s%s' % (sign, size.strip(), multiples[m], unit)
181 181
182 182
183 183 def _check_memory_usage(worker):
184 184 memory_max_usage = worker._memory_max_usage
185 185 if not memory_max_usage:
186 186 return
187 187
188 188 memory_usage_check_interval = worker._memory_usage_check_interval
189 189 memory_usage_recovery_threshold = memory_max_usage * worker._memory_usage_recovery_threshold
190 190
191 191 elapsed = time.time() - worker._last_memory_check_time
192 192 if elapsed > memory_usage_check_interval:
193 193 mem_usage = _get_process_rss()
194 194 if mem_usage and mem_usage > memory_max_usage:
195 195 worker.log.info(
196 196 "memory usage %s > %s, forcing gc",
197 197 _format_data_size(mem_usage), _format_data_size(memory_max_usage))
198 198 # Try to clean it up by forcing a full collection.
199 199 gc.collect()
200 200 mem_usage = _get_process_rss()
201 201 if mem_usage > memory_usage_recovery_threshold:
202 202 # Didn't clean up enough, we'll have to terminate.
203 203 worker.log.warning(
204 204 "memory usage %s > %s after gc, quitting",
205 205 _format_data_size(mem_usage), _format_data_size(memory_max_usage))
206 206 # This will cause worker to auto-restart itself
207 207 worker.alive = False
208 208 worker._last_memory_check_time = time.time()
209 209
210 210
211 211 def worker_int(worker):
212 212 worker.log.info("[%-10s] worker received INT or QUIT signal", worker.pid)
213 213
214 214 # get traceback info, on worker crash
215 215 id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
216 216 code = []
217 217 for thread_id, stack in sys._current_frames().items():
218 218 code.append(
219 219 "\n# Thread: %s(%d)" % (id2name.get(thread_id, ""), thread_id))
220 220 for fname, lineno, name, line in traceback.extract_stack(stack):
221 221 code.append('File: "%s", line %d, in %s' % (fname, lineno, name))
222 222 if line:
223 223 code.append(" %s" % (line.strip()))
224 224 worker.log.debug("\n".join(code))
225 225
226 226
227 227 def worker_abort(worker):
228 228 worker.log.info("[%-10s] worker received SIGABRT signal", worker.pid)
229 229
230 230
231 231 def worker_exit(server, worker):
232 232 worker.log.info("[%-10s] worker exit", worker.pid)
233 233
234 234
235 235 def child_exit(server, worker):
236 236 worker.log.info("[%-10s] worker child exit", worker.pid)
237 237
238 238
239 239 def pre_request(worker, req):
240 240 worker.start_time = time.time()
241 241 worker.log.debug(
242 242 "GNCRN PRE WORKER [cnt:%s]: %s %s", worker.nr, req.method, req.path)
243 243
244 244
245 245 def post_request(worker, req, environ, resp):
246 246 total_time = time.time() - worker.start_time
247 247 # Gunicorn sometimes has problems with reading the status_code
248 248 status_code = getattr(resp, 'status_code', '')
249 249 worker.log.debug(
250 250 "GNCRN POST WORKER [cnt:%s]: %s %s resp: %s, Load Time: %.4fs",
251 251 worker.nr, req.method, req.path, status_code, total_time)
252 252 _check_memory_usage(worker)
253 253
254 254
255 255 def _filter_proxy(ip):
256 256 """
257 257 Passed in IP addresses in HEADERS can be in a special format of multiple
258 258 ips. Those comma separated IPs are passed from various proxies in the
259 259 chain of request processing. The left-most being the original client.
260 260 We only care about the first IP which came from the org. client.
261 261
262 262 :param ip: ip string from headers
263 263 """
264 264 if ',' in ip:
265 265 _ips = ip.split(',')
266 266 _first_ip = _ips[0].strip()
267 267 return _first_ip
268 268 return ip
269 269
270 270
271 271 def _filter_port(ip):
272 272 """
273 273 Removes a port from ip, there are 4 main cases to handle here.
274 274 - ipv4 eg. 127.0.0.1
275 275 - ipv6 eg. ::1
276 276 - ipv4+port eg. 127.0.0.1:8080
277 277 - ipv6+port eg. [::1]:8080
278 278
279 279 :param ip:
280 280 """
281 281 def is_ipv6(ip_addr):
282 282 if hasattr(socket, 'inet_pton'):
283 283 try:
284 284 socket.inet_pton(socket.AF_INET6, ip_addr)
285 285 except socket.error:
286 286 return False
287 287 else:
288 288 return False
289 289 return True
290 290
291 291 if ':' not in ip: # must be ipv4 pure ip
292 292 return ip
293 293
294 294 if '[' in ip and ']' in ip: # ipv6 with port
295 295 return ip.split(']')[0][1:].lower()
296 296
297 297 # must be ipv6 or ipv4 with port
298 298 if is_ipv6(ip):
299 299 return ip
300 300 else:
301 301 ip, _port = ip.split(':')[:2] # means ipv4+port
302 302 return ip
303 303
304 304
305 305 def get_ip_addr(environ):
306 306 proxy_key = 'HTTP_X_REAL_IP'
307 307 proxy_key2 = 'HTTP_X_FORWARDED_FOR'
308 308 def_key = 'REMOTE_ADDR'
309 309 _filters = lambda x: _filter_port(_filter_proxy(x))
310 310
311 311 ip = environ.get(proxy_key)
312 312 if ip:
313 313 return _filters(ip)
314 314
315 315 ip = environ.get(proxy_key2)
316 316 if ip:
317 317 return _filters(ip)
318 318
319 319 ip = environ.get(def_key, '0.0.0.0')
320 320 return _filters(ip)
321 321
322 322
323 323 class RhodeCodeLogger(Logger):
324 324 """
325 325 Custom Logger that allows some customization that gunicorn doesn't allow
326 326 """
327 327
328 328 datefmt = r"%Y-%m-%d %H:%M:%S"
329 329
330 330 def __init__(self, cfg):
331 331 Logger.__init__(self, cfg)
332 332
333 333 def now(self):
334 334 """ return date in RhodeCode Log format """
335 335 now = time.time()
336 msecs = int((now - long(now)) * 1000)
336 msecs = int((now - int(now)) * 1000)
337 337 return time.strftime(self.datefmt, time.localtime(now)) + '.{0:03d}'.format(msecs)
338 338
339 339 def atoms(self, resp, req, environ, request_time):
340 340 """ Gets atoms for log formatting.
341 341 """
342 342 status = resp.status
343 343 if isinstance(status, str):
344 344 status = status.split(None, 1)[0]
345 345 atoms = {
346 346 'h': get_ip_addr(environ),
347 347 'l': '-',
348 348 'u': self._get_user(environ) or '-',
349 349 't': self.now(),
350 350 'r': "%s %s %s" % (environ['REQUEST_METHOD'],
351 351 environ['RAW_URI'],
352 352 environ["SERVER_PROTOCOL"]),
353 353 's': status,
354 354 'm': environ.get('REQUEST_METHOD'),
355 355 'U': environ.get('PATH_INFO'),
356 356 'q': environ.get('QUERY_STRING'),
357 357 'H': environ.get('SERVER_PROTOCOL'),
358 358 'b': getattr(resp, 'sent', None) is not None and str(resp.sent) or '-',
359 359 'B': getattr(resp, 'sent', None),
360 360 'f': environ.get('HTTP_REFERER', '-'),
361 361 'a': environ.get('HTTP_USER_AGENT', '-'),
362 362 'T': request_time.seconds,
363 363 'D': (request_time.seconds * 1000000) + request_time.microseconds,
364 364 'M': (request_time.seconds * 1000) + int(request_time.microseconds/1000),
365 365 'L': "%d.%06d" % (request_time.seconds, request_time.microseconds),
366 366 'p': "<%s>" % os.getpid()
367 367 }
368 368
369 369 # add request headers
370 370 if hasattr(req, 'headers'):
371 371 req_headers = req.headers
372 372 else:
373 373 req_headers = req
374 374
375 375 if hasattr(req_headers, "items"):
376 376 req_headers = req_headers.items()
377 377
378 378 atoms.update({"{%s}i" % k.lower(): v for k, v in req_headers})
379 379
380 380 resp_headers = resp.headers
381 381 if hasattr(resp_headers, "items"):
382 382 resp_headers = resp_headers.items()
383 383
384 384 # add response headers
385 385 atoms.update({"{%s}o" % k.lower(): v for k, v in resp_headers})
386 386
387 387 # add environ variables
388 388 environ_variables = environ.items()
389 389 atoms.update({"{%s}e" % k.lower(): v for k, v in environ_variables})
390 390
391 391 return atoms
392 392
393 393 logger_class = RhodeCodeLogger
General Comments 0
You need to be logged in to leave comments. Login now