##// END OF EJS Templates
http-client: improve exceptions
marcink -
r2944:2f16baee default
parent child Browse files
Show More
@@ -1,305 +1,305 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Client for the VCSServer implemented based on HTTP.
23 23 """
24 24
25 25 import copy
26 26 import logging
27 27 import threading
28 28 import urllib2
29 29 import urlparse
30 30 import uuid
31 31 import traceback
32 32
33 33 import pycurl
34 34 import msgpack
35 35 import requests
36 36 from requests.packages.urllib3.util.retry import Retry
37 37
38 38 from . import exceptions, CurlSession
39 39
40 40
41 41 log = logging.getLogger(__name__)
42 42
43 43
44 44 # TODO: mikhail: Keep it in sync with vcsserver's
45 45 # HTTPApplication.ALLOWED_EXCEPTIONS
46 46 EXCEPTIONS_MAP = {
47 47 'KeyError': KeyError,
48 48 'URLError': urllib2.URLError,
49 49 }
50 50
51 51
52 52 class RepoMaker(object):
53 53
54 54 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
55 55 self.url = urlparse.urljoin(
56 56 'http://%s' % server_and_port, backend_endpoint)
57 57 self._session_factory = session_factory
58 58 self.backend_type = backend_type
59 59
60 60 def __call__(self, path, config, with_wire=None):
61 61 log.debug('RepoMaker call on %s', path)
62 62 return RemoteRepo(
63 63 path, config, self.url, self._session_factory(),
64 64 with_wire=with_wire)
65 65
66 66 def __getattr__(self, name):
67 67 def f(*args, **kwargs):
68 68 return self._call(name, *args, **kwargs)
69 69 return f
70 70
71 71 @exceptions.map_vcs_exceptions
72 72 def _call(self, name, *args, **kwargs):
73 73 payload = {
74 74 'id': str(uuid.uuid4()),
75 75 'method': name,
76 76 'backend': self.backend_type,
77 77 'params': {'args': args, 'kwargs': kwargs}
78 78 }
79 79 return _remote_call(
80 80 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
81 81
82 82
83 83 class ServiceConnection(object):
84 84 def __init__(self, server_and_port, backend_endpoint, session_factory):
85 85 self.url = urlparse.urljoin(
86 86 'http://%s' % server_and_port, backend_endpoint)
87 87 self._session_factory = session_factory
88 88
89 89 def __getattr__(self, name):
90 90 def f(*args, **kwargs):
91 91 return self._call(name, *args, **kwargs)
92 92
93 93 return f
94 94
95 95 @exceptions.map_vcs_exceptions
96 96 def _call(self, name, *args, **kwargs):
97 97 payload = {
98 98 'id': str(uuid.uuid4()),
99 99 'method': name,
100 100 'params': {'args': args, 'kwargs': kwargs}
101 101 }
102 102 return _remote_call(
103 103 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
104 104
105 105
106 106 class RemoteRepo(object):
107 107
108 108 def __init__(self, path, config, url, session, with_wire=None):
109 109 self.url = url
110 110 self._session = session
111 111 self._wire = {
112 112 "path": path,
113 113 "config": config,
114 114 "context": self._create_vcs_cache_context(),
115 115 }
116 116 if with_wire:
117 117 self._wire.update(with_wire)
118 118
119 119 # johbo: Trading complexity for performance. Avoiding the call to
120 120 # log.debug brings a few percent gain even if is is not active.
121 121 if log.isEnabledFor(logging.DEBUG):
122 122 self._call = self._call_with_logging
123 123
124 124 def __getattr__(self, name):
125 125 def f(*args, **kwargs):
126 126 return self._call(name, *args, **kwargs)
127 127 return f
128 128
129 129 @exceptions.map_vcs_exceptions
130 130 def _call(self, name, *args, **kwargs):
131 131 # TODO: oliver: This is currently necessary pre-call since the
132 132 # config object is being changed for hooking scenarios
133 133 wire = copy.deepcopy(self._wire)
134 134 wire["config"] = wire["config"].serialize()
135 135 payload = {
136 136 'id': str(uuid.uuid4()),
137 137 'method': name,
138 138 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
139 139 }
140 140 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
141 141
142 142 def _call_with_logging(self, name, *args, **kwargs):
143 143 context_uid = self._wire.get('context')
144 144 log.debug('Calling %s@%s with args:%r. wire_context: %s',
145 145 self.url, name, args, context_uid)
146 146 return RemoteRepo._call(self, name, *args, **kwargs)
147 147
148 148 def __getitem__(self, key):
149 149 return self.revision(key)
150 150
151 151 def _create_vcs_cache_context(self):
152 152 """
153 153 Creates a unique string which is passed to the VCSServer on every
154 154 remote call. It is used as cache key in the VCSServer.
155 155 """
156 156 return str(uuid.uuid4())
157 157
158 158 def invalidate_vcs_cache(self):
159 159 """
160 160 This invalidates the context which is sent to the VCSServer on every
161 161 call to a remote method. It forces the VCSServer to create a fresh
162 162 repository instance on the next call to a remote method.
163 163 """
164 164 self._wire['context'] = self._create_vcs_cache_context()
165 165
166 166
167 167 class RemoteObject(object):
168 168
169 169 def __init__(self, url, session):
170 170 self._url = url
171 171 self._session = session
172 172
173 173 # johbo: Trading complexity for performance. Avoiding the call to
174 174 # log.debug brings a few percent gain even if is is not active.
175 175 if log.isEnabledFor(logging.DEBUG):
176 176 self._call = self._call_with_logging
177 177
178 178 def __getattr__(self, name):
179 179 def f(*args, **kwargs):
180 180 return self._call(name, *args, **kwargs)
181 181 return f
182 182
183 183 @exceptions.map_vcs_exceptions
184 184 def _call(self, name, *args, **kwargs):
185 185 payload = {
186 186 'id': str(uuid.uuid4()),
187 187 'method': name,
188 188 'params': {'args': args, 'kwargs': kwargs}
189 189 }
190 190 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
191 191
192 192 def _call_with_logging(self, name, *args, **kwargs):
193 193 log.debug('Calling %s@%s', self._url, name)
194 194 return RemoteObject._call(self, name, *args, **kwargs)
195 195
196 196
197 197 def _remote_call(url, payload, exceptions_map, session):
198 198 try:
199 199 response = session.post(url, data=msgpack.packb(payload))
200 200 except pycurl.error as e:
201 msg = '{}. pycurl traceback: {}'.format(e, traceback.format_exc())
201 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
202 202 raise exceptions.HttpVCSCommunicationError(msg)
203 203 except Exception as e:
204 204 message = getattr(e, 'message', '')
205 205 if 'Failed to connect' in message:
206 206 # gevent doesn't return proper pycurl errors
207 207 raise exceptions.HttpVCSCommunicationError(e)
208 208 else:
209 209 raise
210 210
211 211 if response.status_code >= 400:
212 212 log.error('Call to %s returned non 200 HTTP code: %s',
213 213 url, response.status_code)
214 214 raise exceptions.HttpVCSCommunicationError(repr(response.content))
215 215
216 216 try:
217 217 response = msgpack.unpackb(response.content)
218 218 except Exception:
219 219 log.exception('Failed to decode response %r', response.content)
220 220 raise
221 221
222 222 error = response.get('error')
223 223 if error:
224 224 type_ = error.get('type', 'Exception')
225 225 exc = exceptions_map.get(type_, Exception)
226 226 exc = exc(error.get('message'))
227 227 try:
228 228 exc._vcs_kind = error['_vcs_kind']
229 229 except KeyError:
230 230 pass
231 231
232 232 try:
233 233 exc._vcs_server_traceback = error['traceback']
234 234 except KeyError:
235 235 pass
236 236
237 237 raise exc
238 238 return response.get('result')
239 239
240 240
241 241 class VcsHttpProxy(object):
242 242
243 243 CHUNK_SIZE = 16384
244 244
245 245 def __init__(self, server_and_port, backend_endpoint):
246 246
247 247
248 248 retries = Retry(total=5, connect=None, read=None, redirect=None)
249 249
250 250 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
251 251 self.base_url = urlparse.urljoin(
252 252 'http://%s' % server_and_port, backend_endpoint)
253 253 self.session = requests.Session()
254 254 self.session.mount('http://', adapter)
255 255
256 256 def handle(self, environment, input_data, *args, **kwargs):
257 257 data = {
258 258 'environment': environment,
259 259 'input_data': input_data,
260 260 'args': args,
261 261 'kwargs': kwargs
262 262 }
263 263 result = self.session.post(
264 264 self.base_url, msgpack.packb(data), stream=True)
265 265 return self._get_result(result)
266 266
267 267 def _deserialize_and_raise(self, error):
268 268 exception = Exception(error['message'])
269 269 try:
270 270 exception._vcs_kind = error['_vcs_kind']
271 271 except KeyError:
272 272 pass
273 273 raise exception
274 274
275 275 def _iterate(self, result):
276 276 unpacker = msgpack.Unpacker()
277 277 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
278 278 unpacker.feed(line)
279 279 for chunk in unpacker:
280 280 yield chunk
281 281
282 282 def _get_result(self, result):
283 283 iterator = self._iterate(result)
284 284 error = iterator.next()
285 285 if error:
286 286 self._deserialize_and_raise(error)
287 287
288 288 status = iterator.next()
289 289 headers = iterator.next()
290 290
291 291 return iterator, status, headers
292 292
293 293
294 294 class ThreadlocalSessionFactory(object):
295 295 """
296 296 Creates one CurlSession per thread on demand.
297 297 """
298 298
299 299 def __init__(self):
300 300 self._thread_local = threading.local()
301 301
302 302 def __call__(self):
303 303 if not hasattr(self._thread_local, 'curl_session'):
304 304 self._thread_local.curl_session = CurlSession()
305 305 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now