##// END OF EJS Templates
metrics: use new statsd client logic, and start gathering new metrics
super-admin -
r1005:d51a7b83 default
parent child Browse files
Show More
@@ -0,0 +1,49 b''
1 from vcsserver.lib._vendor.statsd import client_from_config
2
3
4 class StatsdClientNotInitialised(Exception):
5 pass
6
7
8 class _Singleton(type):
9 """A metaclass that creates a Singleton base class when called."""
10
11 _instances = {}
12
13 def __call__(cls, *args, **kwargs):
14 if cls not in cls._instances:
15 cls._instances[cls] = super(_Singleton, cls).__call__(*args, **kwargs)
16 return cls._instances[cls]
17
18
19 class Singleton(_Singleton("SingletonMeta", (object,), {})):
20 pass
21
22
23 class StatsdClientClass(Singleton):
24 setup_run = False
25 statsd_client = None
26 statsd = None
27
28 def __getattribute__(self, name):
29
30 if name.startswith("statsd"):
31 if self.setup_run:
32 return super(StatsdClientClass, self).__getattribute__(name)
33 else:
34 return None
35 #raise StatsdClientNotInitialised("requested key was %s" % name)
36
37 return super(StatsdClientClass, self).__getattribute__(name)
38
39 def setup(self, settings):
40 """
41 Initialize the client
42 """
43 statsd = client_from_config(settings)
44 self.statsd = statsd
45 self.statsd_client = statsd
46 self.setup_run = True
47
48
49 StatsdClient = StatsdClientClass()
@@ -1,705 +1,718 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import os
19 19 import sys
20 20 import base64
21 21 import locale
22 22 import logging
23 23 import uuid
24 24 import wsgiref.util
25 25 import traceback
26 26 import tempfile
27 27 import psutil
28 28 from itertools import chain
29 29 from cStringIO import StringIO
30 30
31 31 import simplejson as json
32 32 import msgpack
33 33 from pyramid.config import Configurator
34 34 from pyramid.settings import asbool, aslist
35 35 from pyramid.wsgi import wsgiapp
36 36 from pyramid.compat import configparser
37 37 from pyramid.response import Response
38 38
39 39 from vcsserver.utils import safe_int
40 from vcsserver.lib.statsd_client import StatsdClient
40 41
41 42 log = logging.getLogger(__name__)
42 43
43 44 # due to Mercurial/glibc2.27 problems we need to detect if locale settings are
44 45 # causing problems and "fix" it in case they do and fallback to LC_ALL = C
45 46
46 47 try:
47 48 locale.setlocale(locale.LC_ALL, '')
48 49 except locale.Error as e:
49 50 log.error(
50 51 'LOCALE ERROR: failed to set LC_ALL, fallback to LC_ALL=C, org error: %s', e)
51 52 os.environ['LC_ALL'] = 'C'
52 53
53 54 import vcsserver
54 55 from vcsserver import remote_wsgi, scm_app, settings, hgpatches
55 56 from vcsserver.git_lfs.app import GIT_LFS_CONTENT_TYPE, GIT_LFS_PROTO_PAT
56 57 from vcsserver.echo_stub import remote_wsgi as remote_wsgi_stub
57 58 from vcsserver.echo_stub.echo_app import EchoApp
58 59 from vcsserver.exceptions import HTTPRepoLocked, HTTPRepoBranchProtected
59 60 from vcsserver.lib.exc_tracking import store_exception
60 61 from vcsserver.server import VcsServer
61 62
62 63 try:
63 64 from vcsserver.git import GitFactory, GitRemote
64 65 except ImportError:
65 66 GitFactory = None
66 67 GitRemote = None
67 68
68 69 try:
69 70 from vcsserver.hg import MercurialFactory, HgRemote
70 71 except ImportError:
71 72 MercurialFactory = None
72 73 HgRemote = None
73 74
74 75 try:
75 76 from vcsserver.svn import SubversionFactory, SvnRemote
76 77 except ImportError:
77 78 SubversionFactory = None
78 79 SvnRemote = None
79 80
80 81
81 82 def _is_request_chunked(environ):
82 83 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
83 84 return stream
84 85
85 86
86 87 def _int_setting(settings, name, default):
87 88 settings[name] = int(settings.get(name, default))
88 89 return settings[name]
89 90
90 91
91 92 def _bool_setting(settings, name, default):
92 93 input_val = settings.get(name, default)
93 94 if isinstance(input_val, unicode):
94 95 input_val = input_val.encode('utf8')
95 96 settings[name] = asbool(input_val)
96 97 return settings[name]
97 98
98 99
99 100 def _list_setting(settings, name, default):
100 101 raw_value = settings.get(name, default)
101 102
102 103 # Otherwise we assume it uses pyramids space/newline separation.
103 104 settings[name] = aslist(raw_value)
104 105 return settings[name]
105 106
106 107
107 108 def _string_setting(settings, name, default, lower=True, default_when_empty=False):
108 109 value = settings.get(name, default)
109 110
110 111 if default_when_empty and not value:
111 112 # use default value when value is empty
112 113 value = default
113 114
114 115 if lower:
115 116 value = value.lower()
116 117 settings[name] = value
117 118 return settings[name]
118 119
119 120
120 121 def log_max_fd():
121 122 try:
122 123 maxfd = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)[1]
123 124 log.info('Max file descriptors value: %s', maxfd)
124 125 except Exception:
125 126 pass
126 127
127 128
128 129 class VCS(object):
129 130 def __init__(self, locale_conf=None, cache_config=None):
130 131 self.locale = locale_conf
131 132 self.cache_config = cache_config
132 133 self._configure_locale()
133 134
134 135 log_max_fd()
135 136
136 137 if GitFactory and GitRemote:
137 138 git_factory = GitFactory()
138 139 self._git_remote = GitRemote(git_factory)
139 140 else:
140 141 log.info("Git client import failed")
141 142
142 143 if MercurialFactory and HgRemote:
143 144 hg_factory = MercurialFactory()
144 145 self._hg_remote = HgRemote(hg_factory)
145 146 else:
146 147 log.info("Mercurial client import failed")
147 148
148 149 if SubversionFactory and SvnRemote:
149 150 svn_factory = SubversionFactory()
150 151
151 152 # hg factory is used for svn url validation
152 153 hg_factory = MercurialFactory()
153 154 self._svn_remote = SvnRemote(svn_factory, hg_factory=hg_factory)
154 155 else:
155 156 log.info("Subversion client import failed")
156 157
157 158 self._vcsserver = VcsServer()
158 159
159 160 def _configure_locale(self):
160 161 if self.locale:
161 162 log.info('Settings locale: `LC_ALL` to %s', self.locale)
162 163 else:
163 164 log.info(
164 165 'Configuring locale subsystem based on environment variables')
165 166 try:
166 167 # If self.locale is the empty string, then the locale
167 168 # module will use the environment variables. See the
168 169 # documentation of the package `locale`.
169 170 locale.setlocale(locale.LC_ALL, self.locale)
170 171
171 172 language_code, encoding = locale.getlocale()
172 173 log.info(
173 174 'Locale set to language code "%s" with encoding "%s".',
174 175 language_code, encoding)
175 176 except locale.Error:
176 177 log.exception(
177 178 'Cannot set locale, not configuring the locale system')
178 179
179 180
180 181 class WsgiProxy(object):
181 182 def __init__(self, wsgi):
182 183 self.wsgi = wsgi
183 184
184 185 def __call__(self, environ, start_response):
185 186 input_data = environ['wsgi.input'].read()
186 187 input_data = msgpack.unpackb(input_data)
187 188
188 189 error = None
189 190 try:
190 191 data, status, headers = self.wsgi.handle(
191 192 input_data['environment'], input_data['input_data'],
192 193 *input_data['args'], **input_data['kwargs'])
193 194 except Exception as e:
194 195 data, status, headers = [], None, None
195 196 error = {
196 197 'message': str(e),
197 198 '_vcs_kind': getattr(e, '_vcs_kind', None)
198 199 }
199 200
200 201 start_response(200, {})
201 202 return self._iterator(error, status, headers, data)
202 203
203 204 def _iterator(self, error, status, headers, data):
204 205 initial_data = [
205 206 error,
206 207 status,
207 208 headers,
208 209 ]
209 210
210 211 for d in chain(initial_data, data):
211 212 yield msgpack.packb(d)
212 213
213 214
214 215 def not_found(request):
215 216 return {'status': '404 NOT FOUND'}
216 217
217 218
218 219 class VCSViewPredicate(object):
219 220 def __init__(self, val, config):
220 221 self.remotes = val
221 222
222 223 def text(self):
223 224 return 'vcs view method = %s' % (self.remotes.keys(),)
224 225
225 226 phash = text
226 227
227 228 def __call__(self, context, request):
228 229 """
229 230 View predicate that returns true if given backend is supported by
230 231 defined remotes.
231 232 """
232 233 backend = request.matchdict.get('backend')
233 234 return backend in self.remotes
234 235
235 236
236 237 class HTTPApplication(object):
237 238 ALLOWED_EXCEPTIONS = ('KeyError', 'URLError')
238 239
239 240 remote_wsgi = remote_wsgi
240 241 _use_echo_app = False
241 242
242 243 def __init__(self, settings=None, global_config=None):
243 244 self._sanitize_settings_and_apply_defaults(settings)
244 245
245 246 self.config = Configurator(settings=settings)
247 # Init our statsd at very start
248 self.config.registry.statsd = StatsdClient.statsd
249
246 250 self.global_config = global_config
247 251 self.config.include('vcsserver.lib.rc_cache')
248 252
249 253 settings_locale = settings.get('locale', '') or 'en_US.UTF-8'
250 254 vcs = VCS(locale_conf=settings_locale, cache_config=settings)
251 255 self._remotes = {
252 256 'hg': vcs._hg_remote,
253 257 'git': vcs._git_remote,
254 258 'svn': vcs._svn_remote,
255 259 'server': vcs._vcsserver,
256 260 }
257 261 if settings.get('dev.use_echo_app', 'false').lower() == 'true':
258 262 self._use_echo_app = True
259 263 log.warning("Using EchoApp for VCS operations.")
260 264 self.remote_wsgi = remote_wsgi_stub
261 265
262 266 self._configure_settings(global_config, settings)
263 267
264 268 self._configure()
265 269
266 270 def _configure_settings(self, global_config, app_settings):
267 271 """
268 272 Configure the settings module.
269 273 """
270 274 settings_merged = global_config.copy()
271 275 settings_merged.update(app_settings)
272 276
273 277 git_path = app_settings.get('git_path', None)
274 278 if git_path:
275 279 settings.GIT_EXECUTABLE = git_path
276 280 binary_dir = app_settings.get('core.binary_dir', None)
277 281 if binary_dir:
278 282 settings.BINARY_DIR = binary_dir
279 283
280 284 # Store the settings to make them available to other modules.
281 285 vcsserver.PYRAMID_SETTINGS = settings_merged
282 286 vcsserver.CONFIG = settings_merged
283 287
284 288 def _sanitize_settings_and_apply_defaults(self, settings):
285 289 temp_store = tempfile.gettempdir()
286 290 default_cache_dir = os.path.join(temp_store, 'rc_cache')
287 291
288 292 # save default, cache dir, and use it for all backends later.
289 293 default_cache_dir = _string_setting(
290 294 settings,
291 295 'cache_dir',
292 296 default_cache_dir, lower=False, default_when_empty=True)
293 297
294 298 # ensure we have our dir created
295 299 if not os.path.isdir(default_cache_dir):
296 300 os.makedirs(default_cache_dir, mode=0o755)
297 301
298 302 # exception store cache
299 303 _string_setting(
300 304 settings,
301 305 'exception_tracker.store_path',
302 306 temp_store, lower=False, default_when_empty=True)
303 307
304 308 # repo_object cache
305 309 _string_setting(
306 310 settings,
307 311 'rc_cache.repo_object.backend',
308 312 'dogpile.cache.rc.file_namespace', lower=False)
309 313 _int_setting(
310 314 settings,
311 315 'rc_cache.repo_object.expiration_time',
312 316 30 * 24 * 60 * 60)
313 317 _string_setting(
314 318 settings,
315 319 'rc_cache.repo_object.arguments.filename',
316 320 os.path.join(default_cache_dir, 'vcsserver_cache_1'), lower=False)
317 321
318 322 def _configure(self):
319 323 self.config.add_renderer(name='msgpack', factory=self._msgpack_renderer_factory)
320 324
321 325 self.config.add_route('service', '/_service')
322 326 self.config.add_route('status', '/status')
323 327 self.config.add_route('hg_proxy', '/proxy/hg')
324 328 self.config.add_route('git_proxy', '/proxy/git')
325 329
326 330 # rpc methods
327 331 self.config.add_route('vcs', '/{backend}')
328 332
329 333 # streaming rpc remote methods
330 334 self.config.add_route('vcs_stream', '/{backend}/stream')
331 335
332 336 # vcs operations clone/push as streaming
333 337 self.config.add_route('stream_git', '/stream/git/*repo_name')
334 338 self.config.add_route('stream_hg', '/stream/hg/*repo_name')
335 339
336 340 self.config.add_view(self.status_view, route_name='status', renderer='json')
337 341 self.config.add_view(self.service_view, route_name='service', renderer='msgpack')
338 342
339 343 self.config.add_view(self.hg_proxy(), route_name='hg_proxy')
340 344 self.config.add_view(self.git_proxy(), route_name='git_proxy')
341 345 self.config.add_view(self.vcs_view, route_name='vcs', renderer='msgpack',
342 346 vcs_view=self._remotes)
343 347 self.config.add_view(self.vcs_stream_view, route_name='vcs_stream',
344 348 vcs_view=self._remotes)
345 349
346 350 self.config.add_view(self.hg_stream(), route_name='stream_hg')
347 351 self.config.add_view(self.git_stream(), route_name='stream_git')
348 352
349 353 self.config.add_view_predicate('vcs_view', VCSViewPredicate)
350 354
351 355 self.config.add_notfound_view(not_found, renderer='json')
352 356
353 357 self.config.add_view(self.handle_vcs_exception, context=Exception)
354 358
355 359 self.config.add_tween(
356 360 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
357 361 )
358 362 self.config.add_request_method(
359 363 'vcsserver.lib.request_counter.get_request_counter',
360 364 'request_count')
361 365
362 self.config.add_request_method(
363 'vcsserver.lib._vendor.statsd.get_statsd_client',
364 'statsd', reify=True)
365
366 366 def wsgi_app(self):
367 367 return self.config.make_wsgi_app()
368 368
369 369 def _vcs_view_params(self, request):
370 370 remote = self._remotes[request.matchdict['backend']]
371 371 payload = msgpack.unpackb(request.body, use_list=True)
372 372 method = payload.get('method')
373 373 params = payload['params']
374 374 wire = params.get('wire')
375 375 args = params.get('args')
376 376 kwargs = params.get('kwargs')
377 377 context_uid = None
378 378
379 379 if wire:
380 380 try:
381 381 wire['context'] = context_uid = uuid.UUID(wire['context'])
382 382 except KeyError:
383 383 pass
384 384 args.insert(0, wire)
385 385 repo_state_uid = wire.get('repo_state_uid') if wire else None
386 386
387 387 # NOTE(marcink): trading complexity for slight performance
388 388 if log.isEnabledFor(logging.DEBUG):
389 389 no_args_methods = [
390 390
391 391 ]
392 392 if method in no_args_methods:
393 393 call_args = ''
394 394 else:
395 395 call_args = args[1:]
396 396
397 397 log.debug('Method requested:`%s` with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s',
398 398 method, call_args, kwargs, context_uid, repo_state_uid)
399 399
400 statsd = request.registry.statsd
401 if statsd:
402 statsd.incr(
403 'vcsserver_method_count', tags=[
404 "method:{}".format(method),
405 ])
400 406 return payload, remote, method, args, kwargs
401 407
402 408 def vcs_view(self, request):
403 409
404 410 payload, remote, method, args, kwargs = self._vcs_view_params(request)
405 411 payload_id = payload.get('id')
406 412
407 413 try:
408 414 resp = getattr(remote, method)(*args, **kwargs)
409 415 except Exception as e:
410 416 exc_info = list(sys.exc_info())
411 417 exc_type, exc_value, exc_traceback = exc_info
412 418
413 419 org_exc = getattr(e, '_org_exc', None)
414 420 org_exc_name = None
415 421 org_exc_tb = ''
416 422 if org_exc:
417 423 org_exc_name = org_exc.__class__.__name__
418 424 org_exc_tb = getattr(e, '_org_exc_tb', '')
419 425 # replace our "faked" exception with our org
420 426 exc_info[0] = org_exc.__class__
421 427 exc_info[1] = org_exc
422 428
423 429 should_store_exc = True
424 430 if org_exc:
425 431 def get_exc_fqn(_exc_obj):
426 432 module_name = getattr(org_exc.__class__, '__module__', 'UNKNOWN')
427 433 return module_name + '.' + org_exc_name
428 434
429 435 exc_fqn = get_exc_fqn(org_exc)
430 436
431 437 if exc_fqn in ['mercurial.error.RepoLookupError',
432 438 'vcsserver.exceptions.RefNotFoundException']:
433 439 should_store_exc = False
434 440
435 441 if should_store_exc:
436 442 store_exception(id(exc_info), exc_info, request_path=request.path)
437 443
438 444 tb_info = ''.join(
439 445 traceback.format_exception(exc_type, exc_value, exc_traceback))
440 446
441 447 type_ = e.__class__.__name__
442 448 if type_ not in self.ALLOWED_EXCEPTIONS:
443 449 type_ = None
444 450
445 451 resp = {
446 452 'id': payload_id,
447 453 'error': {
448 454 'message': e.message,
449 455 'traceback': tb_info,
450 456 'org_exc': org_exc_name,
451 457 'org_exc_tb': org_exc_tb,
452 458 'type': type_
453 459 }
454 460 }
455 461
456 462 try:
457 463 resp['error']['_vcs_kind'] = getattr(e, '_vcs_kind', None)
458 464 except AttributeError:
459 465 pass
460 466 else:
461 467 resp = {
462 468 'id': payload_id,
463 469 'result': resp
464 470 }
465 471
466 472 return resp
467 473
468 474 def vcs_stream_view(self, request):
469 475 payload, remote, method, args, kwargs = self._vcs_view_params(request)
470 476 # this method has a stream: marker we remove it here
471 477 method = method.split('stream:')[-1]
472 478 chunk_size = safe_int(payload.get('chunk_size')) or 4096
473 479
474 480 try:
475 481 resp = getattr(remote, method)(*args, **kwargs)
476 482 except Exception as e:
477 483 raise
478 484
479 485 def get_chunked_data(method_resp):
480 486 stream = StringIO(method_resp)
481 487 while 1:
482 488 chunk = stream.read(chunk_size)
483 489 if not chunk:
484 490 break
485 491 yield chunk
486 492
487 493 response = Response(app_iter=get_chunked_data(resp))
488 494 response.content_type = 'application/octet-stream'
489 495
490 496 return response
491 497
492 498 def status_view(self, request):
493 499 import vcsserver
494 500 return {'status': 'OK', 'vcsserver_version': vcsserver.__version__,
495 501 'pid': os.getpid()}
496 502
497 503 def service_view(self, request):
498 504 import vcsserver
499 505
500 506 payload = msgpack.unpackb(request.body, use_list=True)
501 507 server_config, app_config = {}, {}
502 508
503 509 try:
504 510 path = self.global_config['__file__']
505 511 config = configparser.RawConfigParser()
506 512
507 513 config.read(path)
508 514
509 515 if config.has_section('server:main'):
510 516 server_config = dict(config.items('server:main'))
511 517 if config.has_section('app:main'):
512 518 app_config = dict(config.items('app:main'))
513 519
514 520 except Exception:
515 521 log.exception('Failed to read .ini file for display')
516 522
517 523 environ = os.environ.items()
518 524
519 525 resp = {
520 526 'id': payload.get('id'),
521 527 'result': dict(
522 528 version=vcsserver.__version__,
523 529 config=server_config,
524 530 app_config=app_config,
525 531 environ=environ,
526 532 payload=payload,
527 533 )
528 534 }
529 535 return resp
530 536
531 537 def _msgpack_renderer_factory(self, info):
532 538 def _render(value, system):
533 539 request = system.get('request')
534 540 if request is not None:
535 541 response = request.response
536 542 ct = response.content_type
537 543 if ct == response.default_content_type:
538 544 response.content_type = 'application/x-msgpack'
539 545 return msgpack.packb(value)
540 546 return _render
541 547
542 548 def set_env_from_config(self, environ, config):
543 549 dict_conf = {}
544 550 try:
545 551 for elem in config:
546 552 if elem[0] == 'rhodecode':
547 553 dict_conf = json.loads(elem[2])
548 554 break
549 555 except Exception:
550 556 log.exception('Failed to fetch SCM CONFIG')
551 557 return
552 558
553 559 username = dict_conf.get('username')
554 560 if username:
555 561 environ['REMOTE_USER'] = username
556 562 # mercurial specific, some extension api rely on this
557 563 environ['HGUSER'] = username
558 564
559 565 ip = dict_conf.get('ip')
560 566 if ip:
561 567 environ['REMOTE_HOST'] = ip
562 568
563 569 if _is_request_chunked(environ):
564 570 # set the compatibility flag for webob
565 571 environ['wsgi.input_terminated'] = True
566 572
567 573 def hg_proxy(self):
568 574 @wsgiapp
569 575 def _hg_proxy(environ, start_response):
570 576 app = WsgiProxy(self.remote_wsgi.HgRemoteWsgi())
571 577 return app(environ, start_response)
572 578 return _hg_proxy
573 579
574 580 def git_proxy(self):
575 581 @wsgiapp
576 582 def _git_proxy(environ, start_response):
577 583 app = WsgiProxy(self.remote_wsgi.GitRemoteWsgi())
578 584 return app(environ, start_response)
579 585 return _git_proxy
580 586
581 587 def hg_stream(self):
582 588 if self._use_echo_app:
583 589 @wsgiapp
584 590 def _hg_stream(environ, start_response):
585 591 app = EchoApp('fake_path', 'fake_name', None)
586 592 return app(environ, start_response)
587 593 return _hg_stream
588 594 else:
589 595 @wsgiapp
590 596 def _hg_stream(environ, start_response):
591 597 log.debug('http-app: handling hg stream')
592 598 repo_path = environ['HTTP_X_RC_REPO_PATH']
593 599 repo_name = environ['HTTP_X_RC_REPO_NAME']
594 600 packed_config = base64.b64decode(
595 601 environ['HTTP_X_RC_REPO_CONFIG'])
596 602 config = msgpack.unpackb(packed_config)
597 603 app = scm_app.create_hg_wsgi_app(
598 604 repo_path, repo_name, config)
599 605
600 606 # Consistent path information for hgweb
601 607 environ['PATH_INFO'] = environ['HTTP_X_RC_PATH_INFO']
602 608 environ['REPO_NAME'] = repo_name
603 609 self.set_env_from_config(environ, config)
604 610
605 611 log.debug('http-app: starting app handler '
606 612 'with %s and process request', app)
607 613 return app(environ, ResponseFilter(start_response))
608 614 return _hg_stream
609 615
610 616 def git_stream(self):
611 617 if self._use_echo_app:
612 618 @wsgiapp
613 619 def _git_stream(environ, start_response):
614 620 app = EchoApp('fake_path', 'fake_name', None)
615 621 return app(environ, start_response)
616 622 return _git_stream
617 623 else:
618 624 @wsgiapp
619 625 def _git_stream(environ, start_response):
620 626 log.debug('http-app: handling git stream')
621 627 repo_path = environ['HTTP_X_RC_REPO_PATH']
622 628 repo_name = environ['HTTP_X_RC_REPO_NAME']
623 629 packed_config = base64.b64decode(
624 630 environ['HTTP_X_RC_REPO_CONFIG'])
625 631 config = msgpack.unpackb(packed_config)
626 632
627 633 environ['PATH_INFO'] = environ['HTTP_X_RC_PATH_INFO']
628 634 self.set_env_from_config(environ, config)
629 635
630 636 content_type = environ.get('CONTENT_TYPE', '')
631 637
632 638 path = environ['PATH_INFO']
633 639 is_lfs_request = GIT_LFS_CONTENT_TYPE in content_type
634 640 log.debug(
635 641 'LFS: Detecting if request `%s` is LFS server path based '
636 642 'on content type:`%s`, is_lfs:%s',
637 643 path, content_type, is_lfs_request)
638 644
639 645 if not is_lfs_request:
640 646 # fallback detection by path
641 647 if GIT_LFS_PROTO_PAT.match(path):
642 648 is_lfs_request = True
643 649 log.debug(
644 650 'LFS: fallback detection by path of: `%s`, is_lfs:%s',
645 651 path, is_lfs_request)
646 652
647 653 if is_lfs_request:
648 654 app = scm_app.create_git_lfs_wsgi_app(
649 655 repo_path, repo_name, config)
650 656 else:
651 657 app = scm_app.create_git_wsgi_app(
652 658 repo_path, repo_name, config)
653 659
654 660 log.debug('http-app: starting app handler '
655 661 'with %s and process request', app)
656 662
657 663 return app(environ, start_response)
658 664
659 665 return _git_stream
660 666
661 667 def handle_vcs_exception(self, exception, request):
662 668 _vcs_kind = getattr(exception, '_vcs_kind', '')
663 669 if _vcs_kind == 'repo_locked':
664 670 # Get custom repo-locked status code if present.
665 671 status_code = request.headers.get('X-RC-Locked-Status-Code')
666 672 return HTTPRepoLocked(
667 673 title=exception.message, status_code=status_code)
668 674
669 675 elif _vcs_kind == 'repo_branch_protected':
670 676 # Get custom repo-branch-protected status code if present.
671 677 return HTTPRepoBranchProtected(title=exception.message)
672 678
673 679 exc_info = request.exc_info
674 680 store_exception(id(exc_info), exc_info)
675 681
676 682 traceback_info = 'unavailable'
677 683 if request.exc_info:
678 684 exc_type, exc_value, exc_tb = request.exc_info
679 685 traceback_info = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb))
680 686
681 687 log.error(
682 688 'error occurred handling this request for path: %s, \n tb: %s',
683 689 request.path, traceback_info)
690
691 statsd = request.registry.statsd
692 if statsd:
693 statsd.incr('vcsserver_exception')
684 694 raise exception
685 695
686 696
687 697 class ResponseFilter(object):
688 698
689 699 def __init__(self, start_response):
690 700 self._start_response = start_response
691 701
692 702 def __call__(self, status, response_headers, exc_info=None):
693 703 headers = tuple(
694 704 (h, v) for h, v in response_headers
695 705 if not wsgiref.util.is_hop_by_hop(h))
696 706 return self._start_response(status, headers, exc_info)
697 707
698 708
699 709 def main(global_config, **settings):
700 710 if MercurialFactory:
701 711 hgpatches.patch_largefiles_capabilities()
702 712 hgpatches.patch_subrepo_type_mapping()
703 713
714 # init and bootstrap StatsdClient
715 StatsdClient.setup(settings)
716
704 717 app = HTTPApplication(settings=settings, global_config=global_config)
705 718 return app.wsgi_app()
@@ -1,107 +1,128 b''
1 1 from __future__ import absolute_import, division, unicode_literals
2 2
3 import re
3 4 import random
4 5 from collections import deque
5 6 from datetime import timedelta
7 from repoze.lru import lru_cache
6 8
7 9 from .timer import Timer
8 10
11 TAG_INVALID_CHARS_RE = re.compile(r"[^\w\d_\-:/\.]", re.UNICODE)
12 TAG_INVALID_CHARS_SUBS = "_"
13
14
15 @lru_cache(maxsize=500)
16 def _normalize_tags_with_cache(tag_list):
17 return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
18
19
20 def normalize_tags(tag_list):
21 # We have to turn our input tag list into a non-mutable tuple for it to
22 # be hashable (and thus usable) by the @lru_cache decorator.
23 return _normalize_tags_with_cache(tuple(tag_list))
24
9 25
10 26 class StatsClientBase(object):
11 27 """A Base class for various statsd clients."""
12 28
13 29 def close(self):
14 30 """Used to close and clean up any underlying resources."""
15 31 raise NotImplementedError()
16 32
17 33 def _send(self):
18 34 raise NotImplementedError()
19 35
20 36 def pipeline(self):
21 37 raise NotImplementedError()
22 38
23 def timer(self, stat, rate=1):
24 return Timer(self, stat, rate)
39 def timer(self, stat, rate=1, tags=None):
40 return Timer(self, stat, rate, tags)
25 41
26 def timing(self, stat, delta, rate=1):
42 def timing(self, stat, delta, rate=1, tags=None):
27 43 """
28 44 Send new timing information.
29 45
30 46 `delta` can be either a number of milliseconds or a timedelta.
31 47 """
32 48 if isinstance(delta, timedelta):
33 49 # Convert timedelta to number of milliseconds.
34 50 delta = delta.total_seconds() * 1000.
35 self._send_stat(stat, '%0.6f|ms' % delta, rate)
51 self._send_stat(stat, '%0.6f|ms' % delta, rate, tags)
36 52
37 def incr(self, stat, count=1, rate=1):
53 def incr(self, stat, count=1, rate=1, tags=None):
38 54 """Increment a stat by `count`."""
39 self._send_stat(stat, '%s|c' % count, rate)
55 self._send_stat(stat, '%s|c' % count, rate, tags)
40 56
41 def decr(self, stat, count=1, rate=1):
57 def decr(self, stat, count=1, rate=1, tags=None):
42 58 """Decrement a stat by `count`."""
43 self.incr(stat, -count, rate)
59 self.incr(stat, -count, rate, tags)
44 60
45 def gauge(self, stat, value, rate=1, delta=False):
61 def gauge(self, stat, value, rate=1, delta=False, tags=None):
46 62 """Set a gauge value."""
47 63 if value < 0 and not delta:
48 64 if rate < 1:
49 65 if random.random() > rate:
50 66 return
51 67 with self.pipeline() as pipe:
52 68 pipe._send_stat(stat, '0|g', 1)
53 69 pipe._send_stat(stat, '%s|g' % value, 1)
54 70 else:
55 71 prefix = '+' if delta and value >= 0 else ''
56 self._send_stat(stat, '%s%s|g' % (prefix, value), rate)
72 self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags)
57 73
58 74 def set(self, stat, value, rate=1):
59 75 """Set a set value."""
60 76 self._send_stat(stat, '%s|s' % value, rate)
61 77
62 def _send_stat(self, stat, value, rate):
63 self._after(self._prepare(stat, value, rate))
78 def _send_stat(self, stat, value, rate, tags=None):
79 self._after(self._prepare(stat, value, rate, tags))
64 80
65 def _prepare(self, stat, value, rate):
81 def _prepare(self, stat, value, rate, tags=None):
66 82 if rate < 1:
67 83 if random.random() > rate:
68 84 return
69 85 value = '%s|@%s' % (value, rate)
70 86
71 87 if self._prefix:
72 88 stat = '%s.%s' % (self._prefix, stat)
73 89
74 return '%s:%s' % (stat, value)
90 res = '%s:%s%s' % (
91 stat,
92 value,
93 ("|#" + ",".join(normalize_tags(tags))) if tags else "",
94 )
95 return res
75 96
76 97 def _after(self, data):
77 98 if data:
78 99 self._send(data)
79 100
80 101
81 102 class PipelineBase(StatsClientBase):
82 103
83 104 def __init__(self, client):
84 105 self._client = client
85 106 self._prefix = client._prefix
86 107 self._stats = deque()
87 108
88 109 def _send(self):
89 110 raise NotImplementedError()
90 111
91 112 def _after(self, data):
92 113 if data is not None:
93 114 self._stats.append(data)
94 115
95 116 def __enter__(self):
96 117 return self
97 118
98 119 def __exit__(self, typ, value, tb):
99 120 self.send()
100 121
101 122 def send(self):
102 123 if not self._stats:
103 124 return
104 125 self._send()
105 126
106 127 def pipeline(self):
107 128 return self.__class__(self)
@@ -1,71 +1,72 b''
1 1 from __future__ import absolute_import, division, unicode_literals
2 2
3 3 import functools
4 4
5 5 # Use timer that's not susceptible to time of day adjustments.
6 6 try:
7 7 # perf_counter is only present on Py3.3+
8 8 from time import perf_counter as time_now
9 9 except ImportError:
10 10 # fall back to using time
11 11 from time import time as time_now
12 12
13 13
14 14 def safe_wraps(wrapper, *args, **kwargs):
15 15 """Safely wraps partial functions."""
16 16 while isinstance(wrapper, functools.partial):
17 17 wrapper = wrapper.func
18 18 return functools.wraps(wrapper, *args, **kwargs)
19 19
20 20
21 21 class Timer(object):
22 22 """A context manager/decorator for statsd.timing()."""
23 23
24 def __init__(self, client, stat, rate=1):
24 def __init__(self, client, stat, rate=1, tags=None):
25 25 self.client = client
26 26 self.stat = stat
27 27 self.rate = rate
28 self.tags = tags
28 29 self.ms = None
29 30 self._sent = False
30 31 self._start_time = None
31 32
32 33 def __call__(self, f):
33 34 """Thread-safe timing function decorator."""
34 35 @safe_wraps(f)
35 36 def _wrapped(*args, **kwargs):
36 37 start_time = time_now()
37 38 try:
38 39 return f(*args, **kwargs)
39 40 finally:
40 41 elapsed_time_ms = 1000.0 * (time_now() - start_time)
41 self.client.timing(self.stat, elapsed_time_ms, self.rate)
42 self.client.timing(self.stat, elapsed_time_ms, self.rate, self.tags)
42 43 return _wrapped
43 44
44 45 def __enter__(self):
45 46 return self.start()
46 47
47 48 def __exit__(self, typ, value, tb):
48 49 self.stop()
49 50
50 51 def start(self):
51 52 self.ms = None
52 53 self._sent = False
53 54 self._start_time = time_now()
54 55 return self
55 56
56 57 def stop(self, send=True):
57 58 if self._start_time is None:
58 59 raise RuntimeError('Timer has not started.')
59 60 dt = time_now() - self._start_time
60 61 self.ms = 1000.0 * dt # Convert to milliseconds.
61 62 if send:
62 63 self.send()
63 64 return self
64 65
65 66 def send(self):
66 67 if self.ms is None:
67 68 raise RuntimeError('No data recorded.')
68 69 if self._sent:
69 70 raise RuntimeError('Already sent data.')
70 71 self._sent = True
71 72 self.client.timing(self.stat, self.ms, self.rate)
@@ -1,70 +1,80 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import time
19 19 import logging
20 20
21 21 import vcsserver
22 22 from vcsserver.utils import safe_str
23 23
24 24
25 25 log = logging.getLogger(__name__)
26 26
27 27
28 def get_access_path(request):
29 environ = request.environ
30 return environ.get('PATH_INFO')
28 def get_access_path(environ):
29 path = environ.get('PATH_INFO')
30 return path
31 31
32 32
33 33 def get_user_agent(environ):
34 34 return environ.get('HTTP_USER_AGENT')
35 35
36 36
37 37 class RequestWrapperTween(object):
38 38 def __init__(self, handler, registry):
39 39 self.handler = handler
40 40 self.registry = registry
41 41
42 42 # one-time configuration code goes here
43 43
44 44 def __call__(self, request):
45 45 start = time.time()
46 46 log.debug('Starting request time measurement')
47 47 try:
48 48 response = self.handler(request)
49 49 finally:
50 50 count = request.request_count()
51 51 _ver_ = vcsserver.__version__
52 statsd = request.statsd
52 _path = safe_str(get_access_path(request.environ))
53
53 54 total = time.time() - start
54 if statsd:
55 statsd.timing('vcsserver.req.timing', total)
56 statsd.incr('vcsserver.req.count')
57 55 log.info(
58 56 'Req[%4s] IP: %s %s Request to %s time: %.4fs [%s], VCSServer %s',
59 57 count, '127.0.0.1', request.environ.get('REQUEST_METHOD'),
60 safe_str(get_access_path(request)), total,
61 get_user_agent(request.environ), _ver_
58 _path, total, get_user_agent(request.environ), _ver_
62 59 )
63 60
61 statsd = request.registry.statsd
62 if statsd:
63 elapsed_time_ms = 1000.0 * total
64 statsd.timing(
65 'vcsserver_req_timing', elapsed_time_ms,
66 tags=[
67 "path:{}".format(_path),
68 ]
69 )
70 statsd.incr(
71 'vcsserver_req_count', tags=[
72 "path:{}".format(_path),
73 ])
64 74 return response
65 75
66 76
67 77 def includeme(config):
68 78 config.add_tween(
69 79 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
70 80 )
General Comments 0
You need to be logged in to leave comments. Login now