diff --git a/.bumpversion.cfg b/.bumpversion.cfg --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -3,4 +3,3 @@ current_version = 5.0.0 message = release: Bump version {current_version} to {new_version} [bumpversion:file:vcsserver/VERSION] - diff --git a/.hgtags b/.hgtags --- a/.hgtags +++ b/.hgtags @@ -61,3 +61,22 @@ ba6a6dc9ecd7fd8b1dcd6eb0c4ee0210e897c426 17bc818b41bcf6883b9ff0da31f01d8c2a5d0781 v4.18.1 1e9f12aa01f82c335abc9017efe94ce1c30b52ba v4.18.2 f4cc6b3c5680bdf4541d7d442fbb7086640fb547 v4.18.3 +5dc0277e4f77bd4cc3042d99625bb5d3ba480c8c v4.19.0 +3a815eeb1b1efa340dda9b81a8da3cf24a7d605b v4.19.1 +8841da3680fba841e5a54ebccd8ca56c078f7553 v4.19.2 +4b0dec7fd80b1ca38e5073e5e562a5a450f73669 v4.19.3 +1485aa75ffe1b1ec48352dce7b7492d92f85e95f v4.20.0 +5b740274011766ef2f73803cc196d081e1e7f1d4 v4.20.1 +5a7835234e2c45e8fb8184c60f548a64b5842af8 v4.21.0 +26af88343015f8b89d5a66f92bc7547c51fcf0df v4.22.0 +cf54e5f700fe5dc50af1a1bdf5197c18cf52105f v4.23.0 +179d989bcfe02c6227f9f6aa9236cbbe1c14c400 v4.23.1 +383aee8b1652affaa26aefe336a89ee366b2b26d v4.23.2 +bc1a8141cc51fc23c455ebc50c6609c810b46f8d v4.24.0 +530a1c03caabc806ea1ef34605f8f67f18c70e55 v4.24.1 +5908ae65cee1043982e1b26d7b618af5fcfebbb3 v4.25.0 +cce8bcdf75090d5943a1e9706fe5212d7b5d1fa1 v4.25.1 +8610c4bf846c63bbc95d3ddfb53fadaaa9c7aa42 v4.25.2 +d46b7d1be72c76c9f9aaeab6a342951d54459f49 v4.26.0 +6fba0daab1e20a9e18fb70fa59bd21753e0a5b90 v4.27.0 +6195da4fc454087173918ae59cae946289458676 v4.27.1 diff --git a/.release.cfg b/.release.cfg new file mode 100644 --- /dev/null +++ b/.release.cfg @@ -0,0 +1,16 @@ +[DEFAULT] +done = false + +[task:bump_version] +done = true + +[task:fixes_on_stable] +done = true + +[task:pip2nix_generated] +done = true + +[release] +state = prepared +version = 4.27.1 + diff --git a/configs/development.ini b/configs/development.ini --- a/configs/development.ini +++ b/configs/development.ini @@ -31,7 +31,7 @@ asyncore_use_poll = true ; GUNICORN APPLICATION SERVER ; ########################### -; run with gunicorn --log-config rhodecode.ini --paste rhodecode.ini +; run with gunicorn --paste rhodecode.ini ; Module to use, this setting shouldn't be changed #use = egg:gunicorn#main @@ -86,7 +86,7 @@ asyncore_use_poll = true ; serving requests. Workers still alive after the timeout (starting from the ; receipt of the restart signal) are force killed. ; Examples: 1800 (30min), 3600 (1hr), 7200 (2hr), 43200 (12h) -#graceful_timeout = 3600 +#graceful_timeout = 21600 # The number of seconds to wait for requests on a Keep-Alive connection. # Generally set in the 1-5 seconds range. @@ -110,6 +110,17 @@ asyncore_use_poll = true [app:main] ; The %(here)s variable will be replaced with the absolute path of parent directory ; of this file +; Each option in the app:main can be override by an environmental variable +; +;To override an option: +; +;RC_ +;Everything should be uppercase, . and - should be replaced by _. +;For example, if you have these configuration settings: +;rc_cache.repo_object.backend = foo +;can be overridden by +;export RC_CACHE_REPO_OBJECT_BACKEND=foo + use = egg:rhodecode-vcsserver @@ -133,13 +144,13 @@ debugtoolbar.exclude_prefixes = ; ################# ; Pyramid default locales, we need this to be set -pyramid.default_locale_name = en +#pyramid.default_locale_name = en ; default locale used by VCS systems -locale = en_US.UTF-8 +#locale = en_US.UTF-8 ; path to binaries for vcsserver, it should be set by the installer -; at installation time, e.g /home/user/vcsserver-1/profile/bin +; at installation time, e.g /home/user/.rccontrol/vcsserver-1/profile/bin ; it can also be a path to nix-build output in case of development core.binary_dir = "" @@ -153,21 +164,21 @@ core.binary_dir = "" ; Default cache dir for caches. Putting this into a ramdisk can boost performance. ; eg. /tmpfs/data_ramdisk, however this directory might require large amount of space -cache_dir = %(here)s/data +#cache_dir = %(here)s/data ; *************************************** ; `repo_object` cache, default file based ; *************************************** ; `repo_object` cache settings for vcs methods for repositories -rc_cache.repo_object.backend = dogpile.cache.rc.file_namespace +#rc_cache.repo_object.backend = dogpile.cache.rc.file_namespace ; cache auto-expires after N seconds ; Examples: 86400 (1Day), 604800 (7Days), 1209600 (14Days), 2592000 (30days), 7776000 (90Days) -rc_cache.repo_object.expiration_time = 2592000 +#rc_cache.repo_object.expiration_time = 2592000 ; file cache store path. Defaults to `cache_dir =` value or tempdir if both values are not set -#rc_cache.repo_object.arguments.filename = /tmp/vcsserver_cache.db +#rc_cache.repo_object.arguments.filename = /tmp/vcsserver_cache_repo_object.db ; *********************************************************** ; `repo_object` cache with redis backend @@ -191,10 +202,32 @@ rc_cache.repo_object.expiration_time = 2 ; more Redis options: https://dogpilecache.sqlalchemy.org/en/latest/api.html#redis-backends #rc_cache.repo_object.arguments.distributed_lock = true +; auto-renew lock to prevent stale locks, slower but safer. Use only if problems happen +#rc_cache.repo_object.arguments.lock_auto_renewal = true + +; Statsd client config, this is used to send metrics to statsd +; We recommend setting statsd_exported and scrape them using Promethues +#statsd.enabled = false +#statsd.statsd_host = 0.0.0.0 +#statsd.statsd_port = 8125 +#statsd.statsd_prefix = +#statsd.statsd_ipv6 = false + +; configure logging automatically at server startup set to false +; to use the below custom logging config. +; RC_LOGGING_FORMATTER +; RC_LOGGING_LEVEL +; env variables can control the settings for logging in case of autoconfigure + +#logging.autoconfigure = true + +; specify your own custom logging config file to configure logging +#logging.logging_conf_file = /path/to/custom_logging.ini ; ##################### ; LOGGING CONFIGURATION ; ##################### + [loggers] keys = root, vcsserver @@ -202,7 +235,7 @@ keys = root, vcsserver keys = console [formatters] -keys = generic +keys = generic, json ; ####### ; LOGGERS @@ -217,7 +250,6 @@ handlers = qualname = vcsserver propagate = 1 - ; ######## ; HANDLERS ; ######## @@ -226,6 +258,8 @@ propagate = 1 class = StreamHandler args = (sys.stderr, ) level = DEBUG +; To enable JSON formatted logs replace 'generic' with 'json' +; This allows sending properly formatted logs to grafana loki or elasticsearch formatter = generic ; ########## @@ -235,3 +269,7 @@ formatter = generic [formatter_generic] format = %(asctime)s.%(msecs)03d [%(process)d] %(levelname)-5.5s [%(name)s] %(message)s datefmt = %Y-%m-%d %H:%M:%S + +[formatter_json] +format = %(timestamp)s %(levelname)s %(name)s %(message)s %(req_id)s +class = vcsserver.lib._vendor.jsonlogger.JsonFormatter diff --git a/configs/gunicorn_config.py b/configs/gunicorn_config.py --- a/configs/gunicorn_config.py +++ b/configs/gunicorn_config.py @@ -11,6 +11,7 @@ import time import threading import traceback import random +import socket from gunicorn.glogging import Logger @@ -29,9 +30,15 @@ accesslog = '-' worker_tmp_dir = None tmp_upload_dir = None +#reuse_port = True + # Custom log format +#access_log_format = ( +# '%(t)s %(p)s INFO [GNCRN] %(h)-15s rqt:%(L)s %(s)s %(b)-6s "%(m)s:%(U)s %(q)s" usr:%(u)s "%(f)s" "%(a)s"') + +# loki format for easier parsing in grafana access_log_format = ( - '%(t)s %(p)s INFO [GNCRN] %(h)-15s rqt:%(L)s %(s)s %(b)-6s "%(m)s:%(U)s %(q)s" usr:%(u)s "%(f)s" "%(a)s"') + 'time="%(t)s" pid=%(p)s level="INFO" type="[GNCRN]" ip="%(h)-15s" rqt="%(L)s" response_code="%(s)s" response_bytes="%(b)-6s" uri="%(m)s:%(U)s %(q)s" user=":%(u)s" user_agent="%(a)s"') # self adjust workers based on CPU count # workers = get_workers() @@ -50,7 +57,7 @@ def _get_process_rss(pid=None): def _get_config(ini_path): - import configparser + import configparser try: config = configparser.RawConfigParser() @@ -90,9 +97,12 @@ def post_fork(server, worker): if conf.has_option(section, 'memory_usage_recovery_threshold'): _memory_usage_recovery_threshold = conf.getfloat(section, 'memory_usage_recovery_threshold') - worker._memory_max_usage = _memory_max_usage - worker._memory_usage_check_interval = _memory_usage_check_interval - worker._memory_usage_recovery_threshold = _memory_usage_recovery_threshold + worker._memory_max_usage = int(os.environ.get('RC_GUNICORN_MEMORY_MAX_USAGE', '') + or _memory_max_usage) + worker._memory_usage_check_interval = int(os.environ.get('RC_GUNICORN_MEMORY_USAGE_CHECK_INTERVAL', '') + or _memory_usage_check_interval) + worker._memory_usage_recovery_threshold = float(os.environ.get('RC_GUNICORN_MEMORY_USAGE_RECOVERY_THRESHOLD', '') + or _memory_usage_recovery_threshold) # register memory last check time, with some random offset so we don't recycle all # at once @@ -242,6 +252,74 @@ def post_request(worker, req, environ, r _check_memory_usage(worker) +def _filter_proxy(ip): + """ + Passed in IP addresses in HEADERS can be in a special format of multiple + ips. Those comma separated IPs are passed from various proxies in the + chain of request processing. The left-most being the original client. + We only care about the first IP which came from the org. client. + + :param ip: ip string from headers + """ + if ',' in ip: + _ips = ip.split(',') + _first_ip = _ips[0].strip() + return _first_ip + return ip + + +def _filter_port(ip): + """ + Removes a port from ip, there are 4 main cases to handle here. + - ipv4 eg. 127.0.0.1 + - ipv6 eg. ::1 + - ipv4+port eg. 127.0.0.1:8080 + - ipv6+port eg. [::1]:8080 + + :param ip: + """ + def is_ipv6(ip_addr): + if hasattr(socket, 'inet_pton'): + try: + socket.inet_pton(socket.AF_INET6, ip_addr) + except socket.error: + return False + else: + return False + return True + + if ':' not in ip: # must be ipv4 pure ip + return ip + + if '[' in ip and ']' in ip: # ipv6 with port + return ip.split(']')[0][1:].lower() + + # must be ipv6 or ipv4 with port + if is_ipv6(ip): + return ip + else: + ip, _port = ip.split(':')[:2] # means ipv4+port + return ip + + +def get_ip_addr(environ): + proxy_key = 'HTTP_X_REAL_IP' + proxy_key2 = 'HTTP_X_FORWARDED_FOR' + def_key = 'REMOTE_ADDR' + _filters = lambda x: _filter_port(_filter_proxy(x)) + + ip = environ.get(proxy_key) + if ip: + return _filters(ip) + + ip = environ.get(proxy_key2) + if ip: + return _filters(ip) + + ip = environ.get(def_key, '0.0.0.0') + return _filters(ip) + + class RhodeCodeLogger(Logger): """ Custom Logger that allows some customization that gunicorn doesn't allow @@ -258,5 +336,58 @@ class RhodeCodeLogger(Logger): msecs = int((now - long(now)) * 1000) return time.strftime(self.datefmt, time.localtime(now)) + '.{0:03d}'.format(msecs) + def atoms(self, resp, req, environ, request_time): + """ Gets atoms for log formatting. + """ + status = resp.status + if isinstance(status, str): + status = status.split(None, 1)[0] + atoms = { + 'h': get_ip_addr(environ), + 'l': '-', + 'u': self._get_user(environ) or '-', + 't': self.now(), + 'r': "%s %s %s" % (environ['REQUEST_METHOD'], + environ['RAW_URI'], + environ["SERVER_PROTOCOL"]), + 's': status, + 'm': environ.get('REQUEST_METHOD'), + 'U': environ.get('PATH_INFO'), + 'q': environ.get('QUERY_STRING'), + 'H': environ.get('SERVER_PROTOCOL'), + 'b': getattr(resp, 'sent', None) is not None and str(resp.sent) or '-', + 'B': getattr(resp, 'sent', None), + 'f': environ.get('HTTP_REFERER', '-'), + 'a': environ.get('HTTP_USER_AGENT', '-'), + 'T': request_time.seconds, + 'D': (request_time.seconds * 1000000) + request_time.microseconds, + 'M': (request_time.seconds * 1000) + int(request_time.microseconds/1000), + 'L': "%d.%06d" % (request_time.seconds, request_time.microseconds), + 'p': "<%s>" % os.getpid() + } + + # add request headers + if hasattr(req, 'headers'): + req_headers = req.headers + else: + req_headers = req + + if hasattr(req_headers, "items"): + req_headers = req_headers.items() + + atoms.update({"{%s}i" % k.lower(): v for k, v in req_headers}) + + resp_headers = resp.headers + if hasattr(resp_headers, "items"): + resp_headers = resp_headers.items() + + # add response headers + atoms.update({"{%s}o" % k.lower(): v for k, v in resp_headers}) + + # add environ variables + environ_variables = environ.items() + atoms.update({"{%s}e" % k.lower(): v for k, v in environ_variables}) + + return atoms logger_class = RhodeCodeLogger diff --git a/configs/logging.ini b/configs/logging.ini new file mode 100644 --- /dev/null +++ b/configs/logging.ini @@ -0,0 +1,53 @@ +; ##################### +; LOGGING CONFIGURATION +; ##################### +; Logging template, used for configure the logging +; some variables here are replaced by RhodeCode to default values + +[loggers] +keys = root, vcsserver + +[handlers] +keys = console + +[formatters] +keys = generic, json + +; ####### +; LOGGERS +; ####### +[logger_root] +level = NOTSET +handlers = console + +[logger_vcsserver] +level = $RC_LOGGING_LEVEL +handlers = +qualname = vcsserver +propagate = 1 + +; ######## +; HANDLERS +; ######## + +[handler_console] +class = StreamHandler +args = (sys.stderr, ) +level = $RC_LOGGING_LEVEL +; To enable JSON formatted logs replace generic with json +; This allows sending properly formatted logs to grafana loki or elasticsearch +#formatter = json +#formatter = generic +formatter = $RC_LOGGING_FORMATTER + +; ########## +; FORMATTERS +; ########## + +[formatter_generic] +format = %(asctime)s.%(msecs)03d [%(process)d] %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %Y-%m-%d %H:%M:%S + +[formatter_json] +format = %(timestamp)s %(levelname)s %(name)s %(message)s %(req_id)s +class = vcsserver.lib._vendor.jsonlogger.JsonFormatter diff --git a/configs/production.ini b/configs/production.ini --- a/configs/production.ini +++ b/configs/production.ini @@ -14,7 +14,7 @@ port = 9900 ; GUNICORN APPLICATION SERVER ; ########################### -; run with gunicorn --log-config rhodecode.ini --paste rhodecode.ini +; run with gunicorn --paste rhodecode.ini ; Module to use, this setting shouldn't be changed use = egg:gunicorn#main @@ -69,7 +69,7 @@ limit_request_field_size = 0 ; serving requests. Workers still alive after the timeout (starting from the ; receipt of the restart signal) are force killed. ; Examples: 1800 (30min), 3600 (1hr), 7200 (2hr), 43200 (12h) -graceful_timeout = 3600 +graceful_timeout = 21600 # The number of seconds to wait for requests on a Keep-Alive connection. # Generally set in the 1-5 seconds range. @@ -93,16 +93,27 @@ memory_usage_recovery_threshold = 0.8 [app:main] ; The %(here)s variable will be replaced with the absolute path of parent directory ; of this file +; Each option in the app:main can be override by an environmental variable +; +;To override an option: +; +;RC_ +;Everything should be uppercase, . and - should be replaced by _. +;For example, if you have these configuration settings: +;rc_cache.repo_object.backend = foo +;can be overridden by +;export RC_CACHE_REPO_OBJECT_BACKEND=foo + use = egg:rhodecode-vcsserver ; Pyramid default locales, we need this to be set -pyramid.default_locale_name = en +#pyramid.default_locale_name = en ; default locale used by VCS systems -locale = en_US.UTF-8 +#locale = en_US.UTF-8 ; path to binaries for vcsserver, it should be set by the installer -; at installation time, e.g /home/user/vcsserver-1/profile/bin +; at installation time, e.g /home/user/.rccontrol/vcsserver-1/profile/bin ; it can also be a path to nix-build output in case of development core.binary_dir = "" @@ -116,21 +127,21 @@ core.binary_dir = "" ; Default cache dir for caches. Putting this into a ramdisk can boost performance. ; eg. /tmpfs/data_ramdisk, however this directory might require large amount of space -cache_dir = %(here)s/data +#cache_dir = %(here)s/data ; *************************************** ; `repo_object` cache, default file based ; *************************************** ; `repo_object` cache settings for vcs methods for repositories -rc_cache.repo_object.backend = dogpile.cache.rc.file_namespace +#rc_cache.repo_object.backend = dogpile.cache.rc.file_namespace ; cache auto-expires after N seconds ; Examples: 86400 (1Day), 604800 (7Days), 1209600 (14Days), 2592000 (30days), 7776000 (90Days) -rc_cache.repo_object.expiration_time = 2592000 +#rc_cache.repo_object.expiration_time = 2592000 ; file cache store path. Defaults to `cache_dir =` value or tempdir if both values are not set -#rc_cache.repo_object.arguments.filename = /tmp/vcsserver_cache.db +#rc_cache.repo_object.arguments.filename = /tmp/vcsserver_cache_repo_object.db ; *********************************************************** ; `repo_object` cache with redis backend @@ -154,10 +165,32 @@ rc_cache.repo_object.expiration_time = 2 ; more Redis options: https://dogpilecache.sqlalchemy.org/en/latest/api.html#redis-backends #rc_cache.repo_object.arguments.distributed_lock = true +; auto-renew lock to prevent stale locks, slower but safer. Use only if problems happen +#rc_cache.repo_object.arguments.lock_auto_renewal = true + +; Statsd client config, this is used to send metrics to statsd +; We recommend setting statsd_exported and scrape them using Promethues +#statsd.enabled = false +#statsd.statsd_host = 0.0.0.0 +#statsd.statsd_port = 8125 +#statsd.statsd_prefix = +#statsd.statsd_ipv6 = false + +; configure logging automatically at server startup set to false +; to use the below custom logging config. +; RC_LOGGING_FORMATTER +; RC_LOGGING_LEVEL +; env variables can control the settings for logging in case of autoconfigure + +#logging.autoconfigure = true + +; specify your own custom logging config file to configure logging +#logging.logging_conf_file = /path/to/custom_logging.ini ; ##################### ; LOGGING CONFIGURATION ; ##################### + [loggers] keys = root, vcsserver @@ -165,7 +198,7 @@ keys = root, vcsserver keys = console [formatters] -keys = generic +keys = generic, json ; ####### ; LOGGERS @@ -175,12 +208,11 @@ level = NOTSET handlers = console [logger_vcsserver] -level = DEBUG +level = INFO handlers = qualname = vcsserver propagate = 1 - ; ######## ; HANDLERS ; ######## @@ -189,6 +221,8 @@ propagate = 1 class = StreamHandler args = (sys.stderr, ) level = INFO +; To enable JSON formatted logs replace 'generic' with 'json' +; This allows sending properly formatted logs to grafana loki or elasticsearch formatter = generic ; ########## @@ -198,3 +232,7 @@ formatter = generic [formatter_generic] format = %(asctime)s.%(msecs)03d [%(process)d] %(levelname)-5.5s [%(name)s] %(message)s datefmt = %Y-%m-%d %H:%M:%S + +[formatter_json] +format = %(timestamp)s %(levelname)s %(name)s %(message)s %(req_id)s +class = vcsserver.lib._vendor.jsonlogger.JsonFormatter diff --git a/pkgs/patches/configparser/pyproject.patch b/pkgs/patches/configparser/pyproject.patch new file mode 100644 --- /dev/null +++ b/pkgs/patches/configparser/pyproject.patch @@ -0,0 +1,10 @@ +diff -rup configparser-4.0.2-orig/pyproject.toml configparser-4.0.2/pyproject.toml +--- configparser-4.0.2-orig/pyproject.toml 2021-03-22 21:28:11.000000000 +0100 ++++ configparser-4.0.2/pyproject.toml 2021-03-22 21:28:11.000000000 +0100 +@@ -1,5 +1,5 @@ + [build-system] +-requires = ["setuptools>=40.7", "wheel", "setuptools_scm>=1.15"] ++requires = ["setuptools<=42.0", "wheel", "setuptools_scm<6.0.0"] + build-backend = "setuptools.build_meta" + + [tool.black] diff --git a/pkgs/patches/importlib_metadata/pyproject.patch b/pkgs/patches/importlib_metadata/pyproject.patch new file mode 100644 --- /dev/null +++ b/pkgs/patches/importlib_metadata/pyproject.patch @@ -0,0 +1,7 @@ +diff -rup importlib-metadata-1.6.0-orig/yproject.toml importlib-metadata-1.6.0/pyproject.toml +--- importlib-metadata-1.6.0-orig/yproject.toml 2021-03-22 22:10:33.000000000 +0100 ++++ importlib-metadata-1.6.0/pyproject.toml 2021-03-22 22:11:09.000000000 +0100 +@@ -1,3 +1,3 @@ + [build-system] +-requires = ["setuptools>=30.3", "wheel", "setuptools_scm"] ++requires = ["setuptools<42.0", "wheel", "setuptools_scm<6.0.0"] diff --git a/pkgs/patches/pytest/setuptools.patch b/pkgs/patches/pytest/setuptools.patch new file mode 100644 --- /dev/null +++ b/pkgs/patches/pytest/setuptools.patch @@ -0,0 +1,12 @@ +diff -rup pytest-4.6.9-orig/setup.py pytest-4.6.9/setup.py +--- pytest-4.6.9-orig/setup.py 2018-04-10 10:23:04.000000000 +0200 ++++ pytest-4.6.9/setup.py 2018-04-10 10:23:34.000000000 +0200 +@@ -24,7 +24,7 @@ def main(): + def main(): + setup( + use_scm_version={"write_to": "src/_pytest/_version.py"}, +- setup_requires=["setuptools-scm", "setuptools>=40.0"], ++ setup_requires=["setuptools-scm<6.0.0", "setuptools<=42.0"], + package_dir={"": "src"}, + # fmt: off + extras_require={ \ No newline at end of file diff --git a/pkgs/patches/zipp/pyproject.patch b/pkgs/patches/zipp/pyproject.patch new file mode 100644 --- /dev/null +++ b/pkgs/patches/zipp/pyproject.patch @@ -0,0 +1,10 @@ +diff -rup zip-1.2.0-orig/pyproject.toml zip-1.2.0/pyproject.toml +--- zip-1.2.0-orig/pyproject.toml 2021-03-23 10:55:37.000000000 +0100 ++++ zip-1.2.0/pyproject.toml 2021-03-23 10:56:05.000000000 +0100 +@@ -1,5 +1,5 @@ + [build-system] +-requires = ["setuptools>=34.4", "wheel", "setuptools_scm>=1.15"] ++requires = ["setuptools<42.0", "wheel", "setuptools_scm<6.0.0"] + build-backend = "setuptools.build_meta" + + [tool.black] diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- + # RhodeCode VCSServer provides access to different vcs backends via network. # Copyright (C) 2014-2019 RodeCode GmbH # @@ -20,20 +21,47 @@ from setuptools import setup, find_packages import os +import re import sys import pkgutil import platform import codecs -try: # for pip >= 10 +import pip + +pip_major_version = int(pip.__version__.split(".")[0]) +if pip_major_version >= 20: from pip._internal.req import parse_requirements -except ImportError: # for pip <= 9.0.3 + from pip._internal.network.session import PipSession +elif pip_major_version >= 10: + from pip._internal.req import parse_requirements + from pip._internal.download import PipSession +else: from pip.req import parse_requirements + from pip.download import PipSession + -try: # for pip >= 10 - from pip._internal.download import PipSession -except ImportError: # for pip <= 9.0.3 - from pip.download import PipSession +def get_package_name(req_object): + package_name = None + try: + from pip._internal.req.constructors import install_req_from_parsed_requirement + except ImportError: + install_req_from_parsed_requirement = None + + # In 20.1 of pip, the requirements object changed + if hasattr(req_object, 'req'): + package_name = req_object.req.name + + if package_name is None: + if install_req_from_parsed_requirement: + package = install_req_from_parsed_requirement(req_object) + package_name = package.req.name + + if package_name is None: + # fallback for older pip + package_name = re.split('===|<=|!=|==|>=|~=|<|>', req_object.requirement)[0] + + return package_name if sys.version_info < (2, 7): @@ -61,14 +89,15 @@ def _get_requirements(req_filename, excl parsed = parse_requirements(os.path.join(here, req_filename)) requirements = [] - for ir in parsed: - if ir.req and ir.name not in exclude: - requirements.append(str(ir.req)) + for int_req in parsed: + req_name = get_package_name(int_req) + if req_name not in exclude: + requirements.append(req_name) return requirements + extras # requirements extract -setup_requirements = ['pytest-runner'] +setup_requirements = [] install_requirements = _get_requirements( 'requirements.txt', exclude=['setuptools']) test_requirements = _get_requirements( diff --git a/vcsserver/base.py b/vcsserver/base.py --- a/vcsserver/base.py +++ b/vcsserver/base.py @@ -14,13 +14,18 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - +import os import sys import traceback import logging import urllib.parse from vcsserver.lib.rc_cache import region_meta + +from vcsserver import exceptions +from vcsserver.exceptions import NoContentException +from vcsserver.hgcompat import (archival) + log = logging.getLogger(__name__) @@ -74,3 +79,54 @@ def raise_from_original(new_type): raise new_exc.with_traceback(exc_traceback) finally: del exc_traceback + + +class ArchiveNode(object): + def __init__(self, path, mode, is_link, raw_bytes): + self.path = path + self.mode = mode + self.is_link = is_link + self.raw_bytes = raw_bytes + + +def archive_repo(walker, archive_dest_path, kind, mtime, archive_at_path, + archive_dir_name, commit_id, write_metadata=True, extra_metadata=None): + """ + walker should be a file walker, for example: + def walker(): + for file_info in files: + yield ArchiveNode(fn, mode, is_link, ctx[fn].data) + """ + extra_metadata = extra_metadata or {} + + if kind == "tgz": + archiver = archival.tarit(archive_dest_path, mtime, "gz") + elif kind == "tbz2": + archiver = archival.tarit(archive_dest_path, mtime, "bz2") + elif kind == 'zip': + archiver = archival.zipit(archive_dest_path, mtime) + else: + raise exceptions.ArchiveException()( + 'Remote does not support: "%s" archive type.' % kind) + + for f in walker(commit_id, archive_at_path): + f_path = os.path.join(archive_dir_name, f.path.lstrip('/')) + try: + archiver.addfile(f_path, f.mode, f.is_link, f.raw_bytes()) + except NoContentException: + # NOTE(marcink): this is a special case for SVN so we can create "empty" + # directories which arent supported by archiver + archiver.addfile(os.path.join(f_path, '.dir'), f.mode, f.is_link, '') + + if write_metadata: + metadata = dict([ + ('commit_id', commit_id), + ('mtime', mtime), + ]) + metadata.update(extra_metadata) + + meta = ["%s:%s" % (f_name, value) for f_name, value in metadata.items()] + f_path = os.path.join(archive_dir_name, '.archival.txt') + archiver.addfile(f_path, 0o644, False, '\n'.join(meta)) + + return archiver.done() diff --git a/vcsserver/config/__init__.py b/vcsserver/config/__init__.py new file mode 100644 diff --git a/vcsserver/config/settings_maker.py b/vcsserver/config/settings_maker.py new file mode 100644 --- /dev/null +++ b/vcsserver/config/settings_maker.py @@ -0,0 +1,207 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2010-2020 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ + +import os +import textwrap +import string +import functools +import logging +import tempfile +import logging.config +log = logging.getLogger(__name__) + +# skip keys, that are set here, so we don't double process those +set_keys = { + '__file__': '' +} + + +def str2bool(_str): + """ + returns True/False value from given string, it tries to translate the + string into boolean + + :param _str: string value to translate into boolean + :rtype: boolean + :returns: boolean from given string + """ + if _str is None: + return False + if _str in (True, False): + return _str + _str = str(_str).strip().lower() + return _str in ('t', 'true', 'y', 'yes', 'on', '1') + + +def aslist(obj, sep=None, strip=True): + """ + Returns given string separated by sep as list + + :param obj: + :param sep: + :param strip: + """ + if isinstance(obj, (basestring,)): + if obj in ['', ""]: + return [] + + lst = obj.split(sep) + if strip: + lst = [v.strip() for v in lst] + return lst + elif isinstance(obj, (list, tuple)): + return obj + elif obj is None: + return [] + else: + return [obj] + + +class SettingsMaker(object): + + def __init__(self, app_settings): + self.settings = app_settings + + @classmethod + def _bool_func(cls, input_val): + if isinstance(input_val, unicode): + input_val = input_val.encode('utf8') + return str2bool(input_val) + + @classmethod + def _int_func(cls, input_val): + return int(input_val) + + @classmethod + def _list_func(cls, input_val, sep=','): + return aslist(input_val, sep=sep) + + @classmethod + def _string_func(cls, input_val, lower=True): + if lower: + input_val = input_val.lower() + return input_val + + @classmethod + def _float_func(cls, input_val): + return float(input_val) + + @classmethod + def _dir_func(cls, input_val, ensure_dir=False, mode=0o755): + + # ensure we have our dir created + if not os.path.isdir(input_val) and ensure_dir: + os.makedirs(input_val, mode=mode) + + if not os.path.isdir(input_val): + raise Exception('Dir at {} does not exist'.format(input_val)) + return input_val + + @classmethod + def _file_path_func(cls, input_val, ensure_dir=False, mode=0o755): + dirname = os.path.dirname(input_val) + cls._dir_func(dirname, ensure_dir=ensure_dir) + return input_val + + @classmethod + def _key_transformator(cls, key): + return "{}_{}".format('RC'.upper(), key.upper().replace('.', '_').replace('-', '_')) + + def maybe_env_key(self, key): + # now maybe we have this KEY in env, search and use the value with higher priority. + transformed_key = self._key_transformator(key) + envvar_value = os.environ.get(transformed_key) + if envvar_value: + log.debug('using `%s` key instead of `%s` key for config', transformed_key, key) + + return envvar_value + + def env_expand(self): + replaced = {} + for k, v in self.settings.items(): + if k not in set_keys: + envvar_value = self.maybe_env_key(k) + if envvar_value: + replaced[k] = envvar_value + set_keys[k] = envvar_value + + # replace ALL keys updated + self.settings.update(replaced) + + def enable_logging(self, logging_conf=None, level='INFO', formatter='generic'): + """ + Helper to enable debug on running instance + :return: + """ + + if not str2bool(self.settings.get('logging.autoconfigure')): + log.info('logging configuration based on main .ini file') + return + + if logging_conf is None: + logging_conf = self.settings.get('logging.logging_conf_file') or '' + + if not os.path.isfile(logging_conf): + log.error('Unable to setup logging based on %s, ' + 'file does not exist.... specify path using logging.logging_conf_file= config setting. ', logging_conf) + return + + with open(logging_conf, 'rb') as f: + ini_template = textwrap.dedent(f.read()) + ini_template = string.Template(ini_template).safe_substitute( + RC_LOGGING_LEVEL=os.environ.get('RC_LOGGING_LEVEL', '') or level, + RC_LOGGING_FORMATTER=os.environ.get('RC_LOGGING_FORMATTER', '') or formatter + ) + + with tempfile.NamedTemporaryFile(prefix='rc_logging_', suffix='.ini', delete=False) as f: + log.info('Saved Temporary LOGGING config at %s', f.name) + f.write(ini_template) + + logging.config.fileConfig(f.name) + os.remove(f.name) + + def make_setting(self, key, default, lower=False, default_when_empty=False, parser=None): + input_val = self.settings.get(key, default) + + if default_when_empty and not input_val: + # use default value when value is set in the config but it is empty + input_val = default + + parser_func = { + 'bool': self._bool_func, + 'int': self._int_func, + 'list': self._list_func, + 'list:newline': functools.partial(self._list_func, sep='/n'), + 'list:spacesep': functools.partial(self._list_func, sep=' '), + 'string': functools.partial(self._string_func, lower=lower), + 'dir': self._dir_func, + 'dir:ensured': functools.partial(self._dir_func, ensure_dir=True), + 'file': self._file_path_func, + 'file:ensured': functools.partial(self._file_path_func, ensure_dir=True), + None: lambda i: i + }[parser] + + envvar_value = self.maybe_env_key(key) + if envvar_value: + input_val = envvar_value + set_keys[key] = input_val + + self.settings[key] = parser_func(input_val) + return self.settings[key] diff --git a/vcsserver/exceptions.py b/vcsserver/exceptions.py --- a/vcsserver/exceptions.py +++ b/vcsserver/exceptions.py @@ -119,3 +119,7 @@ class HTTPRepoBranchProtected(HTTPForbid class RefNotFoundException(KeyError): pass + + +class NoContentException(ValueError): + pass diff --git a/vcsserver/git.py b/vcsserver/git.py --- a/vcsserver/git.py +++ b/vcsserver/git.py @@ -1,4 +1,4 @@ -# RhodeCode VCSServer provides access to different vcs backends via network. + RhodeCode VCSServer provides access to different vcs backends via network. # Copyright (C) 2014-2020 RhodeCode GmbH # # This program is free software; you can redistribute it and/or modify @@ -29,6 +29,7 @@ from functools import wraps import more_itertools import pygit2 from pygit2 import Repository as LibGit2Repo +from pygit2 import index as LibGit2Index from dulwich import index, objects from dulwich.client import HttpGitClient, LocalGitClient from dulwich.errors import ( @@ -40,7 +41,7 @@ from dulwich.server import update_server from vcsserver import exceptions, settings, subprocessio from vcsserver.utils import safe_str, safe_int, safe_unicode -from vcsserver.base import RepoFactory, obfuscate_qs +from vcsserver.base import RepoFactory, obfuscate_qs, ArchiveNode, archive_repo from vcsserver.hgcompat import ( hg_url as url_parser, httpbasicauthhandler, httpdigestauthhandler) from vcsserver.git_lfs.lib import LFSOidStore @@ -184,7 +185,8 @@ class GitRemote(RemoteBase): @reraise_safe_exceptions def assert_correct_path(self, wire): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _assert_correct_path(_context_uid, _repo_id): try: repo_init = self._factory.repo_libgit2(wire) @@ -216,7 +218,8 @@ class GitRemote(RemoteBase): @reraise_safe_exceptions def blob_raw_length(self, wire, sha): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _blob_raw_length(_repo_id, _sha): repo_init = self._factory.repo_libgit2(wire) @@ -247,7 +250,8 @@ class GitRemote(RemoteBase): def is_large_file(self, wire, commit_id): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _is_large_file(_repo_id, _sha): repo_init = self._factory.repo_libgit2(wire) with repo_init as repo: @@ -263,7 +267,8 @@ class GitRemote(RemoteBase): def is_binary(self, wire, tree_id): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _is_binary(_repo_id, _tree_id): repo_init = self._factory.repo_libgit2(wire) with repo_init as repo: @@ -305,7 +310,8 @@ class GitRemote(RemoteBase): @reraise_safe_exceptions def bulk_request(self, wire, rev, pre_load): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _bulk_request(_repo_id, _rev, _pre_load): result = {} for attr in pre_load: @@ -408,7 +414,8 @@ class GitRemote(RemoteBase): @reraise_safe_exceptions def branch(self, wire, commit_id): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _branch(_context_uid, _repo_id, _commit_id): regex = re.compile('^refs/heads') @@ -423,7 +430,8 @@ class GitRemote(RemoteBase): @reraise_safe_exceptions def commit_branches(self, wire, commit_id): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _commit_branches(_context_uid, _repo_id, _commit_id): repo_init = self._factory.repo_libgit2(wire) with repo_init as repo: @@ -444,11 +452,22 @@ class GitRemote(RemoteBase): # TODO: this is quite complex, check if that can be simplified @reraise_safe_exceptions def commit(self, wire, commit_data, branch, commit_tree, updated, removed): + # Defines the root tree + class _Root(object): + def __repr__(self): + return 'ROOT TREE' + ROOT = _Root() + repo = self._factory.repo(wire) object_store = repo.object_store # Create tree and populates it with blobs - commit_tree = commit_tree and repo[commit_tree] or objects.Tree() + + if commit_tree and repo[commit_tree]: + git_commit = repo[commit_data['parents'][0]] + commit_tree = repo[git_commit.tree] # root tree + else: + commit_tree = objects.Tree() for node in updated: # Compute subdirs if needed @@ -507,21 +526,34 @@ class GitRemote(RemoteBase): for node_path in removed: paths = node_path.split('/') - tree = commit_tree - trees = [tree] + tree = commit_tree # start with top-level + trees = [{'tree': tree, 'path': ROOT}] # Traverse deep into the forest... + # resolve final tree by iterating the path. + # e.g a/b/c.txt will get + # - root as tree then + # - 'a' as tree, + # - 'b' as tree, + # - stop at c as blob. for path in paths: try: obj = repo[tree[path][1]] if isinstance(obj, objects.Tree): - trees.append(obj) + trees.append({'tree': obj, 'path': path}) tree = obj except KeyError: break + #PROBLEM: + """ + We're not editing same reference tree object + """ # Cut down the blob and all rotten trees on the way back... - for path, tree in reversed(list(zip(paths, trees))): - del tree[path] - if tree: + for path, tree_data in reversed(list(zip(paths, trees))): + tree = tree_data['tree'] + tree.__delitem__(path) + # This operation edits the tree, we need to mark new commit back + + if len(tree) > 0: # This tree still has elements - don't remove it or any # of it's parents break @@ -587,7 +619,7 @@ class GitRemote(RemoteBase): if refs and not update_after: # mikhail: explicitly set the head to the last ref. - repo['HEAD'] = remote_refs[refs[-1]] + repo["HEAD"] = remote_refs[refs[-1]] if update_after: # we want to checkout HEAD @@ -689,7 +721,8 @@ class GitRemote(RemoteBase): @reraise_safe_exceptions def get_object(self, wire, sha, maybe_unreachable=False): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _get_object(_context_uid, _repo_id, _sha): repo_init = self._factory.repo_libgit2(wire) with repo_init as repo: @@ -747,7 +780,8 @@ class GitRemote(RemoteBase): @reraise_safe_exceptions def get_refs(self, wire): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _get_refs(_context_uid, _repo_id): repo_init = self._factory.repo_libgit2(wire) @@ -761,7 +795,8 @@ class GitRemote(RemoteBase): @reraise_safe_exceptions def get_branch_pointers(self, wire): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _get_branch_pointers(_context_uid, _repo_id): repo_init = self._factory.repo_libgit2(wire) @@ -775,7 +810,8 @@ class GitRemote(RemoteBase): @reraise_safe_exceptions def head(self, wire, show_exc=True): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _head(_context_uid, _repo_id, _show_exc): repo_init = self._factory.repo_libgit2(wire) with repo_init as repo: @@ -800,7 +836,8 @@ class GitRemote(RemoteBase): def revision(self, wire, rev): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _revision(_context_uid, _repo_id, _rev): repo_init = self._factory.repo_libgit2(wire) with repo_init as repo: @@ -818,7 +855,8 @@ class GitRemote(RemoteBase): @reraise_safe_exceptions def date(self, wire, commit_id): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _date(_repo_id, _commit_id): repo_init = self._factory.repo_libgit2(wire) with repo_init as repo: @@ -837,7 +875,8 @@ class GitRemote(RemoteBase): @reraise_safe_exceptions def author(self, wire, commit_id): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _author(_repo_id, _commit_id): repo_init = self._factory.repo_libgit2(wire) with repo_init as repo: @@ -861,7 +900,8 @@ class GitRemote(RemoteBase): @reraise_safe_exceptions def message(self, wire, commit_id): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _message(_repo_id, _commit_id): repo_init = self._factory.repo_libgit2(wire) with repo_init as repo: @@ -872,7 +912,8 @@ class GitRemote(RemoteBase): @reraise_safe_exceptions def parents(self, wire, commit_id): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _parents(_repo_id, _commit_id): repo_init = self._factory.repo_libgit2(wire) with repo_init as repo: @@ -888,7 +929,8 @@ class GitRemote(RemoteBase): @reraise_safe_exceptions def children(self, wire, commit_id): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _children(_repo_id, _commit_id): output, __ = self.run_git_command( wire, ['rev-list', '--all', '--children']) @@ -947,7 +989,8 @@ class GitRemote(RemoteBase): def tree_and_type_for_path(self, wire, commit_id, path): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _tree_and_type_for_path(_context_uid, _repo_id, _commit_id, _path): repo_init = self._factory.repo_libgit2(wire) @@ -964,7 +1007,8 @@ class GitRemote(RemoteBase): @reraise_safe_exceptions def tree_items(self, wire, tree_id): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _tree_items(_repo_id, _tree_id): repo_init = self._factory.repo_libgit2(wire) @@ -1065,7 +1109,8 @@ class GitRemote(RemoteBase): @reraise_safe_exceptions def node_history(self, wire, commit_id, path, limit): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _node_history(_context_uid, _repo_id, _commit_id, _path, _limit): # optimize for n==1, rev-list is much faster for that use-case if limit == 1: @@ -1107,7 +1152,8 @@ class GitRemote(RemoteBase): def get_all_commit_ids(self, wire): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _get_all_commit_ids(_context_uid, _repo_id): cmd = ['rev-list', '--reverse', '--date-order', '--branches', '--tags'] @@ -1190,3 +1236,46 @@ class GitRemote(RemoteBase): 'pre_version': get_git_pre_hook_version(path, bare), 'post_version': get_git_post_hook_version(path, bare), } + + @reraise_safe_exceptions + def set_head_ref(self, wire, head_name): + log.debug('Setting refs/head to `%s`', head_name) + cmd = ['symbolic-ref', '"HEAD"', '"refs/heads/%s"' % head_name] + output, __ = self.run_git_command(wire, cmd) + return [head_name] + output.splitlines() + + @reraise_safe_exceptions + def archive_repo(self, wire, archive_dest_path, kind, mtime, archive_at_path, + archive_dir_name, commit_id): + + def file_walker(_commit_id, path): + repo_init = self._factory.repo_libgit2(wire) + + with repo_init as repo: + commit = repo[commit_id] + + if path in ['', '/']: + tree = commit.tree + else: + tree = commit.tree[path.rstrip('/')] + tree_id = tree.id.hex + try: + tree = repo[tree_id] + except KeyError: + raise ObjectMissing('No tree with id: {}'.format(tree_id)) + + index = LibGit2Index.Index() + index.read_tree(tree) + file_iter = index + + for fn in file_iter: + file_path = fn.path + mode = fn.mode + is_link = stat.S_ISLNK(mode) + if mode == pygit2.GIT_FILEMODE_COMMIT: + log.debug('Skipping path %s as a commit node', file_path) + continue + yield ArchiveNode(file_path, mode, is_link, repo[fn.hex].read_raw) + + return archive_repo(file_walker, archive_dest_path, kind, mtime, archive_at_path, + archive_dir_name, commit_id) diff --git a/vcsserver/hg.py b/vcsserver/hg.py --- a/vcsserver/hg.py +++ b/vcsserver/hg.py @@ -14,9 +14,10 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - +import functools import io import logging +import os import stat import urllib.request, urllib.parse, urllib.error import urllib.request, urllib.error, urllib.parse @@ -31,13 +32,14 @@ from mercurial import repair import vcsserver from vcsserver import exceptions -from vcsserver.base import RepoFactory, obfuscate_qs, raise_from_original +from vcsserver.base import RepoFactory, obfuscate_qs, raise_from_original, archive_repo, ArchiveNode from vcsserver.hgcompat import ( archival, bin, clone, config as hgconfig, diffopts, hex, get_ctx, hg_url as url_parser, httpbasicauthhandler, httpdigestauthhandler, makepeer, instance, match, memctx, exchange, memfilectx, nullrev, hg_merge, patch, peer, revrange, ui, hg_tag, Abort, LookupError, RepoError, - RepoLookupError, InterventionRequired, RequirementError) + RepoLookupError, InterventionRequired, RequirementError, + alwaysmatcher, patternmatcher, hgutil) from vcsserver.vcs_base import RemoteBase log = logging.getLogger(__name__) @@ -205,25 +207,10 @@ class HgRemote(RemoteBase): return False @reraise_safe_exceptions - def archive_repo(self, archive_path, mtime, file_info, kind): - if kind == "tgz": - archiver = archival.tarit(archive_path, mtime, "gz") - elif kind == "tbz2": - archiver = archival.tarit(archive_path, mtime, "bz2") - elif kind == 'zip': - archiver = archival.zipit(archive_path, mtime) - else: - raise exceptions.ArchiveException()( - 'Remote does not support: "%s".' % kind) - - for f_path, f_mode, f_is_link, f_content in file_info: - archiver.addfile(f_path, f_mode, f_is_link, f_content) - archiver.done() - - @reraise_safe_exceptions def bookmarks(self, wire): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _bookmarks(_context_uid, _repo_id): repo = self._factory.repo(wire) return dict(repo._bookmarks) @@ -233,7 +220,8 @@ class HgRemote(RemoteBase): @reraise_safe_exceptions def branches(self, wire, normal, closed): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _branches(_context_uid, _repo_id, _normal, _closed): repo = self._factory.repo(wire) iter_branches = repo.branchmap().iterbranches() @@ -251,7 +239,8 @@ class HgRemote(RemoteBase): @reraise_safe_exceptions def bulk_request(self, wire, commit_id, pre_load): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _bulk_request(_repo_id, _commit_id, _pre_load): result = {} for attr in pre_load: @@ -268,7 +257,8 @@ class HgRemote(RemoteBase): @reraise_safe_exceptions def ctx_branch(self, wire, commit_id): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _ctx_branch(_repo_id, _commit_id): repo = self._factory.repo(wire) ctx = self._get_ctx(repo, commit_id) @@ -278,7 +268,8 @@ class HgRemote(RemoteBase): @reraise_safe_exceptions def ctx_date(self, wire, commit_id): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _ctx_date(_repo_id, _commit_id): repo = self._factory.repo(wire) ctx = self._get_ctx(repo, commit_id) @@ -294,7 +285,8 @@ class HgRemote(RemoteBase): @reraise_safe_exceptions def ctx_files(self, wire, commit_id): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _ctx_files(_repo_id, _commit_id): repo = self._factory.repo(wire) ctx = self._get_ctx(repo, commit_id) @@ -311,7 +303,8 @@ class HgRemote(RemoteBase): @reraise_safe_exceptions def ctx_parents(self, wire, commit_id): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _ctx_parents(_repo_id, _commit_id): repo = self._factory.repo(wire) ctx = self._get_ctx(repo, commit_id) @@ -323,7 +316,8 @@ class HgRemote(RemoteBase): @reraise_safe_exceptions def ctx_children(self, wire, commit_id): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _ctx_children(_repo_id, _commit_id): repo = self._factory.repo(wire) ctx = self._get_ctx(repo, commit_id) @@ -335,7 +329,8 @@ class HgRemote(RemoteBase): @reraise_safe_exceptions def ctx_phase(self, wire, commit_id): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _ctx_phase(_context_uid, _repo_id, _commit_id): repo = self._factory.repo(wire) ctx = self._get_ctx(repo, commit_id) @@ -346,7 +341,8 @@ class HgRemote(RemoteBase): @reraise_safe_exceptions def ctx_obsolete(self, wire, commit_id): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _ctx_obsolete(_context_uid, _repo_id, _commit_id): repo = self._factory.repo(wire) ctx = self._get_ctx(repo, commit_id) @@ -356,7 +352,8 @@ class HgRemote(RemoteBase): @reraise_safe_exceptions def ctx_hidden(self, wire, commit_id): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _ctx_hidden(_context_uid, _repo_id, _commit_id): repo = self._factory.repo(wire) ctx = self._get_ctx(repo, commit_id) @@ -469,7 +466,8 @@ class HgRemote(RemoteBase): @reraise_safe_exceptions def node_history(self, wire, revision, path, limit): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _node_history(_context_uid, _repo_id, _revision, _path, _limit): repo = self._factory.repo(wire) @@ -499,7 +497,8 @@ class HgRemote(RemoteBase): @reraise_safe_exceptions def node_history_untill(self, wire, revision, path, limit): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _node_history_until(_context_uid, _repo_id): repo = self._factory.repo(wire) ctx = self._get_ctx(repo, revision) @@ -537,7 +536,8 @@ class HgRemote(RemoteBase): @reraise_safe_exceptions def fctx_flags(self, wire, commit_id, path): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _fctx_flags(_repo_id, _commit_id, _path): repo = self._factory.repo(wire) ctx = self._get_ctx(repo, commit_id) @@ -549,7 +549,8 @@ class HgRemote(RemoteBase): @reraise_safe_exceptions def fctx_size(self, wire, commit_id, path): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _fctx_size(_repo_id, _revision, _path): repo = self._factory.repo(wire) ctx = self._get_ctx(repo, commit_id) @@ -560,7 +561,8 @@ class HgRemote(RemoteBase): @reraise_safe_exceptions def get_all_commit_ids(self, wire, name): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _get_all_commit_ids(_context_uid, _repo_id, _name): repo = self._factory.repo(wire) repo = repo.filtered(name) @@ -576,7 +578,8 @@ class HgRemote(RemoteBase): @reraise_safe_exceptions def is_large_file(self, wire, commit_id, path): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _is_large_file(_context_uid, _repo_id, _commit_id, _path): return largefiles.lfutil.isstandin(path) @@ -586,7 +589,8 @@ class HgRemote(RemoteBase): def is_binary(self, wire, revision, path): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _is_binary(_repo_id, _sha, _path): repo = self._factory.repo(wire) ctx = self._get_ctx(repo, revision) @@ -623,7 +627,9 @@ class HgRemote(RemoteBase): @reraise_safe_exceptions def lookup(self, wire, revision, both): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _lookup(_context_uid, _repo_id, _revision, _both): repo = self._factory.repo(wire) @@ -681,7 +687,8 @@ class HgRemote(RemoteBase): def rev_range(self, wire, commit_filter): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _rev_range(_context_uid, _repo_id, _filter): repo = self._factory.repo(wire) revisions = [rev for rev in revrange(repo, commit_filter)] @@ -756,7 +763,8 @@ class HgRemote(RemoteBase): @reraise_safe_exceptions def tags(self, wire): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _tags(_context_uid, _repo_id): repo = self._factory.repo(wire) return repo.tags() @@ -1007,3 +1015,33 @@ class HgRemote(RemoteBase): 'pre_version': vcsserver.__version__, 'post_version': vcsserver.__version__, } + + @reraise_safe_exceptions + def set_head_ref(self, wire, head_name): + pass + + @reraise_safe_exceptions + def archive_repo(self, wire, archive_dest_path, kind, mtime, archive_at_path, + archive_dir_name, commit_id): + + def file_walker(_commit_id, path): + repo = self._factory.repo(wire) + ctx = repo[_commit_id] + is_root = path in ['', '/'] + if is_root: + matcher = alwaysmatcher(badfn=None) + else: + matcher = patternmatcher('', [(b'glob', path+'/**', b'')], badfn=None) + file_iter = ctx.manifest().walk(matcher) + + for fn in file_iter: + file_path = fn + flags = ctx.flags(fn) + mode = b'x' in flags and 0o755 or 0o644 + is_link = b'l' in flags + + yield ArchiveNode(file_path, mode, is_link, ctx[fn].data) + + return archive_repo(file_walker, archive_dest_path, kind, mtime, archive_at_path, + archive_dir_name, commit_id) + diff --git a/vcsserver/hgcompat.py b/vcsserver/hgcompat.py --- a/vcsserver/hgcompat.py +++ b/vcsserver/hgcompat.py @@ -38,7 +38,7 @@ from mercurial import merge as hg_merge from mercurial import subrepo from mercurial import subrepoutil from mercurial import tags as hg_tag - +from mercurial import util as hgutil from mercurial.commands import clone, nullid, pull from mercurial.context import memctx, memfilectx from mercurial.error import ( @@ -46,7 +46,7 @@ from mercurial.error import ( RequirementError, ProgrammingError) from mercurial.hgweb import hgweb_mod from mercurial.localrepo import instance -from mercurial.match import match +from mercurial.match import match, alwaysmatcher, patternmatcher from mercurial.mdiff import diffopts from mercurial.node import bin, hex from mercurial.encoding import tolocal @@ -67,7 +67,7 @@ from mercurial.url import httpbasicauthh def get_ctx(repo, ref): try: ctx = repo[ref] - except ProgrammingError: + except (ProgrammingError, TypeError): # we're unable to find the rev using a regular lookup, we fallback # to slower, but backward compat revsymbol usage ctx = revsymbol(repo, ref) diff --git a/vcsserver/hooks.py b/vcsserver/hooks.py --- a/vcsserver/hooks.py +++ b/vcsserver/hooks.py @@ -49,7 +49,7 @@ class HooksHttpClient(object): try: connection.request('POST', '/', body) except Exception: - log.error('Connection failed on %s', connection) + log.error('Hooks calling Connection failed on %s', connection.__dict__) raise response = connection.getresponse() @@ -577,8 +577,8 @@ def git_post_receive(unused_repo_path, r try: subprocessio.run_command(cmd, env=os.environ.copy()) except Exception: - cmd = [settings.GIT_EXECUTABLE, 'symbolic-ref', 'HEAD', - 'refs/heads/%s' % push_ref['name']] + cmd = [settings.GIT_EXECUTABLE, 'symbolic-ref', '"HEAD"', + '"refs/heads/%s"' % push_ref['name']] print("Setting default branch to %s" % push_ref['name']) subprocessio.run_command(cmd, env=os.environ.copy()) diff --git a/vcsserver/http_main.py b/vcsserver/http_main.py --- a/vcsserver/http_main.py +++ b/vcsserver/http_main.py @@ -21,21 +21,27 @@ import base64 import locale import logging import uuid +import time import wsgiref.util import traceback import tempfile +import psutil + from itertools import chain from io import StringIO import simplejson as json import msgpack import configparser + from pyramid.config import Configurator from pyramid.settings import asbool, aslist from pyramid.wsgi import wsgiapp from pyramid.response import Response +from vcsserver.config.settings_maker import SettingsMaker from vcsserver.utils import safe_int +from vcsserver.lib.statsd_client import StatsdClient log = logging.getLogger(__name__) @@ -49,6 +55,7 @@ except locale.Error as e: 'LOCALE ERROR: failed to set LC_ALL, fallback to LC_ALL=C, org error: %s', e) os.environ['LC_ALL'] = 'C' + import vcsserver from vcsserver import remote_wsgi, scm_app, settings, hgpatches from vcsserver.git_lfs.app import GIT_LFS_CONTENT_TYPE, GIT_LFS_PROTO_PAT @@ -98,38 +105,12 @@ def _is_request_chunked(environ): return stream -def _int_setting(settings, name, default): - settings[name] = int(settings.get(name, default)) - return settings[name] - - -def _bool_setting(settings, name, default): - input_val = settings.get(name, default) - if isinstance(input_val, str): - input_val = input_val.encode('utf8') - settings[name] = asbool(input_val) - return settings[name] - - -def _list_setting(settings, name, default): - raw_value = settings.get(name, default) - - # Otherwise we assume it uses pyramids space/newline separation. - settings[name] = aslist(raw_value) - return settings[name] - - -def _string_setting(settings, name, default, lower=True, default_when_empty=False): - value = settings.get(name, default) - - if default_when_empty and not value: - # use default value when value is empty - value = default - - if lower: - value = value.lower() - settings[name] = value - return settings[name] +def log_max_fd(): + try: + maxfd = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)[1] + log.info('Max file descriptors value: %s', maxfd) + except Exception: + pass class VCS(object): @@ -138,6 +119,8 @@ class VCS(object): self.cache_config = cache_config self._configure_locale() + log_max_fd() + if GitFactory and GitRemote: git_factory = GitFactory() self._git_remote = GitRemote(git_factory) @@ -243,9 +226,11 @@ class HTTPApplication(object): _use_echo_app = False def __init__(self, settings=None, global_config=None): - self._sanitize_settings_and_apply_defaults(settings) self.config = Configurator(settings=settings) + # Init our statsd at very start + self.config.registry.statsd = StatsdClient.statsd + self.global_config = global_config self.config.include('vcsserver.lib.rc_cache') @@ -263,6 +248,7 @@ class HTTPApplication(object): self.remote_wsgi = remote_wsgi_stub self._configure_settings(global_config, settings) + self._configure() def _configure_settings(self, global_config, app_settings): @@ -283,40 +269,6 @@ class HTTPApplication(object): vcsserver.PYRAMID_SETTINGS = settings_merged vcsserver.CONFIG = settings_merged - def _sanitize_settings_and_apply_defaults(self, settings): - temp_store = tempfile.gettempdir() - default_cache_dir = os.path.join(temp_store, 'rc_cache') - - # save default, cache dir, and use it for all backends later. - default_cache_dir = _string_setting( - settings, - 'cache_dir', - default_cache_dir, lower=False, default_when_empty=True) - - # ensure we have our dir created - if not os.path.isdir(default_cache_dir): - os.makedirs(default_cache_dir, mode=0o755) - - # exception store cache - _string_setting( - settings, - 'exception_tracker.store_path', - temp_store, lower=False, default_when_empty=True) - - # repo_object cache - _string_setting( - settings, - 'rc_cache.repo_object.backend', - 'dogpile.cache.rc.file_namespace', lower=False) - _int_setting( - settings, - 'rc_cache.repo_object.expiration_time', - 30 * 24 * 60 * 60) - _string_setting( - settings, - 'rc_cache.repo_object.arguments.filename', - os.path.join(default_cache_dir, 'vcsserver_cache_1'), lower=False) - def _configure(self): self.config.add_renderer(name='msgpack', factory=self._msgpack_renderer_factory) @@ -385,16 +337,22 @@ class HTTPApplication(object): # NOTE(marcink): trading complexity for slight performance if log.isEnabledFor(logging.DEBUG): no_args_methods = [ - 'archive_repo' + ] if method in no_args_methods: call_args = '' else: call_args = args[1:] - log.debug('method requested:%s with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s', + log.debug('Method requested:`%s` with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s', method, call_args, kwargs, context_uid, repo_state_uid) + statsd = request.registry.statsd + if statsd: + statsd.incr( + 'vcsserver_method_total', tags=[ + "method:{}".format(method), + ]) return payload, remote, method, args, kwargs def vcs_view(self, request): @@ -431,7 +389,7 @@ class HTTPApplication(object): should_store_exc = False if should_store_exc: - store_exception(id(exc_info), exc_info) + store_exception(id(exc_info), exc_info, request_path=request.path) tb_info = ''.join( traceback.format_exception(exc_type, exc_value, exc_traceback)) @@ -450,6 +408,7 @@ class HTTPApplication(object): 'type': type_ } } + try: resp['error']['_vcs_kind'] = getattr(e, '_vcs_kind', None) except AttributeError: @@ -678,6 +637,12 @@ class HTTPApplication(object): log.error( 'error occurred handling this request for path: %s, \n tb: %s', request.path, traceback_info) + + statsd = request.registry.statsd + if statsd: + exc_type = "{}.{}".format(exception.__class__.__module__, exception.__class__.__name__) + statsd.incr('vcsserver_exception_total', + tags=["type:{}".format(exc_type)]) raise exception @@ -693,10 +658,83 @@ class ResponseFilter(object): return self._start_response(status, headers, exc_info) +def sanitize_settings_and_apply_defaults(global_config, settings): + global_settings_maker = SettingsMaker(global_config) + settings_maker = SettingsMaker(settings) + + settings_maker.make_setting('logging.autoconfigure', False, parser='bool') + + logging_conf = os.path.join(os.path.dirname(global_config.get('__file__')), 'logging.ini') + settings_maker.enable_logging(logging_conf) + + # Default includes, possible to change as a user + pyramid_includes = settings_maker.make_setting('pyramid.includes', [], parser='list:newline') + log.debug("Using the following pyramid.includes: %s", pyramid_includes) + + settings_maker.make_setting('__file__', global_config.get('__file__')) + + settings_maker.make_setting('pyramid.default_locale_name', 'en') + settings_maker.make_setting('locale', 'en_US.UTF-8') + + settings_maker.make_setting('core.binary_dir', '') + + temp_store = tempfile.gettempdir() + default_cache_dir = os.path.join(temp_store, 'rc_cache') + # save default, cache dir, and use it for all backends later. + default_cache_dir = settings_maker.make_setting( + 'cache_dir', + default=default_cache_dir, default_when_empty=True, + parser='dir:ensured') + + # exception store cache + settings_maker.make_setting( + 'exception_tracker.store_path', + default=os.path.join(default_cache_dir, 'exc_store'), default_when_empty=True, + parser='dir:ensured' + ) + + # repo_object cache defaults + settings_maker.make_setting( + 'rc_cache.repo_object.backend', + default='dogpile.cache.rc.file_namespace', + parser='string') + settings_maker.make_setting( + 'rc_cache.repo_object.expiration_time', + default=30 * 24 * 60 * 60, # 30days + parser='int') + settings_maker.make_setting( + 'rc_cache.repo_object.arguments.filename', + default=os.path.join(default_cache_dir, 'vcsserver_cache_repo_object.db'), + parser='string') + + # statsd + settings_maker.make_setting('statsd.enabled', False, parser='bool') + settings_maker.make_setting('statsd.statsd_host', 'statsd-exporter', parser='string') + settings_maker.make_setting('statsd.statsd_port', 9125, parser='int') + settings_maker.make_setting('statsd.statsd_prefix', '') + settings_maker.make_setting('statsd.statsd_ipv6', False, parser='bool') + + settings_maker.env_expand() + + def main(global_config, **settings): + start_time = time.time() + log.info('Pyramid app config starting') + if MercurialFactory: hgpatches.patch_largefiles_capabilities() hgpatches.patch_subrepo_type_mapping() - app = HTTPApplication(settings=settings, global_config=global_config) - return app.wsgi_app() + # Fill in and sanitize the defaults & do ENV expansion + sanitize_settings_and_apply_defaults(global_config, settings) + + # init and bootstrap StatsdClient + StatsdClient.setup(settings) + + pyramid_app = HTTPApplication(settings=settings, global_config=global_config).wsgi_app() + total_time = time.time() - start_time + log.info('Pyramid app `%s` created and configured in %.2fs', + getattr(pyramid_app, 'func_name', 'pyramid_app'), total_time) + return pyramid_app + + diff --git a/vcsserver/lib/_vendor/__init__.py b/vcsserver/lib/_vendor/__init__.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/_vendor/__init__.py @@ -0,0 +1,26 @@ +# RhodeCode VCSServer provides access to different vcs backends via network. +# Copyright (C) 2014-2020 RhodeCode GmbH +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +# This package contains non rhodecode licensed packages that are +# vendored for various reasons + +import os +import sys + +vendor_dir = os.path.abspath(os.path.dirname(__file__)) + +sys.path.append(vendor_dir) diff --git a/vcsserver/lib/_vendor/jsonlogger/__init__.py b/vcsserver/lib/_vendor/jsonlogger/__init__.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/_vendor/jsonlogger/__init__.py @@ -0,0 +1,243 @@ +''' +This library is provided to allow standard python logging +to output log data as JSON formatted strings +''' +import logging +import json +import re +from datetime import date, datetime, time, tzinfo, timedelta +import traceback +import importlib + +from inspect import istraceback + +from collections import OrderedDict + + +def _inject_req_id(record, *args, **kwargs): + return record + + +ExceptionAwareFormatter = logging.Formatter + + +ZERO = timedelta(0) +HOUR = timedelta(hours=1) + + +class UTC(tzinfo): + """UTC""" + + def utcoffset(self, dt): + return ZERO + + def tzname(self, dt): + return "UTC" + + def dst(self, dt): + return ZERO + +utc = UTC() + + +# skip natural LogRecord attributes +# http://docs.python.org/library/logging.html#logrecord-attributes +RESERVED_ATTRS = ( + 'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename', + 'funcName', 'levelname', 'levelno', 'lineno', 'module', + 'msecs', 'message', 'msg', 'name', 'pathname', 'process', + 'processName', 'relativeCreated', 'stack_info', 'thread', 'threadName') + + +def merge_record_extra(record, target, reserved): + """ + Merges extra attributes from LogRecord object into target dictionary + + :param record: logging.LogRecord + :param target: dict to update + :param reserved: dict or list with reserved keys to skip + """ + for key, value in record.__dict__.items(): + # this allows to have numeric keys + if (key not in reserved + and not (hasattr(key, "startswith") + and key.startswith('_'))): + target[key] = value + return target + + +class JsonEncoder(json.JSONEncoder): + """ + A custom encoder extending the default JSONEncoder + """ + + def default(self, obj): + if isinstance(obj, (date, datetime, time)): + return self.format_datetime_obj(obj) + + elif istraceback(obj): + return ''.join(traceback.format_tb(obj)).strip() + + elif type(obj) == Exception \ + or isinstance(obj, Exception) \ + or type(obj) == type: + return str(obj) + + try: + return super(JsonEncoder, self).default(obj) + + except TypeError: + try: + return str(obj) + + except Exception: + return None + + def format_datetime_obj(self, obj): + return obj.isoformat() + + +class JsonFormatter(ExceptionAwareFormatter): + """ + A custom formatter to format logging records as json strings. + Extra values will be formatted as str() if not supported by + json default encoder + """ + + def __init__(self, *args, **kwargs): + """ + :param json_default: a function for encoding non-standard objects + as outlined in http://docs.python.org/2/library/json.html + :param json_encoder: optional custom encoder + :param json_serializer: a :meth:`json.dumps`-compatible callable + that will be used to serialize the log record. + :param json_indent: an optional :meth:`json.dumps`-compatible numeric value + that will be used to customize the indent of the output json. + :param prefix: an optional string prefix added at the beginning of + the formatted string + :param json_indent: indent parameter for json.dumps + :param json_ensure_ascii: ensure_ascii parameter for json.dumps + :param reserved_attrs: an optional list of fields that will be skipped when + outputting json log record. Defaults to all log record attributes: + http://docs.python.org/library/logging.html#logrecord-attributes + :param timestamp: an optional string/boolean field to add a timestamp when + outputting the json log record. If string is passed, timestamp will be added + to log record using string as key. If True boolean is passed, timestamp key + will be "timestamp". Defaults to False/off. + """ + self.json_default = self._str_to_fn(kwargs.pop("json_default", None)) + self.json_encoder = self._str_to_fn(kwargs.pop("json_encoder", None)) + self.json_serializer = self._str_to_fn(kwargs.pop("json_serializer", json.dumps)) + self.json_indent = kwargs.pop("json_indent", None) + self.json_ensure_ascii = kwargs.pop("json_ensure_ascii", True) + self.prefix = kwargs.pop("prefix", "") + reserved_attrs = kwargs.pop("reserved_attrs", RESERVED_ATTRS) + self.reserved_attrs = dict(zip(reserved_attrs, reserved_attrs)) + self.timestamp = kwargs.pop("timestamp", True) + + # super(JsonFormatter, self).__init__(*args, **kwargs) + logging.Formatter.__init__(self, *args, **kwargs) + if not self.json_encoder and not self.json_default: + self.json_encoder = JsonEncoder + + self._required_fields = self.parse() + self._skip_fields = dict(zip(self._required_fields, + self._required_fields)) + self._skip_fields.update(self.reserved_attrs) + + def _str_to_fn(self, fn_as_str): + """ + If the argument is not a string, return whatever was passed in. + Parses a string such as package.module.function, imports the module + and returns the function. + + :param fn_as_str: The string to parse. If not a string, return it. + """ + if not isinstance(fn_as_str, str): + return fn_as_str + + path, _, function = fn_as_str.rpartition('.') + module = importlib.import_module(path) + return getattr(module, function) + + def parse(self): + """ + Parses format string looking for substitutions + + This method is responsible for returning a list of fields (as strings) + to include in all log messages. + """ + standard_formatters = re.compile(r'\((.+?)\)', re.IGNORECASE) + return standard_formatters.findall(self._fmt) + + def add_fields(self, log_record, record, message_dict): + """ + Override this method to implement custom logic for adding fields. + """ + for field in self._required_fields: + log_record[field] = record.__dict__.get(field) + log_record.update(message_dict) + merge_record_extra(record, log_record, reserved=self._skip_fields) + + if self.timestamp: + key = self.timestamp if type(self.timestamp) == str else 'timestamp' + log_record[key] = datetime.fromtimestamp(record.created, tz=utc) + + def process_log_record(self, log_record): + """ + Override this method to implement custom logic + on the possibly ordered dictionary. + """ + return log_record + + def jsonify_log_record(self, log_record): + """Returns a json string of the log record.""" + return self.json_serializer(log_record, + default=self.json_default, + cls=self.json_encoder, + indent=self.json_indent, + ensure_ascii=self.json_ensure_ascii) + + def serialize_log_record(self, log_record): + """Returns the final representation of the log record.""" + return "%s%s" % (self.prefix, self.jsonify_log_record(log_record)) + + def format(self, record): + """Formats a log record and serializes to json""" + message_dict = {} + # FIXME: logging.LogRecord.msg and logging.LogRecord.message in typeshed + # are always type of str. We shouldn't need to override that. + if isinstance(record.msg, dict): + message_dict = record.msg + record.message = None + else: + record.message = record.getMessage() + # only format time if needed + if "asctime" in self._required_fields: + record.asctime = self.formatTime(record, self.datefmt) + + # Display formatted exception, but allow overriding it in the + # user-supplied dict. + if record.exc_info and not message_dict.get('exc_info'): + message_dict['exc_info'] = self.formatException(record.exc_info) + if not message_dict.get('exc_info') and record.exc_text: + message_dict['exc_info'] = record.exc_text + # Display formatted record of stack frames + # default format is a string returned from :func:`traceback.print_stack` + try: + if record.stack_info and not message_dict.get('stack_info'): + message_dict['stack_info'] = self.formatStack(record.stack_info) + except AttributeError: + # Python2.7 doesn't have stack_info. + pass + + try: + log_record = OrderedDict() + except NameError: + log_record = {} + + _inject_req_id(record, with_prefix=False) + self.add_fields(log_record, record, message_dict) + log_record = self.process_log_record(log_record) + + return self.serialize_log_record(log_record) diff --git a/vcsserver/lib/_vendor/redis_lock/__init__.py b/vcsserver/lib/_vendor/redis_lock/__init__.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/_vendor/redis_lock/__init__.py @@ -0,0 +1,390 @@ +import sys +import threading +import weakref +from base64 import b64encode +from logging import getLogger +from os import urandom + +from redis import StrictRedis + +__version__ = '3.7.0' + +loggers = { + k: getLogger("vcsserver." + ".".join((__name__, k))) + for k in [ + "acquire", + "refresh.thread.start", + "refresh.thread.stop", + "refresh.thread.exit", + "refresh.start", + "refresh.shutdown", + "refresh.exit", + "release", + ] +} + +PY3 = sys.version_info[0] == 3 + +if PY3: + text_type = str + binary_type = bytes +else: + text_type = unicode # noqa + binary_type = str + + +# Check if the id match. If not, return an error code. +UNLOCK_SCRIPT = b""" + if redis.call("get", KEYS[1]) ~= ARGV[1] then + return 1 + else + redis.call("del", KEYS[2]) + redis.call("lpush", KEYS[2], 1) + redis.call("pexpire", KEYS[2], ARGV[2]) + redis.call("del", KEYS[1]) + return 0 + end +""" + +# Covers both cases when key doesn't exist and doesn't equal to lock's id +EXTEND_SCRIPT = b""" + if redis.call("get", KEYS[1]) ~= ARGV[1] then + return 1 + elseif redis.call("ttl", KEYS[1]) < 0 then + return 2 + else + redis.call("expire", KEYS[1], ARGV[2]) + return 0 + end +""" + +RESET_SCRIPT = b""" + redis.call('del', KEYS[2]) + redis.call('lpush', KEYS[2], 1) + redis.call('pexpire', KEYS[2], ARGV[2]) + return redis.call('del', KEYS[1]) +""" + +RESET_ALL_SCRIPT = b""" + local locks = redis.call('keys', 'lock:*') + local signal + for _, lock in pairs(locks) do + signal = 'lock-signal:' .. string.sub(lock, 6) + redis.call('del', signal) + redis.call('lpush', signal, 1) + redis.call('expire', signal, 1) + redis.call('del', lock) + end + return #locks +""" + + +class AlreadyAcquired(RuntimeError): + pass + + +class NotAcquired(RuntimeError): + pass + + +class AlreadyStarted(RuntimeError): + pass + + +class TimeoutNotUsable(RuntimeError): + pass + + +class InvalidTimeout(RuntimeError): + pass + + +class TimeoutTooLarge(RuntimeError): + pass + + +class NotExpirable(RuntimeError): + pass + + +class Lock(object): + """ + A Lock context manager implemented via redis SETNX/BLPOP. + """ + unlock_script = None + extend_script = None + reset_script = None + reset_all_script = None + + def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000): + """ + :param redis_client: + An instance of :class:`~StrictRedis`. + :param name: + The name (redis key) the lock should have. + :param expire: + The lock expiry time in seconds. If left at the default (None) + the lock will not expire. + :param id: + The ID (redis value) the lock should have. A random value is + generated when left at the default. + + Note that if you specify this then the lock is marked as "held". Acquires + won't be possible. + :param auto_renewal: + If set to ``True``, Lock will automatically renew the lock so that it + doesn't expire for as long as the lock is held (acquire() called + or running in a context manager). + + Implementation note: Renewal will happen using a daemon thread with + an interval of ``expire*2/3``. If wishing to use a different renewal + time, subclass Lock, call ``super().__init__()`` then set + ``self._lock_renewal_interval`` to your desired interval. + :param strict: + If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``. + :param signal_expire: + Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``. + """ + if strict and not isinstance(redis_client, StrictRedis): + raise ValueError("redis_client must be instance of StrictRedis. " + "Use strict=False if you know what you're doing.") + if auto_renewal and expire is None: + raise ValueError("Expire may not be None when auto_renewal is set") + + self._client = redis_client + + if expire: + expire = int(expire) + if expire < 0: + raise ValueError("A negative expire is not acceptable.") + else: + expire = None + self._expire = expire + + self._signal_expire = signal_expire + if id is None: + self._id = b64encode(urandom(18)).decode('ascii') + elif isinstance(id, binary_type): + try: + self._id = id.decode('ascii') + except UnicodeDecodeError: + self._id = b64encode(id).decode('ascii') + elif isinstance(id, text_type): + self._id = id + else: + raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id)) + self._name = 'lock:' + name + self._signal = 'lock-signal:' + name + self._lock_renewal_interval = (float(expire) * 2 / 3 + if auto_renewal + else None) + self._lock_renewal_thread = None + + self.register_scripts(redis_client) + + @classmethod + def register_scripts(cls, redis_client): + global reset_all_script + if reset_all_script is None: + reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT) + cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT) + cls.extend_script = redis_client.register_script(EXTEND_SCRIPT) + cls.reset_script = redis_client.register_script(RESET_SCRIPT) + cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT) + + @property + def _held(self): + return self.id == self.get_owner_id() + + def reset(self): + """ + Forcibly deletes the lock. Use this with care. + """ + self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire)) + + @property + def id(self): + return self._id + + def get_owner_id(self): + owner_id = self._client.get(self._name) + if isinstance(owner_id, binary_type): + owner_id = owner_id.decode('ascii', 'replace') + return owner_id + + def acquire(self, blocking=True, timeout=None): + """ + :param blocking: + Boolean value specifying whether lock should be blocking or not. + :param timeout: + An integer value specifying the maximum number of seconds to block. + """ + logger = loggers["acquire"] + + logger.debug("Getting blocking: %s acquire on %r ...", blocking, self._name) + + if self._held: + owner_id = self.get_owner_id() + raise AlreadyAcquired("Already acquired from this Lock instance. Lock id: {}".format(owner_id)) + + if not blocking and timeout is not None: + raise TimeoutNotUsable("Timeout cannot be used if blocking=False") + + if timeout: + timeout = int(timeout) + if timeout < 0: + raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout) + + if self._expire and not self._lock_renewal_interval and timeout > self._expire: + raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire)) + + busy = True + blpop_timeout = timeout or self._expire or 0 + timed_out = False + while busy: + busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire) + if busy: + if timed_out: + return False + elif blocking: + timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout + else: + logger.warning("Failed to get %r.", self._name) + return False + + logger.debug("Got lock for %r.", self._name) + if self._lock_renewal_interval is not None: + self._start_lock_renewer() + return True + + def extend(self, expire=None): + """Extends expiration time of the lock. + + :param expire: + New expiration time. If ``None`` - `expire` provided during + lock initialization will be taken. + """ + if expire: + expire = int(expire) + if expire < 0: + raise ValueError("A negative expire is not acceptable.") + elif self._expire is not None: + expire = self._expire + else: + raise TypeError( + "To extend a lock 'expire' must be provided as an " + "argument to extend() method or at initialization time." + ) + + error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire)) + if error == 1: + raise NotAcquired("Lock %s is not acquired or it already expired." % self._name) + elif error == 2: + raise NotExpirable("Lock %s has no assigned expiration time" % self._name) + elif error: + raise RuntimeError("Unsupported error code %s from EXTEND script" % error) + + @staticmethod + def _lock_renewer(lockref, interval, stop): + """ + Renew the lock key in redis every `interval` seconds for as long + as `self._lock_renewal_thread.should_exit` is False. + """ + while not stop.wait(timeout=interval): + loggers["refresh.thread.start"].debug("Refreshing lock") + lock = lockref() + if lock is None: + loggers["refresh.thread.stop"].debug( + "The lock no longer exists, stopping lock refreshing" + ) + break + lock.extend(expire=lock._expire) + del lock + loggers["refresh.thread.exit"].debug("Exit requested, stopping lock refreshing") + + def _start_lock_renewer(self): + """ + Starts the lock refresher thread. + """ + if self._lock_renewal_thread is not None: + raise AlreadyStarted("Lock refresh thread already started") + + loggers["refresh.start"].debug( + "Starting thread to refresh lock every %s seconds", + self._lock_renewal_interval + ) + self._lock_renewal_stop = threading.Event() + self._lock_renewal_thread = threading.Thread( + group=None, + target=self._lock_renewer, + kwargs={'lockref': weakref.ref(self), + 'interval': self._lock_renewal_interval, + 'stop': self._lock_renewal_stop} + ) + self._lock_renewal_thread.setDaemon(True) + self._lock_renewal_thread.start() + + def _stop_lock_renewer(self): + """ + Stop the lock renewer. + + This signals the renewal thread and waits for its exit. + """ + if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive(): + return + loggers["refresh.shutdown"].debug("Signalling the lock refresher to stop") + self._lock_renewal_stop.set() + self._lock_renewal_thread.join() + self._lock_renewal_thread = None + loggers["refresh.exit"].debug("Lock refresher has stopped") + + def __enter__(self): + acquired = self.acquire(blocking=True) + assert acquired, "Lock wasn't acquired, but blocking=True" + return self + + def __exit__(self, exc_type=None, exc_value=None, traceback=None): + self.release() + + def release(self): + """Releases the lock, that was acquired with the same object. + + .. note:: + + If you want to release a lock that you acquired in a different place you have two choices: + + * Use ``Lock("name", id=id_from_other_place).release()`` + * Use ``Lock("name").reset()`` + """ + if self._lock_renewal_thread is not None: + self._stop_lock_renewer() + loggers["release"].debug("Releasing %r.", self._name) + error = self.unlock_script(client=self._client, keys=(self._name, self._signal), args=(self._id, self._signal_expire)) + if error == 1: + raise NotAcquired("Lock %s is not acquired or it already expired." % self._name) + elif error: + raise RuntimeError("Unsupported error code %s from EXTEND script." % error) + + def locked(self): + """ + Return true if the lock is acquired. + + Checks that lock with same name already exists. This method returns true, even if + lock have another id. + """ + return self._client.exists(self._name) == 1 + + +reset_all_script = None + + +def reset_all(redis_client): + """ + Forcibly deletes all locks if its remains (like a crash reason). Use this with care. + + :param redis_client: + An instance of :class:`~StrictRedis`. + """ + Lock.register_scripts(redis_client) + + reset_all_script(client=redis_client) # noqa diff --git a/vcsserver/lib/_vendor/statsd/__init__.py b/vcsserver/lib/_vendor/statsd/__init__.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/_vendor/statsd/__init__.py @@ -0,0 +1,52 @@ +from __future__ import absolute_import, division, unicode_literals + +import logging + +from .stream import TCPStatsClient, UnixSocketStatsClient # noqa +from .udp import StatsClient # noqa + +HOST = 'localhost' +PORT = 8125 +IPV6 = False +PREFIX = None +MAXUDPSIZE = 512 + +log = logging.getLogger('rhodecode.statsd') + + +def statsd_config(config, prefix='statsd.'): + _config = {} + for key in config.keys(): + if key.startswith(prefix): + _config[key[len(prefix):]] = config[key] + return _config + + +def client_from_config(configuration, prefix='statsd.', **kwargs): + from pyramid.settings import asbool + + _config = statsd_config(configuration, prefix) + statsd_enabled = asbool(_config.pop('enabled', False)) + if not statsd_enabled: + log.debug('statsd client not enabled by statsd.enabled = flag, skipping...') + return + + host = _config.pop('statsd_host', HOST) + port = _config.pop('statsd_port', PORT) + prefix = _config.pop('statsd_prefix', PREFIX) + maxudpsize = _config.pop('statsd_maxudpsize', MAXUDPSIZE) + ipv6 = asbool(_config.pop('statsd_ipv6', IPV6)) + log.debug('configured statsd client %s:%s', host, port) + + try: + client = StatsClient( + host=host, port=port, prefix=prefix, maxudpsize=maxudpsize, ipv6=ipv6) + except Exception: + log.exception('StatsD is enabled, but failed to connect to statsd server, fallback: disable statsd') + client = None + + return client + + +def get_statsd_client(request): + return client_from_config(request.registry.settings) diff --git a/vcsserver/lib/_vendor/statsd/base.py b/vcsserver/lib/_vendor/statsd/base.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/_vendor/statsd/base.py @@ -0,0 +1,156 @@ +from __future__ import absolute_import, division, unicode_literals + +import re +import random +from collections import deque +from datetime import timedelta +from repoze.lru import lru_cache + +from .timer import Timer + +TAG_INVALID_CHARS_RE = re.compile( + r"[^\w\d_\-:/\.]", + #re.UNICODE +) +TAG_INVALID_CHARS_SUBS = "_" + +# we save and expose methods called by statsd for discovery +buckets_dict = { + +} + + +@lru_cache(maxsize=500) +def _normalize_tags_with_cache(tag_list): + return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list] + + +def normalize_tags(tag_list): + # We have to turn our input tag list into a non-mutable tuple for it to + # be hashable (and thus usable) by the @lru_cache decorator. + return _normalize_tags_with_cache(tuple(tag_list)) + + +class StatsClientBase(object): + """A Base class for various statsd clients.""" + + def close(self): + """Used to close and clean up any underlying resources.""" + raise NotImplementedError() + + def _send(self): + raise NotImplementedError() + + def pipeline(self): + raise NotImplementedError() + + def timer(self, stat, rate=1, tags=None, auto_send=True): + """ + statsd = StatsdClient.statsd + with statsd.timer('bucket_name', auto_send=True) as tmr: + # This block will be timed. + for i in xrange(0, 100000): + i ** 2 + # you can access time here... + elapsed_ms = tmr.ms + """ + return Timer(self, stat, rate, tags, auto_send=auto_send) + + def timing(self, stat, delta, rate=1, tags=None, use_decimals=True): + """ + Send new timing information. + + `delta` can be either a number of milliseconds or a timedelta. + """ + if isinstance(delta, timedelta): + # Convert timedelta to number of milliseconds. + delta = delta.total_seconds() * 1000. + if use_decimals: + fmt = '%0.6f|ms' + else: + fmt = '%s|ms' + self._send_stat(stat, fmt % delta, rate, tags) + + def incr(self, stat, count=1, rate=1, tags=None): + """Increment a stat by `count`.""" + self._send_stat(stat, '%s|c' % count, rate, tags) + + def decr(self, stat, count=1, rate=1, tags=None): + """Decrement a stat by `count`.""" + self.incr(stat, -count, rate, tags) + + def gauge(self, stat, value, rate=1, delta=False, tags=None): + """Set a gauge value.""" + if value < 0 and not delta: + if rate < 1: + if random.random() > rate: + return + with self.pipeline() as pipe: + pipe._send_stat(stat, '0|g', 1) + pipe._send_stat(stat, '%s|g' % value, 1) + else: + prefix = '+' if delta and value >= 0 else '' + self._send_stat(stat, '%s%s|g' % (prefix, value), rate, tags) + + def set(self, stat, value, rate=1): + """Set a set value.""" + self._send_stat(stat, '%s|s' % value, rate) + + def histogram(self, stat, value, rate=1, tags=None): + """Set a histogram""" + self._send_stat(stat, '%s|h' % value, rate, tags) + + def _send_stat(self, stat, value, rate, tags=None): + self._after(self._prepare(stat, value, rate, tags)) + + def _prepare(self, stat, value, rate, tags=None): + global buckets_dict + buckets_dict[stat] = 1 + + if rate < 1: + if random.random() > rate: + return + value = '%s|@%s' % (value, rate) + + if self._prefix: + stat = '%s.%s' % (self._prefix, stat) + + res = '%s:%s%s' % ( + stat, + value, + ("|#" + ",".join(normalize_tags(tags))) if tags else "", + ) + return res + + def _after(self, data): + if data: + self._send(data) + + +class PipelineBase(StatsClientBase): + + def __init__(self, client): + self._client = client + self._prefix = client._prefix + self._stats = deque() + + def _send(self): + raise NotImplementedError() + + def _after(self, data): + if data is not None: + self._stats.append(data) + + def __enter__(self): + return self + + def __exit__(self, typ, value, tb): + self.send() + + def send(self): + if not self._stats: + return + self._send() + + def pipeline(self): + return self.__class__(self) diff --git a/vcsserver/lib/_vendor/statsd/stream.py b/vcsserver/lib/_vendor/statsd/stream.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/_vendor/statsd/stream.py @@ -0,0 +1,75 @@ +from __future__ import absolute_import, division, unicode_literals + +import socket + +from .base import StatsClientBase, PipelineBase + + +class StreamPipeline(PipelineBase): + def _send(self): + self._client._after('\n'.join(self._stats)) + self._stats.clear() + + +class StreamClientBase(StatsClientBase): + def connect(self): + raise NotImplementedError() + + def close(self): + if self._sock and hasattr(self._sock, 'close'): + self._sock.close() + self._sock = None + + def reconnect(self): + self.close() + self.connect() + + def pipeline(self): + return StreamPipeline(self) + + def _send(self, data): + """Send data to statsd.""" + if not self._sock: + self.connect() + self._do_send(data) + + def _do_send(self, data): + self._sock.sendall(data.encode('ascii') + b'\n') + + +class TCPStatsClient(StreamClientBase): + """TCP version of StatsClient.""" + + def __init__(self, host='localhost', port=8125, prefix=None, + timeout=None, ipv6=False): + """Create a new client.""" + self._host = host + self._port = port + self._ipv6 = ipv6 + self._timeout = timeout + self._prefix = prefix + self._sock = None + + def connect(self): + fam = socket.AF_INET6 if self._ipv6 else socket.AF_INET + family, _, _, _, addr = socket.getaddrinfo( + self._host, self._port, fam, socket.SOCK_STREAM)[0] + self._sock = socket.socket(family, socket.SOCK_STREAM) + self._sock.settimeout(self._timeout) + self._sock.connect(addr) + + +class UnixSocketStatsClient(StreamClientBase): + """Unix domain socket version of StatsClient.""" + + def __init__(self, socket_path, prefix=None, timeout=None): + """Create a new client.""" + self._socket_path = socket_path + self._timeout = timeout + self._prefix = prefix + self._sock = None + + def connect(self): + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self._sock.settimeout(self._timeout) + self._sock.connect(self._socket_path) diff --git a/vcsserver/lib/_vendor/statsd/timer.py b/vcsserver/lib/_vendor/statsd/timer.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/_vendor/statsd/timer.py @@ -0,0 +1,75 @@ +from __future__ import absolute_import, division, unicode_literals + +import functools + +# Use timer that's not susceptible to time of day adjustments. +try: + # perf_counter is only present on Py3.3+ + from time import perf_counter as time_now +except ImportError: + # fall back to using time + from time import time as time_now + + +def safe_wraps(wrapper, *args, **kwargs): + """Safely wraps partial functions.""" + while isinstance(wrapper, functools.partial): + wrapper = wrapper.func + return functools.wraps(wrapper, *args, **kwargs) + + +class Timer(object): + """A context manager/decorator for statsd.timing().""" + + def __init__(self, client, stat, rate=1, tags=None, use_decimals=True, auto_send=True): + self.client = client + self.stat = stat + self.rate = rate + self.tags = tags + self.ms = None + self._sent = False + self._start_time = None + self.use_decimals = use_decimals + self.auto_send = auto_send + + def __call__(self, f): + """Thread-safe timing function decorator.""" + @safe_wraps(f) + def _wrapped(*args, **kwargs): + start_time = time_now() + try: + return f(*args, **kwargs) + finally: + elapsed_time_ms = 1000.0 * (time_now() - start_time) + self.client.timing(self.stat, elapsed_time_ms, self.rate, self.tags, self.use_decimals) + self._sent = True + return _wrapped + + def __enter__(self): + return self.start() + + def __exit__(self, typ, value, tb): + self.stop(send=self.auto_send) + + def start(self): + self.ms = None + self._sent = False + self._start_time = time_now() + return self + + def stop(self, send=True): + if self._start_time is None: + raise RuntimeError('Timer has not started.') + dt = time_now() - self._start_time + self.ms = 1000.0 * dt # Convert to milliseconds. + if send: + self.send() + return self + + def send(self): + if self.ms is None: + raise RuntimeError('No data recorded.') + if self._sent: + raise RuntimeError('Already sent data.') + self._sent = True + self.client.timing(self.stat, self.ms, self.rate, self.tags, self.use_decimals) diff --git a/vcsserver/lib/_vendor/statsd/udp.py b/vcsserver/lib/_vendor/statsd/udp.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/_vendor/statsd/udp.py @@ -0,0 +1,55 @@ +from __future__ import absolute_import, division, unicode_literals + +import socket + +from .base import StatsClientBase, PipelineBase + + +class Pipeline(PipelineBase): + + def __init__(self, client): + super(Pipeline, self).__init__(client) + self._maxudpsize = client._maxudpsize + + def _send(self): + data = self._stats.popleft() + while self._stats: + # Use popleft to preserve the order of the stats. + stat = self._stats.popleft() + if len(stat) + len(data) + 1 >= self._maxudpsize: + self._client._after(data) + data = stat + else: + data += '\n' + stat + self._client._after(data) + + +class StatsClient(StatsClientBase): + """A client for statsd.""" + + def __init__(self, host='localhost', port=8125, prefix=None, + maxudpsize=512, ipv6=False): + """Create a new client.""" + fam = socket.AF_INET6 if ipv6 else socket.AF_INET + family, _, _, _, addr = socket.getaddrinfo( + host, port, fam, socket.SOCK_DGRAM)[0] + self._addr = addr + self._sock = socket.socket(family, socket.SOCK_DGRAM) + self._prefix = prefix + self._maxudpsize = maxudpsize + + def _send(self, data): + """Send data to statsd.""" + try: + self._sock.sendto(data.encode('ascii'), self._addr) + except (socket.error, RuntimeError): + # No time for love, Dr. Jones! + pass + + def close(self): + if self._sock and hasattr(self._sock, 'close'): + self._sock.close() + self._sock = None + + def pipeline(self): + return Pipeline(self) diff --git a/vcsserver/lib/exc_tracking.py b/vcsserver/lib/exc_tracking.py --- a/vcsserver/lib/exc_tracking.py +++ b/vcsserver/lib/exc_tracking.py @@ -66,7 +66,7 @@ def get_exc_store(): return _exc_store_path -def _store_exception(exc_id, exc_info, prefix): +def _store_exception(exc_id, exc_info, prefix, request_path=''): exc_type, exc_value, exc_traceback = exc_info tb = ''.join(traceback.format_exception( @@ -99,8 +99,13 @@ def _store_exception(exc_id, exc_info, p f.write(exc_data) log.debug('Stored generated exception %s as: %s', exc_id, stored_exc_path) + log.error( + 'error occurred handling this request.\n' + 'Path: `%s`, tb: %s', + request_path, tb) -def store_exception(exc_id, exc_info, prefix=global_prefix): + +def store_exception(exc_id, exc_info, prefix=global_prefix, request_path=''): """ Example usage:: @@ -109,7 +114,8 @@ def store_exception(exc_id, exc_info, pr """ try: - _store_exception(exc_id=exc_id, exc_info=exc_info, prefix=prefix) + _store_exception(exc_id=exc_id, exc_info=exc_info, prefix=prefix, + request_path=request_path) except Exception: log.exception('Failed to store exception `%s` information', exc_id) # there's no way this can fail, it will crash server badly if it does. diff --git a/vcsserver/lib/rc_cache/__init__.py b/vcsserver/lib/rc_cache/__init__.py --- a/vcsserver/lib/rc_cache/__init__.py +++ b/vcsserver/lib/rc_cache/__init__.py @@ -38,7 +38,9 @@ register_backend( log = logging.getLogger(__name__) from . import region_meta -from .utils import (get_default_cache_settings, backend_key_generator, make_region) +from .utils import ( + get_default_cache_settings, backend_key_generator, get_or_create_region, + clear_cache_namespace, make_region) def configure_dogpile_cache(settings): @@ -52,20 +54,25 @@ def configure_dogpile_cache(settings): avail_regions = set() for key in rc_cache_data.keys(): namespace_name = key.split('.', 1)[0] - avail_regions.add(namespace_name) - log.debug('dogpile: found following cache regions: %s', avail_regions) + if namespace_name in avail_regions: + continue - # register them into namespace - for region_name in avail_regions: + avail_regions.add(namespace_name) + log.debug('dogpile: found following cache regions: %s', namespace_name) + new_region = make_region( - name=region_name, + name=namespace_name, function_key_generator=None ) - new_region.configure_from_config(settings, 'rc_cache.{}.'.format(region_name)) + new_region.configure_from_config(settings, 'rc_cache.{}.'.format(namespace_name)) new_region.function_key_generator = backend_key_generator(new_region.actual_backend) - log.debug('dogpile: registering a new region %s[%s]', region_name, new_region.__dict__) - region_meta.dogpile_cache_regions[region_name] = new_region + if log.isEnabledFor(logging.DEBUG): + region_args = dict(backend=new_region.actual_backend.__class__, + region_invalidator=new_region.region_invalidator.__class__) + log.debug('dogpile: registering a new region `%s` %s', namespace_name, region_args) + + region_meta.dogpile_cache_regions[namespace_name] = new_region def includeme(config): diff --git a/vcsserver/lib/rc_cache/backends.py b/vcsserver/lib/rc_cache/backends.py --- a/vcsserver/lib/rc_cache/backends.py +++ b/vcsserver/lib/rc_cache/backends.py @@ -29,7 +29,10 @@ from dogpile.cache.backends import redis from dogpile.cache.backends.file import NO_VALUE, FileLock from dogpile.cache.util import memoized_property +from pyramid.settings import asbool + from vcsserver.lib.memory_lru_dict import LRUDict, LRUDictDebug +from vcsserver.utils import safe_str, safe_unicode _default_max_size = 1024 @@ -124,7 +127,14 @@ class FileNamespaceBackend(PickleSeriali def __init__(self, arguments): arguments['lock_factory'] = CustomLockFactory - super(FileNamespaceBackend, self).__init__(arguments) + db_file = arguments.get('filename') + + log.debug('initialing %s DB in %s', self.__class__.__name__, db_file) + try: + super(FileNamespaceBackend, self).__init__(arguments) + except Exception: + log.exception('Failed to initialize db at: %s', db_file) + raise def __repr__(self): return '{} `{}`'.format(self.__class__, self.filename) @@ -141,13 +151,16 @@ class FileNamespaceBackend(PickleSeriali return False with self._dbm_file(True) as dbm: - - return filter(cond, dbm.keys()) + try: + return filter(cond, dbm.keys()) + except Exception: + log.error('Failed to fetch DBM keys from DB: %s', self.get_store()) + raise def get_store(self): return self.filename - def get(self, key): + def _dbm_get(self, key): with self._dbm_file(False) as dbm: if hasattr(dbm, 'get'): value = dbm.get(key, NO_VALUE) @@ -161,6 +174,13 @@ class FileNamespaceBackend(PickleSeriali value = self._loads(value) return value + def get(self, key): + try: + return self._dbm_get(key) + except Exception: + log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store()) + raise + def set(self, key, value): with self._dbm_file(True) as dbm: dbm[key] = self._dumps(value) @@ -172,6 +192,16 @@ class FileNamespaceBackend(PickleSeriali class BaseRedisBackend(redis_backend.RedisBackend): + key_prefix = '' + + def __init__(self, arguments): + super(BaseRedisBackend, self).__init__(arguments) + self._lock_timeout = self.lock_timeout + self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True)) + + if self._lock_auto_renewal and not self._lock_timeout: + # set default timeout for auto_renewal + self._lock_timeout = 30 def _create_client(self): args = {} @@ -234,11 +264,10 @@ class BaseRedisBackend(redis_backend.Red pipe.execute() def get_mutex(self, key): - u = redis_backend.u if self.distributed_lock: - lock_key = u('_lock_{0}').format(key) - log.debug('Trying to acquire Redis lock for key %s', lock_key) - return self.client.lock(lock_key, self.lock_timeout, self.lock_sleep) + lock_key = u'_lock_{0}'.format(safe_unicode(key)) + return get_mutex_lock(self.client, lock_key, self._lock_timeout, + auto_renewal=self._lock_auto_renewal) else: return None @@ -251,3 +280,50 @@ class RedisPickleBackend(PickleSerialize class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend): key_prefix = 'redis_msgpack_backend' pass + + +def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False): + import redis_lock + + class _RedisLockWrapper(object): + """LockWrapper for redis_lock""" + + @classmethod + def get_lock(cls): + return redis_lock.Lock( + redis_client=client, + name=lock_key, + expire=lock_timeout, + auto_renewal=auto_renewal, + strict=True, + ) + + def __repr__(self): + return "{}:{}".format(self.__class__.__name__, lock_key) + + def __str__(self): + return "{}:{}".format(self.__class__.__name__, lock_key) + + def __init__(self): + self.lock = self.get_lock() + self.lock_key = lock_key + + def acquire(self, wait=True): + log.debug('Trying to acquire Redis lock for key %s', self.lock_key) + try: + acquired = self.lock.acquire(wait) + log.debug('Got lock for key %s, %s', self.lock_key, acquired) + return acquired + except redis_lock.AlreadyAcquired: + return False + except redis_lock.AlreadyStarted: + # refresh thread exists, but it also means we acquired the lock + return True + + def release(self): + try: + self.lock.release() + except redis_lock.NotAcquired: + pass + + return _RedisLockWrapper() diff --git a/vcsserver/lib/rc_cache/utils.py b/vcsserver/lib/rc_cache/utils.py --- a/vcsserver/lib/rc_cache/utils.py +++ b/vcsserver/lib/rc_cache/utils.py @@ -16,15 +16,16 @@ # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA import os +import time import logging import functools -from decorator import decorate from dogpile.cache import CacheRegion from dogpile.cache.util import compat from vcsserver.utils import safe_str, sha1 +from vcsserver.lib.rc_cache import region_meta log = logging.getLogger(__name__) @@ -48,18 +49,77 @@ class RhodeCodeCacheRegion(CacheRegion): if function_key_generator is None: function_key_generator = self.function_key_generator + # workaround for py2 and cython problems, this block should be removed + # once we've migrated to py3 + if 'cython' == 'cython': + def decorator(fn): + if to_str is compat.string_type: + # backwards compatible + key_generator = function_key_generator(namespace, fn) + else: + key_generator = function_key_generator(namespace, fn, to_str=to_str) + + @functools.wraps(fn) + def decorate(*arg, **kw): + key = key_generator(*arg, **kw) + + @functools.wraps(fn) + def creator(): + return fn(*arg, **kw) + + if not condition: + return creator() + + timeout = expiration_time() if expiration_time_is_callable \ + else expiration_time + + return self.get_or_create(key, creator, timeout, should_cache_fn) + + def invalidate(*arg, **kw): + key = key_generator(*arg, **kw) + self.delete(key) + + def set_(value, *arg, **kw): + key = key_generator(*arg, **kw) + self.set(key, value) + + def get(*arg, **kw): + key = key_generator(*arg, **kw) + return self.get(key) + + def refresh(*arg, **kw): + key = key_generator(*arg, **kw) + value = fn(*arg, **kw) + self.set(key, value) + return value + + decorate.set = set_ + decorate.invalidate = invalidate + decorate.refresh = refresh + decorate.get = get + decorate.original = fn + decorate.key_generator = key_generator + decorate.__wrapped__ = fn + + return decorate + return decorator + def get_or_create_for_user_func(key_generator, user_func, *arg, **kw): if not condition: - log.debug('Calling un-cached func:%s', user_func.func_name) - return user_func(*arg, **kw) + log.debug('Calling un-cached method:%s', user_func.func_name) + start = time.time() + result = user_func(*arg, **kw) + total = time.time() - start + log.debug('un-cached method:%s took %.4fs', user_func.func_name, total) + return result key = key_generator(*arg, **kw) timeout = expiration_time() if expiration_time_is_callable \ else expiration_time - log.debug('Calling cached fn:%s', user_func.func_name) + log.debug('Calling cached method:`%s`', user_func.func_name) return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw)) def cache_decorator(user_func): @@ -98,8 +158,7 @@ class RhodeCodeCacheRegion(CacheRegion): user_func.original = user_func # Use `decorate` to preserve the signature of :param:`user_func`. - - return decorate(user_func, functools.partial( + return decorator.decorate(user_func, functools.partial( get_or_create_for_user_func, key_generator)) return cache_decorator @@ -151,3 +210,54 @@ def key_generator(backend, namespace, fn return final_key return generate_key + + +def get_or_create_region(region_name, region_namespace=None): + from vcsserver.lib.rc_cache.backends import FileNamespaceBackend + region_obj = region_meta.dogpile_cache_regions.get(region_name) + if not region_obj: + raise EnvironmentError( + 'Region `{}` not in configured: {}.'.format( + region_name, region_meta.dogpile_cache_regions.keys())) + + region_uid_name = '{}:{}'.format(region_name, region_namespace) + if isinstance(region_obj.actual_backend, FileNamespaceBackend): + region_exist = region_meta.dogpile_cache_regions.get(region_namespace) + if region_exist: + log.debug('Using already configured region: %s', region_namespace) + return region_exist + cache_dir = region_meta.dogpile_config_defaults['cache_dir'] + expiration_time = region_obj.expiration_time + + if not os.path.isdir(cache_dir): + os.makedirs(cache_dir) + new_region = make_region( + name=region_uid_name, + function_key_generator=backend_key_generator(region_obj.actual_backend) + ) + namespace_filename = os.path.join( + cache_dir, "{}.cache.dbm".format(region_namespace)) + # special type that allows 1db per namespace + new_region.configure( + backend='dogpile.cache.rc.file_namespace', + expiration_time=expiration_time, + arguments={"filename": namespace_filename} + ) + + # create and save in region caches + log.debug('configuring new region: %s', region_uid_name) + region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region + + return region_obj + + +def clear_cache_namespace(cache_region, cache_namespace_uid, invalidate=False): + region = get_or_create_region(cache_region, cache_namespace_uid) + cache_keys = region.backend.list_keys(prefix=cache_namespace_uid) + num_delete_keys = len(cache_keys) + if invalidate: + region.invalidate(hard=False) + else: + if num_delete_keys: + region.delete_multi(cache_keys) + return num_delete_keys diff --git a/vcsserver/lib/statsd_client.py b/vcsserver/lib/statsd_client.py new file mode 100644 --- /dev/null +++ b/vcsserver/lib/statsd_client.py @@ -0,0 +1,49 @@ +from vcsserver.lib._vendor.statsd import client_from_config + + +class StatsdClientNotInitialised(Exception): + pass + + +class _Singleton(type): + """A metaclass that creates a Singleton base class when called.""" + + _instances = {} + + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + cls._instances[cls] = super(_Singleton, cls).__call__(*args, **kwargs) + return cls._instances[cls] + + +class Singleton(_Singleton("SingletonMeta", (object,), {})): + pass + + +class StatsdClientClass(Singleton): + setup_run = False + statsd_client = None + statsd = None + + def __getattribute__(self, name): + + if name.startswith("statsd"): + if self.setup_run: + return super(StatsdClientClass, self).__getattribute__(name) + else: + return None + #raise StatsdClientNotInitialised("requested key was %s" % name) + + return super(StatsdClientClass, self).__getattribute__(name) + + def setup(self, settings): + """ + Initialize the client + """ + statsd = client_from_config(settings) + self.statsd = statsd + self.statsd_client = statsd + self.setup_run = True + + +StatsdClient = StatsdClientClass() diff --git a/vcsserver/svn.py b/vcsserver/svn.py --- a/vcsserver/svn.py +++ b/vcsserver/svn.py @@ -19,6 +19,7 @@ import os import subprocess +import time from urllib.error import URLError import urllib.parse import logging @@ -35,7 +36,9 @@ import svn.fs import svn.repos from vcsserver import svn_diff, exceptions, subprocessio, settings -from vcsserver.base import RepoFactory, raise_from_original +from vcsserver.base import RepoFactory, raise_from_original, ArchiveNode, archive_repo +from vcsserver.exceptions import NoContentException +from vcsserver.utils import safe_str from vcsserver.vcs_base import RemoteBase log = logging.getLogger(__name__) @@ -198,7 +201,8 @@ class SvnRemote(RemoteBase): def revision_properties(self, wire, revision): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _revision_properties(_repo_id, _revision): repo = self._factory.repo(wire) fs_ptr = svn.repos.fs(repo) @@ -252,7 +256,8 @@ class SvnRemote(RemoteBase): @reraise_safe_exceptions def node_history(self, wire, path, revision, limit): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _assert_correct_path(_context_uid, _repo_id, _path, _revision, _limit): cross_copies = False repo = self._factory.repo(wire) @@ -273,7 +278,8 @@ class SvnRemote(RemoteBase): def node_properties(self, wire, path, revision): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _node_properties(_repo_id, _path, _revision): repo = self._factory.repo(wire) fsobj = svn.repos.fs(repo) @@ -282,7 +288,7 @@ class SvnRemote(RemoteBase): return _node_properties(repo_id, path, revision) def file_annotate(self, wire, path, revision): - abs_path = 'file://' + urllib.request.pathname2url( + abs_path = 'file://' + urllib.pathname2url( vcspath.join(wire['path'], path)) file_uri = svn.core.svn_path_canonicalize(abs_path) @@ -311,7 +317,8 @@ class SvnRemote(RemoteBase): def get_node_type(self, wire, path, revision=None): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _get_node_type(_repo_id, _path, _revision): repo = self._factory.repo(wire) fs_ptr = svn.repos.fs(repo) @@ -325,7 +332,8 @@ class SvnRemote(RemoteBase): def get_nodes(self, wire, path, revision=None): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _get_nodes(_repo_id, _path, _revision): repo = self._factory.repo(wire) fsobj = svn.repos.fs(repo) @@ -352,7 +360,8 @@ class SvnRemote(RemoteBase): def get_file_size(self, wire, path, revision=None): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _get_file_size(_repo_id, _path, _revision): repo = self._factory.repo(wire) fsobj = svn.repos.fs(repo) @@ -467,7 +476,8 @@ class SvnRemote(RemoteBase): def is_binary(self, wire, rev, path): cache_on, context_uid, repo_id = self._cache_on(wire) - @self.region.conditional_cache_on_arguments(condition=cache_on) + region = self._region(wire) + @region.conditional_cache_on_arguments(condition=cache_on) def _is_binary(_repo_id, _rev, _path): raw_bytes = self.get_file_content(wire, path, rev) return raw_bytes and '\0' in raw_bytes @@ -481,9 +491,7 @@ class SvnRemote(RemoteBase): if path and os.path.isdir(path): opts['cwd'] = path - safe_call = False - if '_safe' in opts: - safe_call = True + safe_call = opts.pop('_safe', False) svnenv = os.environ.copy() svnenv.update(opts.pop('extra_env', {})) @@ -496,15 +504,15 @@ class SvnRemote(RemoteBase): return ''.join(p), ''.join(p.error) except (EnvironmentError, OSError) as err: + if safe_call: + return '', safe_str(err).strip() + else: cmd = ' '.join(cmd) # human friendly CMD tb_err = ("Couldn't run svn command (%s).\n" "Original error was:%s\n" "Call options:%s\n" % (cmd, err, _opts)) log.exception(tb_err) - if safe_call: - return '', err - else: raise exceptions.VcsException()(tb_err) @reraise_safe_exceptions @@ -528,6 +536,74 @@ class SvnRemote(RemoteBase): 'post_version': get_svn_post_hook_version(repo_path), } + @reraise_safe_exceptions + def set_head_ref(self, wire, head_name): + pass + + @reraise_safe_exceptions + def archive_repo(self, wire, archive_dest_path, kind, mtime, archive_at_path, + archive_dir_name, commit_id): + + def walk_tree(root, root_dir, _commit_id): + """ + Special recursive svn repo walker + """ + + filemode_default = 0o100644 + filemode_executable = 0o100755 + + file_iter = svn.fs.dir_entries(root, root_dir) + for f_name in file_iter: + f_type = NODE_TYPE_MAPPING.get(file_iter[f_name].kind, None) + + if f_type == 'dir': + # return only DIR, and then all entries in that dir + yield os.path.join(root_dir, f_name), {'mode': filemode_default}, f_type + new_root = os.path.join(root_dir, f_name) + for _f_name, _f_data, _f_type in walk_tree(root, new_root, _commit_id): + yield _f_name, _f_data, _f_type + else: + f_path = os.path.join(root_dir, f_name).rstrip('/') + prop_list = svn.fs.node_proplist(root, f_path) + + f_mode = filemode_default + if prop_list.get('svn:executable'): + f_mode = filemode_executable + + f_is_link = False + if prop_list.get('svn:special'): + f_is_link = True + + data = { + 'is_link': f_is_link, + 'mode': f_mode, + 'content_stream': svn.core.Stream(svn.fs.file_contents(root, f_path)).read + } + + yield f_path, data, f_type + + def file_walker(_commit_id, path): + repo = self._factory.repo(wire) + root = svn.fs.revision_root(svn.repos.fs(repo), int(commit_id)) + + def no_content(): + raise NoContentException() + + for f_name, f_data, f_type in walk_tree(root, path, _commit_id): + file_path = f_name + + if f_type == 'dir': + mode = f_data['mode'] + yield ArchiveNode(file_path, mode, False, no_content) + else: + mode = f_data['mode'] + is_link = f_data['is_link'] + data_stream = f_data['content_stream'] + yield ArchiveNode(file_path, mode, is_link, data_stream) + + return archive_repo(file_walker, archive_dest_path, kind, mtime, archive_at_path, + archive_dir_name, commit_id) + class SvnDiffer(object): """ diff --git a/vcsserver/tests/conftest.py b/vcsserver/tests/conftest.py --- a/vcsserver/tests/conftest.py +++ b/vcsserver/tests/conftest.py @@ -16,13 +16,12 @@ # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA import socket - import pytest def pytest_addoption(parser): parser.addoption( - '--repeat', type=int, default=100, + '--perf-repeat-vcs', type=int, default=100, help="Number of repetitions in performance tests.") @@ -34,7 +33,7 @@ def repeat(request): Slower calls may divide it by 10 or 100. It is chosen in a way so that the tests are not too slow in our default test suite. """ - return request.config.getoption('--repeat') + return request.config.getoption('--perf-repeat-vcs') @pytest.fixture(scope='session') diff --git a/vcsserver/tests/test_git.py b/vcsserver/tests/test_git.py --- a/vcsserver/tests/test_git.py +++ b/vcsserver/tests/test_git.py @@ -113,7 +113,7 @@ class TestReraiseSafeExceptions(object): methods = inspect.getmembers(git_remote, predicate=inspect.ismethod) for method_name, method in methods: - if not method_name.startswith('_'): + if not method_name.startswith('_') and method_name not in ['vcsserver_invalidate_cache']: assert method.im_func.__code__ == decorator.__code__ @pytest.mark.parametrize('side_effect, expected_type', [ diff --git a/vcsserver/tests/test_hg.py b/vcsserver/tests/test_hg.py --- a/vcsserver/tests/test_hg.py +++ b/vcsserver/tests/test_hg.py @@ -50,7 +50,7 @@ class TestReraiseSafeExceptions(object): methods = inspect.getmembers(hg_remote, predicate=inspect.ismethod) decorator = hg.reraise_safe_exceptions(None) for method_name, method in methods: - if not method_name.startswith('_'): + if not method_name.startswith('_') and method_name not in ['vcsserver_invalidate_cache']: assert method.im_func.__code__ == decorator.__code__ @pytest.mark.parametrize('side_effect, expected_type', [ diff --git a/vcsserver/tests/test_http_performance.py b/vcsserver/tests/test_http_performance.py --- a/vcsserver/tests/test_http_performance.py +++ b/vcsserver/tests/test_http_performance.py @@ -14,7 +14,10 @@ def vcs_app(): 'dev.use_echo_app': 'true', 'locale': 'en_US.UTF-8', } - vcs_app = main({}, **stub_settings) + stub_global_conf = { + '__file__': '' + } + vcs_app = main(stub_global_conf, **stub_settings) app = webtest.TestApp(vcs_app) return app diff --git a/vcsserver/tests/test_main_http.py b/vcsserver/tests/test_main_http.py --- a/vcsserver/tests/test_main_http.py +++ b/vcsserver/tests/test_main_http.py @@ -25,7 +25,7 @@ from vcsserver.base import obfuscate_qs @mock.patch('vcsserver.http_main.VCS', mock.Mock()) @mock.patch('vcsserver.hgpatches.patch_largefiles_capabilities') def test_applies_largefiles_patch(patch_largefiles_capabilities): - http_main.main({}) + http_main.main({'__file__': ''}) patch_largefiles_capabilities.assert_called_once_with() @@ -35,7 +35,7 @@ def test_applies_largefiles_patch(patch_ 'vcsserver.hgpatches.patch_largefiles_capabilities', mock.Mock(side_effect=Exception("Must not be called"))) def test_applies_largefiles_patch_only_if_mercurial_is_available(): - http_main.main({}) + http_main.main({'__file__': ''}) @pytest.mark.parametrize('given, expected', [ diff --git a/vcsserver/tweens/request_wrapper.py b/vcsserver/tweens/request_wrapper.py --- a/vcsserver/tweens/request_wrapper.py +++ b/vcsserver/tweens/request_wrapper.py @@ -25,15 +25,23 @@ from vcsserver.utils import safe_str log = logging.getLogger(__name__) -def get_access_path(request): - environ = request.environ - return environ.get('PATH_INFO') +def get_access_path(environ): + path = environ.get('PATH_INFO') + return path def get_user_agent(environ): return environ.get('HTTP_USER_AGENT') +def get_vcs_method(environ): + return environ.get('HTTP_X_RC_METHOD') + + +def get_vcs_repo(environ): + return environ.get('HTTP_X_RC_REPO_NAME') + + class RequestWrapperTween(object): def __init__(self, handler, registry): self.handler = handler @@ -43,17 +51,52 @@ class RequestWrapperTween(object): def __call__(self, request): start = time.time() + log.debug('Starting request time measurement') + response = None + + ua = get_user_agent(request.environ) + vcs_method = get_vcs_method(request.environ) + repo_name = get_vcs_repo(request.environ) + try: response = self.handler(request) finally: - end = time.time() - total = end - start count = request.request_count() _ver_ = vcsserver.__version__ + _path = safe_str(get_access_path(request.environ)) + ip = '127.0.0.1' + match_route = request.matched_route.name if request.matched_route else "NOT_FOUND" + resp_code = getattr(response, 'status_code', 'UNDEFINED') + + total = time.time() - start + + _view_path = "{}/{}@{}".format(_path, vcs_method, repo_name) log.info( 'Req[%4s] IP: %s %s Request to %s time: %.4fs [%s], VCSServer %s', - count, '127.0.0.1', request.environ.get('REQUEST_METHOD'), - safe_str(get_access_path(request)), total, get_user_agent(request.environ), _ver_) + count, ip, request.environ.get('REQUEST_METHOD'), + _view_path, total, ua, _ver_, + extra={"time": total, "ver": _ver_, "code": resp_code, + "path": _path, "view_name": match_route, "user_agent": ua, + "vcs_method": vcs_method, "repo_name": repo_name} + ) + + statsd = request.registry.statsd + if statsd: + match_route = request.matched_route.name if request.matched_route else _path + elapsed_time_ms = round(1000.0 * total) # use ms only + statsd.timing( + "vcsserver_req_timing.histogram", elapsed_time_ms, + tags=[ + "view_name:{}".format(match_route), + "code:{}".format(resp_code) + ], + use_decimals=False + ) + statsd.incr( + "vcsserver_req_total", tags=[ + "view_name:{}".format(match_route), + "code:{}".format(resp_code) + ]) return response diff --git a/vcsserver/vcs_base.py b/vcsserver/vcs_base.py --- a/vcsserver/vcs_base.py +++ b/vcsserver/vcs_base.py @@ -15,13 +15,15 @@ # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +from vcsserver.lib import rc_cache class RemoteBase(object): EMPTY_COMMIT = '0' * 40 - @property - def region(self): - return self._factory._cache_region + def _region(self, wire): + cache_repo_id = wire.get('cache_repo_id', '') + cache_namespace_uid = 'cache_repo.{}'.format(cache_repo_id) + return rc_cache.get_or_create_region('repo_object', cache_namespace_uid) def _cache_on(self, wire): context = wire.get('context', '') @@ -30,3 +32,15 @@ class RemoteBase(object): cache = wire.get('cache', True) cache_on = context and cache return cache_on, context_uid, repo_id + + def vcsserver_invalidate_cache(self, wire, delete): + from vcsserver.lib import rc_cache + repo_id = wire.get('repo_id', '') + cache_repo_id = wire.get('cache_repo_id', '') + cache_namespace_uid = 'cache_repo.{}'.format(cache_repo_id) + + if delete: + rc_cache.clear_cache_namespace( + 'repo_object', cache_namespace_uid, invalidate=True) + + return {'invalidated': {'repo_id': repo_id, 'delete': delete}}