##// END OF EJS Templates
http-client: improve exceptions
marcink -
r2944:2f16baee default
parent child Browse files
Show More
@@ -1,305 +1,305 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2018 RhodeCode GmbH
3 # Copyright (C) 2016-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Client for the VCSServer implemented based on HTTP.
22 Client for the VCSServer implemented based on HTTP.
23 """
23 """
24
24
25 import copy
25 import copy
26 import logging
26 import logging
27 import threading
27 import threading
28 import urllib2
28 import urllib2
29 import urlparse
29 import urlparse
30 import uuid
30 import uuid
31 import traceback
31 import traceback
32
32
33 import pycurl
33 import pycurl
34 import msgpack
34 import msgpack
35 import requests
35 import requests
36 from requests.packages.urllib3.util.retry import Retry
36 from requests.packages.urllib3.util.retry import Retry
37
37
38 from . import exceptions, CurlSession
38 from . import exceptions, CurlSession
39
39
40
40
41 log = logging.getLogger(__name__)
41 log = logging.getLogger(__name__)
42
42
43
43
44 # TODO: mikhail: Keep it in sync with vcsserver's
44 # TODO: mikhail: Keep it in sync with vcsserver's
45 # HTTPApplication.ALLOWED_EXCEPTIONS
45 # HTTPApplication.ALLOWED_EXCEPTIONS
46 EXCEPTIONS_MAP = {
46 EXCEPTIONS_MAP = {
47 'KeyError': KeyError,
47 'KeyError': KeyError,
48 'URLError': urllib2.URLError,
48 'URLError': urllib2.URLError,
49 }
49 }
50
50
51
51
52 class RepoMaker(object):
52 class RepoMaker(object):
53
53
54 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
54 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
55 self.url = urlparse.urljoin(
55 self.url = urlparse.urljoin(
56 'http://%s' % server_and_port, backend_endpoint)
56 'http://%s' % server_and_port, backend_endpoint)
57 self._session_factory = session_factory
57 self._session_factory = session_factory
58 self.backend_type = backend_type
58 self.backend_type = backend_type
59
59
60 def __call__(self, path, config, with_wire=None):
60 def __call__(self, path, config, with_wire=None):
61 log.debug('RepoMaker call on %s', path)
61 log.debug('RepoMaker call on %s', path)
62 return RemoteRepo(
62 return RemoteRepo(
63 path, config, self.url, self._session_factory(),
63 path, config, self.url, self._session_factory(),
64 with_wire=with_wire)
64 with_wire=with_wire)
65
65
66 def __getattr__(self, name):
66 def __getattr__(self, name):
67 def f(*args, **kwargs):
67 def f(*args, **kwargs):
68 return self._call(name, *args, **kwargs)
68 return self._call(name, *args, **kwargs)
69 return f
69 return f
70
70
71 @exceptions.map_vcs_exceptions
71 @exceptions.map_vcs_exceptions
72 def _call(self, name, *args, **kwargs):
72 def _call(self, name, *args, **kwargs):
73 payload = {
73 payload = {
74 'id': str(uuid.uuid4()),
74 'id': str(uuid.uuid4()),
75 'method': name,
75 'method': name,
76 'backend': self.backend_type,
76 'backend': self.backend_type,
77 'params': {'args': args, 'kwargs': kwargs}
77 'params': {'args': args, 'kwargs': kwargs}
78 }
78 }
79 return _remote_call(
79 return _remote_call(
80 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
80 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
81
81
82
82
83 class ServiceConnection(object):
83 class ServiceConnection(object):
84 def __init__(self, server_and_port, backend_endpoint, session_factory):
84 def __init__(self, server_and_port, backend_endpoint, session_factory):
85 self.url = urlparse.urljoin(
85 self.url = urlparse.urljoin(
86 'http://%s' % server_and_port, backend_endpoint)
86 'http://%s' % server_and_port, backend_endpoint)
87 self._session_factory = session_factory
87 self._session_factory = session_factory
88
88
89 def __getattr__(self, name):
89 def __getattr__(self, name):
90 def f(*args, **kwargs):
90 def f(*args, **kwargs):
91 return self._call(name, *args, **kwargs)
91 return self._call(name, *args, **kwargs)
92
92
93 return f
93 return f
94
94
95 @exceptions.map_vcs_exceptions
95 @exceptions.map_vcs_exceptions
96 def _call(self, name, *args, **kwargs):
96 def _call(self, name, *args, **kwargs):
97 payload = {
97 payload = {
98 'id': str(uuid.uuid4()),
98 'id': str(uuid.uuid4()),
99 'method': name,
99 'method': name,
100 'params': {'args': args, 'kwargs': kwargs}
100 'params': {'args': args, 'kwargs': kwargs}
101 }
101 }
102 return _remote_call(
102 return _remote_call(
103 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
103 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
104
104
105
105
106 class RemoteRepo(object):
106 class RemoteRepo(object):
107
107
108 def __init__(self, path, config, url, session, with_wire=None):
108 def __init__(self, path, config, url, session, with_wire=None):
109 self.url = url
109 self.url = url
110 self._session = session
110 self._session = session
111 self._wire = {
111 self._wire = {
112 "path": path,
112 "path": path,
113 "config": config,
113 "config": config,
114 "context": self._create_vcs_cache_context(),
114 "context": self._create_vcs_cache_context(),
115 }
115 }
116 if with_wire:
116 if with_wire:
117 self._wire.update(with_wire)
117 self._wire.update(with_wire)
118
118
119 # johbo: Trading complexity for performance. Avoiding the call to
119 # johbo: Trading complexity for performance. Avoiding the call to
120 # log.debug brings a few percent gain even if is is not active.
120 # log.debug brings a few percent gain even if is is not active.
121 if log.isEnabledFor(logging.DEBUG):
121 if log.isEnabledFor(logging.DEBUG):
122 self._call = self._call_with_logging
122 self._call = self._call_with_logging
123
123
124 def __getattr__(self, name):
124 def __getattr__(self, name):
125 def f(*args, **kwargs):
125 def f(*args, **kwargs):
126 return self._call(name, *args, **kwargs)
126 return self._call(name, *args, **kwargs)
127 return f
127 return f
128
128
129 @exceptions.map_vcs_exceptions
129 @exceptions.map_vcs_exceptions
130 def _call(self, name, *args, **kwargs):
130 def _call(self, name, *args, **kwargs):
131 # TODO: oliver: This is currently necessary pre-call since the
131 # TODO: oliver: This is currently necessary pre-call since the
132 # config object is being changed for hooking scenarios
132 # config object is being changed for hooking scenarios
133 wire = copy.deepcopy(self._wire)
133 wire = copy.deepcopy(self._wire)
134 wire["config"] = wire["config"].serialize()
134 wire["config"] = wire["config"].serialize()
135 payload = {
135 payload = {
136 'id': str(uuid.uuid4()),
136 'id': str(uuid.uuid4()),
137 'method': name,
137 'method': name,
138 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
138 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
139 }
139 }
140 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
140 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
141
141
142 def _call_with_logging(self, name, *args, **kwargs):
142 def _call_with_logging(self, name, *args, **kwargs):
143 context_uid = self._wire.get('context')
143 context_uid = self._wire.get('context')
144 log.debug('Calling %s@%s with args:%r. wire_context: %s',
144 log.debug('Calling %s@%s with args:%r. wire_context: %s',
145 self.url, name, args, context_uid)
145 self.url, name, args, context_uid)
146 return RemoteRepo._call(self, name, *args, **kwargs)
146 return RemoteRepo._call(self, name, *args, **kwargs)
147
147
148 def __getitem__(self, key):
148 def __getitem__(self, key):
149 return self.revision(key)
149 return self.revision(key)
150
150
151 def _create_vcs_cache_context(self):
151 def _create_vcs_cache_context(self):
152 """
152 """
153 Creates a unique string which is passed to the VCSServer on every
153 Creates a unique string which is passed to the VCSServer on every
154 remote call. It is used as cache key in the VCSServer.
154 remote call. It is used as cache key in the VCSServer.
155 """
155 """
156 return str(uuid.uuid4())
156 return str(uuid.uuid4())
157
157
158 def invalidate_vcs_cache(self):
158 def invalidate_vcs_cache(self):
159 """
159 """
160 This invalidates the context which is sent to the VCSServer on every
160 This invalidates the context which is sent to the VCSServer on every
161 call to a remote method. It forces the VCSServer to create a fresh
161 call to a remote method. It forces the VCSServer to create a fresh
162 repository instance on the next call to a remote method.
162 repository instance on the next call to a remote method.
163 """
163 """
164 self._wire['context'] = self._create_vcs_cache_context()
164 self._wire['context'] = self._create_vcs_cache_context()
165
165
166
166
167 class RemoteObject(object):
167 class RemoteObject(object):
168
168
169 def __init__(self, url, session):
169 def __init__(self, url, session):
170 self._url = url
170 self._url = url
171 self._session = session
171 self._session = session
172
172
173 # johbo: Trading complexity for performance. Avoiding the call to
173 # johbo: Trading complexity for performance. Avoiding the call to
174 # log.debug brings a few percent gain even if is is not active.
174 # log.debug brings a few percent gain even if is is not active.
175 if log.isEnabledFor(logging.DEBUG):
175 if log.isEnabledFor(logging.DEBUG):
176 self._call = self._call_with_logging
176 self._call = self._call_with_logging
177
177
178 def __getattr__(self, name):
178 def __getattr__(self, name):
179 def f(*args, **kwargs):
179 def f(*args, **kwargs):
180 return self._call(name, *args, **kwargs)
180 return self._call(name, *args, **kwargs)
181 return f
181 return f
182
182
183 @exceptions.map_vcs_exceptions
183 @exceptions.map_vcs_exceptions
184 def _call(self, name, *args, **kwargs):
184 def _call(self, name, *args, **kwargs):
185 payload = {
185 payload = {
186 'id': str(uuid.uuid4()),
186 'id': str(uuid.uuid4()),
187 'method': name,
187 'method': name,
188 'params': {'args': args, 'kwargs': kwargs}
188 'params': {'args': args, 'kwargs': kwargs}
189 }
189 }
190 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
190 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
191
191
192 def _call_with_logging(self, name, *args, **kwargs):
192 def _call_with_logging(self, name, *args, **kwargs):
193 log.debug('Calling %s@%s', self._url, name)
193 log.debug('Calling %s@%s', self._url, name)
194 return RemoteObject._call(self, name, *args, **kwargs)
194 return RemoteObject._call(self, name, *args, **kwargs)
195
195
196
196
197 def _remote_call(url, payload, exceptions_map, session):
197 def _remote_call(url, payload, exceptions_map, session):
198 try:
198 try:
199 response = session.post(url, data=msgpack.packb(payload))
199 response = session.post(url, data=msgpack.packb(payload))
200 except pycurl.error as e:
200 except pycurl.error as e:
201 msg = '{}. pycurl traceback: {}'.format(e, traceback.format_exc())
201 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
202 raise exceptions.HttpVCSCommunicationError(msg)
202 raise exceptions.HttpVCSCommunicationError(msg)
203 except Exception as e:
203 except Exception as e:
204 message = getattr(e, 'message', '')
204 message = getattr(e, 'message', '')
205 if 'Failed to connect' in message:
205 if 'Failed to connect' in message:
206 # gevent doesn't return proper pycurl errors
206 # gevent doesn't return proper pycurl errors
207 raise exceptions.HttpVCSCommunicationError(e)
207 raise exceptions.HttpVCSCommunicationError(e)
208 else:
208 else:
209 raise
209 raise
210
210
211 if response.status_code >= 400:
211 if response.status_code >= 400:
212 log.error('Call to %s returned non 200 HTTP code: %s',
212 log.error('Call to %s returned non 200 HTTP code: %s',
213 url, response.status_code)
213 url, response.status_code)
214 raise exceptions.HttpVCSCommunicationError(repr(response.content))
214 raise exceptions.HttpVCSCommunicationError(repr(response.content))
215
215
216 try:
216 try:
217 response = msgpack.unpackb(response.content)
217 response = msgpack.unpackb(response.content)
218 except Exception:
218 except Exception:
219 log.exception('Failed to decode response %r', response.content)
219 log.exception('Failed to decode response %r', response.content)
220 raise
220 raise
221
221
222 error = response.get('error')
222 error = response.get('error')
223 if error:
223 if error:
224 type_ = error.get('type', 'Exception')
224 type_ = error.get('type', 'Exception')
225 exc = exceptions_map.get(type_, Exception)
225 exc = exceptions_map.get(type_, Exception)
226 exc = exc(error.get('message'))
226 exc = exc(error.get('message'))
227 try:
227 try:
228 exc._vcs_kind = error['_vcs_kind']
228 exc._vcs_kind = error['_vcs_kind']
229 except KeyError:
229 except KeyError:
230 pass
230 pass
231
231
232 try:
232 try:
233 exc._vcs_server_traceback = error['traceback']
233 exc._vcs_server_traceback = error['traceback']
234 except KeyError:
234 except KeyError:
235 pass
235 pass
236
236
237 raise exc
237 raise exc
238 return response.get('result')
238 return response.get('result')
239
239
240
240
241 class VcsHttpProxy(object):
241 class VcsHttpProxy(object):
242
242
243 CHUNK_SIZE = 16384
243 CHUNK_SIZE = 16384
244
244
245 def __init__(self, server_and_port, backend_endpoint):
245 def __init__(self, server_and_port, backend_endpoint):
246
246
247
247
248 retries = Retry(total=5, connect=None, read=None, redirect=None)
248 retries = Retry(total=5, connect=None, read=None, redirect=None)
249
249
250 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
250 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
251 self.base_url = urlparse.urljoin(
251 self.base_url = urlparse.urljoin(
252 'http://%s' % server_and_port, backend_endpoint)
252 'http://%s' % server_and_port, backend_endpoint)
253 self.session = requests.Session()
253 self.session = requests.Session()
254 self.session.mount('http://', adapter)
254 self.session.mount('http://', adapter)
255
255
256 def handle(self, environment, input_data, *args, **kwargs):
256 def handle(self, environment, input_data, *args, **kwargs):
257 data = {
257 data = {
258 'environment': environment,
258 'environment': environment,
259 'input_data': input_data,
259 'input_data': input_data,
260 'args': args,
260 'args': args,
261 'kwargs': kwargs
261 'kwargs': kwargs
262 }
262 }
263 result = self.session.post(
263 result = self.session.post(
264 self.base_url, msgpack.packb(data), stream=True)
264 self.base_url, msgpack.packb(data), stream=True)
265 return self._get_result(result)
265 return self._get_result(result)
266
266
267 def _deserialize_and_raise(self, error):
267 def _deserialize_and_raise(self, error):
268 exception = Exception(error['message'])
268 exception = Exception(error['message'])
269 try:
269 try:
270 exception._vcs_kind = error['_vcs_kind']
270 exception._vcs_kind = error['_vcs_kind']
271 except KeyError:
271 except KeyError:
272 pass
272 pass
273 raise exception
273 raise exception
274
274
275 def _iterate(self, result):
275 def _iterate(self, result):
276 unpacker = msgpack.Unpacker()
276 unpacker = msgpack.Unpacker()
277 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
277 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
278 unpacker.feed(line)
278 unpacker.feed(line)
279 for chunk in unpacker:
279 for chunk in unpacker:
280 yield chunk
280 yield chunk
281
281
282 def _get_result(self, result):
282 def _get_result(self, result):
283 iterator = self._iterate(result)
283 iterator = self._iterate(result)
284 error = iterator.next()
284 error = iterator.next()
285 if error:
285 if error:
286 self._deserialize_and_raise(error)
286 self._deserialize_and_raise(error)
287
287
288 status = iterator.next()
288 status = iterator.next()
289 headers = iterator.next()
289 headers = iterator.next()
290
290
291 return iterator, status, headers
291 return iterator, status, headers
292
292
293
293
294 class ThreadlocalSessionFactory(object):
294 class ThreadlocalSessionFactory(object):
295 """
295 """
296 Creates one CurlSession per thread on demand.
296 Creates one CurlSession per thread on demand.
297 """
297 """
298
298
299 def __init__(self):
299 def __init__(self):
300 self._thread_local = threading.local()
300 self._thread_local = threading.local()
301
301
302 def __call__(self):
302 def __call__(self):
303 if not hasattr(self._thread_local, 'curl_session'):
303 if not hasattr(self._thread_local, 'curl_session'):
304 self._thread_local.curl_session = CurlSession()
304 self._thread_local.curl_session = CurlSession()
305 return self._thread_local.curl_session
305 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now