scm_app_http.py
192 lines
| 6.0 KiB
| text/x-python
|
PythonLexer
r5608 | # Copyright (C) 2014-2024 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
Implementation of the scm_app interface using raw HTTP communication. | ||||
""" | ||||
import base64 | ||||
import logging | ||||
r4919 | import urllib.parse | |||
r1 | import wsgiref.util | |||
import msgpack | ||||
import requests | ||||
import webob.request | ||||
import rhodecode | ||||
r5032 | from rhodecode.lib.middleware.utils import get_path_info | |||
r1 | ||||
log = logging.getLogger(__name__) | ||||
Martin Bornhold
|
r951 | def create_git_wsgi_app(repo_path, repo_name, config): | ||
r1 | url = _vcs_streaming_url() + 'git/' | |||
Martin Bornhold
|
r951 | return VcsHttpProxy(url, repo_path, repo_name, config) | ||
r1 | ||||
Martin Bornhold
|
r951 | def create_hg_wsgi_app(repo_path, repo_name, config): | ||
r1 | url = _vcs_streaming_url() + 'hg/' | |||
Martin Bornhold
|
r951 | return VcsHttpProxy(url, repo_path, repo_name, config) | ||
r1 | ||||
def _vcs_streaming_url(): | ||||
template = 'http://{}/stream/' | ||||
return template.format(rhodecode.CONFIG['vcs.server']) | ||||
# TODO: johbo: Avoid the global. | ||||
session = requests.Session() | ||||
# Requests speedup, avoid reading .netrc and similar | ||||
session.trust_env = False | ||||
r1550 | # prevent urllib3 spawning our logs. | |||
logging.getLogger("requests.packages.urllib3.connectionpool").setLevel( | ||||
logging.WARNING) | ||||
r1 | ||||
class VcsHttpProxy(object): | ||||
""" | ||||
A WSGI application which proxies vcs requests. | ||||
The goal is to shuffle the data around without touching it. The only | ||||
exception is the extra data from the config object which we send to the | ||||
server as well. | ||||
""" | ||||
Martin Bornhold
|
r951 | def __init__(self, url, repo_path, repo_name, config): | ||
r1 | """ | |||
:param str url: The URL of the VCSServer to call. | ||||
""" | ||||
self._url = url | ||||
self._repo_name = repo_name | ||||
self._repo_path = repo_path | ||||
self._config = config | ||||
r4352 | self.rc_extras = {} | |||
r1 | log.debug( | |||
"Creating VcsHttpProxy for repo %s, url %s", | ||||
repo_name, url) | ||||
def __call__(self, environ, start_response): | ||||
r5082 | config = self._config | |||
r1 | request = webob.request.Request(environ) | |||
request_headers = request.headers | ||||
r4352 | ||||
r5082 | call_context = { | |||
r1 | # TODO: johbo: Remove this, rely on URL path only | |||
r5082 | 'repo_name': self._repo_name, | |||
'repo_path': self._repo_path, | ||||
'path_info': get_path_info(environ), | ||||
'repo_store': self.rc_extras.get('repo_store'), | ||||
'server_config_file': self.rc_extras.get('config'), | ||||
r4352 | ||||
r5082 | 'auth_user': self.rc_extras.get('username'), | |||
'auth_user_id': str(self.rc_extras.get('user_id')), | ||||
'auth_user_ip': self.rc_extras.get('ip'), | ||||
r4352 | ||||
r5082 | 'repo_config': config, | |||
'locked_status_code': rhodecode.CONFIG.get('lock_ret_code'), | ||||
} | ||||
r4352 | ||||
r5082 | request_headers.update({ | |||
r1 | # TODO: johbo: Avoid encoding and put this into payload? | |||
r5082 | 'X_RC_VCS_STREAM_CALL_CONTEXT': base64.b64encode(msgpack.packb(call_context)) | |||
r1 | }) | |||
method = environ['REQUEST_METHOD'] | ||||
# Preserve the query string | ||||
url = self._url | ||||
r4950 | url = urllib.parse.urljoin(url, self._repo_name) | |||
r1 | if environ.get('QUERY_STRING'): | |||
url += '?' + environ['QUERY_STRING'] | ||||
r1983 | log.debug('http-app: preparing request to: %s', url) | |||
r1 | response = session.request( | |||
r1983 | method, | |||
url, | ||||
r1423 | data=_maybe_stream_request(environ), | |||
r1 | headers=request_headers, | |||
stream=True) | ||||
r1983 | log.debug('http-app: got vcsserver response: %s', response) | |||
r3093 | if response.status_code >= 500: | |||
log.error('Exception returned by vcsserver at: %s %s, %s', | ||||
url, response.status_code, response.content) | ||||
r1 | # Preserve the headers of the response, except hop_by_hop ones | |||
response_headers = [ | ||||
(h, v) for h, v in response.headers.items() | ||||
if not wsgiref.util.is_hop_by_hop(h) | ||||
] | ||||
r3093 | # Build status argument for start_response callable. | |||
Martin Bornhold
|
r977 | status = '{status_code} {reason_phrase}'.format( | ||
status_code=response.status_code, | ||||
reason_phrase=response.reason) | ||||
r1 | start_response(status, response_headers) | |||
r1423 | return _maybe_stream_response(response) | |||
r1 | ||||
r3327 | def read_in_chunks(stream_obj, block_size=1024, chunks=-1): | |||
""" | ||||
Read Stream in chunks, default chunk size: 1k. | ||||
""" | ||||
while chunks: | ||||
data = stream_obj.read(block_size) | ||||
if not data: | ||||
break | ||||
yield data | ||||
chunks -= 1 | ||||
r1643 | def _is_request_chunked(environ): | |||
stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked' | ||||
return stream | ||||
r1423 | def _maybe_stream_request(environ): | |||
r5032 | path = get_path_info(environ) | |||
r1566 | stream = _is_request_chunked(environ) | |||
r5553 | req_method = environ['REQUEST_METHOD'] | |||
log.debug('handling scm request: %s `%s` with stream support: %s', req_method, path, stream) | ||||
r1566 | ||||
if stream: | ||||
r3327 | # set stream by 256k | |||
return read_in_chunks(environ['wsgi.input'], block_size=1024 * 256) | ||||
r1423 | else: | |||
return environ['wsgi.input'].read() | ||||
def _maybe_stream_response(response): | ||||
r1 | """ | |||
Try to generate chunks from the response if it is chunked. | ||||
""" | ||||
r1566 | stream = _is_chunked(response) | |||
log.debug('returning response with stream: %s', stream) | ||||
if stream: | ||||
r3327 | # read in 256k Chunks | |||
return response.raw.read_chunked(amt=1024 * 256) | ||||
r1 | else: | |||
return [response.content] | ||||
def _is_chunked(response): | ||||
return response.headers.get('Transfer-Encoding', '') == 'chunked' | ||||