##// END OF EJS Templates
vcs: add nicer traceback information for curl errors.
marcink -
r2940:1dfdca3b default
parent child Browse files
Show More
@@ -1,303 +1,305 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Client for the VCSServer implemented based on HTTP.
23 23 """
24 24
25 25 import copy
26 26 import logging
27 27 import threading
28 28 import urllib2
29 29 import urlparse
30 30 import uuid
31 import traceback
31 32
32 33 import pycurl
33 34 import msgpack
34 35 import requests
35 36 from requests.packages.urllib3.util.retry import Retry
36 37
37 38 from . import exceptions, CurlSession
38 39
39 40
40 41 log = logging.getLogger(__name__)
41 42
42 43
43 44 # TODO: mikhail: Keep it in sync with vcsserver's
44 45 # HTTPApplication.ALLOWED_EXCEPTIONS
45 46 EXCEPTIONS_MAP = {
46 47 'KeyError': KeyError,
47 48 'URLError': urllib2.URLError,
48 49 }
49 50
50 51
51 52 class RepoMaker(object):
52 53
53 54 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
54 55 self.url = urlparse.urljoin(
55 56 'http://%s' % server_and_port, backend_endpoint)
56 57 self._session_factory = session_factory
57 58 self.backend_type = backend_type
58 59
59 60 def __call__(self, path, config, with_wire=None):
60 61 log.debug('RepoMaker call on %s', path)
61 62 return RemoteRepo(
62 63 path, config, self.url, self._session_factory(),
63 64 with_wire=with_wire)
64 65
65 66 def __getattr__(self, name):
66 67 def f(*args, **kwargs):
67 68 return self._call(name, *args, **kwargs)
68 69 return f
69 70
70 71 @exceptions.map_vcs_exceptions
71 72 def _call(self, name, *args, **kwargs):
72 73 payload = {
73 74 'id': str(uuid.uuid4()),
74 75 'method': name,
75 76 'backend': self.backend_type,
76 77 'params': {'args': args, 'kwargs': kwargs}
77 78 }
78 79 return _remote_call(
79 80 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
80 81
81 82
82 83 class ServiceConnection(object):
83 84 def __init__(self, server_and_port, backend_endpoint, session_factory):
84 85 self.url = urlparse.urljoin(
85 86 'http://%s' % server_and_port, backend_endpoint)
86 87 self._session_factory = session_factory
87 88
88 89 def __getattr__(self, name):
89 90 def f(*args, **kwargs):
90 91 return self._call(name, *args, **kwargs)
91 92
92 93 return f
93 94
94 95 @exceptions.map_vcs_exceptions
95 96 def _call(self, name, *args, **kwargs):
96 97 payload = {
97 98 'id': str(uuid.uuid4()),
98 99 'method': name,
99 100 'params': {'args': args, 'kwargs': kwargs}
100 101 }
101 102 return _remote_call(
102 103 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
103 104
104 105
105 106 class RemoteRepo(object):
106 107
107 108 def __init__(self, path, config, url, session, with_wire=None):
108 109 self.url = url
109 110 self._session = session
110 111 self._wire = {
111 112 "path": path,
112 113 "config": config,
113 114 "context": self._create_vcs_cache_context(),
114 115 }
115 116 if with_wire:
116 117 self._wire.update(with_wire)
117 118
118 119 # johbo: Trading complexity for performance. Avoiding the call to
119 120 # log.debug brings a few percent gain even if is is not active.
120 121 if log.isEnabledFor(logging.DEBUG):
121 122 self._call = self._call_with_logging
122 123
123 124 def __getattr__(self, name):
124 125 def f(*args, **kwargs):
125 126 return self._call(name, *args, **kwargs)
126 127 return f
127 128
128 129 @exceptions.map_vcs_exceptions
129 130 def _call(self, name, *args, **kwargs):
130 131 # TODO: oliver: This is currently necessary pre-call since the
131 132 # config object is being changed for hooking scenarios
132 133 wire = copy.deepcopy(self._wire)
133 134 wire["config"] = wire["config"].serialize()
134 135 payload = {
135 136 'id': str(uuid.uuid4()),
136 137 'method': name,
137 138 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
138 139 }
139 140 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
140 141
141 142 def _call_with_logging(self, name, *args, **kwargs):
142 143 context_uid = self._wire.get('context')
143 144 log.debug('Calling %s@%s with args:%r. wire_context: %s',
144 145 self.url, name, args, context_uid)
145 146 return RemoteRepo._call(self, name, *args, **kwargs)
146 147
147 148 def __getitem__(self, key):
148 149 return self.revision(key)
149 150
150 151 def _create_vcs_cache_context(self):
151 152 """
152 153 Creates a unique string which is passed to the VCSServer on every
153 154 remote call. It is used as cache key in the VCSServer.
154 155 """
155 156 return str(uuid.uuid4())
156 157
157 158 def invalidate_vcs_cache(self):
158 159 """
159 160 This invalidates the context which is sent to the VCSServer on every
160 161 call to a remote method. It forces the VCSServer to create a fresh
161 162 repository instance on the next call to a remote method.
162 163 """
163 164 self._wire['context'] = self._create_vcs_cache_context()
164 165
165 166
166 167 class RemoteObject(object):
167 168
168 169 def __init__(self, url, session):
169 170 self._url = url
170 171 self._session = session
171 172
172 173 # johbo: Trading complexity for performance. Avoiding the call to
173 174 # log.debug brings a few percent gain even if is is not active.
174 175 if log.isEnabledFor(logging.DEBUG):
175 176 self._call = self._call_with_logging
176 177
177 178 def __getattr__(self, name):
178 179 def f(*args, **kwargs):
179 180 return self._call(name, *args, **kwargs)
180 181 return f
181 182
182 183 @exceptions.map_vcs_exceptions
183 184 def _call(self, name, *args, **kwargs):
184 185 payload = {
185 186 'id': str(uuid.uuid4()),
186 187 'method': name,
187 188 'params': {'args': args, 'kwargs': kwargs}
188 189 }
189 190 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
190 191
191 192 def _call_with_logging(self, name, *args, **kwargs):
192 193 log.debug('Calling %s@%s', self._url, name)
193 194 return RemoteObject._call(self, name, *args, **kwargs)
194 195
195 196
196 197 def _remote_call(url, payload, exceptions_map, session):
197 198 try:
198 199 response = session.post(url, data=msgpack.packb(payload))
199 200 except pycurl.error as e:
200 raise exceptions.HttpVCSCommunicationError(e)
201 msg = '{}. pycurl traceback: {}'.format(e, traceback.format_exc())
202 raise exceptions.HttpVCSCommunicationError(msg)
201 203 except Exception as e:
202 204 message = getattr(e, 'message', '')
203 205 if 'Failed to connect' in message:
204 206 # gevent doesn't return proper pycurl errors
205 207 raise exceptions.HttpVCSCommunicationError(e)
206 208 else:
207 209 raise
208 210
209 211 if response.status_code >= 400:
210 212 log.error('Call to %s returned non 200 HTTP code: %s',
211 213 url, response.status_code)
212 214 raise exceptions.HttpVCSCommunicationError(repr(response.content))
213 215
214 216 try:
215 217 response = msgpack.unpackb(response.content)
216 218 except Exception:
217 219 log.exception('Failed to decode response %r', response.content)
218 220 raise
219 221
220 222 error = response.get('error')
221 223 if error:
222 224 type_ = error.get('type', 'Exception')
223 225 exc = exceptions_map.get(type_, Exception)
224 226 exc = exc(error.get('message'))
225 227 try:
226 228 exc._vcs_kind = error['_vcs_kind']
227 229 except KeyError:
228 230 pass
229 231
230 232 try:
231 233 exc._vcs_server_traceback = error['traceback']
232 234 except KeyError:
233 235 pass
234 236
235 237 raise exc
236 238 return response.get('result')
237 239
238 240
239 241 class VcsHttpProxy(object):
240 242
241 243 CHUNK_SIZE = 16384
242 244
243 245 def __init__(self, server_and_port, backend_endpoint):
244 246
245 247
246 248 retries = Retry(total=5, connect=None, read=None, redirect=None)
247 249
248 250 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
249 251 self.base_url = urlparse.urljoin(
250 252 'http://%s' % server_and_port, backend_endpoint)
251 253 self.session = requests.Session()
252 254 self.session.mount('http://', adapter)
253 255
254 256 def handle(self, environment, input_data, *args, **kwargs):
255 257 data = {
256 258 'environment': environment,
257 259 'input_data': input_data,
258 260 'args': args,
259 261 'kwargs': kwargs
260 262 }
261 263 result = self.session.post(
262 264 self.base_url, msgpack.packb(data), stream=True)
263 265 return self._get_result(result)
264 266
265 267 def _deserialize_and_raise(self, error):
266 268 exception = Exception(error['message'])
267 269 try:
268 270 exception._vcs_kind = error['_vcs_kind']
269 271 except KeyError:
270 272 pass
271 273 raise exception
272 274
273 275 def _iterate(self, result):
274 276 unpacker = msgpack.Unpacker()
275 277 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
276 278 unpacker.feed(line)
277 279 for chunk in unpacker:
278 280 yield chunk
279 281
280 282 def _get_result(self, result):
281 283 iterator = self._iterate(result)
282 284 error = iterator.next()
283 285 if error:
284 286 self._deserialize_and_raise(error)
285 287
286 288 status = iterator.next()
287 289 headers = iterator.next()
288 290
289 291 return iterator, status, headers
290 292
291 293
292 294 class ThreadlocalSessionFactory(object):
293 295 """
294 296 Creates one CurlSession per thread on demand.
295 297 """
296 298
297 299 def __init__(self):
298 300 self._thread_local = threading.local()
299 301
300 302 def __call__(self):
301 303 if not hasattr(self._thread_local, 'curl_session'):
302 304 self._thread_local.curl_session = CurlSession()
303 305 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now