client_http.py
432 lines
| 15.0 KiB
| text/x-python
|
PythonLexer
r5608 | # Copyright (C) 2016-2024 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
Client for the VCSServer implemented based on HTTP. | ||||
""" | ||||
import copy | ||||
import logging | ||||
Martin Bornhold
|
r243 | import threading | ||
r3848 | import time | |||
r5075 | import urllib.request | |||
import urllib.error | ||||
import urllib.parse | ||||
r1 | import uuid | |||
r2940 | import traceback | |||
r1 | ||||
r682 | import pycurl | |||
r1 | import msgpack | |||
import requests | ||||
r5647 | from urllib3.util.retry import Retry | |||
r1 | ||||
r3337 | import rhodecode | |||
r4744 | from rhodecode.lib import rc_cache | |||
from rhodecode.lib.rc_cache.utils import compute_key_from_params | ||||
r3337 | from rhodecode.lib.system_info import get_cert_path | |||
from rhodecode.lib.vcs import exceptions, CurlSession | ||||
r4790 | from rhodecode.lib.utils2 import str2bool | |||
r1 | ||||
log = logging.getLogger(__name__) | ||||
# TODO: mikhail: Keep it in sync with vcsserver's | ||||
# HTTPApplication.ALLOWED_EXCEPTIONS | ||||
EXCEPTIONS_MAP = { | ||||
'KeyError': KeyError, | ||||
r4914 | 'URLError': urllib.error.URLError, | |||
r1 | } | |||
r5033 | def _remote_call(url, payload, exceptions_map, session, retries=3): | |||
for attempt in range(retries): | ||||
try: | ||||
response = session.post(url, data=msgpack.packb(payload)) | ||||
r5075 | break | |||
r5033 | except pycurl.error as e: | |||
error_code, error_message = e.args | ||||
if error_code == pycurl.E_RECV_ERROR: | ||||
log.warning(f'Received a "Connection reset by peer" error. ' | ||||
f'Retrying... ({attempt + 1}/{retries})') | ||||
continue # Retry if connection reset error. | ||||
r5091 | msg = f'{e}. \npycurl traceback: {traceback.format_exc()}' | |||
r5033 | raise exceptions.HttpVCSCommunicationError(msg) | |||
except Exception as e: | ||||
message = getattr(e, 'message', '') | ||||
if 'Failed to connect' in message: | ||||
# gevent doesn't return proper pycurl errors | ||||
raise exceptions.HttpVCSCommunicationError(e) | ||||
else: | ||||
raise | ||||
r682 | ||||
r1410 | if response.status_code >= 400: | |||
r5075 | content_type = response.content_type | |||
log.error('Call to %s returned non 200 HTTP code: %s [%s]', | ||||
url, response.status_code, content_type) | ||||
r1410 | raise exceptions.HttpVCSCommunicationError(repr(response.content)) | |||
r1110 | try: | |||
r5075 | response = msgpack.unpackb(response.content) | |||
r1110 | except Exception: | |||
r5033 | log.exception('Failed to decode response from msgpack') | |||
r1110 | raise | |||
r1 | error = response.get('error') | |||
if error: | ||||
type_ = error.get('type', 'Exception') | ||||
exc = exceptions_map.get(type_, Exception) | ||||
exc = exc(error.get('message')) | ||||
try: | ||||
exc._vcs_kind = error['_vcs_kind'] | ||||
except KeyError: | ||||
pass | ||||
r1257 | ||||
try: | ||||
exc._vcs_server_traceback = error['traceback'] | ||||
r3369 | exc._vcs_server_org_exc_name = error['org_exc'] | |||
exc._vcs_server_org_exc_tb = error['org_exc_tb'] | ||||
r1257 | except KeyError: | |||
pass | ||||
r5075 | exc.add_note(attach_exc_details(error)) | |||
raise exc # raising the org exception from vcsserver | ||||
r1 | return response.get('result') | |||
r5075 | def attach_exc_details(error): | |||
note = '-- EXC NOTE -- :\n' | ||||
note += f'vcs_kind: {error.get("_vcs_kind")}\n' | ||||
note += f'org_exc: {error.get("_vcs_kind")}\n' | ||||
note += f'tb: {error.get("traceback")}\n' | ||||
note += '-- END EXC NOTE --' | ||||
return note | ||||
r3895 | def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size): | |||
try: | ||||
r4888 | headers = { | |||
'X-RC-Method': payload.get('method'), | ||||
'X-RC-Repo-Name': payload.get('_repo_name') | ||||
} | ||||
response = session.post(url, data=msgpack.packb(payload), headers=headers) | ||||
r3895 | except pycurl.error as e: | |||
r5033 | error_code, error_message = e.args | |||
r5091 | msg = f'{e}. \npycurl traceback: {traceback.format_exc()}' | |||
r3895 | raise exceptions.HttpVCSCommunicationError(msg) | |||
except Exception as e: | ||||
message = getattr(e, 'message', '') | ||||
if 'Failed to connect' in message: | ||||
# gevent doesn't return proper pycurl errors | ||||
raise exceptions.HttpVCSCommunicationError(e) | ||||
else: | ||||
raise | ||||
if response.status_code >= 400: | ||||
log.error('Call to %s returned non 200 HTTP code: %s', | ||||
url, response.status_code) | ||||
raise exceptions.HttpVCSCommunicationError(repr(response.content)) | ||||
return response.iter_content(chunk_size=chunk_size) | ||||
class ServiceConnection(object): | ||||
def __init__(self, server_and_port, backend_endpoint, session_factory): | ||||
r5260 | self.url = urllib.parse.urljoin(f'http://{server_and_port}', backend_endpoint) | |||
r3895 | self._session_factory = session_factory | |||
def __getattr__(self, name): | ||||
def f(*args, **kwargs): | ||||
return self._call(name, *args, **kwargs) | ||||
return f | ||||
@exceptions.map_vcs_exceptions | ||||
def _call(self, name, *args, **kwargs): | ||||
payload = { | ||||
'id': str(uuid.uuid4()), | ||||
'method': name, | ||||
'params': {'args': args, 'kwargs': kwargs} | ||||
} | ||||
return _remote_call( | ||||
self.url, payload, EXCEPTIONS_MAP, self._session_factory()) | ||||
class RemoteVCSMaker(object): | ||||
def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory): | ||||
r5260 | self.url = urllib.parse.urljoin(f'http://{server_and_port}', backend_endpoint) | |||
self.stream_url = urllib.parse.urljoin(f'http://{server_and_port}', backend_endpoint+'/stream') | ||||
r3895 | ||||
self._session_factory = session_factory | ||||
self.backend_type = backend_type | ||||
r4744 | @classmethod | |||
def init_cache_region(cls, repo_id): | ||||
r5091 | cache_namespace_uid = f'repo.{repo_id}' | |||
r4744 | region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid) | |||
return region, cache_namespace_uid | ||||
r3895 | def __call__(self, path, repo_id, config, with_wire=None): | |||
log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path) | ||||
return RemoteRepo(path, repo_id, config, self, with_wire=with_wire) | ||||
def __getattr__(self, name): | ||||
def remote_attr(*args, **kwargs): | ||||
return self._call(name, *args, **kwargs) | ||||
return remote_attr | ||||
@exceptions.map_vcs_exceptions | ||||
def _call(self, func_name, *args, **kwargs): | ||||
payload = { | ||||
'id': str(uuid.uuid4()), | ||||
'method': func_name, | ||||
'backend': self.backend_type, | ||||
'params': {'args': args, 'kwargs': kwargs} | ||||
} | ||||
url = self.url | ||||
return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory()) | ||||
class RemoteRepo(object): | ||||
CHUNK_SIZE = 16384 | ||||
def __init__(self, path, repo_id, config, remote_maker, with_wire=None): | ||||
self.url = remote_maker.url | ||||
self.stream_url = remote_maker.stream_url | ||||
self._session = remote_maker._session_factory() | ||||
r4778 | ||||
r4777 | cache_repo_id = self._repo_id_sanitizer(repo_id) | |||
r4887 | _repo_name = self._get_repo_name(config, path) | |||
r4744 | self._cache_region, self._cache_namespace = \ | |||
r4778 | remote_maker.init_cache_region(cache_repo_id) | |||
r3895 | ||||
r5607 | with_wire = with_wire or {"cache": False} | |||
r3895 | ||||
repo_state_uid = with_wire.get('repo_state_uid') or 'state' | ||||
r4887 | ||||
r3895 | self._wire = { | |||
r4887 | "_repo_name": _repo_name, | |||
r3895 | "path": path, # repo path | |||
"repo_id": repo_id, | ||||
r4777 | "cache_repo_id": cache_repo_id, | |||
r3895 | "config": config, | |||
"repo_state_uid": repo_state_uid, | ||||
"context": self._create_vcs_cache_context(path, repo_state_uid) | ||||
} | ||||
if with_wire: | ||||
self._wire.update(with_wire) | ||||
# NOTE(johbo): Trading complexity for performance. Avoiding the call to | ||||
# log.debug brings a few percent gain even if is is not active. | ||||
if log.isEnabledFor(logging.DEBUG): | ||||
self._call_with_logging = True | ||||
self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__')) | ||||
r4887 | def _get_repo_name(self, config, path): | |||
repo_store = config.get('paths', '/') | ||||
return path.split(repo_store)[-1].lstrip('/') | ||||
r4766 | def _repo_id_sanitizer(self, repo_id): | |||
r4815 | pathless = repo_id.replace('/', '__').replace('-', '_') | |||
return ''.join(char if ord(char) < 128 else '_{}_'.format(ord(char)) for char in pathless) | ||||
r4766 | ||||
r3895 | def __getattr__(self, name): | |||
if name.startswith('stream:'): | ||||
def repo_remote_attr(*args, **kwargs): | ||||
return self._call_stream(name, *args, **kwargs) | ||||
else: | ||||
def repo_remote_attr(*args, **kwargs): | ||||
return self._call(name, *args, **kwargs) | ||||
return repo_remote_attr | ||||
def _base_call(self, name, *args, **kwargs): | ||||
# TODO: oliver: This is currently necessary pre-call since the | ||||
# config object is being changed for hooking scenarios | ||||
wire = copy.deepcopy(self._wire) | ||||
wire["config"] = wire["config"].serialize() | ||||
wire["config"].append(('vcs', 'ssl_dir', self.cert_dir)) | ||||
payload = { | ||||
'id': str(uuid.uuid4()), | ||||
'method': name, | ||||
r4887 | "_repo_name": wire['_repo_name'], | |||
r3895 | 'params': {'wire': wire, 'args': args, 'kwargs': kwargs} | |||
} | ||||
context_uid = wire.get('context') | ||||
return context_uid, payload | ||||
r4790 | def get_local_cache(self, name, args): | |||
r4744 | cache_on = False | |||
cache_key = '' | ||||
r5075 | local_cache_on = rhodecode.ConfigGet().get_bool('vcs.methods.cache') | |||
r4747 | ||||
cache_methods = [ | ||||
'branches', 'tags', 'bookmarks', | ||||
r4790 | 'is_large_file', 'is_binary', | |||
'fctx_size', 'stream:fctx_node_data', 'blob_raw_length', | ||||
'node_history', | ||||
r4747 | 'revision', 'tree_items', | |||
r5647 | 'ctx_branch', 'ctx_description', | |||
r4747 | 'bulk_request', | |||
r5565 | 'assert_correct_path', | |||
'is_path_valid_repository', | ||||
r4747 | ] | |||
r5580 | wire_cache = self._wire['cache'] | |||
if local_cache_on and wire_cache and name in cache_methods: | ||||
r4745 | cache_on = True | |||
r4747 | repo_state_uid = self._wire['repo_state_uid'] | |||
call_args = [a for a in args] | ||||
cache_key = compute_key_from_params(repo_state_uid, name, *call_args) | ||||
r3895 | ||||
r4790 | return cache_on, cache_key | |||
@exceptions.map_vcs_exceptions | ||||
def _call(self, name, *args, **kwargs): | ||||
r5580 | ||||
r4790 | context_uid, payload = self._base_call(name, *args, **kwargs) | |||
url = self.url | ||||
start = time.time() | ||||
cache_on, cache_key = self.get_local_cache(name, args) | ||||
r4744 | @self._cache_region.conditional_cache_on_arguments( | |||
namespace=self._cache_namespace, condition=cache_on and cache_key) | ||||
def remote_call(_cache_key): | ||||
if self._call_with_logging: | ||||
r5075 | args_repr = f'ARG: {str(args):.512}|KW: {str(kwargs):.512}' | |||
r5033 | log.debug('Calling %s@%s with args:%r. wire_context: %s cache_on: %s', | |||
url, name, args_repr, context_uid, cache_on) | ||||
r4744 | return _remote_call(url, payload, EXCEPTIONS_MAP, self._session) | |||
result = remote_call(cache_key) | ||||
r3895 | if self._call_with_logging: | |||
log.debug('Call %s@%s took: %.4fs. wire_context: %s', | ||||
url, name, time.time()-start, context_uid) | ||||
return result | ||||
@exceptions.map_vcs_exceptions | ||||
def _call_stream(self, name, *args, **kwargs): | ||||
context_uid, payload = self._base_call(name, *args, **kwargs) | ||||
payload['chunk_size'] = self.CHUNK_SIZE | ||||
url = self.stream_url | ||||
start = time.time() | ||||
r4790 | cache_on, cache_key = self.get_local_cache(name, args) | |||
r3895 | ||||
r4790 | # Cache is a problem because this is a stream | |||
def streaming_remote_call(_cache_key): | ||||
if self._call_with_logging: | ||||
r5075 | args_repr = f'ARG: {str(args):.512}|KW: {str(kwargs):.512}' | |||
r5033 | log.debug('Calling %s@%s with args:%r. wire_context: %s cache_on: %s', | |||
url, name, args_repr, context_uid, cache_on) | ||||
r4790 | return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE) | |||
r3895 | ||||
r4790 | result = streaming_remote_call(cache_key) | |||
r3895 | if self._call_with_logging: | |||
log.debug('Call %s@%s took: %.4fs. wire_context: %s', | ||||
url, name, time.time()-start, context_uid) | ||||
return result | ||||
def __getitem__(self, key): | ||||
return self.revision(key) | ||||
def _create_vcs_cache_context(self, *args): | ||||
""" | ||||
Creates a unique string which is passed to the VCSServer on every | ||||
remote call. It is used as cache key in the VCSServer. | ||||
""" | ||||
hash_key = '-'.join(map(str, args)) | ||||
return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key)) | ||||
def invalidate_vcs_cache(self): | ||||
""" | ||||
This invalidates the context which is sent to the VCSServer on every | ||||
call to a remote method. It forces the VCSServer to create a fresh | ||||
repository instance on the next call to a remote method. | ||||
""" | ||||
self._wire['context'] = str(uuid.uuid4()) | ||||
r1 | class VcsHttpProxy(object): | |||
CHUNK_SIZE = 16384 | ||||
def __init__(self, server_and_port, backend_endpoint): | ||||
r1958 | retries = Retry(total=5, connect=None, read=None, redirect=None) | |||
adapter = requests.adapters.HTTPAdapter(max_retries=retries) | ||||
r4950 | self.base_url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint) | |||
r1 | self.session = requests.Session() | |||
self.session.mount('http://', adapter) | ||||
def handle(self, environment, input_data, *args, **kwargs): | ||||
data = { | ||||
'environment': environment, | ||||
'input_data': input_data, | ||||
'args': args, | ||||
'kwargs': kwargs | ||||
} | ||||
result = self.session.post( | ||||
self.base_url, msgpack.packb(data), stream=True) | ||||
return self._get_result(result) | ||||
def _deserialize_and_raise(self, error): | ||||
exception = Exception(error['message']) | ||||
try: | ||||
exception._vcs_kind = error['_vcs_kind'] | ||||
except KeyError: | ||||
pass | ||||
raise exception | ||||
def _iterate(self, result): | ||||
unpacker = msgpack.Unpacker() | ||||
for line in result.iter_content(chunk_size=self.CHUNK_SIZE): | ||||
unpacker.feed(line) | ||||
r5091 | yield from unpacker | |||
r1 | ||||
def _get_result(self, result): | ||||
iterator = self._iterate(result) | ||||
r4936 | error = next(iterator) | |||
r1 | if error: | |||
self._deserialize_and_raise(error) | ||||
r4936 | status = next(iterator) | |||
headers = next(iterator) | ||||
r1 | ||||
return iterator, status, headers | ||||
Martin Bornhold
|
r243 | |||
class ThreadlocalSessionFactory(object): | ||||
""" | ||||
Creates one CurlSession per thread on demand. | ||||
""" | ||||
def __init__(self): | ||||
self._thread_local = threading.local() | ||||
def __call__(self): | ||||
if not hasattr(self._thread_local, 'curl_session'): | ||||
self._thread_local.curl_session = CurlSession() | ||||
return self._thread_local.curl_session | ||||