##// END OF EJS Templates
scm_app: add more debug info when unhandled errors happen on vcsserver.
marcink -
r3093:e2f600bd default
parent child Browse files
Show More
@@ -1,330 +1,332 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2018 RhodeCode GmbH
3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
21 import os
22 import time
22 import time
23 import logging
23 import logging
24 import tempfile
24 import tempfile
25 import traceback
25 import traceback
26 import threading
26 import threading
27
27
28 from BaseHTTPServer import BaseHTTPRequestHandler
28 from BaseHTTPServer import BaseHTTPRequestHandler
29 from SocketServer import TCPServer
29 from SocketServer import TCPServer
30
30
31 import rhodecode
31 import rhodecode
32 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
32 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
33 from rhodecode.model import meta
33 from rhodecode.model import meta
34 from rhodecode.lib.base import bootstrap_request, bootstrap_config
34 from rhodecode.lib.base import bootstrap_request, bootstrap_config
35 from rhodecode.lib import hooks_base
35 from rhodecode.lib import hooks_base
36 from rhodecode.lib.utils2 import AttributeDict
36 from rhodecode.lib.utils2 import AttributeDict
37 from rhodecode.lib.ext_json import json
37 from rhodecode.lib.ext_json import json
38 from rhodecode.lib import rc_cache
38 from rhodecode.lib import rc_cache
39
39
40 log = logging.getLogger(__name__)
40 log = logging.getLogger(__name__)
41
41
42
42
43 class HooksHttpHandler(BaseHTTPRequestHandler):
43 class HooksHttpHandler(BaseHTTPRequestHandler):
44
44
45 def do_POST(self):
45 def do_POST(self):
46 method, extras = self._read_request()
46 method, extras = self._read_request()
47 txn_id = getattr(self.server, 'txn_id', None)
47 txn_id = getattr(self.server, 'txn_id', None)
48 if txn_id:
48 if txn_id:
49 log.debug('Computing TXN_ID based on `%s`:`%s`',
49 log.debug('Computing TXN_ID based on `%s`:`%s`',
50 extras['repository'], extras['txn_id'])
50 extras['repository'], extras['txn_id'])
51 computed_txn_id = rc_cache.utils.compute_key_from_params(
51 computed_txn_id = rc_cache.utils.compute_key_from_params(
52 extras['repository'], extras['txn_id'])
52 extras['repository'], extras['txn_id'])
53 if txn_id != computed_txn_id:
53 if txn_id != computed_txn_id:
54 raise Exception(
54 raise Exception(
55 'TXN ID fail: expected {} got {} instead'.format(
55 'TXN ID fail: expected {} got {} instead'.format(
56 txn_id, computed_txn_id))
56 txn_id, computed_txn_id))
57
57
58 try:
58 try:
59 result = self._call_hook(method, extras)
59 result = self._call_hook(method, extras)
60 except Exception as e:
60 except Exception as e:
61 exc_tb = traceback.format_exc()
61 exc_tb = traceback.format_exc()
62 result = {
62 result = {
63 'exception': e.__class__.__name__,
63 'exception': e.__class__.__name__,
64 'exception_traceback': exc_tb,
64 'exception_traceback': exc_tb,
65 'exception_args': e.args
65 'exception_args': e.args
66 }
66 }
67 self._write_response(result)
67 self._write_response(result)
68
68
69 def _read_request(self):
69 def _read_request(self):
70 length = int(self.headers['Content-Length'])
70 length = int(self.headers['Content-Length'])
71 body = self.rfile.read(length).decode('utf-8')
71 body = self.rfile.read(length).decode('utf-8')
72 data = json.loads(body)
72 data = json.loads(body)
73 return data['method'], data['extras']
73 return data['method'], data['extras']
74
74
75 def _write_response(self, result):
75 def _write_response(self, result):
76 self.send_response(200)
76 self.send_response(200)
77 self.send_header("Content-type", "text/json")
77 self.send_header("Content-type", "text/json")
78 self.end_headers()
78 self.end_headers()
79 self.wfile.write(json.dumps(result))
79 self.wfile.write(json.dumps(result))
80
80
81 def _call_hook(self, method, extras):
81 def _call_hook(self, method, extras):
82 hooks = Hooks()
82 hooks = Hooks()
83 try:
83 try:
84 result = getattr(hooks, method)(extras)
84 result = getattr(hooks, method)(extras)
85 finally:
85 finally:
86 meta.Session.remove()
86 meta.Session.remove()
87 return result
87 return result
88
88
89 def log_message(self, format, *args):
89 def log_message(self, format, *args):
90 """
90 """
91 This is an overridden method of BaseHTTPRequestHandler which logs using
91 This is an overridden method of BaseHTTPRequestHandler which logs using
92 logging library instead of writing directly to stderr.
92 logging library instead of writing directly to stderr.
93 """
93 """
94
94
95 message = format % args
95 message = format % args
96
96
97 log.debug(
97 log.debug(
98 "%s - - [%s] %s", self.client_address[0],
98 "%s - - [%s] %s", self.client_address[0],
99 self.log_date_time_string(), message)
99 self.log_date_time_string(), message)
100
100
101
101
102 class DummyHooksCallbackDaemon(object):
102 class DummyHooksCallbackDaemon(object):
103 hooks_uri = ''
103 hooks_uri = ''
104
104
105 def __init__(self):
105 def __init__(self):
106 self.hooks_module = Hooks.__module__
106 self.hooks_module = Hooks.__module__
107
107
108 def __enter__(self):
108 def __enter__(self):
109 log.debug('Running dummy hooks callback daemon')
109 log.debug('Running dummy hooks callback daemon')
110 return self
110 return self
111
111
112 def __exit__(self, exc_type, exc_val, exc_tb):
112 def __exit__(self, exc_type, exc_val, exc_tb):
113 log.debug('Exiting dummy hooks callback daemon')
113 log.debug('Exiting dummy hooks callback daemon')
114
114
115
115
116 class ThreadedHookCallbackDaemon(object):
116 class ThreadedHookCallbackDaemon(object):
117
117
118 _callback_thread = None
118 _callback_thread = None
119 _daemon = None
119 _daemon = None
120 _done = False
120 _done = False
121
121
122 def __init__(self, txn_id=None, host=None, port=None):
122 def __init__(self, txn_id=None, host=None, port=None):
123 self._prepare(txn_id=txn_id, host=None, port=port)
123 self._prepare(txn_id=txn_id, host=None, port=port)
124
124
125 def __enter__(self):
125 def __enter__(self):
126 self._run()
126 self._run()
127 return self
127 return self
128
128
129 def __exit__(self, exc_type, exc_val, exc_tb):
129 def __exit__(self, exc_type, exc_val, exc_tb):
130 log.debug('Callback daemon exiting now...')
130 log.debug('Callback daemon exiting now...')
131 self._stop()
131 self._stop()
132
132
133 def _prepare(self, txn_id=None, host=None, port=None):
133 def _prepare(self, txn_id=None, host=None, port=None):
134 raise NotImplementedError()
134 raise NotImplementedError()
135
135
136 def _run(self):
136 def _run(self):
137 raise NotImplementedError()
137 raise NotImplementedError()
138
138
139 def _stop(self):
139 def _stop(self):
140 raise NotImplementedError()
140 raise NotImplementedError()
141
141
142
142
143 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
143 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
144 """
144 """
145 Context manager which will run a callback daemon in a background thread.
145 Context manager which will run a callback daemon in a background thread.
146 """
146 """
147
147
148 hooks_uri = None
148 hooks_uri = None
149
149
150 # From Python docs: Polling reduces our responsiveness to a shutdown
150 # From Python docs: Polling reduces our responsiveness to a shutdown
151 # request and wastes cpu at all other times.
151 # request and wastes cpu at all other times.
152 POLL_INTERVAL = 0.01
152 POLL_INTERVAL = 0.01
153
153
154 def _prepare(self, txn_id=None, host=None, port=None):
154 def _prepare(self, txn_id=None, host=None, port=None):
155 host = host or '127.0.0.1'
155 host = host or '127.0.0.1'
156 self._done = False
156 self._done = False
157 self._daemon = TCPServer((host, port or 0), HooksHttpHandler)
157 self._daemon = TCPServer((host, port or 0), HooksHttpHandler)
158 _, port = self._daemon.server_address
158 _, port = self._daemon.server_address
159 self.hooks_uri = '{}:{}'.format(host, port)
159 self.hooks_uri = '{}:{}'.format(host, port)
160 self.txn_id = txn_id
160 self.txn_id = txn_id
161 # inject transaction_id for later verification
161 # inject transaction_id for later verification
162 self._daemon.txn_id = self.txn_id
162 self._daemon.txn_id = self.txn_id
163
163
164 log.debug(
164 log.debug(
165 "Preparing HTTP callback daemon at `%s` and registering hook object",
165 "Preparing HTTP callback daemon at `%s` and registering hook object",
166 self.hooks_uri)
166 self.hooks_uri)
167
167
168 def _run(self):
168 def _run(self):
169 log.debug("Running event loop of callback daemon in background thread")
169 log.debug("Running event loop of callback daemon in background thread")
170 callback_thread = threading.Thread(
170 callback_thread = threading.Thread(
171 target=self._daemon.serve_forever,
171 target=self._daemon.serve_forever,
172 kwargs={'poll_interval': self.POLL_INTERVAL})
172 kwargs={'poll_interval': self.POLL_INTERVAL})
173 callback_thread.daemon = True
173 callback_thread.daemon = True
174 callback_thread.start()
174 callback_thread.start()
175 self._callback_thread = callback_thread
175 self._callback_thread = callback_thread
176
176
177 def _stop(self):
177 def _stop(self):
178 log.debug("Waiting for background thread to finish.")
178 log.debug("Waiting for background thread to finish.")
179 self._daemon.shutdown()
179 self._daemon.shutdown()
180 self._callback_thread.join()
180 self._callback_thread.join()
181 self._daemon = None
181 self._daemon = None
182 self._callback_thread = None
182 self._callback_thread = None
183 if self.txn_id:
183 if self.txn_id:
184 txn_id_file = get_txn_id_data_path(self.txn_id)
184 txn_id_file = get_txn_id_data_path(self.txn_id)
185 log.debug('Cleaning up TXN ID %s', txn_id_file)
185 log.debug('Cleaning up TXN ID %s', txn_id_file)
186 if os.path.isfile(txn_id_file):
186 if os.path.isfile(txn_id_file):
187 os.remove(txn_id_file)
187 os.remove(txn_id_file)
188
188
189 log.debug("Background thread done.")
189 log.debug("Background thread done.")
190
190
191
191
192 def get_txn_id_data_path(txn_id):
192 def get_txn_id_data_path(txn_id):
193 import rhodecode
193 import rhodecode
194
194
195 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
195 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
196 final_dir = os.path.join(root, 'svn_txn_id')
196 final_dir = os.path.join(root, 'svn_txn_id')
197
197
198 if not os.path.isdir(final_dir):
198 if not os.path.isdir(final_dir):
199 os.makedirs(final_dir)
199 os.makedirs(final_dir)
200 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
200 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
201
201
202
202
203 def store_txn_id_data(txn_id, data_dict):
203 def store_txn_id_data(txn_id, data_dict):
204 if not txn_id:
204 if not txn_id:
205 log.warning('Cannot store txn_id because it is empty')
205 log.warning('Cannot store txn_id because it is empty')
206 return
206 return
207
207
208 path = get_txn_id_data_path(txn_id)
208 path = get_txn_id_data_path(txn_id)
209 try:
209 try:
210 with open(path, 'wb') as f:
210 with open(path, 'wb') as f:
211 f.write(json.dumps(data_dict))
211 f.write(json.dumps(data_dict))
212 except Exception:
212 except Exception:
213 log.exception('Failed to write txn_id metadata')
213 log.exception('Failed to write txn_id metadata')
214
214
215
215
216 def get_txn_id_from_store(txn_id):
216 def get_txn_id_from_store(txn_id):
217 """
217 """
218 Reads txn_id from store and if present returns the data for callback manager
218 Reads txn_id from store and if present returns the data for callback manager
219 """
219 """
220 path = get_txn_id_data_path(txn_id)
220 path = get_txn_id_data_path(txn_id)
221 try:
221 try:
222 with open(path, 'rb') as f:
222 with open(path, 'rb') as f:
223 return json.loads(f.read())
223 return json.loads(f.read())
224 except Exception:
224 except Exception:
225 return {}
225 return {}
226
226
227
227
228 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
228 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
229 txn_details = get_txn_id_from_store(txn_id)
229 txn_details = get_txn_id_from_store(txn_id)
230 port = txn_details.get('port', 0)
230 port = txn_details.get('port', 0)
231 if use_direct_calls:
231 if use_direct_calls:
232 callback_daemon = DummyHooksCallbackDaemon()
232 callback_daemon = DummyHooksCallbackDaemon()
233 extras['hooks_module'] = callback_daemon.hooks_module
233 extras['hooks_module'] = callback_daemon.hooks_module
234 else:
234 else:
235 if protocol == 'http':
235 if protocol == 'http':
236 callback_daemon = HttpHooksCallbackDaemon(
236 callback_daemon = HttpHooksCallbackDaemon(
237 txn_id=txn_id, host=host, port=port)
237 txn_id=txn_id, host=host, port=port)
238 else:
238 else:
239 log.error('Unsupported callback daemon protocol "%s"', protocol)
239 log.error('Unsupported callback daemon protocol "%s"', protocol)
240 raise Exception('Unsupported callback daemon protocol.')
240 raise Exception('Unsupported callback daemon protocol.')
241
241
242 extras['hooks_uri'] = callback_daemon.hooks_uri
242 extras['hooks_uri'] = callback_daemon.hooks_uri
243 extras['hooks_protocol'] = protocol
243 extras['hooks_protocol'] = protocol
244 extras['time'] = time.time()
244 extras['time'] = time.time()
245
245
246 # register txn_id
246 # register txn_id
247 extras['txn_id'] = txn_id
247 extras['txn_id'] = txn_id
248
249 log.debug('Prepared a callback daemon: %s at url `%s`',
248 log.debug('Prepared a callback daemon: %s at url `%s`',
250 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
249 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
251 return callback_daemon, extras
250 return callback_daemon, extras
252
251
253
252
254 class Hooks(object):
253 class Hooks(object):
255 """
254 """
256 Exposes the hooks for remote call backs
255 Exposes the hooks for remote call backs
257 """
256 """
258
257
259 def repo_size(self, extras):
258 def repo_size(self, extras):
260 log.debug("Called repo_size of %s object", self)
259 log.debug("Called repo_size of %s object", self)
261 return self._call_hook(hooks_base.repo_size, extras)
260 return self._call_hook(hooks_base.repo_size, extras)
262
261
263 def pre_pull(self, extras):
262 def pre_pull(self, extras):
264 log.debug("Called pre_pull of %s object", self)
263 log.debug("Called pre_pull of %s object", self)
265 return self._call_hook(hooks_base.pre_pull, extras)
264 return self._call_hook(hooks_base.pre_pull, extras)
266
265
267 def post_pull(self, extras):
266 def post_pull(self, extras):
268 log.debug("Called post_pull of %s object", self)
267 log.debug("Called post_pull of %s object", self)
269 return self._call_hook(hooks_base.post_pull, extras)
268 return self._call_hook(hooks_base.post_pull, extras)
270
269
271 def pre_push(self, extras):
270 def pre_push(self, extras):
272 log.debug("Called pre_push of %s object", self)
271 log.debug("Called pre_push of %s object", self)
273 return self._call_hook(hooks_base.pre_push, extras)
272 return self._call_hook(hooks_base.pre_push, extras)
274
273
275 def post_push(self, extras):
274 def post_push(self, extras):
276 log.debug("Called post_push of %s object", self)
275 log.debug("Called post_push of %s object", self)
277 return self._call_hook(hooks_base.post_push, extras)
276 return self._call_hook(hooks_base.post_push, extras)
278
277
279 def _call_hook(self, hook, extras):
278 def _call_hook(self, hook, extras):
280 extras = AttributeDict(extras)
279 extras = AttributeDict(extras)
281 server_url = extras['server_url']
280 server_url = extras['server_url']
282 request = bootstrap_request(application_url=server_url)
281 request = bootstrap_request(application_url=server_url)
283
282
284 bootstrap_config(request) # inject routes and other interfaces
283 bootstrap_config(request) # inject routes and other interfaces
285
284
286 # inject the user for usage in hooks
285 # inject the user for usage in hooks
287 request.user = AttributeDict({'username': extras.username,
286 request.user = AttributeDict({'username': extras.username,
288 'ip_addr': extras.ip,
287 'ip_addr': extras.ip,
289 'user_id': extras.user_id})
288 'user_id': extras.user_id})
290
289
291 extras.request = request
290 extras.request = request
292
291
293 try:
292 try:
294 result = hook(extras)
293 result = hook(extras)
294 if result is None:
295 raise Exception(
296 'Failed to obtain hook result from func: {}'.format(hook))
295 except HTTPBranchProtected as handled_error:
297 except HTTPBranchProtected as handled_error:
296 # Those special cases doesn't need error reporting. It's a case of
298 # Those special cases doesn't need error reporting. It's a case of
297 # locked repo or protected branch
299 # locked repo or protected branch
298 result = AttributeDict({
300 result = AttributeDict({
299 'status': handled_error.code,
301 'status': handled_error.code,
300 'output': handled_error.explanation
302 'output': handled_error.explanation
301 })
303 })
302 except (HTTPLockedRC, Exception) as error:
304 except (HTTPLockedRC, Exception) as error:
303 # locked needs different handling since we need to also
305 # locked needs different handling since we need to also
304 # handle PULL operations
306 # handle PULL operations
305 exc_tb = ''
307 exc_tb = ''
306 if not isinstance(error, HTTPLockedRC):
308 if not isinstance(error, HTTPLockedRC):
307 exc_tb = traceback.format_exc()
309 exc_tb = traceback.format_exc()
308 log.exception('Exception when handling hook %s', hook)
310 log.exception('Exception when handling hook %s', hook)
309 error_args = error.args
311 error_args = error.args
310 return {
312 return {
311 'status': 128,
313 'status': 128,
312 'output': '',
314 'output': '',
313 'exception': type(error).__name__,
315 'exception': type(error).__name__,
314 'exception_traceback': exc_tb,
316 'exception_traceback': exc_tb,
315 'exception_args': error_args,
317 'exception_args': error_args,
316 }
318 }
317 finally:
319 finally:
318 meta.Session.remove()
320 meta.Session.remove()
319
321
320 log.debug('Got hook call response %s', result)
322 log.debug('Got hook call response %s', result)
321 return {
323 return {
322 'status': result.status,
324 'status': result.status,
323 'output': result.output,
325 'output': result.output,
324 }
326 }
325
327
326 def __enter__(self):
328 def __enter__(self):
327 return self
329 return self
328
330
329 def __exit__(self, exc_type, exc_val, exc_tb):
331 def __exit__(self, exc_type, exc_val, exc_tb):
330 pass
332 pass
@@ -1,161 +1,165 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2014-2018 RhodeCode GmbH
3 # Copyright (C) 2014-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Implementation of the scm_app interface using raw HTTP communication.
22 Implementation of the scm_app interface using raw HTTP communication.
23 """
23 """
24
24
25 import base64
25 import base64
26 import logging
26 import logging
27 import urlparse
27 import urlparse
28 import wsgiref.util
28 import wsgiref.util
29
29
30 import msgpack
30 import msgpack
31 import requests
31 import requests
32 import webob.request
32 import webob.request
33
33
34 import rhodecode
34 import rhodecode
35
35
36
36
37 log = logging.getLogger(__name__)
37 log = logging.getLogger(__name__)
38
38
39
39
40 def create_git_wsgi_app(repo_path, repo_name, config):
40 def create_git_wsgi_app(repo_path, repo_name, config):
41 url = _vcs_streaming_url() + 'git/'
41 url = _vcs_streaming_url() + 'git/'
42 return VcsHttpProxy(url, repo_path, repo_name, config)
42 return VcsHttpProxy(url, repo_path, repo_name, config)
43
43
44
44
45 def create_hg_wsgi_app(repo_path, repo_name, config):
45 def create_hg_wsgi_app(repo_path, repo_name, config):
46 url = _vcs_streaming_url() + 'hg/'
46 url = _vcs_streaming_url() + 'hg/'
47 return VcsHttpProxy(url, repo_path, repo_name, config)
47 return VcsHttpProxy(url, repo_path, repo_name, config)
48
48
49
49
50 def _vcs_streaming_url():
50 def _vcs_streaming_url():
51 template = 'http://{}/stream/'
51 template = 'http://{}/stream/'
52 return template.format(rhodecode.CONFIG['vcs.server'])
52 return template.format(rhodecode.CONFIG['vcs.server'])
53
53
54
54
55 # TODO: johbo: Avoid the global.
55 # TODO: johbo: Avoid the global.
56 session = requests.Session()
56 session = requests.Session()
57 # Requests speedup, avoid reading .netrc and similar
57 # Requests speedup, avoid reading .netrc and similar
58 session.trust_env = False
58 session.trust_env = False
59
59
60 # prevent urllib3 spawning our logs.
60 # prevent urllib3 spawning our logs.
61 logging.getLogger("requests.packages.urllib3.connectionpool").setLevel(
61 logging.getLogger("requests.packages.urllib3.connectionpool").setLevel(
62 logging.WARNING)
62 logging.WARNING)
63
63
64
64
65 class VcsHttpProxy(object):
65 class VcsHttpProxy(object):
66 """
66 """
67 A WSGI application which proxies vcs requests.
67 A WSGI application which proxies vcs requests.
68
68
69 The goal is to shuffle the data around without touching it. The only
69 The goal is to shuffle the data around without touching it. The only
70 exception is the extra data from the config object which we send to the
70 exception is the extra data from the config object which we send to the
71 server as well.
71 server as well.
72 """
72 """
73
73
74 def __init__(self, url, repo_path, repo_name, config):
74 def __init__(self, url, repo_path, repo_name, config):
75 """
75 """
76 :param str url: The URL of the VCSServer to call.
76 :param str url: The URL of the VCSServer to call.
77 """
77 """
78 self._url = url
78 self._url = url
79 self._repo_name = repo_name
79 self._repo_name = repo_name
80 self._repo_path = repo_path
80 self._repo_path = repo_path
81 self._config = config
81 self._config = config
82 log.debug(
82 log.debug(
83 "Creating VcsHttpProxy for repo %s, url %s",
83 "Creating VcsHttpProxy for repo %s, url %s",
84 repo_name, url)
84 repo_name, url)
85
85
86 def __call__(self, environ, start_response):
86 def __call__(self, environ, start_response):
87 config = msgpack.packb(self._config)
87 config = msgpack.packb(self._config)
88 request = webob.request.Request(environ)
88 request = webob.request.Request(environ)
89 request_headers = request.headers
89 request_headers = request.headers
90 request_headers.update({
90 request_headers.update({
91 # TODO: johbo: Remove this, rely on URL path only
91 # TODO: johbo: Remove this, rely on URL path only
92 'X-RC-Repo-Name': self._repo_name,
92 'X-RC-Repo-Name': self._repo_name,
93 'X-RC-Repo-Path': self._repo_path,
93 'X-RC-Repo-Path': self._repo_path,
94 'X-RC-Path-Info': environ['PATH_INFO'],
94 'X-RC-Path-Info': environ['PATH_INFO'],
95 # TODO: johbo: Avoid encoding and put this into payload?
95 # TODO: johbo: Avoid encoding and put this into payload?
96 'X-RC-Repo-Config': base64.b64encode(config),
96 'X-RC-Repo-Config': base64.b64encode(config),
97 'X-RC-Locked-Status-Code': rhodecode.CONFIG.get('lock_ret_code')
97 'X-RC-Locked-Status-Code': rhodecode.CONFIG.get('lock_ret_code')
98 })
98 })
99
99
100 method = environ['REQUEST_METHOD']
100 method = environ['REQUEST_METHOD']
101
101
102 # Preserve the query string
102 # Preserve the query string
103 url = self._url
103 url = self._url
104 url = urlparse.urljoin(url, self._repo_name)
104 url = urlparse.urljoin(url, self._repo_name)
105 if environ.get('QUERY_STRING'):
105 if environ.get('QUERY_STRING'):
106 url += '?' + environ['QUERY_STRING']
106 url += '?' + environ['QUERY_STRING']
107
107
108 log.debug('http-app: preparing request to: %s', url)
108 log.debug('http-app: preparing request to: %s', url)
109 response = session.request(
109 response = session.request(
110 method,
110 method,
111 url,
111 url,
112 data=_maybe_stream_request(environ),
112 data=_maybe_stream_request(environ),
113 headers=request_headers,
113 headers=request_headers,
114 stream=True)
114 stream=True)
115
115
116 log.debug('http-app: got vcsserver response: %s', response)
116 log.debug('http-app: got vcsserver response: %s', response)
117 if response.status_code >= 500:
118 log.error('Exception returned by vcsserver at: %s %s, %s',
119 url, response.status_code, response.content)
120
117 # Preserve the headers of the response, except hop_by_hop ones
121 # Preserve the headers of the response, except hop_by_hop ones
118 response_headers = [
122 response_headers = [
119 (h, v) for h, v in response.headers.items()
123 (h, v) for h, v in response.headers.items()
120 if not wsgiref.util.is_hop_by_hop(h)
124 if not wsgiref.util.is_hop_by_hop(h)
121 ]
125 ]
122
126
123 # Build status argument for start_reponse callable.
127 # Build status argument for start_response callable.
124 status = '{status_code} {reason_phrase}'.format(
128 status = '{status_code} {reason_phrase}'.format(
125 status_code=response.status_code,
129 status_code=response.status_code,
126 reason_phrase=response.reason)
130 reason_phrase=response.reason)
127
131
128 start_response(status, response_headers)
132 start_response(status, response_headers)
129 return _maybe_stream_response(response)
133 return _maybe_stream_response(response)
130
134
131
135
132 def _is_request_chunked(environ):
136 def _is_request_chunked(environ):
133 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
137 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
134 return stream
138 return stream
135
139
136
140
137 def _maybe_stream_request(environ):
141 def _maybe_stream_request(environ):
138 path = environ['PATH_INFO']
142 path = environ['PATH_INFO']
139 stream = _is_request_chunked(environ)
143 stream = _is_request_chunked(environ)
140 log.debug('handling request `%s` with stream support: %s', path, stream)
144 log.debug('handling request `%s` with stream support: %s', path, stream)
141
145
142 if stream:
146 if stream:
143 return environ['wsgi.input']
147 return environ['wsgi.input']
144 else:
148 else:
145 return environ['wsgi.input'].read()
149 return environ['wsgi.input'].read()
146
150
147
151
148 def _maybe_stream_response(response):
152 def _maybe_stream_response(response):
149 """
153 """
150 Try to generate chunks from the response if it is chunked.
154 Try to generate chunks from the response if it is chunked.
151 """
155 """
152 stream = _is_chunked(response)
156 stream = _is_chunked(response)
153 log.debug('returning response with stream: %s', stream)
157 log.debug('returning response with stream: %s', stream)
154 if stream:
158 if stream:
155 return response.raw.read_chunked()
159 return response.raw.read_chunked()
156 else:
160 else:
157 return [response.content]
161 return [response.content]
158
162
159
163
160 def _is_chunked(response):
164 def _is_chunked(response):
161 return response.headers.get('Transfer-Encoding', '') == 'chunked'
165 return response.headers.get('Transfer-Encoding', '') == 'chunked'
General Comments 0
You need to be logged in to leave comments. Login now