##// END OF EJS Templates
vcs: add nicer traceback information for curl errors.
marcink -
r2940:1dfdca3b default
parent child Browse files
Show More
@@ -1,303 +1,305 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2018 RhodeCode GmbH
3 # Copyright (C) 2016-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Client for the VCSServer implemented based on HTTP.
22 Client for the VCSServer implemented based on HTTP.
23 """
23 """
24
24
25 import copy
25 import copy
26 import logging
26 import logging
27 import threading
27 import threading
28 import urllib2
28 import urllib2
29 import urlparse
29 import urlparse
30 import uuid
30 import uuid
31 import traceback
31
32
32 import pycurl
33 import pycurl
33 import msgpack
34 import msgpack
34 import requests
35 import requests
35 from requests.packages.urllib3.util.retry import Retry
36 from requests.packages.urllib3.util.retry import Retry
36
37
37 from . import exceptions, CurlSession
38 from . import exceptions, CurlSession
38
39
39
40
40 log = logging.getLogger(__name__)
41 log = logging.getLogger(__name__)
41
42
42
43
43 # TODO: mikhail: Keep it in sync with vcsserver's
44 # TODO: mikhail: Keep it in sync with vcsserver's
44 # HTTPApplication.ALLOWED_EXCEPTIONS
45 # HTTPApplication.ALLOWED_EXCEPTIONS
45 EXCEPTIONS_MAP = {
46 EXCEPTIONS_MAP = {
46 'KeyError': KeyError,
47 'KeyError': KeyError,
47 'URLError': urllib2.URLError,
48 'URLError': urllib2.URLError,
48 }
49 }
49
50
50
51
51 class RepoMaker(object):
52 class RepoMaker(object):
52
53
53 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
54 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
54 self.url = urlparse.urljoin(
55 self.url = urlparse.urljoin(
55 'http://%s' % server_and_port, backend_endpoint)
56 'http://%s' % server_and_port, backend_endpoint)
56 self._session_factory = session_factory
57 self._session_factory = session_factory
57 self.backend_type = backend_type
58 self.backend_type = backend_type
58
59
59 def __call__(self, path, config, with_wire=None):
60 def __call__(self, path, config, with_wire=None):
60 log.debug('RepoMaker call on %s', path)
61 log.debug('RepoMaker call on %s', path)
61 return RemoteRepo(
62 return RemoteRepo(
62 path, config, self.url, self._session_factory(),
63 path, config, self.url, self._session_factory(),
63 with_wire=with_wire)
64 with_wire=with_wire)
64
65
65 def __getattr__(self, name):
66 def __getattr__(self, name):
66 def f(*args, **kwargs):
67 def f(*args, **kwargs):
67 return self._call(name, *args, **kwargs)
68 return self._call(name, *args, **kwargs)
68 return f
69 return f
69
70
70 @exceptions.map_vcs_exceptions
71 @exceptions.map_vcs_exceptions
71 def _call(self, name, *args, **kwargs):
72 def _call(self, name, *args, **kwargs):
72 payload = {
73 payload = {
73 'id': str(uuid.uuid4()),
74 'id': str(uuid.uuid4()),
74 'method': name,
75 'method': name,
75 'backend': self.backend_type,
76 'backend': self.backend_type,
76 'params': {'args': args, 'kwargs': kwargs}
77 'params': {'args': args, 'kwargs': kwargs}
77 }
78 }
78 return _remote_call(
79 return _remote_call(
79 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
80 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
80
81
81
82
82 class ServiceConnection(object):
83 class ServiceConnection(object):
83 def __init__(self, server_and_port, backend_endpoint, session_factory):
84 def __init__(self, server_and_port, backend_endpoint, session_factory):
84 self.url = urlparse.urljoin(
85 self.url = urlparse.urljoin(
85 'http://%s' % server_and_port, backend_endpoint)
86 'http://%s' % server_and_port, backend_endpoint)
86 self._session_factory = session_factory
87 self._session_factory = session_factory
87
88
88 def __getattr__(self, name):
89 def __getattr__(self, name):
89 def f(*args, **kwargs):
90 def f(*args, **kwargs):
90 return self._call(name, *args, **kwargs)
91 return self._call(name, *args, **kwargs)
91
92
92 return f
93 return f
93
94
94 @exceptions.map_vcs_exceptions
95 @exceptions.map_vcs_exceptions
95 def _call(self, name, *args, **kwargs):
96 def _call(self, name, *args, **kwargs):
96 payload = {
97 payload = {
97 'id': str(uuid.uuid4()),
98 'id': str(uuid.uuid4()),
98 'method': name,
99 'method': name,
99 'params': {'args': args, 'kwargs': kwargs}
100 'params': {'args': args, 'kwargs': kwargs}
100 }
101 }
101 return _remote_call(
102 return _remote_call(
102 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
103 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
103
104
104
105
105 class RemoteRepo(object):
106 class RemoteRepo(object):
106
107
107 def __init__(self, path, config, url, session, with_wire=None):
108 def __init__(self, path, config, url, session, with_wire=None):
108 self.url = url
109 self.url = url
109 self._session = session
110 self._session = session
110 self._wire = {
111 self._wire = {
111 "path": path,
112 "path": path,
112 "config": config,
113 "config": config,
113 "context": self._create_vcs_cache_context(),
114 "context": self._create_vcs_cache_context(),
114 }
115 }
115 if with_wire:
116 if with_wire:
116 self._wire.update(with_wire)
117 self._wire.update(with_wire)
117
118
118 # johbo: Trading complexity for performance. Avoiding the call to
119 # johbo: Trading complexity for performance. Avoiding the call to
119 # log.debug brings a few percent gain even if is is not active.
120 # log.debug brings a few percent gain even if is is not active.
120 if log.isEnabledFor(logging.DEBUG):
121 if log.isEnabledFor(logging.DEBUG):
121 self._call = self._call_with_logging
122 self._call = self._call_with_logging
122
123
123 def __getattr__(self, name):
124 def __getattr__(self, name):
124 def f(*args, **kwargs):
125 def f(*args, **kwargs):
125 return self._call(name, *args, **kwargs)
126 return self._call(name, *args, **kwargs)
126 return f
127 return f
127
128
128 @exceptions.map_vcs_exceptions
129 @exceptions.map_vcs_exceptions
129 def _call(self, name, *args, **kwargs):
130 def _call(self, name, *args, **kwargs):
130 # TODO: oliver: This is currently necessary pre-call since the
131 # TODO: oliver: This is currently necessary pre-call since the
131 # config object is being changed for hooking scenarios
132 # config object is being changed for hooking scenarios
132 wire = copy.deepcopy(self._wire)
133 wire = copy.deepcopy(self._wire)
133 wire["config"] = wire["config"].serialize()
134 wire["config"] = wire["config"].serialize()
134 payload = {
135 payload = {
135 'id': str(uuid.uuid4()),
136 'id': str(uuid.uuid4()),
136 'method': name,
137 'method': name,
137 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
138 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
138 }
139 }
139 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
140 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
140
141
141 def _call_with_logging(self, name, *args, **kwargs):
142 def _call_with_logging(self, name, *args, **kwargs):
142 context_uid = self._wire.get('context')
143 context_uid = self._wire.get('context')
143 log.debug('Calling %s@%s with args:%r. wire_context: %s',
144 log.debug('Calling %s@%s with args:%r. wire_context: %s',
144 self.url, name, args, context_uid)
145 self.url, name, args, context_uid)
145 return RemoteRepo._call(self, name, *args, **kwargs)
146 return RemoteRepo._call(self, name, *args, **kwargs)
146
147
147 def __getitem__(self, key):
148 def __getitem__(self, key):
148 return self.revision(key)
149 return self.revision(key)
149
150
150 def _create_vcs_cache_context(self):
151 def _create_vcs_cache_context(self):
151 """
152 """
152 Creates a unique string which is passed to the VCSServer on every
153 Creates a unique string which is passed to the VCSServer on every
153 remote call. It is used as cache key in the VCSServer.
154 remote call. It is used as cache key in the VCSServer.
154 """
155 """
155 return str(uuid.uuid4())
156 return str(uuid.uuid4())
156
157
157 def invalidate_vcs_cache(self):
158 def invalidate_vcs_cache(self):
158 """
159 """
159 This invalidates the context which is sent to the VCSServer on every
160 This invalidates the context which is sent to the VCSServer on every
160 call to a remote method. It forces the VCSServer to create a fresh
161 call to a remote method. It forces the VCSServer to create a fresh
161 repository instance on the next call to a remote method.
162 repository instance on the next call to a remote method.
162 """
163 """
163 self._wire['context'] = self._create_vcs_cache_context()
164 self._wire['context'] = self._create_vcs_cache_context()
164
165
165
166
166 class RemoteObject(object):
167 class RemoteObject(object):
167
168
168 def __init__(self, url, session):
169 def __init__(self, url, session):
169 self._url = url
170 self._url = url
170 self._session = session
171 self._session = session
171
172
172 # johbo: Trading complexity for performance. Avoiding the call to
173 # johbo: Trading complexity for performance. Avoiding the call to
173 # log.debug brings a few percent gain even if is is not active.
174 # log.debug brings a few percent gain even if is is not active.
174 if log.isEnabledFor(logging.DEBUG):
175 if log.isEnabledFor(logging.DEBUG):
175 self._call = self._call_with_logging
176 self._call = self._call_with_logging
176
177
177 def __getattr__(self, name):
178 def __getattr__(self, name):
178 def f(*args, **kwargs):
179 def f(*args, **kwargs):
179 return self._call(name, *args, **kwargs)
180 return self._call(name, *args, **kwargs)
180 return f
181 return f
181
182
182 @exceptions.map_vcs_exceptions
183 @exceptions.map_vcs_exceptions
183 def _call(self, name, *args, **kwargs):
184 def _call(self, name, *args, **kwargs):
184 payload = {
185 payload = {
185 'id': str(uuid.uuid4()),
186 'id': str(uuid.uuid4()),
186 'method': name,
187 'method': name,
187 'params': {'args': args, 'kwargs': kwargs}
188 'params': {'args': args, 'kwargs': kwargs}
188 }
189 }
189 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
190 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
190
191
191 def _call_with_logging(self, name, *args, **kwargs):
192 def _call_with_logging(self, name, *args, **kwargs):
192 log.debug('Calling %s@%s', self._url, name)
193 log.debug('Calling %s@%s', self._url, name)
193 return RemoteObject._call(self, name, *args, **kwargs)
194 return RemoteObject._call(self, name, *args, **kwargs)
194
195
195
196
196 def _remote_call(url, payload, exceptions_map, session):
197 def _remote_call(url, payload, exceptions_map, session):
197 try:
198 try:
198 response = session.post(url, data=msgpack.packb(payload))
199 response = session.post(url, data=msgpack.packb(payload))
199 except pycurl.error as e:
200 except pycurl.error as e:
200 raise exceptions.HttpVCSCommunicationError(e)
201 msg = '{}. pycurl traceback: {}'.format(e, traceback.format_exc())
202 raise exceptions.HttpVCSCommunicationError(msg)
201 except Exception as e:
203 except Exception as e:
202 message = getattr(e, 'message', '')
204 message = getattr(e, 'message', '')
203 if 'Failed to connect' in message:
205 if 'Failed to connect' in message:
204 # gevent doesn't return proper pycurl errors
206 # gevent doesn't return proper pycurl errors
205 raise exceptions.HttpVCSCommunicationError(e)
207 raise exceptions.HttpVCSCommunicationError(e)
206 else:
208 else:
207 raise
209 raise
208
210
209 if response.status_code >= 400:
211 if response.status_code >= 400:
210 log.error('Call to %s returned non 200 HTTP code: %s',
212 log.error('Call to %s returned non 200 HTTP code: %s',
211 url, response.status_code)
213 url, response.status_code)
212 raise exceptions.HttpVCSCommunicationError(repr(response.content))
214 raise exceptions.HttpVCSCommunicationError(repr(response.content))
213
215
214 try:
216 try:
215 response = msgpack.unpackb(response.content)
217 response = msgpack.unpackb(response.content)
216 except Exception:
218 except Exception:
217 log.exception('Failed to decode response %r', response.content)
219 log.exception('Failed to decode response %r', response.content)
218 raise
220 raise
219
221
220 error = response.get('error')
222 error = response.get('error')
221 if error:
223 if error:
222 type_ = error.get('type', 'Exception')
224 type_ = error.get('type', 'Exception')
223 exc = exceptions_map.get(type_, Exception)
225 exc = exceptions_map.get(type_, Exception)
224 exc = exc(error.get('message'))
226 exc = exc(error.get('message'))
225 try:
227 try:
226 exc._vcs_kind = error['_vcs_kind']
228 exc._vcs_kind = error['_vcs_kind']
227 except KeyError:
229 except KeyError:
228 pass
230 pass
229
231
230 try:
232 try:
231 exc._vcs_server_traceback = error['traceback']
233 exc._vcs_server_traceback = error['traceback']
232 except KeyError:
234 except KeyError:
233 pass
235 pass
234
236
235 raise exc
237 raise exc
236 return response.get('result')
238 return response.get('result')
237
239
238
240
239 class VcsHttpProxy(object):
241 class VcsHttpProxy(object):
240
242
241 CHUNK_SIZE = 16384
243 CHUNK_SIZE = 16384
242
244
243 def __init__(self, server_and_port, backend_endpoint):
245 def __init__(self, server_and_port, backend_endpoint):
244
246
245
247
246 retries = Retry(total=5, connect=None, read=None, redirect=None)
248 retries = Retry(total=5, connect=None, read=None, redirect=None)
247
249
248 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
250 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
249 self.base_url = urlparse.urljoin(
251 self.base_url = urlparse.urljoin(
250 'http://%s' % server_and_port, backend_endpoint)
252 'http://%s' % server_and_port, backend_endpoint)
251 self.session = requests.Session()
253 self.session = requests.Session()
252 self.session.mount('http://', adapter)
254 self.session.mount('http://', adapter)
253
255
254 def handle(self, environment, input_data, *args, **kwargs):
256 def handle(self, environment, input_data, *args, **kwargs):
255 data = {
257 data = {
256 'environment': environment,
258 'environment': environment,
257 'input_data': input_data,
259 'input_data': input_data,
258 'args': args,
260 'args': args,
259 'kwargs': kwargs
261 'kwargs': kwargs
260 }
262 }
261 result = self.session.post(
263 result = self.session.post(
262 self.base_url, msgpack.packb(data), stream=True)
264 self.base_url, msgpack.packb(data), stream=True)
263 return self._get_result(result)
265 return self._get_result(result)
264
266
265 def _deserialize_and_raise(self, error):
267 def _deserialize_and_raise(self, error):
266 exception = Exception(error['message'])
268 exception = Exception(error['message'])
267 try:
269 try:
268 exception._vcs_kind = error['_vcs_kind']
270 exception._vcs_kind = error['_vcs_kind']
269 except KeyError:
271 except KeyError:
270 pass
272 pass
271 raise exception
273 raise exception
272
274
273 def _iterate(self, result):
275 def _iterate(self, result):
274 unpacker = msgpack.Unpacker()
276 unpacker = msgpack.Unpacker()
275 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
277 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
276 unpacker.feed(line)
278 unpacker.feed(line)
277 for chunk in unpacker:
279 for chunk in unpacker:
278 yield chunk
280 yield chunk
279
281
280 def _get_result(self, result):
282 def _get_result(self, result):
281 iterator = self._iterate(result)
283 iterator = self._iterate(result)
282 error = iterator.next()
284 error = iterator.next()
283 if error:
285 if error:
284 self._deserialize_and_raise(error)
286 self._deserialize_and_raise(error)
285
287
286 status = iterator.next()
288 status = iterator.next()
287 headers = iterator.next()
289 headers = iterator.next()
288
290
289 return iterator, status, headers
291 return iterator, status, headers
290
292
291
293
292 class ThreadlocalSessionFactory(object):
294 class ThreadlocalSessionFactory(object):
293 """
295 """
294 Creates one CurlSession per thread on demand.
296 Creates one CurlSession per thread on demand.
295 """
297 """
296
298
297 def __init__(self):
299 def __init__(self):
298 self._thread_local = threading.local()
300 self._thread_local = threading.local()
299
301
300 def __call__(self):
302 def __call__(self):
301 if not hasattr(self._thread_local, 'curl_session'):
303 if not hasattr(self._thread_local, 'curl_session'):
302 self._thread_local.curl_session = CurlSession()
304 self._thread_local.curl_session = CurlSession()
303 return self._thread_local.curl_session
305 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now