##// END OF EJS Templates
application: added statsd client for sending usage statistics.
milka -
r920:1ce34849 stable
parent child Browse files
Show More
@@ -0,0 +1,26 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
18 # This package contains non rhodecode licensed packages that are
19 # vendored for various reasons
20
21 import os
22 import sys
23
24 vendor_dir = os.path.abspath(os.path.dirname(__file__))
25
26 sys.path.append(vendor_dir)
@@ -0,0 +1,46 b''
1 from __future__ import absolute_import, division, unicode_literals
2
3 import logging
4
5 from .stream import TCPStatsClient, UnixSocketStatsClient # noqa
6 from .udp import StatsClient # noqa
7
8 HOST = 'localhost'
9 PORT = 8125
10 IPV6 = False
11 PREFIX = None
12 MAXUDPSIZE = 512
13
14 log = logging.getLogger('rhodecode.statsd')
15
16
17 def statsd_config(config, prefix='statsd.'):
18 _config = {}
19 for key in config.keys():
20 if key.startswith(prefix):
21 _config[key[len(prefix):]] = config[key]
22 return _config
23
24
25 def client_from_config(configuration, prefix='statsd.', **kwargs):
26 from pyramid.settings import asbool
27
28 _config = statsd_config(configuration, prefix)
29 statsd_enabled = asbool(_config.pop('enabled', False))
30 if not statsd_enabled:
31 log.debug('statsd client not enabled by statsd.enabled = flag, skipping...')
32 return
33
34 host = _config.pop('statsd_host', HOST)
35 port = _config.pop('statsd_port', PORT)
36 prefix = _config.pop('statsd_prefix', PREFIX)
37 maxudpsize = _config.pop('statsd_maxudpsize', MAXUDPSIZE)
38 ipv6 = asbool(_config.pop('statsd_ipv6', IPV6))
39 log.debug('configured statsd client %s:%s', host, port)
40
41 return StatsClient(
42 host=host, port=port, prefix=prefix, maxudpsize=maxudpsize, ipv6=ipv6)
43
44
45 def get_statsd_client(request):
46 return client_from_config(request.registry.settings)
@@ -0,0 +1,107 b''
1 from __future__ import absolute_import, division, unicode_literals
2
3 import random
4 from collections import deque
5 from datetime import timedelta
6
7 from .timer import Timer
8
9
10 class StatsClientBase(object):
11 """A Base class for various statsd clients."""
12
13 def close(self):
14 """Used to close and clean up any underlying resources."""
15 raise NotImplementedError()
16
17 def _send(self):
18 raise NotImplementedError()
19
20 def pipeline(self):
21 raise NotImplementedError()
22
23 def timer(self, stat, rate=1):
24 return Timer(self, stat, rate)
25
26 def timing(self, stat, delta, rate=1):
27 """
28 Send new timing information.
29
30 `delta` can be either a number of milliseconds or a timedelta.
31 """
32 if isinstance(delta, timedelta):
33 # Convert timedelta to number of milliseconds.
34 delta = delta.total_seconds() * 1000.
35 self._send_stat(stat, '%0.6f|ms' % delta, rate)
36
37 def incr(self, stat, count=1, rate=1):
38 """Increment a stat by `count`."""
39 self._send_stat(stat, '%s|c' % count, rate)
40
41 def decr(self, stat, count=1, rate=1):
42 """Decrement a stat by `count`."""
43 self.incr(stat, -count, rate)
44
45 def gauge(self, stat, value, rate=1, delta=False):
46 """Set a gauge value."""
47 if value < 0 and not delta:
48 if rate < 1:
49 if random.random() > rate:
50 return
51 with self.pipeline() as pipe:
52 pipe._send_stat(stat, '0|g', 1)
53 pipe._send_stat(stat, '%s|g' % value, 1)
54 else:
55 prefix = '+' if delta and value >= 0 else ''
56 self._send_stat(stat, '%s%s|g' % (prefix, value), rate)
57
58 def set(self, stat, value, rate=1):
59 """Set a set value."""
60 self._send_stat(stat, '%s|s' % value, rate)
61
62 def _send_stat(self, stat, value, rate):
63 self._after(self._prepare(stat, value, rate))
64
65 def _prepare(self, stat, value, rate):
66 if rate < 1:
67 if random.random() > rate:
68 return
69 value = '%s|@%s' % (value, rate)
70
71 if self._prefix:
72 stat = '%s.%s' % (self._prefix, stat)
73
74 return '%s:%s' % (stat, value)
75
76 def _after(self, data):
77 if data:
78 self._send(data)
79
80
81 class PipelineBase(StatsClientBase):
82
83 def __init__(self, client):
84 self._client = client
85 self._prefix = client._prefix
86 self._stats = deque()
87
88 def _send(self):
89 raise NotImplementedError()
90
91 def _after(self, data):
92 if data is not None:
93 self._stats.append(data)
94
95 def __enter__(self):
96 return self
97
98 def __exit__(self, typ, value, tb):
99 self.send()
100
101 def send(self):
102 if not self._stats:
103 return
104 self._send()
105
106 def pipeline(self):
107 return self.__class__(self)
@@ -0,0 +1,75 b''
1 from __future__ import absolute_import, division, unicode_literals
2
3 import socket
4
5 from .base import StatsClientBase, PipelineBase
6
7
8 class StreamPipeline(PipelineBase):
9 def _send(self):
10 self._client._after('\n'.join(self._stats))
11 self._stats.clear()
12
13
14 class StreamClientBase(StatsClientBase):
15 def connect(self):
16 raise NotImplementedError()
17
18 def close(self):
19 if self._sock and hasattr(self._sock, 'close'):
20 self._sock.close()
21 self._sock = None
22
23 def reconnect(self):
24 self.close()
25 self.connect()
26
27 def pipeline(self):
28 return StreamPipeline(self)
29
30 def _send(self, data):
31 """Send data to statsd."""
32 if not self._sock:
33 self.connect()
34 self._do_send(data)
35
36 def _do_send(self, data):
37 self._sock.sendall(data.encode('ascii') + b'\n')
38
39
40 class TCPStatsClient(StreamClientBase):
41 """TCP version of StatsClient."""
42
43 def __init__(self, host='localhost', port=8125, prefix=None,
44 timeout=None, ipv6=False):
45 """Create a new client."""
46 self._host = host
47 self._port = port
48 self._ipv6 = ipv6
49 self._timeout = timeout
50 self._prefix = prefix
51 self._sock = None
52
53 def connect(self):
54 fam = socket.AF_INET6 if self._ipv6 else socket.AF_INET
55 family, _, _, _, addr = socket.getaddrinfo(
56 self._host, self._port, fam, socket.SOCK_STREAM)[0]
57 self._sock = socket.socket(family, socket.SOCK_STREAM)
58 self._sock.settimeout(self._timeout)
59 self._sock.connect(addr)
60
61
62 class UnixSocketStatsClient(StreamClientBase):
63 """Unix domain socket version of StatsClient."""
64
65 def __init__(self, socket_path, prefix=None, timeout=None):
66 """Create a new client."""
67 self._socket_path = socket_path
68 self._timeout = timeout
69 self._prefix = prefix
70 self._sock = None
71
72 def connect(self):
73 self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
74 self._sock.settimeout(self._timeout)
75 self._sock.connect(self._socket_path)
@@ -0,0 +1,71 b''
1 from __future__ import absolute_import, division, unicode_literals
2
3 import functools
4
5 # Use timer that's not susceptible to time of day adjustments.
6 try:
7 # perf_counter is only present on Py3.3+
8 from time import perf_counter as time_now
9 except ImportError:
10 # fall back to using time
11 from time import time as time_now
12
13
14 def safe_wraps(wrapper, *args, **kwargs):
15 """Safely wraps partial functions."""
16 while isinstance(wrapper, functools.partial):
17 wrapper = wrapper.func
18 return functools.wraps(wrapper, *args, **kwargs)
19
20
21 class Timer(object):
22 """A context manager/decorator for statsd.timing()."""
23
24 def __init__(self, client, stat, rate=1):
25 self.client = client
26 self.stat = stat
27 self.rate = rate
28 self.ms = None
29 self._sent = False
30 self._start_time = None
31
32 def __call__(self, f):
33 """Thread-safe timing function decorator."""
34 @safe_wraps(f)
35 def _wrapped(*args, **kwargs):
36 start_time = time_now()
37 try:
38 return f(*args, **kwargs)
39 finally:
40 elapsed_time_ms = 1000.0 * (time_now() - start_time)
41 self.client.timing(self.stat, elapsed_time_ms, self.rate)
42 return _wrapped
43
44 def __enter__(self):
45 return self.start()
46
47 def __exit__(self, typ, value, tb):
48 self.stop()
49
50 def start(self):
51 self.ms = None
52 self._sent = False
53 self._start_time = time_now()
54 return self
55
56 def stop(self, send=True):
57 if self._start_time is None:
58 raise RuntimeError('Timer has not started.')
59 dt = time_now() - self._start_time
60 self.ms = 1000.0 * dt # Convert to milliseconds.
61 if send:
62 self.send()
63 return self
64
65 def send(self):
66 if self.ms is None:
67 raise RuntimeError('No data recorded.')
68 if self._sent:
69 raise RuntimeError('Already sent data.')
70 self._sent = True
71 self.client.timing(self.stat, self.ms, self.rate)
@@ -0,0 +1,55 b''
1 from __future__ import absolute_import, division, unicode_literals
2
3 import socket
4
5 from .base import StatsClientBase, PipelineBase
6
7
8 class Pipeline(PipelineBase):
9
10 def __init__(self, client):
11 super(Pipeline, self).__init__(client)
12 self._maxudpsize = client._maxudpsize
13
14 def _send(self):
15 data = self._stats.popleft()
16 while self._stats:
17 # Use popleft to preserve the order of the stats.
18 stat = self._stats.popleft()
19 if len(stat) + len(data) + 1 >= self._maxudpsize:
20 self._client._after(data)
21 data = stat
22 else:
23 data += '\n' + stat
24 self._client._after(data)
25
26
27 class StatsClient(StatsClientBase):
28 """A client for statsd."""
29
30 def __init__(self, host='localhost', port=8125, prefix=None,
31 maxudpsize=512, ipv6=False):
32 """Create a new client."""
33 fam = socket.AF_INET6 if ipv6 else socket.AF_INET
34 family, _, _, _, addr = socket.getaddrinfo(
35 host, port, fam, socket.SOCK_DGRAM)[0]
36 self._addr = addr
37 self._sock = socket.socket(family, socket.SOCK_DGRAM)
38 self._prefix = prefix
39 self._maxudpsize = maxudpsize
40
41 def _send(self, data):
42 """Send data to statsd."""
43 try:
44 self._sock.sendto(data.encode('ascii'), self._addr)
45 except (socket.error, RuntimeError):
46 # No time for love, Dr. Jones!
47 pass
48
49 def close(self):
50 if self._sock and hasattr(self._sock, 'close'):
51 self._sock.close()
52 self._sock = None
53
54 def pipeline(self):
55 return Pipeline(self)
@@ -1,237 +1,243 b''
1 1 ## -*- coding: utf-8 -*-
2 2
3 3 ; #################################
4 4 ; RHODECODE VCSSERVER CONFIGURATION
5 5 ; #################################
6 6
7 7 [server:main]
8 8 ; COMMON HOST/IP CONFIG
9 9 host = 0.0.0.0
10 10 port = 9900
11 11
12 12 ; ##################################################
13 13 ; WAITRESS WSGI SERVER - Recommended for Development
14 14 ; ##################################################
15 15
16 16 ; use server type
17 17 use = egg:waitress#main
18 18
19 19 ; number of worker threads
20 20 threads = 5
21 21
22 22 ; MAX BODY SIZE 100GB
23 23 max_request_body_size = 107374182400
24 24
25 25 ; Use poll instead of select, fixes file descriptors limits problems.
26 26 ; May not work on old windows systems.
27 27 asyncore_use_poll = true
28 28
29 29
30 30 ; ###########################
31 31 ; GUNICORN APPLICATION SERVER
32 32 ; ###########################
33 33
34 34 ; run with gunicorn --log-config rhodecode.ini --paste rhodecode.ini
35 35
36 36 ; Module to use, this setting shouldn't be changed
37 37 #use = egg:gunicorn#main
38 38
39 39 ; Sets the number of process workers. More workers means more concurrent connections
40 40 ; RhodeCode can handle at the same time. Each additional worker also it increases
41 41 ; memory usage as each has it's own set of caches.
42 42 ; Recommended value is (2 * NUMBER_OF_CPUS + 1), eg 2CPU = 5 workers, but no more
43 43 ; than 8-10 unless for really big deployments .e.g 700-1000 users.
44 44 ; `instance_id = *` must be set in the [app:main] section below (which is the default)
45 45 ; when using more than 1 worker.
46 46 #workers = 2
47 47
48 48 ; Gunicorn access log level
49 49 #loglevel = info
50 50
51 51 ; Process name visible in process list
52 52 #proc_name = rhodecode_vcsserver
53 53
54 54 ; Type of worker class, one of `sync`, `gevent`
55 55 ; currently `sync` is the only option allowed.
56 56 #worker_class = sync
57 57
58 58 ; The maximum number of simultaneous clients. Valid only for gevent
59 59 #worker_connections = 10
60 60
61 61 ; Max number of requests that worker will handle before being gracefully restarted.
62 62 ; Prevents memory leaks, jitter adds variability so not all workers are restarted at once.
63 63 #max_requests = 1000
64 64 #max_requests_jitter = 30
65 65
66 66 ; Amount of time a worker can spend with handling a request before it
67 67 ; gets killed and restarted. By default set to 21600 (6hrs)
68 68 ; Examples: 1800 (30min), 3600 (1hr), 7200 (2hr), 43200 (12h)
69 69 #timeout = 21600
70 70
71 71 ; The maximum size of HTTP request line in bytes.
72 72 ; 0 for unlimited
73 73 #limit_request_line = 0
74 74
75 75 ; Limit the number of HTTP headers fields in a request.
76 76 ; By default this value is 100 and can't be larger than 32768.
77 77 #limit_request_fields = 32768
78 78
79 79 ; Limit the allowed size of an HTTP request header field.
80 80 ; Value is a positive number or 0.
81 81 ; Setting it to 0 will allow unlimited header field sizes.
82 82 #limit_request_field_size = 0
83 83
84 84 ; Timeout for graceful workers restart.
85 85 ; After receiving a restart signal, workers have this much time to finish
86 86 ; serving requests. Workers still alive after the timeout (starting from the
87 87 ; receipt of the restart signal) are force killed.
88 88 ; Examples: 1800 (30min), 3600 (1hr), 7200 (2hr), 43200 (12h)
89 89 #graceful_timeout = 3600
90 90
91 91 # The number of seconds to wait for requests on a Keep-Alive connection.
92 92 # Generally set in the 1-5 seconds range.
93 93 #keepalive = 2
94 94
95 95 ; Maximum memory usage that each worker can use before it will receive a
96 96 ; graceful restart signal 0 = memory monitoring is disabled
97 97 ; Examples: 268435456 (256MB), 536870912 (512MB)
98 98 ; 1073741824 (1GB), 2147483648 (2GB), 4294967296 (4GB)
99 99 #memory_max_usage = 0
100 100
101 101 ; How often in seconds to check for memory usage for each gunicorn worker
102 102 #memory_usage_check_interval = 60
103 103
104 104 ; Threshold value for which we don't recycle worker if GarbageCollection
105 105 ; frees up enough resources. Before each restart we try to run GC on worker
106 106 ; in case we get enough free memory after that, restart will not happen.
107 107 #memory_usage_recovery_threshold = 0.8
108 108
109 109
110 110 [app:main]
111 111 ; The %(here)s variable will be replaced with the absolute path of parent directory
112 112 ; of this file
113 113 use = egg:rhodecode-vcsserver
114 114
115 115
116 116 ; #############
117 117 ; DEBUG OPTIONS
118 118 ; #############
119 119
120 120 # During development the we want to have the debug toolbar enabled
121 121 pyramid.includes =
122 122 pyramid_debugtoolbar
123 123
124 124 debugtoolbar.hosts = 0.0.0.0/0
125 125 debugtoolbar.exclude_prefixes =
126 126 /css
127 127 /fonts
128 128 /images
129 129 /js
130 130
131 131 ; #################
132 132 ; END DEBUG OPTIONS
133 133 ; #################
134 134
135 135 ; Pyramid default locales, we need this to be set
136 136 pyramid.default_locale_name = en
137 137
138 138 ; default locale used by VCS systems
139 139 locale = en_US.UTF-8
140 140
141 141 ; path to binaries for vcsserver, it should be set by the installer
142 142 ; at installation time, e.g /home/user/vcsserver-1/profile/bin
143 143 ; it can also be a path to nix-build output in case of development
144 144 core.binary_dir = ""
145 145
146 146 ; Custom exception store path, defaults to TMPDIR
147 147 ; This is used to store exception from RhodeCode in shared directory
148 148 #exception_tracker.store_path =
149 149
150 150 ; #############
151 151 ; DOGPILE CACHE
152 152 ; #############
153 153
154 154 ; Default cache dir for caches. Putting this into a ramdisk can boost performance.
155 155 ; eg. /tmpfs/data_ramdisk, however this directory might require large amount of space
156 156 cache_dir = %(here)s/data
157 157
158 158 ; ***************************************
159 159 ; `repo_object` cache, default file based
160 160 ; ***************************************
161 161
162 162 ; `repo_object` cache settings for vcs methods for repositories
163 163 rc_cache.repo_object.backend = dogpile.cache.rc.file_namespace
164 164
165 165 ; cache auto-expires after N seconds
166 166 ; Examples: 86400 (1Day), 604800 (7Days), 1209600 (14Days), 2592000 (30days), 7776000 (90Days)
167 167 rc_cache.repo_object.expiration_time = 2592000
168 168
169 169 ; file cache store path. Defaults to `cache_dir =` value or tempdir if both values are not set
170 170 #rc_cache.repo_object.arguments.filename = /tmp/vcsserver_cache.db
171 171
172 172 ; ***********************************************************
173 173 ; `repo_object` cache with redis backend
174 174 ; recommended for larger instance, and for better performance
175 175 ; ***********************************************************
176 176
177 177 ; `repo_object` cache settings for vcs methods for repositories
178 178 #rc_cache.repo_object.backend = dogpile.cache.rc.redis_msgpack
179 179
180 180 ; cache auto-expires after N seconds
181 181 ; Examples: 86400 (1Day), 604800 (7Days), 1209600 (14Days), 2592000 (30days), 7776000 (90Days)
182 182 #rc_cache.repo_object.expiration_time = 2592000
183 183
184 184 ; redis_expiration_time needs to be greater then expiration_time
185 185 #rc_cache.repo_object.arguments.redis_expiration_time = 3592000
186 186
187 187 #rc_cache.repo_object.arguments.host = localhost
188 188 #rc_cache.repo_object.arguments.port = 6379
189 189 #rc_cache.repo_object.arguments.db = 5
190 190 #rc_cache.repo_object.arguments.socket_timeout = 30
191 191 ; more Redis options: https://dogpilecache.sqlalchemy.org/en/latest/api.html#redis-backends
192 192 #rc_cache.repo_object.arguments.distributed_lock = true
193 193
194 ; Statsd client config
195 #statsd.enabled = false
196 #statsd.statsd_host = 0.0.0.0
197 #statsd.statsd_port = 8125
198 #statsd.statsd_prefix =
199 #statsd.statsd_ipv6 = false
194 200
195 201 ; #####################
196 202 ; LOGGING CONFIGURATION
197 203 ; #####################
198 204 [loggers]
199 205 keys = root, vcsserver
200 206
201 207 [handlers]
202 208 keys = console
203 209
204 210 [formatters]
205 211 keys = generic
206 212
207 213 ; #######
208 214 ; LOGGERS
209 215 ; #######
210 216 [logger_root]
211 217 level = NOTSET
212 218 handlers = console
213 219
214 220 [logger_vcsserver]
215 221 level = DEBUG
216 222 handlers =
217 223 qualname = vcsserver
218 224 propagate = 1
219 225
220 226
221 227 ; ########
222 228 ; HANDLERS
223 229 ; ########
224 230
225 231 [handler_console]
226 232 class = StreamHandler
227 233 args = (sys.stderr, )
228 234 level = DEBUG
229 235 formatter = generic
230 236
231 237 ; ##########
232 238 ; FORMATTERS
233 239 ; ##########
234 240
235 241 [formatter_generic]
236 242 format = %(asctime)s.%(msecs)03d [%(process)d] %(levelname)-5.5s [%(name)s] %(message)s
237 243 datefmt = %Y-%m-%d %H:%M:%S
@@ -1,200 +1,206 b''
1 1 ## -*- coding: utf-8 -*-
2 2
3 3 ; #################################
4 4 ; RHODECODE VCSSERVER CONFIGURATION
5 5 ; #################################
6 6
7 7 [server:main]
8 8 ; COMMON HOST/IP CONFIG
9 9 host = 127.0.0.1
10 10 port = 9900
11 11
12 12
13 13 ; ###########################
14 14 ; GUNICORN APPLICATION SERVER
15 15 ; ###########################
16 16
17 17 ; run with gunicorn --log-config rhodecode.ini --paste rhodecode.ini
18 18
19 19 ; Module to use, this setting shouldn't be changed
20 20 use = egg:gunicorn#main
21 21
22 22 ; Sets the number of process workers. More workers means more concurrent connections
23 23 ; RhodeCode can handle at the same time. Each additional worker also it increases
24 24 ; memory usage as each has it's own set of caches.
25 25 ; Recommended value is (2 * NUMBER_OF_CPUS + 1), eg 2CPU = 5 workers, but no more
26 26 ; than 8-10 unless for really big deployments .e.g 700-1000 users.
27 27 ; `instance_id = *` must be set in the [app:main] section below (which is the default)
28 28 ; when using more than 1 worker.
29 29 workers = 2
30 30
31 31 ; Gunicorn access log level
32 32 loglevel = info
33 33
34 34 ; Process name visible in process list
35 35 proc_name = rhodecode_vcsserver
36 36
37 37 ; Type of worker class, one of `sync`, `gevent`
38 38 ; currently `sync` is the only option allowed.
39 39 worker_class = sync
40 40
41 41 ; The maximum number of simultaneous clients. Valid only for gevent
42 42 worker_connections = 10
43 43
44 44 ; Max number of requests that worker will handle before being gracefully restarted.
45 45 ; Prevents memory leaks, jitter adds variability so not all workers are restarted at once.
46 46 max_requests = 1000
47 47 max_requests_jitter = 30
48 48
49 49 ; Amount of time a worker can spend with handling a request before it
50 50 ; gets killed and restarted. By default set to 21600 (6hrs)
51 51 ; Examples: 1800 (30min), 3600 (1hr), 7200 (2hr), 43200 (12h)
52 52 timeout = 21600
53 53
54 54 ; The maximum size of HTTP request line in bytes.
55 55 ; 0 for unlimited
56 56 limit_request_line = 0
57 57
58 58 ; Limit the number of HTTP headers fields in a request.
59 59 ; By default this value is 100 and can't be larger than 32768.
60 60 limit_request_fields = 32768
61 61
62 62 ; Limit the allowed size of an HTTP request header field.
63 63 ; Value is a positive number or 0.
64 64 ; Setting it to 0 will allow unlimited header field sizes.
65 65 limit_request_field_size = 0
66 66
67 67 ; Timeout for graceful workers restart.
68 68 ; After receiving a restart signal, workers have this much time to finish
69 69 ; serving requests. Workers still alive after the timeout (starting from the
70 70 ; receipt of the restart signal) are force killed.
71 71 ; Examples: 1800 (30min), 3600 (1hr), 7200 (2hr), 43200 (12h)
72 72 graceful_timeout = 3600
73 73
74 74 # The number of seconds to wait for requests on a Keep-Alive connection.
75 75 # Generally set in the 1-5 seconds range.
76 76 keepalive = 2
77 77
78 78 ; Maximum memory usage that each worker can use before it will receive a
79 79 ; graceful restart signal 0 = memory monitoring is disabled
80 80 ; Examples: 268435456 (256MB), 536870912 (512MB)
81 81 ; 1073741824 (1GB), 2147483648 (2GB), 4294967296 (4GB)
82 82 memory_max_usage = 0
83 83
84 84 ; How often in seconds to check for memory usage for each gunicorn worker
85 85 memory_usage_check_interval = 60
86 86
87 87 ; Threshold value for which we don't recycle worker if GarbageCollection
88 88 ; frees up enough resources. Before each restart we try to run GC on worker
89 89 ; in case we get enough free memory after that, restart will not happen.
90 90 memory_usage_recovery_threshold = 0.8
91 91
92 92
93 93 [app:main]
94 94 ; The %(here)s variable will be replaced with the absolute path of parent directory
95 95 ; of this file
96 96 use = egg:rhodecode-vcsserver
97 97
98 98 ; Pyramid default locales, we need this to be set
99 99 pyramid.default_locale_name = en
100 100
101 101 ; default locale used by VCS systems
102 102 locale = en_US.UTF-8
103 103
104 104 ; path to binaries for vcsserver, it should be set by the installer
105 105 ; at installation time, e.g /home/user/vcsserver-1/profile/bin
106 106 ; it can also be a path to nix-build output in case of development
107 107 core.binary_dir = ""
108 108
109 109 ; Custom exception store path, defaults to TMPDIR
110 110 ; This is used to store exception from RhodeCode in shared directory
111 111 #exception_tracker.store_path =
112 112
113 113 ; #############
114 114 ; DOGPILE CACHE
115 115 ; #############
116 116
117 117 ; Default cache dir for caches. Putting this into a ramdisk can boost performance.
118 118 ; eg. /tmpfs/data_ramdisk, however this directory might require large amount of space
119 119 cache_dir = %(here)s/data
120 120
121 121 ; ***************************************
122 122 ; `repo_object` cache, default file based
123 123 ; ***************************************
124 124
125 125 ; `repo_object` cache settings for vcs methods for repositories
126 126 rc_cache.repo_object.backend = dogpile.cache.rc.file_namespace
127 127
128 128 ; cache auto-expires after N seconds
129 129 ; Examples: 86400 (1Day), 604800 (7Days), 1209600 (14Days), 2592000 (30days), 7776000 (90Days)
130 130 rc_cache.repo_object.expiration_time = 2592000
131 131
132 132 ; file cache store path. Defaults to `cache_dir =` value or tempdir if both values are not set
133 133 #rc_cache.repo_object.arguments.filename = /tmp/vcsserver_cache.db
134 134
135 135 ; ***********************************************************
136 136 ; `repo_object` cache with redis backend
137 137 ; recommended for larger instance, and for better performance
138 138 ; ***********************************************************
139 139
140 140 ; `repo_object` cache settings for vcs methods for repositories
141 141 #rc_cache.repo_object.backend = dogpile.cache.rc.redis_msgpack
142 142
143 143 ; cache auto-expires after N seconds
144 144 ; Examples: 86400 (1Day), 604800 (7Days), 1209600 (14Days), 2592000 (30days), 7776000 (90Days)
145 145 #rc_cache.repo_object.expiration_time = 2592000
146 146
147 147 ; redis_expiration_time needs to be greater then expiration_time
148 148 #rc_cache.repo_object.arguments.redis_expiration_time = 3592000
149 149
150 150 #rc_cache.repo_object.arguments.host = localhost
151 151 #rc_cache.repo_object.arguments.port = 6379
152 152 #rc_cache.repo_object.arguments.db = 5
153 153 #rc_cache.repo_object.arguments.socket_timeout = 30
154 154 ; more Redis options: https://dogpilecache.sqlalchemy.org/en/latest/api.html#redis-backends
155 155 #rc_cache.repo_object.arguments.distributed_lock = true
156 156
157 ; Statsd client config
158 #statsd.enabled = false
159 #statsd.statsd_host = 0.0.0.0
160 #statsd.statsd_port = 8125
161 #statsd.statsd_prefix =
162 #statsd.statsd_ipv6 = false
157 163
158 164 ; #####################
159 165 ; LOGGING CONFIGURATION
160 166 ; #####################
161 167 [loggers]
162 168 keys = root, vcsserver
163 169
164 170 [handlers]
165 171 keys = console
166 172
167 173 [formatters]
168 174 keys = generic
169 175
170 176 ; #######
171 177 ; LOGGERS
172 178 ; #######
173 179 [logger_root]
174 180 level = NOTSET
175 181 handlers = console
176 182
177 183 [logger_vcsserver]
178 184 level = DEBUG
179 185 handlers =
180 186 qualname = vcsserver
181 187 propagate = 1
182 188
183 189
184 190 ; ########
185 191 ; HANDLERS
186 192 ; ########
187 193
188 194 [handler_console]
189 195 class = StreamHandler
190 196 args = (sys.stderr, )
191 197 level = INFO
192 198 formatter = generic
193 199
194 200 ; ##########
195 201 ; FORMATTERS
196 202 ; ##########
197 203
198 204 [formatter_generic]
199 205 format = %(asctime)s.%(msecs)03d [%(process)d] %(levelname)-5.5s [%(name)s] %(message)s
200 206 datefmt = %Y-%m-%d %H:%M:%S
@@ -1,700 +1,704 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import os
19 19 import sys
20 20 import base64
21 21 import locale
22 22 import logging
23 23 import uuid
24 24 import wsgiref.util
25 25 import traceback
26 26 import tempfile
27 import resource
28 27 import psutil
29 28 from itertools import chain
30 29 from cStringIO import StringIO
31 30
32 31 import simplejson as json
33 32 import msgpack
34 33 from pyramid.config import Configurator
35 34 from pyramid.settings import asbool, aslist
36 35 from pyramid.wsgi import wsgiapp
37 36 from pyramid.compat import configparser
38 37 from pyramid.response import Response
39 38
40 39 from vcsserver.utils import safe_int
41 40
42 41 log = logging.getLogger(__name__)
43 42
44 43 # due to Mercurial/glibc2.27 problems we need to detect if locale settings are
45 44 # causing problems and "fix" it in case they do and fallback to LC_ALL = C
46 45
47 46 try:
48 47 locale.setlocale(locale.LC_ALL, '')
49 48 except locale.Error as e:
50 49 log.error(
51 50 'LOCALE ERROR: failed to set LC_ALL, fallback to LC_ALL=C, org error: %s', e)
52 51 os.environ['LC_ALL'] = 'C'
53 52
54 53 import vcsserver
55 54 from vcsserver import remote_wsgi, scm_app, settings, hgpatches
56 55 from vcsserver.git_lfs.app import GIT_LFS_CONTENT_TYPE, GIT_LFS_PROTO_PAT
57 56 from vcsserver.echo_stub import remote_wsgi as remote_wsgi_stub
58 57 from vcsserver.echo_stub.echo_app import EchoApp
59 58 from vcsserver.exceptions import HTTPRepoLocked, HTTPRepoBranchProtected
60 59 from vcsserver.lib.exc_tracking import store_exception
61 60 from vcsserver.server import VcsServer
62 61
63 62 try:
64 63 from vcsserver.git import GitFactory, GitRemote
65 64 except ImportError:
66 65 GitFactory = None
67 66 GitRemote = None
68 67
69 68 try:
70 69 from vcsserver.hg import MercurialFactory, HgRemote
71 70 except ImportError:
72 71 MercurialFactory = None
73 72 HgRemote = None
74 73
75 74 try:
76 75 from vcsserver.svn import SubversionFactory, SvnRemote
77 76 except ImportError:
78 77 SubversionFactory = None
79 78 SvnRemote = None
80 79
81 80
82 81 def _is_request_chunked(environ):
83 82 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
84 83 return stream
85 84
86 85
87 86 def _int_setting(settings, name, default):
88 87 settings[name] = int(settings.get(name, default))
89 88 return settings[name]
90 89
91 90
92 91 def _bool_setting(settings, name, default):
93 92 input_val = settings.get(name, default)
94 93 if isinstance(input_val, unicode):
95 94 input_val = input_val.encode('utf8')
96 95 settings[name] = asbool(input_val)
97 96 return settings[name]
98 97
99 98
100 99 def _list_setting(settings, name, default):
101 100 raw_value = settings.get(name, default)
102 101
103 102 # Otherwise we assume it uses pyramids space/newline separation.
104 103 settings[name] = aslist(raw_value)
105 104 return settings[name]
106 105
107 106
108 107 def _string_setting(settings, name, default, lower=True, default_when_empty=False):
109 108 value = settings.get(name, default)
110 109
111 110 if default_when_empty and not value:
112 111 # use default value when value is empty
113 112 value = default
114 113
115 114 if lower:
116 115 value = value.lower()
117 116 settings[name] = value
118 117 return settings[name]
119 118
120 119
121 120 def log_max_fd():
122 121 try:
123 122 maxfd = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)[1]
124 123 log.info('Max file descriptors value: %s', maxfd)
125 124 except Exception:
126 125 pass
127 126
128 127
129 128 class VCS(object):
130 129 def __init__(self, locale_conf=None, cache_config=None):
131 130 self.locale = locale_conf
132 131 self.cache_config = cache_config
133 132 self._configure_locale()
134 133
135 134 log_max_fd()
136 135
137 136 if GitFactory and GitRemote:
138 137 git_factory = GitFactory()
139 138 self._git_remote = GitRemote(git_factory)
140 139 else:
141 140 log.info("Git client import failed")
142 141
143 142 if MercurialFactory and HgRemote:
144 143 hg_factory = MercurialFactory()
145 144 self._hg_remote = HgRemote(hg_factory)
146 145 else:
147 146 log.info("Mercurial client import failed")
148 147
149 148 if SubversionFactory and SvnRemote:
150 149 svn_factory = SubversionFactory()
151 150
152 151 # hg factory is used for svn url validation
153 152 hg_factory = MercurialFactory()
154 153 self._svn_remote = SvnRemote(svn_factory, hg_factory=hg_factory)
155 154 else:
156 155 log.info("Subversion client import failed")
157 156
158 157 self._vcsserver = VcsServer()
159 158
160 159 def _configure_locale(self):
161 160 if self.locale:
162 161 log.info('Settings locale: `LC_ALL` to %s', self.locale)
163 162 else:
164 163 log.info(
165 164 'Configuring locale subsystem based on environment variables')
166 165 try:
167 166 # If self.locale is the empty string, then the locale
168 167 # module will use the environment variables. See the
169 168 # documentation of the package `locale`.
170 169 locale.setlocale(locale.LC_ALL, self.locale)
171 170
172 171 language_code, encoding = locale.getlocale()
173 172 log.info(
174 173 'Locale set to language code "%s" with encoding "%s".',
175 174 language_code, encoding)
176 175 except locale.Error:
177 176 log.exception(
178 177 'Cannot set locale, not configuring the locale system')
179 178
180 179
181 180 class WsgiProxy(object):
182 181 def __init__(self, wsgi):
183 182 self.wsgi = wsgi
184 183
185 184 def __call__(self, environ, start_response):
186 185 input_data = environ['wsgi.input'].read()
187 186 input_data = msgpack.unpackb(input_data)
188 187
189 188 error = None
190 189 try:
191 190 data, status, headers = self.wsgi.handle(
192 191 input_data['environment'], input_data['input_data'],
193 192 *input_data['args'], **input_data['kwargs'])
194 193 except Exception as e:
195 194 data, status, headers = [], None, None
196 195 error = {
197 196 'message': str(e),
198 197 '_vcs_kind': getattr(e, '_vcs_kind', None)
199 198 }
200 199
201 200 start_response(200, {})
202 201 return self._iterator(error, status, headers, data)
203 202
204 203 def _iterator(self, error, status, headers, data):
205 204 initial_data = [
206 205 error,
207 206 status,
208 207 headers,
209 208 ]
210 209
211 210 for d in chain(initial_data, data):
212 211 yield msgpack.packb(d)
213 212
214 213
215 214 def not_found(request):
216 215 return {'status': '404 NOT FOUND'}
217 216
218 217
219 218 class VCSViewPredicate(object):
220 219 def __init__(self, val, config):
221 220 self.remotes = val
222 221
223 222 def text(self):
224 223 return 'vcs view method = %s' % (self.remotes.keys(),)
225 224
226 225 phash = text
227 226
228 227 def __call__(self, context, request):
229 228 """
230 229 View predicate that returns true if given backend is supported by
231 230 defined remotes.
232 231 """
233 232 backend = request.matchdict.get('backend')
234 233 return backend in self.remotes
235 234
236 235
237 236 class HTTPApplication(object):
238 237 ALLOWED_EXCEPTIONS = ('KeyError', 'URLError')
239 238
240 239 remote_wsgi = remote_wsgi
241 240 _use_echo_app = False
242 241
243 242 def __init__(self, settings=None, global_config=None):
244 243 self._sanitize_settings_and_apply_defaults(settings)
245 244
246 245 self.config = Configurator(settings=settings)
247 246 self.global_config = global_config
248 247 self.config.include('vcsserver.lib.rc_cache')
249 248
250 249 settings_locale = settings.get('locale', '') or 'en_US.UTF-8'
251 250 vcs = VCS(locale_conf=settings_locale, cache_config=settings)
252 251 self._remotes = {
253 252 'hg': vcs._hg_remote,
254 253 'git': vcs._git_remote,
255 254 'svn': vcs._svn_remote,
256 255 'server': vcs._vcsserver,
257 256 }
258 257 if settings.get('dev.use_echo_app', 'false').lower() == 'true':
259 258 self._use_echo_app = True
260 259 log.warning("Using EchoApp for VCS operations.")
261 260 self.remote_wsgi = remote_wsgi_stub
262 261
263 262 self._configure_settings(global_config, settings)
263
264 264 self._configure()
265 265
266 266 def _configure_settings(self, global_config, app_settings):
267 267 """
268 268 Configure the settings module.
269 269 """
270 270 settings_merged = global_config.copy()
271 271 settings_merged.update(app_settings)
272 272
273 273 git_path = app_settings.get('git_path', None)
274 274 if git_path:
275 275 settings.GIT_EXECUTABLE = git_path
276 276 binary_dir = app_settings.get('core.binary_dir', None)
277 277 if binary_dir:
278 278 settings.BINARY_DIR = binary_dir
279 279
280 280 # Store the settings to make them available to other modules.
281 281 vcsserver.PYRAMID_SETTINGS = settings_merged
282 282 vcsserver.CONFIG = settings_merged
283 283
284 284 def _sanitize_settings_and_apply_defaults(self, settings):
285 285 temp_store = tempfile.gettempdir()
286 286 default_cache_dir = os.path.join(temp_store, 'rc_cache')
287 287
288 288 # save default, cache dir, and use it for all backends later.
289 289 default_cache_dir = _string_setting(
290 290 settings,
291 291 'cache_dir',
292 292 default_cache_dir, lower=False, default_when_empty=True)
293 293
294 294 # ensure we have our dir created
295 295 if not os.path.isdir(default_cache_dir):
296 296 os.makedirs(default_cache_dir, mode=0o755)
297 297
298 298 # exception store cache
299 299 _string_setting(
300 300 settings,
301 301 'exception_tracker.store_path',
302 302 temp_store, lower=False, default_when_empty=True)
303 303
304 304 # repo_object cache
305 305 _string_setting(
306 306 settings,
307 307 'rc_cache.repo_object.backend',
308 308 'dogpile.cache.rc.file_namespace', lower=False)
309 309 _int_setting(
310 310 settings,
311 311 'rc_cache.repo_object.expiration_time',
312 312 30 * 24 * 60 * 60)
313 313 _string_setting(
314 314 settings,
315 315 'rc_cache.repo_object.arguments.filename',
316 316 os.path.join(default_cache_dir, 'vcsserver_cache_1'), lower=False)
317 317
318 318 def _configure(self):
319 319 self.config.add_renderer(name='msgpack', factory=self._msgpack_renderer_factory)
320 320
321 321 self.config.add_route('service', '/_service')
322 322 self.config.add_route('status', '/status')
323 323 self.config.add_route('hg_proxy', '/proxy/hg')
324 324 self.config.add_route('git_proxy', '/proxy/git')
325 325
326 326 # rpc methods
327 327 self.config.add_route('vcs', '/{backend}')
328 328
329 329 # streaming rpc remote methods
330 330 self.config.add_route('vcs_stream', '/{backend}/stream')
331 331
332 332 # vcs operations clone/push as streaming
333 333 self.config.add_route('stream_git', '/stream/git/*repo_name')
334 334 self.config.add_route('stream_hg', '/stream/hg/*repo_name')
335 335
336 336 self.config.add_view(self.status_view, route_name='status', renderer='json')
337 337 self.config.add_view(self.service_view, route_name='service', renderer='msgpack')
338 338
339 339 self.config.add_view(self.hg_proxy(), route_name='hg_proxy')
340 340 self.config.add_view(self.git_proxy(), route_name='git_proxy')
341 341 self.config.add_view(self.vcs_view, route_name='vcs', renderer='msgpack',
342 342 vcs_view=self._remotes)
343 343 self.config.add_view(self.vcs_stream_view, route_name='vcs_stream',
344 344 vcs_view=self._remotes)
345 345
346 346 self.config.add_view(self.hg_stream(), route_name='stream_hg')
347 347 self.config.add_view(self.git_stream(), route_name='stream_git')
348 348
349 349 self.config.add_view_predicate('vcs_view', VCSViewPredicate)
350 350
351 351 self.config.add_notfound_view(not_found, renderer='json')
352 352
353 353 self.config.add_view(self.handle_vcs_exception, context=Exception)
354 354
355 355 self.config.add_tween(
356 356 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
357 357 )
358 358 self.config.add_request_method(
359 359 'vcsserver.lib.request_counter.get_request_counter',
360 360 'request_count')
361 361
362 self.config.add_request_method(
363 'vcsserver.lib._vendor.statsd.get_statsd_client',
364 'statsd', reify=True)
365
362 366 def wsgi_app(self):
363 367 return self.config.make_wsgi_app()
364 368
365 369 def _vcs_view_params(self, request):
366 370 remote = self._remotes[request.matchdict['backend']]
367 371 payload = msgpack.unpackb(request.body, use_list=True)
368 372 method = payload.get('method')
369 373 params = payload['params']
370 374 wire = params.get('wire')
371 375 args = params.get('args')
372 376 kwargs = params.get('kwargs')
373 377 context_uid = None
374 378
375 379 if wire:
376 380 try:
377 381 wire['context'] = context_uid = uuid.UUID(wire['context'])
378 382 except KeyError:
379 383 pass
380 384 args.insert(0, wire)
381 385 repo_state_uid = wire.get('repo_state_uid') if wire else None
382 386
383 387 # NOTE(marcink): trading complexity for slight performance
384 388 if log.isEnabledFor(logging.DEBUG):
385 389 no_args_methods = [
386 390
387 391 ]
388 392 if method in no_args_methods:
389 393 call_args = ''
390 394 else:
391 395 call_args = args[1:]
392 396
393 397 log.debug('method requested:%s with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s',
394 398 method, call_args, kwargs, context_uid, repo_state_uid)
395 399
396 400 return payload, remote, method, args, kwargs
397 401
398 402 def vcs_view(self, request):
399 403
400 404 payload, remote, method, args, kwargs = self._vcs_view_params(request)
401 405 payload_id = payload.get('id')
402 406
403 407 try:
404 408 resp = getattr(remote, method)(*args, **kwargs)
405 409 except Exception as e:
406 410 exc_info = list(sys.exc_info())
407 411 exc_type, exc_value, exc_traceback = exc_info
408 412
409 413 org_exc = getattr(e, '_org_exc', None)
410 414 org_exc_name = None
411 415 org_exc_tb = ''
412 416 if org_exc:
413 417 org_exc_name = org_exc.__class__.__name__
414 418 org_exc_tb = getattr(e, '_org_exc_tb', '')
415 419 # replace our "faked" exception with our org
416 420 exc_info[0] = org_exc.__class__
417 421 exc_info[1] = org_exc
418 422
419 423 should_store_exc = True
420 424 if org_exc:
421 425 def get_exc_fqn(_exc_obj):
422 426 module_name = getattr(org_exc.__class__, '__module__', 'UNKNOWN')
423 427 return module_name + '.' + org_exc_name
424 428
425 429 exc_fqn = get_exc_fqn(org_exc)
426 430
427 431 if exc_fqn in ['mercurial.error.RepoLookupError',
428 432 'vcsserver.exceptions.RefNotFoundException']:
429 433 should_store_exc = False
430 434
431 435 if should_store_exc:
432 436 store_exception(id(exc_info), exc_info)
433 437
434 438 tb_info = ''.join(
435 439 traceback.format_exception(exc_type, exc_value, exc_traceback))
436 440
437 441 type_ = e.__class__.__name__
438 442 if type_ not in self.ALLOWED_EXCEPTIONS:
439 443 type_ = None
440 444
441 445 resp = {
442 446 'id': payload_id,
443 447 'error': {
444 448 'message': e.message,
445 449 'traceback': tb_info,
446 450 'org_exc': org_exc_name,
447 451 'org_exc_tb': org_exc_tb,
448 452 'type': type_
449 453 }
450 454 }
451 455 try:
452 456 resp['error']['_vcs_kind'] = getattr(e, '_vcs_kind', None)
453 457 except AttributeError:
454 458 pass
455 459 else:
456 460 resp = {
457 461 'id': payload_id,
458 462 'result': resp
459 463 }
460 464
461 465 return resp
462 466
463 467 def vcs_stream_view(self, request):
464 468 payload, remote, method, args, kwargs = self._vcs_view_params(request)
465 469 # this method has a stream: marker we remove it here
466 470 method = method.split('stream:')[-1]
467 471 chunk_size = safe_int(payload.get('chunk_size')) or 4096
468 472
469 473 try:
470 474 resp = getattr(remote, method)(*args, **kwargs)
471 475 except Exception as e:
472 476 raise
473 477
474 478 def get_chunked_data(method_resp):
475 479 stream = StringIO(method_resp)
476 480 while 1:
477 481 chunk = stream.read(chunk_size)
478 482 if not chunk:
479 483 break
480 484 yield chunk
481 485
482 486 response = Response(app_iter=get_chunked_data(resp))
483 487 response.content_type = 'application/octet-stream'
484 488
485 489 return response
486 490
487 491 def status_view(self, request):
488 492 import vcsserver
489 493 return {'status': 'OK', 'vcsserver_version': vcsserver.__version__,
490 494 'pid': os.getpid()}
491 495
492 496 def service_view(self, request):
493 497 import vcsserver
494 498
495 499 payload = msgpack.unpackb(request.body, use_list=True)
496 500 server_config, app_config = {}, {}
497 501
498 502 try:
499 503 path = self.global_config['__file__']
500 504 config = configparser.RawConfigParser()
501 505
502 506 config.read(path)
503 507
504 508 if config.has_section('server:main'):
505 509 server_config = dict(config.items('server:main'))
506 510 if config.has_section('app:main'):
507 511 app_config = dict(config.items('app:main'))
508 512
509 513 except Exception:
510 514 log.exception('Failed to read .ini file for display')
511 515
512 516 environ = os.environ.items()
513 517
514 518 resp = {
515 519 'id': payload.get('id'),
516 520 'result': dict(
517 521 version=vcsserver.__version__,
518 522 config=server_config,
519 523 app_config=app_config,
520 524 environ=environ,
521 525 payload=payload,
522 526 )
523 527 }
524 528 return resp
525 529
526 530 def _msgpack_renderer_factory(self, info):
527 531 def _render(value, system):
528 532 request = system.get('request')
529 533 if request is not None:
530 534 response = request.response
531 535 ct = response.content_type
532 536 if ct == response.default_content_type:
533 537 response.content_type = 'application/x-msgpack'
534 538 return msgpack.packb(value)
535 539 return _render
536 540
537 541 def set_env_from_config(self, environ, config):
538 542 dict_conf = {}
539 543 try:
540 544 for elem in config:
541 545 if elem[0] == 'rhodecode':
542 546 dict_conf = json.loads(elem[2])
543 547 break
544 548 except Exception:
545 549 log.exception('Failed to fetch SCM CONFIG')
546 550 return
547 551
548 552 username = dict_conf.get('username')
549 553 if username:
550 554 environ['REMOTE_USER'] = username
551 555 # mercurial specific, some extension api rely on this
552 556 environ['HGUSER'] = username
553 557
554 558 ip = dict_conf.get('ip')
555 559 if ip:
556 560 environ['REMOTE_HOST'] = ip
557 561
558 562 if _is_request_chunked(environ):
559 563 # set the compatibility flag for webob
560 564 environ['wsgi.input_terminated'] = True
561 565
562 566 def hg_proxy(self):
563 567 @wsgiapp
564 568 def _hg_proxy(environ, start_response):
565 569 app = WsgiProxy(self.remote_wsgi.HgRemoteWsgi())
566 570 return app(environ, start_response)
567 571 return _hg_proxy
568 572
569 573 def git_proxy(self):
570 574 @wsgiapp
571 575 def _git_proxy(environ, start_response):
572 576 app = WsgiProxy(self.remote_wsgi.GitRemoteWsgi())
573 577 return app(environ, start_response)
574 578 return _git_proxy
575 579
576 580 def hg_stream(self):
577 581 if self._use_echo_app:
578 582 @wsgiapp
579 583 def _hg_stream(environ, start_response):
580 584 app = EchoApp('fake_path', 'fake_name', None)
581 585 return app(environ, start_response)
582 586 return _hg_stream
583 587 else:
584 588 @wsgiapp
585 589 def _hg_stream(environ, start_response):
586 590 log.debug('http-app: handling hg stream')
587 591 repo_path = environ['HTTP_X_RC_REPO_PATH']
588 592 repo_name = environ['HTTP_X_RC_REPO_NAME']
589 593 packed_config = base64.b64decode(
590 594 environ['HTTP_X_RC_REPO_CONFIG'])
591 595 config = msgpack.unpackb(packed_config)
592 596 app = scm_app.create_hg_wsgi_app(
593 597 repo_path, repo_name, config)
594 598
595 599 # Consistent path information for hgweb
596 600 environ['PATH_INFO'] = environ['HTTP_X_RC_PATH_INFO']
597 601 environ['REPO_NAME'] = repo_name
598 602 self.set_env_from_config(environ, config)
599 603
600 604 log.debug('http-app: starting app handler '
601 605 'with %s and process request', app)
602 606 return app(environ, ResponseFilter(start_response))
603 607 return _hg_stream
604 608
605 609 def git_stream(self):
606 610 if self._use_echo_app:
607 611 @wsgiapp
608 612 def _git_stream(environ, start_response):
609 613 app = EchoApp('fake_path', 'fake_name', None)
610 614 return app(environ, start_response)
611 615 return _git_stream
612 616 else:
613 617 @wsgiapp
614 618 def _git_stream(environ, start_response):
615 619 log.debug('http-app: handling git stream')
616 620 repo_path = environ['HTTP_X_RC_REPO_PATH']
617 621 repo_name = environ['HTTP_X_RC_REPO_NAME']
618 622 packed_config = base64.b64decode(
619 623 environ['HTTP_X_RC_REPO_CONFIG'])
620 624 config = msgpack.unpackb(packed_config)
621 625
622 626 environ['PATH_INFO'] = environ['HTTP_X_RC_PATH_INFO']
623 627 self.set_env_from_config(environ, config)
624 628
625 629 content_type = environ.get('CONTENT_TYPE', '')
626 630
627 631 path = environ['PATH_INFO']
628 632 is_lfs_request = GIT_LFS_CONTENT_TYPE in content_type
629 633 log.debug(
630 634 'LFS: Detecting if request `%s` is LFS server path based '
631 635 'on content type:`%s`, is_lfs:%s',
632 636 path, content_type, is_lfs_request)
633 637
634 638 if not is_lfs_request:
635 639 # fallback detection by path
636 640 if GIT_LFS_PROTO_PAT.match(path):
637 641 is_lfs_request = True
638 642 log.debug(
639 643 'LFS: fallback detection by path of: `%s`, is_lfs:%s',
640 644 path, is_lfs_request)
641 645
642 646 if is_lfs_request:
643 647 app = scm_app.create_git_lfs_wsgi_app(
644 648 repo_path, repo_name, config)
645 649 else:
646 650 app = scm_app.create_git_wsgi_app(
647 651 repo_path, repo_name, config)
648 652
649 653 log.debug('http-app: starting app handler '
650 654 'with %s and process request', app)
651 655
652 656 return app(environ, start_response)
653 657
654 658 return _git_stream
655 659
656 660 def handle_vcs_exception(self, exception, request):
657 661 _vcs_kind = getattr(exception, '_vcs_kind', '')
658 662 if _vcs_kind == 'repo_locked':
659 663 # Get custom repo-locked status code if present.
660 664 status_code = request.headers.get('X-RC-Locked-Status-Code')
661 665 return HTTPRepoLocked(
662 666 title=exception.message, status_code=status_code)
663 667
664 668 elif _vcs_kind == 'repo_branch_protected':
665 669 # Get custom repo-branch-protected status code if present.
666 670 return HTTPRepoBranchProtected(title=exception.message)
667 671
668 672 exc_info = request.exc_info
669 673 store_exception(id(exc_info), exc_info)
670 674
671 675 traceback_info = 'unavailable'
672 676 if request.exc_info:
673 677 exc_type, exc_value, exc_tb = request.exc_info
674 678 traceback_info = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb))
675 679
676 680 log.error(
677 681 'error occurred handling this request for path: %s, \n tb: %s',
678 682 request.path, traceback_info)
679 683 raise exception
680 684
681 685
682 686 class ResponseFilter(object):
683 687
684 688 def __init__(self, start_response):
685 689 self._start_response = start_response
686 690
687 691 def __call__(self, status, response_headers, exc_info=None):
688 692 headers = tuple(
689 693 (h, v) for h, v in response_headers
690 694 if not wsgiref.util.is_hop_by_hop(h))
691 695 return self._start_response(status, headers, exc_info)
692 696
693 697
694 698 def main(global_config, **settings):
695 699 if MercurialFactory:
696 700 hgpatches.patch_largefiles_capabilities()
697 701 hgpatches.patch_subrepo_type_mapping()
698 702
699 703 app = HTTPApplication(settings=settings, global_config=global_config)
700 704 return app.wsgi_app()
@@ -1,64 +1,70 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import time
19 19 import logging
20 20
21 21 import vcsserver
22 22 from vcsserver.utils import safe_str
23 23
24 24
25 25 log = logging.getLogger(__name__)
26 26
27 27
28 28 def get_access_path(request):
29 29 environ = request.environ
30 30 return environ.get('PATH_INFO')
31 31
32 32
33 33 def get_user_agent(environ):
34 34 return environ.get('HTTP_USER_AGENT')
35 35
36 36
37 37 class RequestWrapperTween(object):
38 38 def __init__(self, handler, registry):
39 39 self.handler = handler
40 40 self.registry = registry
41 41
42 42 # one-time configuration code goes here
43 43
44 44 def __call__(self, request):
45 45 start = time.time()
46 log.debug('Starting request time measurement')
46 47 try:
47 48 response = self.handler(request)
48 49 finally:
49 end = time.time()
50 total = end - start
51 50 count = request.request_count()
52 51 _ver_ = vcsserver.__version__
52 statsd = request.statsd
53 total = time.time() - start
54 if statsd:
55 statsd.timing('vcsserver.req.timing', total)
56 statsd.incr('vcsserver.req.count')
53 57 log.info(
54 58 'Req[%4s] IP: %s %s Request to %s time: %.4fs [%s], VCSServer %s',
55 59 count, '127.0.0.1', request.environ.get('REQUEST_METHOD'),
56 safe_str(get_access_path(request)), total, get_user_agent(request.environ), _ver_)
60 safe_str(get_access_path(request)), total,
61 get_user_agent(request.environ), _ver_
62 )
57 63
58 64 return response
59 65
60 66
61 67 def includeme(config):
62 68 config.add_tween(
63 69 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
64 70 )
General Comments 0
You need to be logged in to leave comments. Login now