##// END OF EJS Templates
metrics: use new statsd client logic, and start gathering new metrics
super-admin -
r1005:d51a7b83 default
parent child Browse files
Show More
@@ -0,0 +1,49 b''
1 from vcsserver.lib._vendor.statsd import client_from_config
2
3
4 class StatsdClientNotInitialised(Exception):
5 pass
6
7
8 class _Singleton(type):
9 """A metaclass that creates a Singleton base class when called."""
10
11 _instances = {}
12
13 def __call__(cls, *args, **kwargs):
14 if cls not in cls._instances:
15 cls._instances[cls] = super(_Singleton, cls).__call__(*args, **kwargs)
16 return cls._instances[cls]
17
18
19 class Singleton(_Singleton("SingletonMeta", (object,), {})):
20 pass
21
22
23 class StatsdClientClass(Singleton):
24 setup_run = False
25 statsd_client = None
26 statsd = None
27
28 def __getattribute__(self, name):
29
30 if name.startswith("statsd"):
31 if self.setup_run:
32 return super(StatsdClientClass, self).__getattribute__(name)
33 else:
34 return None
35 #raise StatsdClientNotInitialised("requested key was %s" % name)
36
37 return super(StatsdClientClass, self).__getattribute__(name)
38
39 def setup(self, settings):
40 """
41 Initialize the client
42 """
43 statsd = client_from_config(settings)
44 self.statsd = statsd
45 self.statsd_client = statsd
46 self.setup_run = True
47
48
49 StatsdClient = StatsdClientClass()
@@ -1,705 +1,718 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2020 RhodeCode GmbH
2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import os
18 import os
19 import sys
19 import sys
20 import base64
20 import base64
21 import locale
21 import locale
22 import logging
22 import logging
23 import uuid
23 import uuid
24 import wsgiref.util
24 import wsgiref.util
25 import traceback
25 import traceback
26 import tempfile
26 import tempfile
27 import psutil
27 import psutil
28 from itertools import chain
28 from itertools import chain
29 from cStringIO import StringIO
29 from cStringIO import StringIO
30
30
31 import simplejson as json
31 import simplejson as json
32 import msgpack
32 import msgpack
33 from pyramid.config import Configurator
33 from pyramid.config import Configurator
34 from pyramid.settings import asbool, aslist
34 from pyramid.settings import asbool, aslist
35 from pyramid.wsgi import wsgiapp
35 from pyramid.wsgi import wsgiapp
36 from pyramid.compat import configparser
36 from pyramid.compat import configparser
37 from pyramid.response import Response
37 from pyramid.response import Response
38
38
39 from vcsserver.utils import safe_int
39 from vcsserver.utils import safe_int
40 from vcsserver.lib.statsd_client import StatsdClient
40
41
41 log = logging.getLogger(__name__)
42 log = logging.getLogger(__name__)
42
43
43 # due to Mercurial/glibc2.27 problems we need to detect if locale settings are
44 # due to Mercurial/glibc2.27 problems we need to detect if locale settings are
44 # causing problems and "fix" it in case they do and fallback to LC_ALL = C
45 # causing problems and "fix" it in case they do and fallback to LC_ALL = C
45
46
46 try:
47 try:
47 locale.setlocale(locale.LC_ALL, '')
48 locale.setlocale(locale.LC_ALL, '')
48 except locale.Error as e:
49 except locale.Error as e:
49 log.error(
50 log.error(
50 'LOCALE ERROR: failed to set LC_ALL, fallback to LC_ALL=C, org error: %s', e)
51 'LOCALE ERROR: failed to set LC_ALL, fallback to LC_ALL=C, org error: %s', e)
51 os.environ['LC_ALL'] = 'C'
52 os.environ['LC_ALL'] = 'C'
52
53
53 import vcsserver
54 import vcsserver
54 from vcsserver import remote_wsgi, scm_app, settings, hgpatches
55 from vcsserver import remote_wsgi, scm_app, settings, hgpatches
55 from vcsserver.git_lfs.app import GIT_LFS_CONTENT_TYPE, GIT_LFS_PROTO_PAT
56 from vcsserver.git_lfs.app import GIT_LFS_CONTENT_TYPE, GIT_LFS_PROTO_PAT
56 from vcsserver.echo_stub import remote_wsgi as remote_wsgi_stub
57 from vcsserver.echo_stub import remote_wsgi as remote_wsgi_stub
57 from vcsserver.echo_stub.echo_app import EchoApp
58 from vcsserver.echo_stub.echo_app import EchoApp
58 from vcsserver.exceptions import HTTPRepoLocked, HTTPRepoBranchProtected
59 from vcsserver.exceptions import HTTPRepoLocked, HTTPRepoBranchProtected
59 from vcsserver.lib.exc_tracking import store_exception
60 from vcsserver.lib.exc_tracking import store_exception
60 from vcsserver.server import VcsServer
61 from vcsserver.server import VcsServer
61
62
62 try:
63 try:
63 from vcsserver.git import GitFactory, GitRemote
64 from vcsserver.git import GitFactory, GitRemote
64 except ImportError:
65 except ImportError:
65 GitFactory = None
66 GitFactory = None
66 GitRemote = None
67 GitRemote = None
67
68
68 try:
69 try:
69 from vcsserver.hg import MercurialFactory, HgRemote
70 from vcsserver.hg import MercurialFactory, HgRemote
70 except ImportError:
71 except ImportError:
71 MercurialFactory = None
72 MercurialFactory = None
72 HgRemote = None
73 HgRemote = None
73
74
74 try:
75 try:
75 from vcsserver.svn import SubversionFactory, SvnRemote
76 from vcsserver.svn import SubversionFactory, SvnRemote
76 except ImportError:
77 except ImportError:
77 SubversionFactory = None
78 SubversionFactory = None
78 SvnRemote = None
79 SvnRemote = None
79
80
80
81
81 def _is_request_chunked(environ):
82 def _is_request_chunked(environ):
82 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
83 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
83 return stream
84 return stream
84
85
85
86
86 def _int_setting(settings, name, default):
87 def _int_setting(settings, name, default):
87 settings[name] = int(settings.get(name, default))
88 settings[name] = int(settings.get(name, default))
88 return settings[name]
89 return settings[name]
89
90
90
91
91 def _bool_setting(settings, name, default):
92 def _bool_setting(settings, name, default):
92 input_val = settings.get(name, default)
93 input_val = settings.get(name, default)
93 if isinstance(input_val, unicode):
94 if isinstance(input_val, unicode):
94 input_val = input_val.encode('utf8')
95 input_val = input_val.encode('utf8')
95 settings[name] = asbool(input_val)
96 settings[name] = asbool(input_val)
96 return settings[name]
97 return settings[name]
97
98
98
99
99 def _list_setting(settings, name, default):
100 def _list_setting(settings, name, default):
100 raw_value = settings.get(name, default)
101 raw_value = settings.get(name, default)
101
102
102 # Otherwise we assume it uses pyramids space/newline separation.
103 # Otherwise we assume it uses pyramids space/newline separation.
103 settings[name] = aslist(raw_value)
104 settings[name] = aslist(raw_value)
104 return settings[name]
105 return settings[name]
105
106
106
107
107 def _string_setting(settings, name, default, lower=True, default_when_empty=False):
108 def _string_setting(settings, name, default, lower=True, default_when_empty=False):
108 value = settings.get(name, default)
109 value = settings.get(name, default)
109
110
110 if default_when_empty and not value:
111 if default_when_empty and not value:
111 # use default value when value is empty
112 # use default value when value is empty
112 value = default
113 value = default
113
114
114 if lower:
115 if lower:
115 value = value.lower()
116 value = value.lower()
116 settings[name] = value
117 settings[name] = value
117 return settings[name]
118 return settings[name]
118
119
119
120
120 def log_max_fd():
121 def log_max_fd():
121 try:
122 try:
122 maxfd = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)[1]
123 maxfd = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)[1]
123 log.info('Max file descriptors value: %s', maxfd)
124 log.info('Max file descriptors value: %s', maxfd)
124 except Exception:
125 except Exception:
125 pass
126 pass
126
127
127
128
128 class VCS(object):
129 class VCS(object):
129 def __init__(self, locale_conf=None, cache_config=None):
130 def __init__(self, locale_conf=None, cache_config=None):
130 self.locale = locale_conf
131 self.locale = locale_conf
131 self.cache_config = cache_config
132 self.cache_config = cache_config
132 self._configure_locale()
133 self._configure_locale()
133
134
134 log_max_fd()
135 log_max_fd()
135
136
136 if GitFactory and GitRemote:
137 if GitFactory and GitRemote:
137 git_factory = GitFactory()
138 git_factory = GitFactory()
138 self._git_remote = GitRemote(git_factory)
139 self._git_remote = GitRemote(git_factory)
139 else:
140 else:
140 log.info("Git client import failed")
141 log.info("Git client import failed")
141
142
142 if MercurialFactory and HgRemote:
143 if MercurialFactory and HgRemote:
143 hg_factory = MercurialFactory()
144 hg_factory = MercurialFactory()
144 self._hg_remote = HgRemote(hg_factory)
145 self._hg_remote = HgRemote(hg_factory)
145 else:
146 else:
146 log.info("Mercurial client import failed")
147 log.info("Mercurial client import failed")
147
148
148 if SubversionFactory and SvnRemote:
149 if SubversionFactory and SvnRemote:
149 svn_factory = SubversionFactory()
150 svn_factory = SubversionFactory()
150
151
151 # hg factory is used for svn url validation
152 # hg factory is used for svn url validation
152 hg_factory = MercurialFactory()
153 hg_factory = MercurialFactory()
153 self._svn_remote = SvnRemote(svn_factory, hg_factory=hg_factory)
154 self._svn_remote = SvnRemote(svn_factory, hg_factory=hg_factory)
154 else:
155 else:
155 log.info("Subversion client import failed")
156 log.info("Subversion client import failed")
156
157
157 self._vcsserver = VcsServer()
158 self._vcsserver = VcsServer()
158
159
159 def _configure_locale(self):
160 def _configure_locale(self):
160 if self.locale:
161 if self.locale:
161 log.info('Settings locale: `LC_ALL` to %s', self.locale)
162 log.info('Settings locale: `LC_ALL` to %s', self.locale)
162 else:
163 else:
163 log.info(
164 log.info(
164 'Configuring locale subsystem based on environment variables')
165 'Configuring locale subsystem based on environment variables')
165 try:
166 try:
166 # If self.locale is the empty string, then the locale
167 # If self.locale is the empty string, then the locale
167 # module will use the environment variables. See the
168 # module will use the environment variables. See the
168 # documentation of the package `locale`.
169 # documentation of the package `locale`.
169 locale.setlocale(locale.LC_ALL, self.locale)
170 locale.setlocale(locale.LC_ALL, self.locale)
170
171
171 language_code, encoding = locale.getlocale()
172 language_code, encoding = locale.getlocale()
172 log.info(
173 log.info(
173 'Locale set to language code "%s" with encoding "%s".',
174 'Locale set to language code "%s" with encoding "%s".',
174 language_code, encoding)
175 language_code, encoding)
175 except locale.Error:
176 except locale.Error:
176 log.exception(
177 log.exception(
177 'Cannot set locale, not configuring the locale system')
178 'Cannot set locale, not configuring the locale system')
178
179
179
180
180 class WsgiProxy(object):
181 class WsgiProxy(object):
181 def __init__(self, wsgi):
182 def __init__(self, wsgi):
182 self.wsgi = wsgi
183 self.wsgi = wsgi
183
184
184 def __call__(self, environ, start_response):
185 def __call__(self, environ, start_response):
185 input_data = environ['wsgi.input'].read()
186 input_data = environ['wsgi.input'].read()
186 input_data = msgpack.unpackb(input_data)
187 input_data = msgpack.unpackb(input_data)
187
188
188 error = None
189 error = None
189 try:
190 try:
190 data, status, headers = self.wsgi.handle(
191 data, status, headers = self.wsgi.handle(
191 input_data['environment'], input_data['input_data'],
192 input_data['environment'], input_data['input_data'],
192 *input_data['args'], **input_data['kwargs'])
193 *input_data['args'], **input_data['kwargs'])
193 except Exception as e:
194 except Exception as e:
194 data, status, headers = [], None, None
195 data, status, headers = [], None, None
195 error = {
196 error = {
196 'message': str(e),
197 'message': str(e),
197 '_vcs_kind': getattr(e, '_vcs_kind', None)
198 '_vcs_kind': getattr(e, '_vcs_kind', None)
198 }
199 }
199
200
200 start_response(200, {})
201 start_response(200, {})
201 return self._iterator(error, status, headers, data)
202 return self._iterator(error, status, headers, data)
202
203
203 def _iterator(self, error, status, headers, data):
204 def _iterator(self, error, status, headers, data):
204 initial_data = [
205 initial_data = [
205 error,
206 error,
206 status,
207 status,
207 headers,
208 headers,
208 ]
209 ]
209
210
210 for d in chain(initial_data, data):
211 for d in chain(initial_data, data):
211 yield msgpack.packb(d)
212 yield msgpack.packb(d)
212
213
213
214
214 def not_found(request):
215 def not_found(request):
215 return {'status': '404 NOT FOUND'}
216 return {'status': '404 NOT FOUND'}
216
217
217
218
218 class VCSViewPredicate(object):
219 class VCSViewPredicate(object):
219 def __init__(self, val, config):
220 def __init__(self, val, config):
220 self.remotes = val
221 self.remotes = val
221
222
222 def text(self):
223 def text(self):
223 return 'vcs view method = %s' % (self.remotes.keys(),)
224 return 'vcs view method = %s' % (self.remotes.keys(),)
224
225
225 phash = text
226 phash = text
226
227
227 def __call__(self, context, request):
228 def __call__(self, context, request):
228 """
229 """
229 View predicate that returns true if given backend is supported by
230 View predicate that returns true if given backend is supported by
230 defined remotes.
231 defined remotes.
231 """
232 """
232 backend = request.matchdict.get('backend')
233 backend = request.matchdict.get('backend')
233 return backend in self.remotes
234 return backend in self.remotes
234
235
235
236
236 class HTTPApplication(object):
237 class HTTPApplication(object):
237 ALLOWED_EXCEPTIONS = ('KeyError', 'URLError')
238 ALLOWED_EXCEPTIONS = ('KeyError', 'URLError')
238
239
239 remote_wsgi = remote_wsgi
240 remote_wsgi = remote_wsgi
240 _use_echo_app = False
241 _use_echo_app = False
241
242
242 def __init__(self, settings=None, global_config=None):
243 def __init__(self, settings=None, global_config=None):
243 self._sanitize_settings_and_apply_defaults(settings)
244 self._sanitize_settings_and_apply_defaults(settings)
244
245
245 self.config = Configurator(settings=settings)
246 self.config = Configurator(settings=settings)
247 # Init our statsd at very start
248 self.config.registry.statsd = StatsdClient.statsd
249
246 self.global_config = global_config
250 self.global_config = global_config
247 self.config.include('vcsserver.lib.rc_cache')
251 self.config.include('vcsserver.lib.rc_cache')
248
252
249 settings_locale = settings.get('locale', '') or 'en_US.UTF-8'
253 settings_locale = settings.get('locale', '') or 'en_US.UTF-8'
250 vcs = VCS(locale_conf=settings_locale, cache_config=settings)
254 vcs = VCS(locale_conf=settings_locale, cache_config=settings)
251 self._remotes = {
255 self._remotes = {
252 'hg': vcs._hg_remote,
256 'hg': vcs._hg_remote,
253 'git': vcs._git_remote,
257 'git': vcs._git_remote,
254 'svn': vcs._svn_remote,
258 'svn': vcs._svn_remote,
255 'server': vcs._vcsserver,
259 'server': vcs._vcsserver,
256 }
260 }
257 if settings.get('dev.use_echo_app', 'false').lower() == 'true':
261 if settings.get('dev.use_echo_app', 'false').lower() == 'true':
258 self._use_echo_app = True
262 self._use_echo_app = True
259 log.warning("Using EchoApp for VCS operations.")
263 log.warning("Using EchoApp for VCS operations.")
260 self.remote_wsgi = remote_wsgi_stub
264 self.remote_wsgi = remote_wsgi_stub
261
265
262 self._configure_settings(global_config, settings)
266 self._configure_settings(global_config, settings)
263
267
264 self._configure()
268 self._configure()
265
269
266 def _configure_settings(self, global_config, app_settings):
270 def _configure_settings(self, global_config, app_settings):
267 """
271 """
268 Configure the settings module.
272 Configure the settings module.
269 """
273 """
270 settings_merged = global_config.copy()
274 settings_merged = global_config.copy()
271 settings_merged.update(app_settings)
275 settings_merged.update(app_settings)
272
276
273 git_path = app_settings.get('git_path', None)
277 git_path = app_settings.get('git_path', None)
274 if git_path:
278 if git_path:
275 settings.GIT_EXECUTABLE = git_path
279 settings.GIT_EXECUTABLE = git_path
276 binary_dir = app_settings.get('core.binary_dir', None)
280 binary_dir = app_settings.get('core.binary_dir', None)
277 if binary_dir:
281 if binary_dir:
278 settings.BINARY_DIR = binary_dir
282 settings.BINARY_DIR = binary_dir
279
283
280 # Store the settings to make them available to other modules.
284 # Store the settings to make them available to other modules.
281 vcsserver.PYRAMID_SETTINGS = settings_merged
285 vcsserver.PYRAMID_SETTINGS = settings_merged
282 vcsserver.CONFIG = settings_merged
286 vcsserver.CONFIG = settings_merged
283
287
284 def _sanitize_settings_and_apply_defaults(self, settings):
288 def _sanitize_settings_and_apply_defaults(self, settings):
285 temp_store = tempfile.gettempdir()
289 temp_store = tempfile.gettempdir()
286 default_cache_dir = os.path.join(temp_store, 'rc_cache')
290 default_cache_dir = os.path.join(temp_store, 'rc_cache')
287
291
288 # save default, cache dir, and use it for all backends later.
292 # save default, cache dir, and use it for all backends later.
289 default_cache_dir = _string_setting(
293 default_cache_dir = _string_setting(
290 settings,
294 settings,
291 'cache_dir',
295 'cache_dir',
292 default_cache_dir, lower=False, default_when_empty=True)
296 default_cache_dir, lower=False, default_when_empty=True)
293
297
294 # ensure we have our dir created
298 # ensure we have our dir created
295 if not os.path.isdir(default_cache_dir):
299 if not os.path.isdir(default_cache_dir):
296 os.makedirs(default_cache_dir, mode=0o755)
300 os.makedirs(default_cache_dir, mode=0o755)
297
301
298 # exception store cache
302 # exception store cache
299 _string_setting(
303 _string_setting(
300 settings,
304 settings,
301 'exception_tracker.store_path',
305 'exception_tracker.store_path',
302 temp_store, lower=False, default_when_empty=True)
306 temp_store, lower=False, default_when_empty=True)
303
307
304 # repo_object cache
308 # repo_object cache
305 _string_setting(
309 _string_setting(
306 settings,
310 settings,
307 'rc_cache.repo_object.backend',
311 'rc_cache.repo_object.backend',
308 'dogpile.cache.rc.file_namespace', lower=False)
312 'dogpile.cache.rc.file_namespace', lower=False)
309 _int_setting(
313 _int_setting(
310 settings,
314 settings,
311 'rc_cache.repo_object.expiration_time',
315 'rc_cache.repo_object.expiration_time',
312 30 * 24 * 60 * 60)
316 30 * 24 * 60 * 60)
313 _string_setting(
317 _string_setting(
314 settings,
318 settings,
315 'rc_cache.repo_object.arguments.filename',
319 'rc_cache.repo_object.arguments.filename',
316 os.path.join(default_cache_dir, 'vcsserver_cache_1'), lower=False)
320 os.path.join(default_cache_dir, 'vcsserver_cache_1'), lower=False)
317
321
318 def _configure(self):
322 def _configure(self):
319 self.config.add_renderer(name='msgpack', factory=self._msgpack_renderer_factory)
323 self.config.add_renderer(name='msgpack', factory=self._msgpack_renderer_factory)
320
324
321 self.config.add_route('service', '/_service')
325 self.config.add_route('service', '/_service')
322 self.config.add_route('status', '/status')
326 self.config.add_route('status', '/status')
323 self.config.add_route('hg_proxy', '/proxy/hg')
327 self.config.add_route('hg_proxy', '/proxy/hg')
324 self.config.add_route('git_proxy', '/proxy/git')
328 self.config.add_route('git_proxy', '/proxy/git')
325
329
326 # rpc methods
330 # rpc methods
327 self.config.add_route('vcs', '/{backend}')
331 self.config.add_route('vcs', '/{backend}')
328
332
329 # streaming rpc remote methods
333 # streaming rpc remote methods
330 self.config.add_route('vcs_stream', '/{backend}/stream')
334 self.config.add_route('vcs_stream', '/{backend}/stream')
331
335
332 # vcs operations clone/push as streaming
336 # vcs operations clone/push as streaming
333 self.config.add_route('stream_git', '/stream/git/*repo_name')
337 self.config.add_route('stream_git', '/stream/git/*repo_name')
334 self.config.add_route('stream_hg', '/stream/hg/*repo_name')
338 self.config.add_route('stream_hg', '/stream/hg/*repo_name')
335
339
336 self.config.add_view(self.status_view, route_name='status', renderer='json')
340 self.config.add_view(self.status_view, route_name='status', renderer='json')
337 self.config.add_view(self.service_view, route_name='service', renderer='msgpack')
341 self.config.add_view(self.service_view, route_name='service', renderer='msgpack')
338
342
339 self.config.add_view(self.hg_proxy(), route_name='hg_proxy')
343 self.config.add_view(self.hg_proxy(), route_name='hg_proxy')
340 self.config.add_view(self.git_proxy(), route_name='git_proxy')
344 self.config.add_view(self.git_proxy(), route_name='git_proxy')
341 self.config.add_view(self.vcs_view, route_name='vcs', renderer='msgpack',
345 self.config.add_view(self.vcs_view, route_name='vcs', renderer='msgpack',
342 vcs_view=self._remotes)
346 vcs_view=self._remotes)
343 self.config.add_view(self.vcs_stream_view, route_name='vcs_stream',
347 self.config.add_view(self.vcs_stream_view, route_name='vcs_stream',
344 vcs_view=self._remotes)
348 vcs_view=self._remotes)
345
349
346 self.config.add_view(self.hg_stream(), route_name='stream_hg')
350 self.config.add_view(self.hg_stream(), route_name='stream_hg')
347 self.config.add_view(self.git_stream(), route_name='stream_git')
351 self.config.add_view(self.git_stream(), route_name='stream_git')
348
352
349 self.config.add_view_predicate('vcs_view', VCSViewPredicate)
353 self.config.add_view_predicate('vcs_view', VCSViewPredicate)
350
354
351 self.config.add_notfound_view(not_found, renderer='json')
355 self.config.add_notfound_view(not_found, renderer='json')
352
356
353 self.config.add_view(self.handle_vcs_exception, context=Exception)
357 self.config.add_view(self.handle_vcs_exception, context=Exception)
354
358
355 self.config.add_tween(
359 self.config.add_tween(
356 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
360 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
357 )
361 )
358 self.config.add_request_method(
362 self.config.add_request_method(
359 'vcsserver.lib.request_counter.get_request_counter',
363 'vcsserver.lib.request_counter.get_request_counter',
360 'request_count')
364 'request_count')
361
365
362 self.config.add_request_method(
363 'vcsserver.lib._vendor.statsd.get_statsd_client',
364 'statsd', reify=True)
365
366 def wsgi_app(self):
366 def wsgi_app(self):
367 return self.config.make_wsgi_app()
367 return self.config.make_wsgi_app()
368
368
369 def _vcs_view_params(self, request):
369 def _vcs_view_params(self, request):
370 remote = self._remotes[request.matchdict['backend']]
370 remote = self._remotes[request.matchdict['backend']]
371 payload = msgpack.unpackb(request.body, use_list=True)
371 payload = msgpack.unpackb(request.body, use_list=True)
372 method = payload.get('method')
372 method = payload.get('method')
373 params = payload['params']
373 params = payload['params']
374 wire = params.get('wire')
374 wire = params.get('wire')
375 args = params.get('args')
375 args = params.get('args')
376 kwargs = params.get('kwargs')
376 kwargs = params.get('kwargs')
377 context_uid = None
377 context_uid = None
378
378
379 if wire:
379 if wire:
380 try:
380 try:
381 wire['context'] = context_uid = uuid.UUID(wire['context'])
381 wire['context'] = context_uid = uuid.UUID(wire['context'])
382 except KeyError:
382 except KeyError:
383 pass
383 pass
384 args.insert(0, wire)
384 args.insert(0, wire)
385 repo_state_uid = wire.get('repo_state_uid') if wire else None
385 repo_state_uid = wire.get('repo_state_uid') if wire else None
386
386
387 # NOTE(marcink): trading complexity for slight performance
387 # NOTE(marcink): trading complexity for slight performance
388 if log.isEnabledFor(logging.DEBUG):
388 if log.isEnabledFor(logging.DEBUG):
389 no_args_methods = [
389 no_args_methods = [
390
390
391 ]
391 ]
392 if method in no_args_methods:
392 if method in no_args_methods:
393 call_args = ''
393 call_args = ''
394 else:
394 else:
395 call_args = args[1:]
395 call_args = args[1:]
396
396
397 log.debug('Method requested:`%s` with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s',
397 log.debug('Method requested:`%s` with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s',
398 method, call_args, kwargs, context_uid, repo_state_uid)
398 method, call_args, kwargs, context_uid, repo_state_uid)
399
399
400 statsd = request.registry.statsd
401 if statsd:
402 statsd.incr(
403 'vcsserver_method_count', tags=[
404 "method:{}".format(method),
405 ])
400 return payload, remote, method, args, kwargs
406 return payload, remote, method, args, kwargs
401
407
402 def vcs_view(self, request):
408 def vcs_view(self, request):
403
409
404 payload, remote, method, args, kwargs = self._vcs_view_params(request)
410 payload, remote, method, args, kwargs = self._vcs_view_params(request)
405 payload_id = payload.get('id')
411 payload_id = payload.get('id')
406
412
407 try:
413 try:
408 resp = getattr(remote, method)(*args, **kwargs)
414 resp = getattr(remote, method)(*args, **kwargs)
409 except Exception as e:
415 except Exception as e:
410 exc_info = list(sys.exc_info())
416 exc_info = list(sys.exc_info())
411 exc_type, exc_value, exc_traceback = exc_info
417 exc_type, exc_value, exc_traceback = exc_info
412
418
413 org_exc = getattr(e, '_org_exc', None)
419 org_exc = getattr(e, '_org_exc', None)
414 org_exc_name = None
420 org_exc_name = None
415 org_exc_tb = ''
421 org_exc_tb = ''
416 if org_exc:
422 if org_exc:
417 org_exc_name = org_exc.__class__.__name__
423 org_exc_name = org_exc.__class__.__name__
418 org_exc_tb = getattr(e, '_org_exc_tb', '')
424 org_exc_tb = getattr(e, '_org_exc_tb', '')
419 # replace our "faked" exception with our org
425 # replace our "faked" exception with our org
420 exc_info[0] = org_exc.__class__
426 exc_info[0] = org_exc.__class__
421 exc_info[1] = org_exc
427 exc_info[1] = org_exc
422
428
423 should_store_exc = True
429 should_store_exc = True
424 if org_exc:
430 if org_exc:
425 def get_exc_fqn(_exc_obj):
431 def get_exc_fqn(_exc_obj):
426 module_name = getattr(org_exc.__class__, '__module__', 'UNKNOWN')
432 module_name = getattr(org_exc.__class__, '__module__', 'UNKNOWN')
427 return module_name + '.' + org_exc_name
433 return module_name + '.' + org_exc_name
428
434
429 exc_fqn = get_exc_fqn(org_exc)
435 exc_fqn = get_exc_fqn(org_exc)
430
436
431 if exc_fqn in ['mercurial.error.RepoLookupError',
437 if exc_fqn in ['mercurial.error.RepoLookupError',
432 'vcsserver.exceptions.RefNotFoundException']:
438 'vcsserver.exceptions.RefNotFoundException']:
433 should_store_exc = False
439 should_store_exc = False
434
440
435 if should_store_exc:
441 if should_store_exc:
436 store_exception(id(exc_info), exc_info, request_path=request.path)
442 store_exception(id(exc_info), exc_info, request_path=request.path)
437
443
438 tb_info = ''.join(
444 tb_info = ''.join(
439 traceback.format_exception(exc_type, exc_value, exc_traceback))
445 traceback.format_exception(exc_type, exc_value, exc_traceback))
440
446
441 type_ = e.__class__.__name__
447 type_ = e.__class__.__name__
442 if type_ not in self.ALLOWED_EXCEPTIONS:
448 if type_ not in self.ALLOWED_EXCEPTIONS:
443 type_ = None
449 type_ = None
444
450
445 resp = {
451 resp = {
446 'id': payload_id,
452 'id': payload_id,
447 'error': {
453 'error': {
448 'message': e.message,
454 'message': e.message,
449 'traceback': tb_info,
455 'traceback': tb_info,
450 'org_exc': org_exc_name,
456 'org_exc': org_exc_name,
451 'org_exc_tb': org_exc_tb,
457 'org_exc_tb': org_exc_tb,
452 'type': type_
458 'type': type_
453 }
459 }
454 }
460 }
455
461
456 try:
462 try:
457 resp['error']['_vcs_kind'] = getattr(e, '_vcs_kind', None)
463 resp['error']['_vcs_kind'] = getattr(e, '_vcs_kind', None)
458 except AttributeError:
464 except AttributeError:
459 pass
465 pass
460 else:
466 else:
461 resp = {
467 resp = {
462 'id': payload_id,
468 'id': payload_id,
463 'result': resp
469 'result': resp
464 }
470 }
465
471
466 return resp
472 return resp
467
473
468 def vcs_stream_view(self, request):
474 def vcs_stream_view(self, request):
469 payload, remote, method, args, kwargs = self._vcs_view_params(request)
475 payload, remote, method, args, kwargs = self._vcs_view_params(request)
470 # this method has a stream: marker we remove it here
476 # this method has a stream: marker we remove it here
471 method = method.split('stream:')[-1]
477 method = method.split('stream:')[-1]
472 chunk_size = safe_int(payload.get('chunk_size')) or 4096
478 chunk_size = safe_int(payload.get('chunk_size')) or 4096
473
479
474 try:
480 try:
475 resp = getattr(remote, method)(*args, **kwargs)
481 resp = getattr(remote, method)(*args, **kwargs)
476 except Exception as e:
482 except Exception as e:
477 raise
483 raise
478
484
479 def get_chunked_data(method_resp):
485 def get_chunked_data(method_resp):
480 stream = StringIO(method_resp)
486 stream = StringIO(method_resp)
481 while 1:
487 while 1:
482 chunk = stream.read(chunk_size)
488 chunk = stream.read(chunk_size)
483 if not chunk:
489 if not chunk:
484 break
490 break
485 yield chunk
491 yield chunk
486
492
487 response = Response(app_iter=get_chunked_data(resp))
493 response = Response(app_iter=get_chunked_data(resp))
488 response.content_type = 'application/octet-stream'
494 response.content_type = 'application/octet-stream'
489
495
490 return response
496 return response
491
497
492 def status_view(self, request):
498 def status_view(self, request):
493 import vcsserver
499 import vcsserver
494 return {'status': 'OK', 'vcsserver_version': vcsserver.__version__,
500 return {'status': 'OK', 'vcsserver_version': vcsserver.__version__,
495 'pid': os.getpid()}
501 'pid': os.getpid()}
496
502
497 def service_view(self, request):
503 def service_view(self, request):
498 import vcsserver
504 import vcsserver
499
505
500 payload = msgpack.unpackb(request.body, use_list=True)
506 payload = msgpack.unpackb(request.body, use_list=True)
501 server_config, app_config = {}, {}
507 server_config, app_config = {}, {}
502
508
503 try:
509 try:
504 path = self.global_config['__file__']
510 path = self.global_config['__file__']
505 config = configparser.RawConfigParser()
511 config = configparser.RawConfigParser()
506
512
507 config.read(path)
513 config.read(path)
508
514
509 if config.has_section('server:main'):
515 if config.has_section('server:main'):
510 server_config = dict(config.items('server:main'))
516 server_config = dict(config.items('server:main'))
511 if config.has_section('app:main'):
517 if config.has_section('app:main'):
512 app_config = dict(config.items('app:main'))
518 app_config = dict(config.items('app:main'))
513
519
514 except Exception:
520 except Exception:
515 log.exception('Failed to read .ini file for display')
521 log.exception('Failed to read .ini file for display')
516
522
517 environ = os.environ.items()
523 environ = os.environ.items()
518
524
519 resp = {
525 resp = {
520 'id': payload.get('id'),
526 'id': payload.get('id'),
521 'result': dict(
527 'result': dict(
522 version=vcsserver.__version__,
528 version=vcsserver.__version__,
523 config=server_config,
529 config=server_config,
524 app_config=app_config,
530 app_config=app_config,
525 environ=environ,
531 environ=environ,
526 payload=payload,
532 payload=payload,
527 )
533 )
528 }
534 }
529 return resp
535 return resp
530
536
531 def _msgpack_renderer_factory(self, info):
537 def _msgpack_renderer_factory(self, info):
532 def _render(value, system):
538 def _render(value, system):
533 request = system.get('request')
539 request = system.get('request')
534 if request is not None:
540 if request is not None:
535 response = request.response
541 response = request.response
536 ct = response.content_type
542 ct = response.content_type
537 if ct == response.default_content_type:
543 if ct == response.default_content_type:
538 response.content_type = 'application/x-msgpack'
544 response.content_type = 'application/x-msgpack'
539 return msgpack.packb(value)
545 return msgpack.packb(value)
540 return _render
546 return _render
541
547
542 def set_env_from_config(self, environ, config):
548 def set_env_from_config(self, environ, config):
543 dict_conf = {}
549 dict_conf = {}
544 try:
550 try:
545 for elem in config:
551 for elem in config:
546 if elem[0] == 'rhodecode':
552 if elem[0] == 'rhodecode':
547 dict_conf = json.loads(elem[2])
553 dict_conf = json.loads(elem[2])
548 break
554 break
549 except Exception:
555 except Exception:
550 log.exception('Failed to fetch SCM CONFIG')
556 log.exception('Failed to fetch SCM CONFIG')
551 return
557 return
552
558
553 username = dict_conf.get('username')
559 username = dict_conf.get('username')
554 if username:
560 if username:
555 environ['REMOTE_USER'] = username
561 environ['REMOTE_USER'] = username
556 # mercurial specific, some extension api rely on this
562 # mercurial specific, some extension api rely on this
557 environ['HGUSER'] = username
563 environ['HGUSER'] = username
558
564
559 ip = dict_conf.get('ip')
565 ip = dict_conf.get('ip')
560 if ip:
566 if ip:
561 environ['REMOTE_HOST'] = ip
567 environ['REMOTE_HOST'] = ip
562
568
563 if _is_request_chunked(environ):
569 if _is_request_chunked(environ):
564 # set the compatibility flag for webob
570 # set the compatibility flag for webob
565 environ['wsgi.input_terminated'] = True
571 environ['wsgi.input_terminated'] = True
566
572
567 def hg_proxy(self):
573 def hg_proxy(self):
568 @wsgiapp
574 @wsgiapp
569 def _hg_proxy(environ, start_response):
575 def _hg_proxy(environ, start_response):
570 app = WsgiProxy(self.remote_wsgi.HgRemoteWsgi())
576 app = WsgiProxy(self.remote_wsgi.HgRemoteWsgi())
571 return app(environ, start_response)
577 return app(environ, start_response)
572 return _hg_proxy
578 return _hg_proxy
573
579
574 def git_proxy(self):
580 def git_proxy(self):
575 @wsgiapp
581 @wsgiapp
576 def _git_proxy(environ, start_response):
582 def _git_proxy(environ, start_response):
577 app = WsgiProxy(self.remote_wsgi.GitRemoteWsgi())
583 app = WsgiProxy(self.remote_wsgi.GitRemoteWsgi())
578 return app(environ, start_response)
584 return app(environ, start_response)
579 return _git_proxy
585 return _git_proxy
580
586
581 def hg_stream(self):
587 def hg_stream(self):
582 if self._use_echo_app:
588 if self._use_echo_app:
583 @wsgiapp
589 @wsgiapp
584 def _hg_stream(environ, start_response):
590 def _hg_stream(environ, start_response):
585 app = EchoApp('fake_path', 'fake_name', None)
591 app = EchoApp('fake_path', 'fake_name', None)
586 return app(environ, start_response)
592 return app(environ, start_response)
587 return _hg_stream
593 return _hg_stream
588 else:
594 else:
589 @wsgiapp
595 @wsgiapp
590 def _hg_stream(environ, start_response):
596 def _hg_stream(environ, start_response):
591 log.debug('http-app: handling hg stream')
597 log.debug('http-app: handling hg stream')
592 repo_path = environ['HTTP_X_RC_REPO_PATH']
598 repo_path = environ['HTTP_X_RC_REPO_PATH']
593 repo_name = environ['HTTP_X_RC_REPO_NAME']
599 repo_name = environ['HTTP_X_RC_REPO_NAME']
594 packed_config = base64.b64decode(
600 packed_config = base64.b64decode(
595 environ['HTTP_X_RC_REPO_CONFIG'])
601 environ['HTTP_X_RC_REPO_CONFIG'])
596 config = msgpack.unpackb(packed_config)
602 config = msgpack.unpackb(packed_config)
597 app = scm_app.create_hg_wsgi_app(
603 app = scm_app.create_hg_wsgi_app(
598 repo_path, repo_name, config)
604 repo_path, repo_name, config)
599
605
600 # Consistent path information for hgweb
606 # Consistent path information for hgweb
601 environ['PATH_INFO'] = environ['HTTP_X_RC_PATH_INFO']
607 environ['PATH_INFO'] = environ['HTTP_X_RC_PATH_INFO']
602 environ['REPO_NAME'] = repo_name
608 environ['REPO_NAME'] = repo_name
603 self.set_env_from_config(environ, config)
609 self.set_env_from_config(environ, config)
604
610
605 log.debug('http-app: starting app handler '
611 log.debug('http-app: starting app handler '
606 'with %s and process request', app)
612 'with %s and process request', app)
607 return app(environ, ResponseFilter(start_response))
613 return app(environ, ResponseFilter(start_response))
608 return _hg_stream
614 return _hg_stream
609
615
610 def git_stream(self):
616 def git_stream(self):
611 if self._use_echo_app:
617 if self._use_echo_app:
612 @wsgiapp
618 @wsgiapp
613 def _git_stream(environ, start_response):
619 def _git_stream(environ, start_response):
614 app = EchoApp('fake_path', 'fake_name', None)
620 app = EchoApp('fake_path', 'fake_name', None)
615 return app(environ, start_response)
621 return app(environ, start_response)
616 return _git_stream
622 return _git_stream
617 else:
623 else:
618 @wsgiapp
624 @wsgiapp
619 def _git_stream(environ, start_response):
625 def _git_stream(environ, start_response):
620 log.debug('http-app: handling git stream')
626 log.debug('http-app: handling git stream')
621 repo_path = environ['HTTP_X_RC_REPO_PATH']
627 repo_path = environ['HTTP_X_RC_REPO_PATH']
622 repo_name = environ['HTTP_X_RC_REPO_NAME']
628 repo_name = environ['HTTP_X_RC_REPO_NAME']
623 packed_config = base64.b64decode(
629 packed_config = base64.b64decode(
624 environ['HTTP_X_RC_REPO_CONFIG'])
630 environ['HTTP_X_RC_REPO_CONFIG'])
625 config = msgpack.unpackb(packed_config)
631 config = msgpack.unpackb(packed_config)
626
632
627 environ['PATH_INFO'] = environ['HTTP_X_RC_PATH_INFO']
633 environ['PATH_INFO'] = environ['HTTP_X_RC_PATH_INFO']
628 self.set_env_from_config(environ, config)
634 self.set_env_from_config(environ, config)
629
635
630 content_type = environ.get('CONTENT_TYPE', '')
636 content_type = environ.get('CONTENT_TYPE', '')
631
637
632 path = environ['PATH_INFO']
638 path = environ['PATH_INFO']
633 is_lfs_request = GIT_LFS_CONTENT_TYPE in content_type
639 is_lfs_request = GIT_LFS_CONTENT_TYPE in content_type
634 log.debug(
640 log.debug(
635 'LFS: Detecting if request `%s` is LFS server path based '
641 'LFS: Detecting if request `%s` is LFS server path based '
636 'on content type:`%s`, is_lfs:%s',
642 'on content type:`%s`, is_lfs:%s',
637 path, content_type, is_lfs_request)
643 path, content_type, is_lfs_request)
638
644
639 if not is_lfs_request:
645 if not is_lfs_request:
640 # fallback detection by path
646 # fallback detection by path
641 if GIT_LFS_PROTO_PAT.match(path):
647 if GIT_LFS_PROTO_PAT.match(path):
642 is_lfs_request = True
648 is_lfs_request = True
643 log.debug(
649 log.debug(
644 'LFS: fallback detection by path of: `%s`, is_lfs:%s',
650 'LFS: fallback detection by path of: `%s`, is_lfs:%s',
645 path, is_lfs_request)
651 path, is_lfs_request)
646
652
647 if is_lfs_request:
653 if is_lfs_request:
648 app = scm_app.create_git_lfs_wsgi_app(
654 app = scm_app.create_git_lfs_wsgi_app(
649 repo_path, repo_name, config)
655 repo_path, repo_name, config)
650 else:
656 else:
651 app = scm_app.create_git_wsgi_app(
657 app = scm_app.create_git_wsgi_app(
652 repo_path, repo_name, config)
658 repo_path, repo_name, config)
653
659
654 log.debug('http-app: starting app handler '
660 log.debug('http-app: starting app handler '
655 'with %s and process request', app)
661 'with %s and process request', app)
656
662
657 return app(environ, start_response)
663 return app(environ, start_response)
658
664
659 return _git_stream
665 return _git_stream
660
666
661 def handle_vcs_exception(self, exception, request):
667 def handle_vcs_exception(self, exception, request):
662 _vcs_kind = getattr(exception, '_vcs_kind', '')
668 _vcs_kind = getattr(exception, '_vcs_kind', '')
663 if _vcs_kind == 'repo_locked':
669 if _vcs_kind == 'repo_locked':
664 # Get custom repo-locked status code if present.
670 # Get custom repo-locked status code if present.
665 status_code = request.headers.get('X-RC-Locked-Status-Code')
671 status_code = request.headers.get('X-RC-Locked-Status-Code')
666 return HTTPRepoLocked(
672 return HTTPRepoLocked(
667 title=exception.message, status_code=status_code)
673 title=exception.message, status_code=status_code)
668
674
669 elif _vcs_kind == 'repo_branch_protected':
675 elif _vcs_kind == 'repo_branch_protected':
670 # Get custom repo-branch-protected status code if present.
676 # Get custom repo-branch-protected status code if present.
671 return HTTPRepoBranchProtected(title=exception.message)
677 return HTTPRepoBranchProtected(title=exception.message)
672
678
673 exc_info = request.exc_info
679 exc_info = request.exc_info
674 store_exception(id(exc_info), exc_info)
680 store_exception(id(exc_info), exc_info)
675
681
676 traceback_info = 'unavailable'
682 traceback_info = 'unavailable'
677 if request.exc_info:
683 if request.exc_info:
678 exc_type, exc_value, exc_tb = request.exc_info
684 exc_type, exc_value, exc_tb = request.exc_info
679 traceback_info = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb))
685 traceback_info = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb))
680
686
681 log.error(
687 log.error(
682 'error occurred handling this request for path: %s, \n tb: %s',
688 'error occurred handling this request for path: %s, \n tb: %s',
683 request.path, traceback_info)
689 request.path, traceback_info)
690
691 statsd = request.registry.statsd
692 if statsd:
693 statsd.incr('vcsserver_exception')
684 raise exception
694 raise exception
685
695
686
696
687 class ResponseFilter(object):
697 class ResponseFilter(object):
688
698
689 def __init__(self, start_response):
699 def __init__(self, start_response):
690 self._start_response = start_response
700 self._start_response = start_response
691
701
692 def __call__(self, status, response_headers, exc_info=None):
702 def __call__(self, status, response_headers, exc_info=None):
693 headers = tuple(
703 headers = tuple(
694 (h, v) for h, v in response_headers
704 (h, v) for h, v in response_headers
695 if not wsgiref.util.is_hop_by_hop(h))
705 if not wsgiref.util.is_hop_by_hop(h))
696 return self._start_response(status, headers, exc_info)
706 return self._start_response(status, headers, exc_info)
697
707
698
708
699 def main(global_config, **settings):
709 def main(global_config, **settings):
700 if MercurialFactory:
710 if MercurialFactory:
701 hgpatches.patch_largefiles_capabilities()
711 hgpatches.patch_largefiles_capabilities()
702 hgpatches.patch_subrepo_type_mapping()
712 hgpatches.patch_subrepo_type_mapping()
703
713
714 # init and bootstrap StatsdClient
715 StatsdClient.setup(settings)
716
704 app = HTTPApplication(settings=settings, global_config=global_config)
717 app = HTTPApplication(settings=settings, global_config=global_config)
705 return app.wsgi_app()
718 return app.wsgi_app()
@@ -1,107 +1,128 b''
1 from __future__ import absolute_import, division, unicode_literals
1 from __future__ import absolute_import, division, unicode_literals
2
2
3 import re
3 import random
4 import random
4 from collections import deque
5 from collections import deque
5 from datetime import timedelta
6 from datetime import timedelta
7 from repoze.lru import lru_cache
6
8
7 from .timer import Timer
9 from .timer import Timer
8
10
11 TAG_INVALID_CHARS_RE = re.compile(r"[^\w\d_\-:/\.]", re.UNICODE)
12 TAG_INVALID_CHARS_SUBS = "_"
13
14
15 @lru_cache(maxsize=500)
16 def _normalize_tags_with_cache(tag_list):
17 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
18
19
20 def normalize_tags(tag_list):
21 # We have to turn our input tag list into a non-mutable tuple for it to
22 # be hashable (and thus usable) by the @lru_cache decorator.
23 return _normalize_tags_with_cache(tuple(tag_list))
24
9
25
10 class StatsClientBase(object):
26 class StatsClientBase(object):
11 """A Base class for various statsd clients."""
27 """A Base class for various statsd clients."""
12
28
13 def close(self):
29 def close(self):
14 """Used to close and clean up any underlying resources."""
30 """Used to close and clean up any underlying resources."""
15 raise NotImplementedError()
31 raise NotImplementedError()
16
32
17 def _send(self):
33 def _send(self):
18 raise NotImplementedError()
34 raise NotImplementedError()
19
35
20 def pipeline(self):
36 def pipeline(self):
21 raise NotImplementedError()
37 raise NotImplementedError()
22
38
23 def timer(self, stat, rate=1):
39 def timer(self, stat, rate=1, tags=None):
24 return Timer(self, stat, rate)
40 return Timer(self, stat, rate, tags)
25
41
26 def timing(self, stat, delta, rate=1):
42 def timing(self, stat, delta, rate=1, tags=None):
27 """
43 """
28 Send new timing information.
44 Send new timing information.
29
45
30 `delta` can be either a number of milliseconds or a timedelta.
46 `delta` can be either a number of milliseconds or a timedelta.
31 """
47 """
32 if isinstance(delta, timedelta):
48 if isinstance(delta, timedelta):
33 # Convert timedelta to number of milliseconds.
49 # Convert timedelta to number of milliseconds.
34 delta = delta.total_seconds() * 1000.
50 delta = delta.total_seconds() * 1000.
35 self._send_stat(stat, '%0.6f|ms' % delta, rate)
51 self._send_stat(stat, '%0.6f|ms' % delta, rate, tags)
36
52
37 def incr(self, stat, count=1, rate=1):
53 def incr(self, stat, count=1, rate=1, tags=None):
38 """Increment a stat by `count`."""
54 """Increment a stat by `count`."""
39 self._send_stat(stat, '%s|c' % count, rate)
55 self._send_stat(stat, '%s|c' % count, rate, tags)
40
56
41 def decr(self, stat, count=1, rate=1):
57 def decr(self, stat, count=1, rate=1, tags=None):
42 """Decrement a stat by `count`."""
58 """Decrement a stat by `count`."""
43 self.incr(stat, -count, rate)
59 self.incr(stat, -count, rate, tags)
44
60
45 def gauge(self, stat, value, rate=1, delta=False):
61 def gauge(self, stat, value, rate=1, delta=False, tags=None):
46 """Set a gauge value."""
62 """Set a gauge value."""
47 if value < 0 and not delta:
63 if value < 0 and not delta:
48 if rate < 1:
64 if rate < 1:
49 if random.random() > rate:
65 if random.random() > rate:
50 return
66 return
51 with self.pipeline() as pipe:
67 with self.pipeline() as pipe:
52 pipe._send_stat(stat, '0|g', 1)
68 pipe._send_stat(stat, '0|g', 1)
53 pipe._send_stat(stat, '%s|g' % value, 1)
69 pipe._send_stat(stat, '%s|g' % value, 1)
54 else:
70 else:
55 prefix = '+' if delta and value >= 0 else ''
71 prefix = '+' if delta and value >= 0 else ''
56 self._send_stat(stat, '%s%s|g' % (prefix, value), rate)
72 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
57
73
58 def set(self, stat, value, rate=1):
74 def set(self, stat, value, rate=1):
59 """Set a set value."""
75 """Set a set value."""
60 self._send_stat(stat, '%s|s' % value, rate)
76 self._send_stat(stat, '%s|s' % value, rate)
61
77
62 def _send_stat(self, stat, value, rate):
78 def _send_stat(self, stat, value, rate, tags=None):
63 self._after(self._prepare(stat, value, rate))
79 self._after(self._prepare(stat, value, rate, tags))
64
80
65 def _prepare(self, stat, value, rate):
81 def _prepare(self, stat, value, rate, tags=None):
66 if rate < 1:
82 if rate < 1:
67 if random.random() > rate:
83 if random.random() > rate:
68 return
84 return
69 value = '%s|@%s' % (value, rate)
85 value = '%s|@%s' % (value, rate)
70
86
71 if self._prefix:
87 if self._prefix:
72 stat = '%s.%s' % (self._prefix, stat)
88 stat = '%s.%s' % (self._prefix, stat)
73
89
74 return '%s:%s' % (stat, value)
90 res = '%s:%s%s' % (
91 stat,
92 value,
93 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
94 )
95 return res
75
96
76 def _after(self, data):
97 def _after(self, data):
77 if data:
98 if data:
78 self._send(data)
99 self._send(data)
79
100
80
101
81 class PipelineBase(StatsClientBase):
102 class PipelineBase(StatsClientBase):
82
103
83 def __init__(self, client):
104 def __init__(self, client):
84 self._client = client
105 self._client = client
85 self._prefix = client._prefix
106 self._prefix = client._prefix
86 self._stats = deque()
107 self._stats = deque()
87
108
88 def _send(self):
109 def _send(self):
89 raise NotImplementedError()
110 raise NotImplementedError()
90
111
91 def _after(self, data):
112 def _after(self, data):
92 if data is not None:
113 if data is not None:
93 self._stats.append(data)
114 self._stats.append(data)
94
115
95 def __enter__(self):
116 def __enter__(self):
96 return self
117 return self
97
118
98 def __exit__(self, typ, value, tb):
119 def __exit__(self, typ, value, tb):
99 self.send()
120 self.send()
100
121
101 def send(self):
122 def send(self):
102 if not self._stats:
123 if not self._stats:
103 return
124 return
104 self._send()
125 self._send()
105
126
106 def pipeline(self):
127 def pipeline(self):
107 return self.__class__(self)
128 return self.__class__(self)
@@ -1,71 +1,72 b''
1 from __future__ import absolute_import, division, unicode_literals
1 from __future__ import absolute_import, division, unicode_literals
2
2
3 import functools
3 import functools
4
4
5 # Use timer that's not susceptible to time of day adjustments.
5 # Use timer that's not susceptible to time of day adjustments.
6 try:
6 try:
7 # perf_counter is only present on Py3.3+
7 # perf_counter is only present on Py3.3+
8 from time import perf_counter as time_now
8 from time import perf_counter as time_now
9 except ImportError:
9 except ImportError:
10 # fall back to using time
10 # fall back to using time
11 from time import time as time_now
11 from time import time as time_now
12
12
13
13
14 def safe_wraps(wrapper, *args, **kwargs):
14 def safe_wraps(wrapper, *args, **kwargs):
15 """Safely wraps partial functions."""
15 """Safely wraps partial functions."""
16 while isinstance(wrapper, functools.partial):
16 while isinstance(wrapper, functools.partial):
17 wrapper = wrapper.func
17 wrapper = wrapper.func
18 return functools.wraps(wrapper, *args, **kwargs)
18 return functools.wraps(wrapper, *args, **kwargs)
19
19
20
20
21 class Timer(object):
21 class Timer(object):
22 """A context manager/decorator for statsd.timing()."""
22 """A context manager/decorator for statsd.timing()."""
23
23
24 def __init__(self, client, stat, rate=1):
24 def __init__(self, client, stat, rate=1, tags=None):
25 self.client = client
25 self.client = client
26 self.stat = stat
26 self.stat = stat
27 self.rate = rate
27 self.rate = rate
28 self.tags = tags
28 self.ms = None
29 self.ms = None
29 self._sent = False
30 self._sent = False
30 self._start_time = None
31 self._start_time = None
31
32
32 def __call__(self, f):
33 def __call__(self, f):
33 """Thread-safe timing function decorator."""
34 """Thread-safe timing function decorator."""
34 @safe_wraps(f)
35 @safe_wraps(f)
35 def _wrapped(*args, **kwargs):
36 def _wrapped(*args, **kwargs):
36 start_time = time_now()
37 start_time = time_now()
37 try:
38 try:
38 return f(*args, **kwargs)
39 return f(*args, **kwargs)
39 finally:
40 finally:
40 elapsed_time_ms = 1000.0 * (time_now() - start_time)
41 elapsed_time_ms = 1000.0 * (time_now() - start_time)
41 self.client.timing(self.stat, elapsed_time_ms, self.rate)
42 self.client.timing(self.stat, elapsed_time_ms, self.rate, self.tags)
42 return _wrapped
43 return _wrapped
43
44
44 def __enter__(self):
45 def __enter__(self):
45 return self.start()
46 return self.start()
46
47
47 def __exit__(self, typ, value, tb):
48 def __exit__(self, typ, value, tb):
48 self.stop()
49 self.stop()
49
50
50 def start(self):
51 def start(self):
51 self.ms = None
52 self.ms = None
52 self._sent = False
53 self._sent = False
53 self._start_time = time_now()
54 self._start_time = time_now()
54 return self
55 return self
55
56
56 def stop(self, send=True):
57 def stop(self, send=True):
57 if self._start_time is None:
58 if self._start_time is None:
58 raise RuntimeError('Timer has not started.')
59 raise RuntimeError('Timer has not started.')
59 dt = time_now() - self._start_time
60 dt = time_now() - self._start_time
60 self.ms = 1000.0 * dt # Convert to milliseconds.
61 self.ms = 1000.0 * dt # Convert to milliseconds.
61 if send:
62 if send:
62 self.send()
63 self.send()
63 return self
64 return self
64
65
65 def send(self):
66 def send(self):
66 if self.ms is None:
67 if self.ms is None:
67 raise RuntimeError('No data recorded.')
68 raise RuntimeError('No data recorded.')
68 if self._sent:
69 if self._sent:
69 raise RuntimeError('Already sent data.')
70 raise RuntimeError('Already sent data.')
70 self._sent = True
71 self._sent = True
71 self.client.timing(self.stat, self.ms, self.rate)
72 self.client.timing(self.stat, self.ms, self.rate)
@@ -1,70 +1,80 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2020 RhodeCode GmbH
2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import time
18 import time
19 import logging
19 import logging
20
20
21 import vcsserver
21 import vcsserver
22 from vcsserver.utils import safe_str
22 from vcsserver.utils import safe_str
23
23
24
24
25 log = logging.getLogger(__name__)
25 log = logging.getLogger(__name__)
26
26
27
27
28 def get_access_path(request):
28 def get_access_path(environ):
29 environ = request.environ
29 path = environ.get('PATH_INFO')
30 return environ.get('PATH_INFO')
30 return path
31
31
32
32
33 def get_user_agent(environ):
33 def get_user_agent(environ):
34 return environ.get('HTTP_USER_AGENT')
34 return environ.get('HTTP_USER_AGENT')
35
35
36
36
37 class RequestWrapperTween(object):
37 class RequestWrapperTween(object):
38 def __init__(self, handler, registry):
38 def __init__(self, handler, registry):
39 self.handler = handler
39 self.handler = handler
40 self.registry = registry
40 self.registry = registry
41
41
42 # one-time configuration code goes here
42 # one-time configuration code goes here
43
43
44 def __call__(self, request):
44 def __call__(self, request):
45 start = time.time()
45 start = time.time()
46 log.debug('Starting request time measurement')
46 log.debug('Starting request time measurement')
47 try:
47 try:
48 response = self.handler(request)
48 response = self.handler(request)
49 finally:
49 finally:
50 count = request.request_count()
50 count = request.request_count()
51 _ver_ = vcsserver.__version__
51 _ver_ = vcsserver.__version__
52 statsd = request.statsd
52 _path = safe_str(get_access_path(request.environ))
53
53 total = time.time() - start
54 total = time.time() - start
54 if statsd:
55 statsd.timing('vcsserver.req.timing', total)
56 statsd.incr('vcsserver.req.count')
57 log.info(
55 log.info(
58 'Req[%4s] IP: %s %s Request to %s time: %.4fs [%s], VCSServer %s',
56 'Req[%4s] IP: %s %s Request to %s time: %.4fs [%s], VCSServer %s',
59 count, '127.0.0.1', request.environ.get('REQUEST_METHOD'),
57 count, '127.0.0.1', request.environ.get('REQUEST_METHOD'),
60 safe_str(get_access_path(request)), total,
58 _path, total, get_user_agent(request.environ), _ver_
61 get_user_agent(request.environ), _ver_
62 )
59 )
63
60
61 statsd = request.registry.statsd
62 if statsd:
63 elapsed_time_ms = 1000.0 * total
64 statsd.timing(
65 'vcsserver_req_timing', elapsed_time_ms,
66 tags=[
67 "path:{}".format(_path),
68 ]
69 )
70 statsd.incr(
71 'vcsserver_req_count', tags=[
72 "path:{}".format(_path),
73 ])
64 return response
74 return response
65
75
66
76
67 def includeme(config):
77 def includeme(config):
68 config.add_tween(
78 config.add_tween(
69 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
79 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
70 )
80 )
General Comments 0
You need to be logged in to leave comments. Login now