##// END OF EJS Templates
vcs: streaming will use now 100kb chunks readers for faster throughput
marcink -
r3327:ddd720d0 default
parent child Browse files
Show More
@@ -1,165 +1,179 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2014-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Implementation of the scm_app interface using raw HTTP communication.
23 23 """
24 24
25 25 import base64
26 26 import logging
27 27 import urlparse
28 28 import wsgiref.util
29 29
30 30 import msgpack
31 31 import requests
32 32 import webob.request
33 33
34 34 import rhodecode
35 35
36 36
37 37 log = logging.getLogger(__name__)
38 38
39 39
40 40 def create_git_wsgi_app(repo_path, repo_name, config):
41 41 url = _vcs_streaming_url() + 'git/'
42 42 return VcsHttpProxy(url, repo_path, repo_name, config)
43 43
44 44
45 45 def create_hg_wsgi_app(repo_path, repo_name, config):
46 46 url = _vcs_streaming_url() + 'hg/'
47 47 return VcsHttpProxy(url, repo_path, repo_name, config)
48 48
49 49
50 50 def _vcs_streaming_url():
51 51 template = 'http://{}/stream/'
52 52 return template.format(rhodecode.CONFIG['vcs.server'])
53 53
54 54
55 55 # TODO: johbo: Avoid the global.
56 56 session = requests.Session()
57 57 # Requests speedup, avoid reading .netrc and similar
58 58 session.trust_env = False
59 59
60 60 # prevent urllib3 spawning our logs.
61 61 logging.getLogger("requests.packages.urllib3.connectionpool").setLevel(
62 62 logging.WARNING)
63 63
64 64
65 65 class VcsHttpProxy(object):
66 66 """
67 67 A WSGI application which proxies vcs requests.
68 68
69 69 The goal is to shuffle the data around without touching it. The only
70 70 exception is the extra data from the config object which we send to the
71 71 server as well.
72 72 """
73 73
74 74 def __init__(self, url, repo_path, repo_name, config):
75 75 """
76 76 :param str url: The URL of the VCSServer to call.
77 77 """
78 78 self._url = url
79 79 self._repo_name = repo_name
80 80 self._repo_path = repo_path
81 81 self._config = config
82 82 log.debug(
83 83 "Creating VcsHttpProxy for repo %s, url %s",
84 84 repo_name, url)
85 85
86 86 def __call__(self, environ, start_response):
87 87 config = msgpack.packb(self._config)
88 88 request = webob.request.Request(environ)
89 89 request_headers = request.headers
90 90 request_headers.update({
91 91 # TODO: johbo: Remove this, rely on URL path only
92 92 'X-RC-Repo-Name': self._repo_name,
93 93 'X-RC-Repo-Path': self._repo_path,
94 94 'X-RC-Path-Info': environ['PATH_INFO'],
95 95 # TODO: johbo: Avoid encoding and put this into payload?
96 96 'X-RC-Repo-Config': base64.b64encode(config),
97 97 'X-RC-Locked-Status-Code': rhodecode.CONFIG.get('lock_ret_code')
98 98 })
99 99
100 100 method = environ['REQUEST_METHOD']
101 101
102 102 # Preserve the query string
103 103 url = self._url
104 104 url = urlparse.urljoin(url, self._repo_name)
105 105 if environ.get('QUERY_STRING'):
106 106 url += '?' + environ['QUERY_STRING']
107 107
108 108 log.debug('http-app: preparing request to: %s', url)
109 109 response = session.request(
110 110 method,
111 111 url,
112 112 data=_maybe_stream_request(environ),
113 113 headers=request_headers,
114 114 stream=True)
115 115
116 116 log.debug('http-app: got vcsserver response: %s', response)
117 117 if response.status_code >= 500:
118 118 log.error('Exception returned by vcsserver at: %s %s, %s',
119 119 url, response.status_code, response.content)
120 120
121 121 # Preserve the headers of the response, except hop_by_hop ones
122 122 response_headers = [
123 123 (h, v) for h, v in response.headers.items()
124 124 if not wsgiref.util.is_hop_by_hop(h)
125 125 ]
126 126
127 127 # Build status argument for start_response callable.
128 128 status = '{status_code} {reason_phrase}'.format(
129 129 status_code=response.status_code,
130 130 reason_phrase=response.reason)
131 131
132 132 start_response(status, response_headers)
133 133 return _maybe_stream_response(response)
134 134
135 135
136 def read_in_chunks(stream_obj, block_size=1024, chunks=-1):
137 """
138 Read Stream in chunks, default chunk size: 1k.
139 """
140 while chunks:
141 data = stream_obj.read(block_size)
142 if not data:
143 break
144 yield data
145 chunks -= 1
146
147
136 148 def _is_request_chunked(environ):
137 149 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
138 150 return stream
139 151
140 152
141 153 def _maybe_stream_request(environ):
142 154 path = environ['PATH_INFO']
143 155 stream = _is_request_chunked(environ)
144 156 log.debug('handling request `%s` with stream support: %s', path, stream)
145 157
146 158 if stream:
147 return environ['wsgi.input']
159 # set stream by 256k
160 return read_in_chunks(environ['wsgi.input'], block_size=1024 * 256)
148 161 else:
149 162 return environ['wsgi.input'].read()
150 163
151 164
152 165 def _maybe_stream_response(response):
153 166 """
154 167 Try to generate chunks from the response if it is chunked.
155 168 """
156 169 stream = _is_chunked(response)
157 170 log.debug('returning response with stream: %s', stream)
158 171 if stream:
159 return response.raw.read_chunked()
172 # read in 256k Chunks
173 return response.raw.read_chunked(amt=1024 * 256)
160 174 else:
161 175 return [response.content]
162 176
163 177
164 178 def _is_chunked(response):
165 179 return response.headers.get('Transfer-Encoding', '') == 'chunked'
General Comments 0
You need to be logged in to leave comments. Login now