##// END OF EJS Templates
feat(locks): added no-blocking option for redis blocks
feat(locks): added no-blocking option for redis blocks

File last commit:

r5260:e241ac2a default
r5380:6a5d4eae default
Show More
client_http.py
429 lines | 15.0 KiB | text/x-python | PythonLexer
copyrights: updated for 2023
r5088 # Copyright (C) 2016-2023 RhodeCode GmbH
project: added all source files and assets
r1 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
Client for the VCSServer implemented based on HTTP.
"""
import copy
import logging
Martin Bornhold
threading: Add factory that creates curl session for each thread....
r243 import threading
caches: updated cache backend to new vcsserver caches implementation.
r3848 import time
vcs-support: bulk of changes for python3
r5075 import urllib.request
import urllib.error
import urllib.parse
python3: fixed urlparse import
r4919 import urllib.parse
project: added all source files and assets
r1 import uuid
vcs: add nicer traceback information for curl errors.
r2940 import traceback
project: added all source files and assets
r1
dan
vcs-error: replace disable_vcs middleware with vcs and http exceptions...
r682 import pycurl
project: added all source files and assets
r1 import msgpack
import requests
http: ported old depracated retry format for http client.
r1958 from requests.packages.urllib3.util.retry import Retry
project: added all source files and assets
r1
vcs: expose SSL certificate path over the wire to the vcsserver....
r3337 import rhodecode
vcs: experimental local cache for methods
r4744 from rhodecode.lib import rc_cache
from rhodecode.lib.rc_cache.utils import compute_key_from_params
vcs: expose SSL certificate path over the wire to the vcsserver....
r3337 from rhodecode.lib.system_info import get_cert_path
from rhodecode.lib.vcs import exceptions, CurlSession
http_client: some reformatting
r4790 from rhodecode.lib.utils2 import str2bool
project: added all source files and assets
r1
log = logging.getLogger(__name__)
# TODO: mikhail: Keep it in sync with vcsserver's
# HTTPApplication.ALLOWED_EXCEPTIONS
EXCEPTIONS_MAP = {
'KeyError': KeyError,
python3: fix urllib usage
r4914 'URLError': urllib.error.URLError,
project: added all source files and assets
r1 }
remote-call: added retries for connection-reset-by-peer and optimize logging leaking too much info
r5033 def _remote_call(url, payload, exceptions_map, session, retries=3):
for attempt in range(retries):
try:
response = session.post(url, data=msgpack.packb(payload))
vcs-support: bulk of changes for python3
r5075 break
remote-call: added retries for connection-reset-by-peer and optimize logging leaking too much info
r5033 except pycurl.error as e:
error_code, error_message = e.args
if error_code == pycurl.E_RECV_ERROR:
log.warning(f'Received a "Connection reset by peer" error. '
f'Retrying... ({attempt + 1}/{retries})')
continue # Retry if connection reset error.
libs: more python3 reformats
r5091 msg = f'{e}. \npycurl traceback: {traceback.format_exc()}'
remote-call: added retries for connection-reset-by-peer and optimize logging leaking too much info
r5033 raise exceptions.HttpVCSCommunicationError(msg)
except Exception as e:
message = getattr(e, 'message', '')
if 'Failed to connect' in message:
# gevent doesn't return proper pycurl errors
raise exceptions.HttpVCSCommunicationError(e)
else:
raise
dan
vcs-error: replace disable_vcs middleware with vcs and http exceptions...
r682
http-backend: catch errors from HTTP calls that are not raising exceptions, but...
r1410 if response.status_code >= 400:
vcs-support: bulk of changes for python3
r5075 content_type = response.content_type
log.error('Call to %s returned non 200 HTTP code: %s [%s]',
url, response.status_code, content_type)
http-backend: catch errors from HTTP calls that are not raising exceptions, but...
r1410 raise exceptions.HttpVCSCommunicationError(repr(response.content))
http: better reporting of msgpack unpack errors.
r1110 try:
vcs-support: bulk of changes for python3
r5075 response = msgpack.unpackb(response.content)
http: better reporting of msgpack unpack errors.
r1110 except Exception:
remote-call: added retries for connection-reset-by-peer and optimize logging leaking too much info
r5033 log.exception('Failed to decode response from msgpack')
http: better reporting of msgpack unpack errors.
r1110 raise
project: added all source files and assets
r1 error = response.get('error')
if error:
type_ = error.get('type', 'Exception')
exc = exceptions_map.get(type_, Exception)
exc = exc(error.get('message'))
try:
exc._vcs_kind = error['_vcs_kind']
except KeyError:
pass
vcs-server: expose remote tracebacks from http backend using the Pyro4AwareFormatter.
r1257
try:
exc._vcs_server_traceback = error['traceback']
exceptions: show original remote traceback, and more info if debug is enabled.
r3369 exc._vcs_server_org_exc_name = error['org_exc']
exc._vcs_server_org_exc_tb = error['org_exc_tb']
vcs-server: expose remote tracebacks from http backend using the Pyro4AwareFormatter.
r1257 except KeyError:
pass
vcs-support: bulk of changes for python3
r5075 exc.add_note(attach_exc_details(error))
raise exc # raising the org exception from vcsserver
project: added all source files and assets
r1 return response.get('result')
vcs-support: bulk of changes for python3
r5075 def attach_exc_details(error):
note = '-- EXC NOTE -- :\n'
note += f'vcs_kind: {error.get("_vcs_kind")}\n'
note += f'org_exc: {error.get("_vcs_kind")}\n'
note += f'tb: {error.get("traceback")}\n'
note += '-- END EXC NOTE --'
return note
dan
file-nodes: added streaming remote attributes for vcsserver....
r3895 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
try:
vcs: added headers into stream call too
r4888 headers = {
'X-RC-Method': payload.get('method'),
'X-RC-Repo-Name': payload.get('_repo_name')
}
response = session.post(url, data=msgpack.packb(payload), headers=headers)
dan
file-nodes: added streaming remote attributes for vcsserver....
r3895 except pycurl.error as e:
remote-call: added retries for connection-reset-by-peer and optimize logging leaking too much info
r5033 error_code, error_message = e.args
libs: more python3 reformats
r5091 msg = f'{e}. \npycurl traceback: {traceback.format_exc()}'
dan
file-nodes: added streaming remote attributes for vcsserver....
r3895 raise exceptions.HttpVCSCommunicationError(msg)
except Exception as e:
message = getattr(e, 'message', '')
if 'Failed to connect' in message:
# gevent doesn't return proper pycurl errors
raise exceptions.HttpVCSCommunicationError(e)
else:
raise
if response.status_code >= 400:
log.error('Call to %s returned non 200 HTTP code: %s',
url, response.status_code)
raise exceptions.HttpVCSCommunicationError(repr(response.content))
return response.iter_content(chunk_size=chunk_size)
class ServiceConnection(object):
def __init__(self, server_and_port, backend_endpoint, session_factory):
chore(http-client): small refactor and use fstrings
r5260 self.url = urllib.parse.urljoin(f'http://{server_and_port}', backend_endpoint)
dan
file-nodes: added streaming remote attributes for vcsserver....
r3895 self._session_factory = session_factory
def __getattr__(self, name):
def f(*args, **kwargs):
return self._call(name, *args, **kwargs)
return f
@exceptions.map_vcs_exceptions
def _call(self, name, *args, **kwargs):
payload = {
'id': str(uuid.uuid4()),
'method': name,
'params': {'args': args, 'kwargs': kwargs}
}
return _remote_call(
self.url, payload, EXCEPTIONS_MAP, self._session_factory())
class RemoteVCSMaker(object):
def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
chore(http-client): small refactor and use fstrings
r5260 self.url = urllib.parse.urljoin(f'http://{server_and_port}', backend_endpoint)
self.stream_url = urllib.parse.urljoin(f'http://{server_and_port}', backend_endpoint+'/stream')
dan
file-nodes: added streaming remote attributes for vcsserver....
r3895
self._session_factory = session_factory
self.backend_type = backend_type
vcs: experimental local cache for methods
r4744 @classmethod
def init_cache_region(cls, repo_id):
libs: more python3 reformats
r5091 cache_namespace_uid = f'repo.{repo_id}'
vcs: experimental local cache for methods
r4744 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
return region, cache_namespace_uid
dan
file-nodes: added streaming remote attributes for vcsserver....
r3895 def __call__(self, path, repo_id, config, with_wire=None):
log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
def __getattr__(self, name):
def remote_attr(*args, **kwargs):
return self._call(name, *args, **kwargs)
return remote_attr
@exceptions.map_vcs_exceptions
def _call(self, func_name, *args, **kwargs):
payload = {
'id': str(uuid.uuid4()),
'method': func_name,
'backend': self.backend_type,
'params': {'args': args, 'kwargs': kwargs}
}
url = self.url
return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
class RemoteRepo(object):
CHUNK_SIZE = 16384
def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
self.url = remote_maker.url
self.stream_url = remote_maker.stream_url
self._session = remote_maker._session_factory()
caches: fix id sanitizer.
r4778
caches: fixes the file-store backend to use serialized names for filename cache of vcsserver....
r4777 cache_repo_id = self._repo_id_sanitizer(repo_id)
vcs-client: report more data about the call for better request tracking
r4887 _repo_name = self._get_repo_name(config, path)
vcs: experimental local cache for methods
r4744 self._cache_region, self._cache_namespace = \
caches: fix id sanitizer.
r4778 remote_maker.init_cache_region(cache_repo_id)
dan
file-nodes: added streaming remote attributes for vcsserver....
r3895
with_wire = with_wire or {}
repo_state_uid = with_wire.get('repo_state_uid') or 'state'
vcs-client: report more data about the call for better request tracking
r4887
dan
file-nodes: added streaming remote attributes for vcsserver....
r3895 self._wire = {
vcs-client: report more data about the call for better request tracking
r4887 "_repo_name": _repo_name,
dan
file-nodes: added streaming remote attributes for vcsserver....
r3895 "path": path, # repo path
"repo_id": repo_id,
caches: fixes the file-store backend to use serialized names for filename cache of vcsserver....
r4777 "cache_repo_id": cache_repo_id,
dan
file-nodes: added streaming remote attributes for vcsserver....
r3895 "config": config,
"repo_state_uid": repo_state_uid,
"context": self._create_vcs_cache_context(path, repo_state_uid)
}
if with_wire:
self._wire.update(with_wire)
# NOTE(johbo): Trading complexity for performance. Avoiding the call to
# log.debug brings a few percent gain even if is is not active.
if log.isEnabledFor(logging.DEBUG):
self._call_with_logging = True
self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
vcs-client: report more data about the call for better request tracking
r4887 def _get_repo_name(self, config, path):
repo_store = config.get('paths', '/')
return path.split(repo_store)[-1].lstrip('/')
caches: make sure we init caches on repo names withou '/' to not create a new cache subpath
r4766 def _repo_id_sanitizer(self, repo_id):
caches: updated cache sanitizer to remove any non-ascii chars
r4815 pathless = repo_id.replace('/', '__').replace('-', '_')
return ''.join(char if ord(char) < 128 else '_{}_'.format(ord(char)) for char in pathless)
caches: make sure we init caches on repo names withou '/' to not create a new cache subpath
r4766
dan
file-nodes: added streaming remote attributes for vcsserver....
r3895 def __getattr__(self, name):
if name.startswith('stream:'):
def repo_remote_attr(*args, **kwargs):
return self._call_stream(name, *args, **kwargs)
else:
def repo_remote_attr(*args, **kwargs):
return self._call(name, *args, **kwargs)
return repo_remote_attr
def _base_call(self, name, *args, **kwargs):
# TODO: oliver: This is currently necessary pre-call since the
# config object is being changed for hooking scenarios
wire = copy.deepcopy(self._wire)
wire["config"] = wire["config"].serialize()
wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
payload = {
'id': str(uuid.uuid4()),
'method': name,
vcs-client: report more data about the call for better request tracking
r4887 "_repo_name": wire['_repo_name'],
dan
file-nodes: added streaming remote attributes for vcsserver....
r3895 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
}
context_uid = wire.get('context')
return context_uid, payload
http_client: some reformatting
r4790 def get_local_cache(self, name, args):
vcs: experimental local cache for methods
r4744 cache_on = False
cache_key = ''
vcs-support: bulk of changes for python3
r5075 local_cache_on = rhodecode.ConfigGet().get_bool('vcs.methods.cache')
vcs-cache: enable more methods, and fix arguments calls
r4747
cache_methods = [
'branches', 'tags', 'bookmarks',
http_client: some reformatting
r4790 'is_large_file', 'is_binary',
'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
'node_history',
vcs-cache: enable more methods, and fix arguments calls
r4747 'revision', 'tree_items',
caches: cache ctx_description on local cache on
r4798 'ctx_list', 'ctx_branch', 'ctx_description',
vcs-cache: enable more methods, and fix arguments calls
r4747 'bulk_request',
caches: added path check to cached methods for faster git repo checks
r4828 'assert_correct_path'
vcs-cache: enable more methods, and fix arguments calls
r4747 ]
if local_cache_on and name in cache_methods:
vcs-call-cache: controll via hidden .ini flag
r4745 cache_on = True
vcs-cache: enable more methods, and fix arguments calls
r4747 repo_state_uid = self._wire['repo_state_uid']
call_args = [a for a in args]
cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
dan
file-nodes: added streaming remote attributes for vcsserver....
r3895
http_client: some reformatting
r4790 return cache_on, cache_key
@exceptions.map_vcs_exceptions
def _call(self, name, *args, **kwargs):
context_uid, payload = self._base_call(name, *args, **kwargs)
url = self.url
start = time.time()
cache_on, cache_key = self.get_local_cache(name, args)
vcs: experimental local cache for methods
r4744 @self._cache_region.conditional_cache_on_arguments(
namespace=self._cache_namespace, condition=cache_on and cache_key)
def remote_call(_cache_key):
if self._call_with_logging:
vcs-support: bulk of changes for python3
r5075 args_repr = f'ARG: {str(args):.512}|KW: {str(kwargs):.512}'
remote-call: added retries for connection-reset-by-peer and optimize logging leaking too much info
r5033 log.debug('Calling %s@%s with args:%r. wire_context: %s cache_on: %s',
url, name, args_repr, context_uid, cache_on)
vcs: experimental local cache for methods
r4744 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
result = remote_call(cache_key)
dan
file-nodes: added streaming remote attributes for vcsserver....
r3895 if self._call_with_logging:
log.debug('Call %s@%s took: %.4fs. wire_context: %s',
url, name, time.time()-start, context_uid)
return result
@exceptions.map_vcs_exceptions
def _call_stream(self, name, *args, **kwargs):
context_uid, payload = self._base_call(name, *args, **kwargs)
payload['chunk_size'] = self.CHUNK_SIZE
url = self.stream_url
start = time.time()
http_client: some reformatting
r4790 cache_on, cache_key = self.get_local_cache(name, args)
dan
file-nodes: added streaming remote attributes for vcsserver....
r3895
http_client: some reformatting
r4790 # Cache is a problem because this is a stream
def streaming_remote_call(_cache_key):
if self._call_with_logging:
vcs-support: bulk of changes for python3
r5075 args_repr = f'ARG: {str(args):.512}|KW: {str(kwargs):.512}'
remote-call: added retries for connection-reset-by-peer and optimize logging leaking too much info
r5033 log.debug('Calling %s@%s with args:%r. wire_context: %s cache_on: %s',
url, name, args_repr, context_uid, cache_on)
http_client: some reformatting
r4790 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
dan
file-nodes: added streaming remote attributes for vcsserver....
r3895
http_client: some reformatting
r4790 result = streaming_remote_call(cache_key)
dan
file-nodes: added streaming remote attributes for vcsserver....
r3895 if self._call_with_logging:
log.debug('Call %s@%s took: %.4fs. wire_context: %s',
url, name, time.time()-start, context_uid)
return result
def __getitem__(self, key):
return self.revision(key)
def _create_vcs_cache_context(self, *args):
"""
Creates a unique string which is passed to the VCSServer on every
remote call. It is used as cache key in the VCSServer.
"""
hash_key = '-'.join(map(str, args))
return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
def invalidate_vcs_cache(self):
"""
This invalidates the context which is sent to the VCSServer on every
call to a remote method. It forces the VCSServer to create a fresh
repository instance on the next call to a remote method.
"""
self._wire['context'] = str(uuid.uuid4())
project: added all source files and assets
r1 class VcsHttpProxy(object):
CHUNK_SIZE = 16384
def __init__(self, server_and_port, backend_endpoint):
http: ported old depracated retry format for http client.
r1958 retries = Retry(total=5, connect=None, read=None, redirect=None)
adapter = requests.adapters.HTTPAdapter(max_retries=retries)
python3: fixed urllib usage
r4950 self.base_url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
project: added all source files and assets
r1 self.session = requests.Session()
self.session.mount('http://', adapter)
def handle(self, environment, input_data, *args, **kwargs):
data = {
'environment': environment,
'input_data': input_data,
'args': args,
'kwargs': kwargs
}
result = self.session.post(
self.base_url, msgpack.packb(data), stream=True)
return self._get_result(result)
def _deserialize_and_raise(self, error):
exception = Exception(error['message'])
try:
exception._vcs_kind = error['_vcs_kind']
except KeyError:
pass
raise exception
def _iterate(self, result):
unpacker = msgpack.Unpacker()
for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
unpacker.feed(line)
libs: more python3 reformats
r5091 yield from unpacker
project: added all source files and assets
r1
def _get_result(self, result):
iterator = self._iterate(result)
python3: fixed usage of .next() and .func_name
r4936 error = next(iterator)
project: added all source files and assets
r1 if error:
self._deserialize_and_raise(error)
python3: fixed usage of .next() and .func_name
r4936 status = next(iterator)
headers = next(iterator)
project: added all source files and assets
r1
return iterator, status, headers
Martin Bornhold
threading: Add factory that creates curl session for each thread....
r243
class ThreadlocalSessionFactory(object):
"""
Creates one CurlSession per thread on demand.
"""
def __init__(self):
self._thread_local = threading.local()
def __call__(self):
if not hasattr(self._thread_local, 'curl_session'):
self._thread_local.curl_session = CurlSession()
return self._thread_local.curl_session