##// END OF EJS Templates
vcs: streaming will use now 100kb chunks readers for faster throughput
marcink -
r3327:ddd720d0 default
parent child Browse files
Show More
@@ -1,165 +1,179 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2014-2018 RhodeCode GmbH
3 # Copyright (C) 2014-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Implementation of the scm_app interface using raw HTTP communication.
22 Implementation of the scm_app interface using raw HTTP communication.
23 """
23 """
24
24
25 import base64
25 import base64
26 import logging
26 import logging
27 import urlparse
27 import urlparse
28 import wsgiref.util
28 import wsgiref.util
29
29
30 import msgpack
30 import msgpack
31 import requests
31 import requests
32 import webob.request
32 import webob.request
33
33
34 import rhodecode
34 import rhodecode
35
35
36
36
37 log = logging.getLogger(__name__)
37 log = logging.getLogger(__name__)
38
38
39
39
40 def create_git_wsgi_app(repo_path, repo_name, config):
40 def create_git_wsgi_app(repo_path, repo_name, config):
41 url = _vcs_streaming_url() + 'git/'
41 url = _vcs_streaming_url() + 'git/'
42 return VcsHttpProxy(url, repo_path, repo_name, config)
42 return VcsHttpProxy(url, repo_path, repo_name, config)
43
43
44
44
45 def create_hg_wsgi_app(repo_path, repo_name, config):
45 def create_hg_wsgi_app(repo_path, repo_name, config):
46 url = _vcs_streaming_url() + 'hg/'
46 url = _vcs_streaming_url() + 'hg/'
47 return VcsHttpProxy(url, repo_path, repo_name, config)
47 return VcsHttpProxy(url, repo_path, repo_name, config)
48
48
49
49
50 def _vcs_streaming_url():
50 def _vcs_streaming_url():
51 template = 'http://{}/stream/'
51 template = 'http://{}/stream/'
52 return template.format(rhodecode.CONFIG['vcs.server'])
52 return template.format(rhodecode.CONFIG['vcs.server'])
53
53
54
54
55 # TODO: johbo: Avoid the global.
55 # TODO: johbo: Avoid the global.
56 session = requests.Session()
56 session = requests.Session()
57 # Requests speedup, avoid reading .netrc and similar
57 # Requests speedup, avoid reading .netrc and similar
58 session.trust_env = False
58 session.trust_env = False
59
59
60 # prevent urllib3 spawning our logs.
60 # prevent urllib3 spawning our logs.
61 logging.getLogger("requests.packages.urllib3.connectionpool").setLevel(
61 logging.getLogger("requests.packages.urllib3.connectionpool").setLevel(
62 logging.WARNING)
62 logging.WARNING)
63
63
64
64
65 class VcsHttpProxy(object):
65 class VcsHttpProxy(object):
66 """
66 """
67 A WSGI application which proxies vcs requests.
67 A WSGI application which proxies vcs requests.
68
68
69 The goal is to shuffle the data around without touching it. The only
69 The goal is to shuffle the data around without touching it. The only
70 exception is the extra data from the config object which we send to the
70 exception is the extra data from the config object which we send to the
71 server as well.
71 server as well.
72 """
72 """
73
73
74 def __init__(self, url, repo_path, repo_name, config):
74 def __init__(self, url, repo_path, repo_name, config):
75 """
75 """
76 :param str url: The URL of the VCSServer to call.
76 :param str url: The URL of the VCSServer to call.
77 """
77 """
78 self._url = url
78 self._url = url
79 self._repo_name = repo_name
79 self._repo_name = repo_name
80 self._repo_path = repo_path
80 self._repo_path = repo_path
81 self._config = config
81 self._config = config
82 log.debug(
82 log.debug(
83 "Creating VcsHttpProxy for repo %s, url %s",
83 "Creating VcsHttpProxy for repo %s, url %s",
84 repo_name, url)
84 repo_name, url)
85
85
86 def __call__(self, environ, start_response):
86 def __call__(self, environ, start_response):
87 config = msgpack.packb(self._config)
87 config = msgpack.packb(self._config)
88 request = webob.request.Request(environ)
88 request = webob.request.Request(environ)
89 request_headers = request.headers
89 request_headers = request.headers
90 request_headers.update({
90 request_headers.update({
91 # TODO: johbo: Remove this, rely on URL path only
91 # TODO: johbo: Remove this, rely on URL path only
92 'X-RC-Repo-Name': self._repo_name,
92 'X-RC-Repo-Name': self._repo_name,
93 'X-RC-Repo-Path': self._repo_path,
93 'X-RC-Repo-Path': self._repo_path,
94 'X-RC-Path-Info': environ['PATH_INFO'],
94 'X-RC-Path-Info': environ['PATH_INFO'],
95 # TODO: johbo: Avoid encoding and put this into payload?
95 # TODO: johbo: Avoid encoding and put this into payload?
96 'X-RC-Repo-Config': base64.b64encode(config),
96 'X-RC-Repo-Config': base64.b64encode(config),
97 'X-RC-Locked-Status-Code': rhodecode.CONFIG.get('lock_ret_code')
97 'X-RC-Locked-Status-Code': rhodecode.CONFIG.get('lock_ret_code')
98 })
98 })
99
99
100 method = environ['REQUEST_METHOD']
100 method = environ['REQUEST_METHOD']
101
101
102 # Preserve the query string
102 # Preserve the query string
103 url = self._url
103 url = self._url
104 url = urlparse.urljoin(url, self._repo_name)
104 url = urlparse.urljoin(url, self._repo_name)
105 if environ.get('QUERY_STRING'):
105 if environ.get('QUERY_STRING'):
106 url += '?' + environ['QUERY_STRING']
106 url += '?' + environ['QUERY_STRING']
107
107
108 log.debug('http-app: preparing request to: %s', url)
108 log.debug('http-app: preparing request to: %s', url)
109 response = session.request(
109 response = session.request(
110 method,
110 method,
111 url,
111 url,
112 data=_maybe_stream_request(environ),
112 data=_maybe_stream_request(environ),
113 headers=request_headers,
113 headers=request_headers,
114 stream=True)
114 stream=True)
115
115
116 log.debug('http-app: got vcsserver response: %s', response)
116 log.debug('http-app: got vcsserver response: %s', response)
117 if response.status_code >= 500:
117 if response.status_code >= 500:
118 log.error('Exception returned by vcsserver at: %s %s, %s',
118 log.error('Exception returned by vcsserver at: %s %s, %s',
119 url, response.status_code, response.content)
119 url, response.status_code, response.content)
120
120
121 # Preserve the headers of the response, except hop_by_hop ones
121 # Preserve the headers of the response, except hop_by_hop ones
122 response_headers = [
122 response_headers = [
123 (h, v) for h, v in response.headers.items()
123 (h, v) for h, v in response.headers.items()
124 if not wsgiref.util.is_hop_by_hop(h)
124 if not wsgiref.util.is_hop_by_hop(h)
125 ]
125 ]
126
126
127 # Build status argument for start_response callable.
127 # Build status argument for start_response callable.
128 status = '{status_code} {reason_phrase}'.format(
128 status = '{status_code} {reason_phrase}'.format(
129 status_code=response.status_code,
129 status_code=response.status_code,
130 reason_phrase=response.reason)
130 reason_phrase=response.reason)
131
131
132 start_response(status, response_headers)
132 start_response(status, response_headers)
133 return _maybe_stream_response(response)
133 return _maybe_stream_response(response)
134
134
135
135
136 def read_in_chunks(stream_obj, block_size=1024, chunks=-1):
137 """
138 Read Stream in chunks, default chunk size: 1k.
139 """
140 while chunks:
141 data = stream_obj.read(block_size)
142 if not data:
143 break
144 yield data
145 chunks -= 1
146
147
136 def _is_request_chunked(environ):
148 def _is_request_chunked(environ):
137 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
149 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
138 return stream
150 return stream
139
151
140
152
141 def _maybe_stream_request(environ):
153 def _maybe_stream_request(environ):
142 path = environ['PATH_INFO']
154 path = environ['PATH_INFO']
143 stream = _is_request_chunked(environ)
155 stream = _is_request_chunked(environ)
144 log.debug('handling request `%s` with stream support: %s', path, stream)
156 log.debug('handling request `%s` with stream support: %s', path, stream)
145
157
146 if stream:
158 if stream:
147 return environ['wsgi.input']
159 # set stream by 256k
160 return read_in_chunks(environ['wsgi.input'], block_size=1024 * 256)
148 else:
161 else:
149 return environ['wsgi.input'].read()
162 return environ['wsgi.input'].read()
150
163
151
164
152 def _maybe_stream_response(response):
165 def _maybe_stream_response(response):
153 """
166 """
154 Try to generate chunks from the response if it is chunked.
167 Try to generate chunks from the response if it is chunked.
155 """
168 """
156 stream = _is_chunked(response)
169 stream = _is_chunked(response)
157 log.debug('returning response with stream: %s', stream)
170 log.debug('returning response with stream: %s', stream)
158 if stream:
171 if stream:
159 return response.raw.read_chunked()
172 # read in 256k Chunks
173 return response.raw.read_chunked(amt=1024 * 256)
160 else:
174 else:
161 return [response.content]
175 return [response.content]
162
176
163
177
164 def _is_chunked(response):
178 def _is_chunked(response):
165 return response.headers.get('Transfer-Encoding', '') == 'chunked'
179 return response.headers.get('Transfer-Encoding', '') == 'chunked'
General Comments 0
You need to be logged in to leave comments. Login now