##// END OF EJS Templates
chore(deps): bumped zope.interface==6.2.0
chore(deps): bumped zope.interface==6.2.0

File last commit:

r5260:e241ac2a default
r5307:39250ee5 default
Show More
client_http.py
429 lines | 15.0 KiB | text/x-python | PythonLexer
# Copyright (C) 2016-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
Client for the VCSServer implemented based on HTTP.
"""
import copy
import logging
import threading
import time
import urllib.request
import urllib.error
import urllib.parse
import urllib.parse
import uuid
import traceback
import pycurl
import msgpack
import requests
from requests.packages.urllib3.util.retry import Retry
import rhodecode
from rhodecode.lib import rc_cache
from rhodecode.lib.rc_cache.utils import compute_key_from_params
from rhodecode.lib.system_info import get_cert_path
from rhodecode.lib.vcs import exceptions, CurlSession
from rhodecode.lib.utils2 import str2bool
log = logging.getLogger(__name__)
# TODO: mikhail: Keep it in sync with vcsserver's
# HTTPApplication.ALLOWED_EXCEPTIONS
EXCEPTIONS_MAP = {
'KeyError': KeyError,
'URLError': urllib.error.URLError,
}
def _remote_call(url, payload, exceptions_map, session, retries=3):
for attempt in range(retries):
try:
response = session.post(url, data=msgpack.packb(payload))
break
except pycurl.error as e:
error_code, error_message = e.args
if error_code == pycurl.E_RECV_ERROR:
log.warning(f'Received a "Connection reset by peer" error. '
f'Retrying... ({attempt + 1}/{retries})')
continue # Retry if connection reset error.
msg = f'{e}. \npycurl traceback: {traceback.format_exc()}'
raise exceptions.HttpVCSCommunicationError(msg)
except Exception as e:
message = getattr(e, 'message', '')
if 'Failed to connect' in message:
# gevent doesn't return proper pycurl errors
raise exceptions.HttpVCSCommunicationError(e)
else:
raise
if response.status_code >= 400:
content_type = response.content_type
log.error('Call to %s returned non 200 HTTP code: %s [%s]',
url, response.status_code, content_type)
raise exceptions.HttpVCSCommunicationError(repr(response.content))
try:
response = msgpack.unpackb(response.content)
except Exception:
log.exception('Failed to decode response from msgpack')
raise
error = response.get('error')
if error:
type_ = error.get('type', 'Exception')
exc = exceptions_map.get(type_, Exception)
exc = exc(error.get('message'))
try:
exc._vcs_kind = error['_vcs_kind']
except KeyError:
pass
try:
exc._vcs_server_traceback = error['traceback']
exc._vcs_server_org_exc_name = error['org_exc']
exc._vcs_server_org_exc_tb = error['org_exc_tb']
except KeyError:
pass
exc.add_note(attach_exc_details(error))
raise exc # raising the org exception from vcsserver
return response.get('result')
def attach_exc_details(error):
note = '-- EXC NOTE -- :\n'
note += f'vcs_kind: {error.get("_vcs_kind")}\n'
note += f'org_exc: {error.get("_vcs_kind")}\n'
note += f'tb: {error.get("traceback")}\n'
note += '-- END EXC NOTE --'
return note
def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
try:
headers = {
'X-RC-Method': payload.get('method'),
'X-RC-Repo-Name': payload.get('_repo_name')
}
response = session.post(url, data=msgpack.packb(payload), headers=headers)
except pycurl.error as e:
error_code, error_message = e.args
msg = f'{e}. \npycurl traceback: {traceback.format_exc()}'
raise exceptions.HttpVCSCommunicationError(msg)
except Exception as e:
message = getattr(e, 'message', '')
if 'Failed to connect' in message:
# gevent doesn't return proper pycurl errors
raise exceptions.HttpVCSCommunicationError(e)
else:
raise
if response.status_code >= 400:
log.error('Call to %s returned non 200 HTTP code: %s',
url, response.status_code)
raise exceptions.HttpVCSCommunicationError(repr(response.content))
return response.iter_content(chunk_size=chunk_size)
class ServiceConnection(object):
def __init__(self, server_and_port, backend_endpoint, session_factory):
self.url = urllib.parse.urljoin(f'http://{server_and_port}', backend_endpoint)
self._session_factory = session_factory
def __getattr__(self, name):
def f(*args, **kwargs):
return self._call(name, *args, **kwargs)
return f
@exceptions.map_vcs_exceptions
def _call(self, name, *args, **kwargs):
payload = {
'id': str(uuid.uuid4()),
'method': name,
'params': {'args': args, 'kwargs': kwargs}
}
return _remote_call(
self.url, payload, EXCEPTIONS_MAP, self._session_factory())
class RemoteVCSMaker(object):
def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
self.url = urllib.parse.urljoin(f'http://{server_and_port}', backend_endpoint)
self.stream_url = urllib.parse.urljoin(f'http://{server_and_port}', backend_endpoint+'/stream')
self._session_factory = session_factory
self.backend_type = backend_type
@classmethod
def init_cache_region(cls, repo_id):
cache_namespace_uid = f'repo.{repo_id}'
region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
return region, cache_namespace_uid
def __call__(self, path, repo_id, config, with_wire=None):
log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
def __getattr__(self, name):
def remote_attr(*args, **kwargs):
return self._call(name, *args, **kwargs)
return remote_attr
@exceptions.map_vcs_exceptions
def _call(self, func_name, *args, **kwargs):
payload = {
'id': str(uuid.uuid4()),
'method': func_name,
'backend': self.backend_type,
'params': {'args': args, 'kwargs': kwargs}
}
url = self.url
return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
class RemoteRepo(object):
CHUNK_SIZE = 16384
def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
self.url = remote_maker.url
self.stream_url = remote_maker.stream_url
self._session = remote_maker._session_factory()
cache_repo_id = self._repo_id_sanitizer(repo_id)
_repo_name = self._get_repo_name(config, path)
self._cache_region, self._cache_namespace = \
remote_maker.init_cache_region(cache_repo_id)
with_wire = with_wire or {}
repo_state_uid = with_wire.get('repo_state_uid') or 'state'
self._wire = {
"_repo_name": _repo_name,
"path": path, # repo path
"repo_id": repo_id,
"cache_repo_id": cache_repo_id,
"config": config,
"repo_state_uid": repo_state_uid,
"context": self._create_vcs_cache_context(path, repo_state_uid)
}
if with_wire:
self._wire.update(with_wire)
# NOTE(johbo): Trading complexity for performance. Avoiding the call to
# log.debug brings a few percent gain even if is is not active.
if log.isEnabledFor(logging.DEBUG):
self._call_with_logging = True
self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
def _get_repo_name(self, config, path):
repo_store = config.get('paths', '/')
return path.split(repo_store)[-1].lstrip('/')
def _repo_id_sanitizer(self, repo_id):
pathless = repo_id.replace('/', '__').replace('-', '_')
return ''.join(char if ord(char) < 128 else '_{}_'.format(ord(char)) for char in pathless)
def __getattr__(self, name):
if name.startswith('stream:'):
def repo_remote_attr(*args, **kwargs):
return self._call_stream(name, *args, **kwargs)
else:
def repo_remote_attr(*args, **kwargs):
return self._call(name, *args, **kwargs)
return repo_remote_attr
def _base_call(self, name, *args, **kwargs):
# TODO: oliver: This is currently necessary pre-call since the
# config object is being changed for hooking scenarios
wire = copy.deepcopy(self._wire)
wire["config"] = wire["config"].serialize()
wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
payload = {
'id': str(uuid.uuid4()),
'method': name,
"_repo_name": wire['_repo_name'],
'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
}
context_uid = wire.get('context')
return context_uid, payload
def get_local_cache(self, name, args):
cache_on = False
cache_key = ''
local_cache_on = rhodecode.ConfigGet().get_bool('vcs.methods.cache')
cache_methods = [
'branches', 'tags', 'bookmarks',
'is_large_file', 'is_binary',
'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
'node_history',
'revision', 'tree_items',
'ctx_list', 'ctx_branch', 'ctx_description',
'bulk_request',
'assert_correct_path'
]
if local_cache_on and name in cache_methods:
cache_on = True
repo_state_uid = self._wire['repo_state_uid']
call_args = [a for a in args]
cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
return cache_on, cache_key
@exceptions.map_vcs_exceptions
def _call(self, name, *args, **kwargs):
context_uid, payload = self._base_call(name, *args, **kwargs)
url = self.url
start = time.time()
cache_on, cache_key = self.get_local_cache(name, args)
@self._cache_region.conditional_cache_on_arguments(
namespace=self._cache_namespace, condition=cache_on and cache_key)
def remote_call(_cache_key):
if self._call_with_logging:
args_repr = f'ARG: {str(args):.512}|KW: {str(kwargs):.512}'
log.debug('Calling %s@%s with args:%r. wire_context: %s cache_on: %s',
url, name, args_repr, context_uid, cache_on)
return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
result = remote_call(cache_key)
if self._call_with_logging:
log.debug('Call %s@%s took: %.4fs. wire_context: %s',
url, name, time.time()-start, context_uid)
return result
@exceptions.map_vcs_exceptions
def _call_stream(self, name, *args, **kwargs):
context_uid, payload = self._base_call(name, *args, **kwargs)
payload['chunk_size'] = self.CHUNK_SIZE
url = self.stream_url
start = time.time()
cache_on, cache_key = self.get_local_cache(name, args)
# Cache is a problem because this is a stream
def streaming_remote_call(_cache_key):
if self._call_with_logging:
args_repr = f'ARG: {str(args):.512}|KW: {str(kwargs):.512}'
log.debug('Calling %s@%s with args:%r. wire_context: %s cache_on: %s',
url, name, args_repr, context_uid, cache_on)
return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
result = streaming_remote_call(cache_key)
if self._call_with_logging:
log.debug('Call %s@%s took: %.4fs. wire_context: %s',
url, name, time.time()-start, context_uid)
return result
def __getitem__(self, key):
return self.revision(key)
def _create_vcs_cache_context(self, *args):
"""
Creates a unique string which is passed to the VCSServer on every
remote call. It is used as cache key in the VCSServer.
"""
hash_key = '-'.join(map(str, args))
return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
def invalidate_vcs_cache(self):
"""
This invalidates the context which is sent to the VCSServer on every
call to a remote method. It forces the VCSServer to create a fresh
repository instance on the next call to a remote method.
"""
self._wire['context'] = str(uuid.uuid4())
class VcsHttpProxy(object):
CHUNK_SIZE = 16384
def __init__(self, server_and_port, backend_endpoint):
retries = Retry(total=5, connect=None, read=None, redirect=None)
adapter = requests.adapters.HTTPAdapter(max_retries=retries)
self.base_url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
self.session = requests.Session()
self.session.mount('http://', adapter)
def handle(self, environment, input_data, *args, **kwargs):
data = {
'environment': environment,
'input_data': input_data,
'args': args,
'kwargs': kwargs
}
result = self.session.post(
self.base_url, msgpack.packb(data), stream=True)
return self._get_result(result)
def _deserialize_and_raise(self, error):
exception = Exception(error['message'])
try:
exception._vcs_kind = error['_vcs_kind']
except KeyError:
pass
raise exception
def _iterate(self, result):
unpacker = msgpack.Unpacker()
for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
unpacker.feed(line)
yield from unpacker
def _get_result(self, result):
iterator = self._iterate(result)
error = next(iterator)
if error:
self._deserialize_and_raise(error)
status = next(iterator)
headers = next(iterator)
return iterator, status, headers
class ThreadlocalSessionFactory(object):
"""
Creates one CurlSession per thread on demand.
"""
def __init__(self):
self._thread_local = threading.local()
def __call__(self):
if not hasattr(self._thread_local, 'curl_session'):
self._thread_local.curl_session = CurlSession()
return self._thread_local.curl_session