##// END OF EJS Templates
scm_app: add more debug info when unhandled errors happen on vcsserver.
marcink -
r3093:e2f600bd default
parent child Browse files
Show More
@@ -1,330 +1,332 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import time
23 23 import logging
24 24 import tempfile
25 25 import traceback
26 26 import threading
27 27
28 28 from BaseHTTPServer import BaseHTTPRequestHandler
29 29 from SocketServer import TCPServer
30 30
31 31 import rhodecode
32 32 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
33 33 from rhodecode.model import meta
34 34 from rhodecode.lib.base import bootstrap_request, bootstrap_config
35 35 from rhodecode.lib import hooks_base
36 36 from rhodecode.lib.utils2 import AttributeDict
37 37 from rhodecode.lib.ext_json import json
38 38 from rhodecode.lib import rc_cache
39 39
40 40 log = logging.getLogger(__name__)
41 41
42 42
43 43 class HooksHttpHandler(BaseHTTPRequestHandler):
44 44
45 45 def do_POST(self):
46 46 method, extras = self._read_request()
47 47 txn_id = getattr(self.server, 'txn_id', None)
48 48 if txn_id:
49 49 log.debug('Computing TXN_ID based on `%s`:`%s`',
50 50 extras['repository'], extras['txn_id'])
51 51 computed_txn_id = rc_cache.utils.compute_key_from_params(
52 52 extras['repository'], extras['txn_id'])
53 53 if txn_id != computed_txn_id:
54 54 raise Exception(
55 55 'TXN ID fail: expected {} got {} instead'.format(
56 56 txn_id, computed_txn_id))
57 57
58 58 try:
59 59 result = self._call_hook(method, extras)
60 60 except Exception as e:
61 61 exc_tb = traceback.format_exc()
62 62 result = {
63 63 'exception': e.__class__.__name__,
64 64 'exception_traceback': exc_tb,
65 65 'exception_args': e.args
66 66 }
67 67 self._write_response(result)
68 68
69 69 def _read_request(self):
70 70 length = int(self.headers['Content-Length'])
71 71 body = self.rfile.read(length).decode('utf-8')
72 72 data = json.loads(body)
73 73 return data['method'], data['extras']
74 74
75 75 def _write_response(self, result):
76 76 self.send_response(200)
77 77 self.send_header("Content-type", "text/json")
78 78 self.end_headers()
79 79 self.wfile.write(json.dumps(result))
80 80
81 81 def _call_hook(self, method, extras):
82 82 hooks = Hooks()
83 83 try:
84 84 result = getattr(hooks, method)(extras)
85 85 finally:
86 86 meta.Session.remove()
87 87 return result
88 88
89 89 def log_message(self, format, *args):
90 90 """
91 91 This is an overridden method of BaseHTTPRequestHandler which logs using
92 92 logging library instead of writing directly to stderr.
93 93 """
94 94
95 95 message = format % args
96 96
97 97 log.debug(
98 98 "%s - - [%s] %s", self.client_address[0],
99 99 self.log_date_time_string(), message)
100 100
101 101
102 102 class DummyHooksCallbackDaemon(object):
103 103 hooks_uri = ''
104 104
105 105 def __init__(self):
106 106 self.hooks_module = Hooks.__module__
107 107
108 108 def __enter__(self):
109 109 log.debug('Running dummy hooks callback daemon')
110 110 return self
111 111
112 112 def __exit__(self, exc_type, exc_val, exc_tb):
113 113 log.debug('Exiting dummy hooks callback daemon')
114 114
115 115
116 116 class ThreadedHookCallbackDaemon(object):
117 117
118 118 _callback_thread = None
119 119 _daemon = None
120 120 _done = False
121 121
122 122 def __init__(self, txn_id=None, host=None, port=None):
123 123 self._prepare(txn_id=txn_id, host=None, port=port)
124 124
125 125 def __enter__(self):
126 126 self._run()
127 127 return self
128 128
129 129 def __exit__(self, exc_type, exc_val, exc_tb):
130 130 log.debug('Callback daemon exiting now...')
131 131 self._stop()
132 132
133 133 def _prepare(self, txn_id=None, host=None, port=None):
134 134 raise NotImplementedError()
135 135
136 136 def _run(self):
137 137 raise NotImplementedError()
138 138
139 139 def _stop(self):
140 140 raise NotImplementedError()
141 141
142 142
143 143 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
144 144 """
145 145 Context manager which will run a callback daemon in a background thread.
146 146 """
147 147
148 148 hooks_uri = None
149 149
150 150 # From Python docs: Polling reduces our responsiveness to a shutdown
151 151 # request and wastes cpu at all other times.
152 152 POLL_INTERVAL = 0.01
153 153
154 154 def _prepare(self, txn_id=None, host=None, port=None):
155 155 host = host or '127.0.0.1'
156 156 self._done = False
157 157 self._daemon = TCPServer((host, port or 0), HooksHttpHandler)
158 158 _, port = self._daemon.server_address
159 159 self.hooks_uri = '{}:{}'.format(host, port)
160 160 self.txn_id = txn_id
161 161 # inject transaction_id for later verification
162 162 self._daemon.txn_id = self.txn_id
163 163
164 164 log.debug(
165 165 "Preparing HTTP callback daemon at `%s` and registering hook object",
166 166 self.hooks_uri)
167 167
168 168 def _run(self):
169 169 log.debug("Running event loop of callback daemon in background thread")
170 170 callback_thread = threading.Thread(
171 171 target=self._daemon.serve_forever,
172 172 kwargs={'poll_interval': self.POLL_INTERVAL})
173 173 callback_thread.daemon = True
174 174 callback_thread.start()
175 175 self._callback_thread = callback_thread
176 176
177 177 def _stop(self):
178 178 log.debug("Waiting for background thread to finish.")
179 179 self._daemon.shutdown()
180 180 self._callback_thread.join()
181 181 self._daemon = None
182 182 self._callback_thread = None
183 183 if self.txn_id:
184 184 txn_id_file = get_txn_id_data_path(self.txn_id)
185 185 log.debug('Cleaning up TXN ID %s', txn_id_file)
186 186 if os.path.isfile(txn_id_file):
187 187 os.remove(txn_id_file)
188 188
189 189 log.debug("Background thread done.")
190 190
191 191
192 192 def get_txn_id_data_path(txn_id):
193 193 import rhodecode
194 194
195 195 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
196 196 final_dir = os.path.join(root, 'svn_txn_id')
197 197
198 198 if not os.path.isdir(final_dir):
199 199 os.makedirs(final_dir)
200 200 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
201 201
202 202
203 203 def store_txn_id_data(txn_id, data_dict):
204 204 if not txn_id:
205 205 log.warning('Cannot store txn_id because it is empty')
206 206 return
207 207
208 208 path = get_txn_id_data_path(txn_id)
209 209 try:
210 210 with open(path, 'wb') as f:
211 211 f.write(json.dumps(data_dict))
212 212 except Exception:
213 213 log.exception('Failed to write txn_id metadata')
214 214
215 215
216 216 def get_txn_id_from_store(txn_id):
217 217 """
218 218 Reads txn_id from store and if present returns the data for callback manager
219 219 """
220 220 path = get_txn_id_data_path(txn_id)
221 221 try:
222 222 with open(path, 'rb') as f:
223 223 return json.loads(f.read())
224 224 except Exception:
225 225 return {}
226 226
227 227
228 228 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
229 229 txn_details = get_txn_id_from_store(txn_id)
230 230 port = txn_details.get('port', 0)
231 231 if use_direct_calls:
232 232 callback_daemon = DummyHooksCallbackDaemon()
233 233 extras['hooks_module'] = callback_daemon.hooks_module
234 234 else:
235 235 if protocol == 'http':
236 236 callback_daemon = HttpHooksCallbackDaemon(
237 237 txn_id=txn_id, host=host, port=port)
238 238 else:
239 239 log.error('Unsupported callback daemon protocol "%s"', protocol)
240 240 raise Exception('Unsupported callback daemon protocol.')
241 241
242 242 extras['hooks_uri'] = callback_daemon.hooks_uri
243 243 extras['hooks_protocol'] = protocol
244 244 extras['time'] = time.time()
245 245
246 246 # register txn_id
247 247 extras['txn_id'] = txn_id
248
249 248 log.debug('Prepared a callback daemon: %s at url `%s`',
250 249 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
251 250 return callback_daemon, extras
252 251
253 252
254 253 class Hooks(object):
255 254 """
256 255 Exposes the hooks for remote call backs
257 256 """
258 257
259 258 def repo_size(self, extras):
260 259 log.debug("Called repo_size of %s object", self)
261 260 return self._call_hook(hooks_base.repo_size, extras)
262 261
263 262 def pre_pull(self, extras):
264 263 log.debug("Called pre_pull of %s object", self)
265 264 return self._call_hook(hooks_base.pre_pull, extras)
266 265
267 266 def post_pull(self, extras):
268 267 log.debug("Called post_pull of %s object", self)
269 268 return self._call_hook(hooks_base.post_pull, extras)
270 269
271 270 def pre_push(self, extras):
272 271 log.debug("Called pre_push of %s object", self)
273 272 return self._call_hook(hooks_base.pre_push, extras)
274 273
275 274 def post_push(self, extras):
276 275 log.debug("Called post_push of %s object", self)
277 276 return self._call_hook(hooks_base.post_push, extras)
278 277
279 278 def _call_hook(self, hook, extras):
280 279 extras = AttributeDict(extras)
281 280 server_url = extras['server_url']
282 281 request = bootstrap_request(application_url=server_url)
283 282
284 283 bootstrap_config(request) # inject routes and other interfaces
285 284
286 285 # inject the user for usage in hooks
287 286 request.user = AttributeDict({'username': extras.username,
288 287 'ip_addr': extras.ip,
289 288 'user_id': extras.user_id})
290 289
291 290 extras.request = request
292 291
293 292 try:
294 293 result = hook(extras)
294 if result is None:
295 raise Exception(
296 'Failed to obtain hook result from func: {}'.format(hook))
295 297 except HTTPBranchProtected as handled_error:
296 298 # Those special cases doesn't need error reporting. It's a case of
297 299 # locked repo or protected branch
298 300 result = AttributeDict({
299 301 'status': handled_error.code,
300 302 'output': handled_error.explanation
301 303 })
302 304 except (HTTPLockedRC, Exception) as error:
303 305 # locked needs different handling since we need to also
304 306 # handle PULL operations
305 307 exc_tb = ''
306 308 if not isinstance(error, HTTPLockedRC):
307 309 exc_tb = traceback.format_exc()
308 310 log.exception('Exception when handling hook %s', hook)
309 311 error_args = error.args
310 312 return {
311 313 'status': 128,
312 314 'output': '',
313 315 'exception': type(error).__name__,
314 316 'exception_traceback': exc_tb,
315 317 'exception_args': error_args,
316 318 }
317 319 finally:
318 320 meta.Session.remove()
319 321
320 322 log.debug('Got hook call response %s', result)
321 323 return {
322 324 'status': result.status,
323 325 'output': result.output,
324 326 }
325 327
326 328 def __enter__(self):
327 329 return self
328 330
329 331 def __exit__(self, exc_type, exc_val, exc_tb):
330 332 pass
@@ -1,161 +1,165 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2014-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Implementation of the scm_app interface using raw HTTP communication.
23 23 """
24 24
25 25 import base64
26 26 import logging
27 27 import urlparse
28 28 import wsgiref.util
29 29
30 30 import msgpack
31 31 import requests
32 32 import webob.request
33 33
34 34 import rhodecode
35 35
36 36
37 37 log = logging.getLogger(__name__)
38 38
39 39
40 40 def create_git_wsgi_app(repo_path, repo_name, config):
41 41 url = _vcs_streaming_url() + 'git/'
42 42 return VcsHttpProxy(url, repo_path, repo_name, config)
43 43
44 44
45 45 def create_hg_wsgi_app(repo_path, repo_name, config):
46 46 url = _vcs_streaming_url() + 'hg/'
47 47 return VcsHttpProxy(url, repo_path, repo_name, config)
48 48
49 49
50 50 def _vcs_streaming_url():
51 51 template = 'http://{}/stream/'
52 52 return template.format(rhodecode.CONFIG['vcs.server'])
53 53
54 54
55 55 # TODO: johbo: Avoid the global.
56 56 session = requests.Session()
57 57 # Requests speedup, avoid reading .netrc and similar
58 58 session.trust_env = False
59 59
60 60 # prevent urllib3 spawning our logs.
61 61 logging.getLogger("requests.packages.urllib3.connectionpool").setLevel(
62 62 logging.WARNING)
63 63
64 64
65 65 class VcsHttpProxy(object):
66 66 """
67 67 A WSGI application which proxies vcs requests.
68 68
69 69 The goal is to shuffle the data around without touching it. The only
70 70 exception is the extra data from the config object which we send to the
71 71 server as well.
72 72 """
73 73
74 74 def __init__(self, url, repo_path, repo_name, config):
75 75 """
76 76 :param str url: The URL of the VCSServer to call.
77 77 """
78 78 self._url = url
79 79 self._repo_name = repo_name
80 80 self._repo_path = repo_path
81 81 self._config = config
82 82 log.debug(
83 83 "Creating VcsHttpProxy for repo %s, url %s",
84 84 repo_name, url)
85 85
86 86 def __call__(self, environ, start_response):
87 87 config = msgpack.packb(self._config)
88 88 request = webob.request.Request(environ)
89 89 request_headers = request.headers
90 90 request_headers.update({
91 91 # TODO: johbo: Remove this, rely on URL path only
92 92 'X-RC-Repo-Name': self._repo_name,
93 93 'X-RC-Repo-Path': self._repo_path,
94 94 'X-RC-Path-Info': environ['PATH_INFO'],
95 95 # TODO: johbo: Avoid encoding and put this into payload?
96 96 'X-RC-Repo-Config': base64.b64encode(config),
97 97 'X-RC-Locked-Status-Code': rhodecode.CONFIG.get('lock_ret_code')
98 98 })
99 99
100 100 method = environ['REQUEST_METHOD']
101 101
102 102 # Preserve the query string
103 103 url = self._url
104 104 url = urlparse.urljoin(url, self._repo_name)
105 105 if environ.get('QUERY_STRING'):
106 106 url += '?' + environ['QUERY_STRING']
107 107
108 108 log.debug('http-app: preparing request to: %s', url)
109 109 response = session.request(
110 110 method,
111 111 url,
112 112 data=_maybe_stream_request(environ),
113 113 headers=request_headers,
114 114 stream=True)
115 115
116 116 log.debug('http-app: got vcsserver response: %s', response)
117 if response.status_code >= 500:
118 log.error('Exception returned by vcsserver at: %s %s, %s',
119 url, response.status_code, response.content)
120
117 121 # Preserve the headers of the response, except hop_by_hop ones
118 122 response_headers = [
119 123 (h, v) for h, v in response.headers.items()
120 124 if not wsgiref.util.is_hop_by_hop(h)
121 125 ]
122 126
123 # Build status argument for start_reponse callable.
127 # Build status argument for start_response callable.
124 128 status = '{status_code} {reason_phrase}'.format(
125 129 status_code=response.status_code,
126 130 reason_phrase=response.reason)
127 131
128 132 start_response(status, response_headers)
129 133 return _maybe_stream_response(response)
130 134
131 135
132 136 def _is_request_chunked(environ):
133 137 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
134 138 return stream
135 139
136 140
137 141 def _maybe_stream_request(environ):
138 142 path = environ['PATH_INFO']
139 143 stream = _is_request_chunked(environ)
140 144 log.debug('handling request `%s` with stream support: %s', path, stream)
141 145
142 146 if stream:
143 147 return environ['wsgi.input']
144 148 else:
145 149 return environ['wsgi.input'].read()
146 150
147 151
148 152 def _maybe_stream_response(response):
149 153 """
150 154 Try to generate chunks from the response if it is chunked.
151 155 """
152 156 stream = _is_chunked(response)
153 157 log.debug('returning response with stream: %s', stream)
154 158 if stream:
155 159 return response.raw.read_chunked()
156 160 else:
157 161 return [response.content]
158 162
159 163
160 164 def _is_chunked(response):
161 165 return response.headers.get('Transfer-Encoding', '') == 'chunked'
General Comments 0
You need to be logged in to leave comments. Login now