##// END OF EJS Templates
http: better reporting of msgpack unpack errors.
marcink -
r1110:17efd44c default
parent child Browse files
Show More
@@ -1,255 +1,283 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2016 RhodeCode GmbH
3 # Copyright (C) 2016-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Client for the VCSServer implemented based on HTTP.
22 Client for the VCSServer implemented based on HTTP.
23
23
24
24
25 Status
25 Status
26 ------
26 ------
27
27
28 This client implementation shall eventually replace the Pyro4 based
28 This client implementation shall eventually replace the Pyro4 based
29 implementation.
29 implementation.
30 """
30 """
31
31
32 import copy
32 import copy
33 import logging
33 import logging
34 import threading
34 import threading
35 import urllib2
35 import urllib2
36 import urlparse
36 import urlparse
37 import uuid
37 import uuid
38
38
39 import pycurl
39 import pycurl
40 import msgpack
40 import msgpack
41 import requests
41 import requests
42
42
43 from . import exceptions, CurlSession
43 from . import exceptions, CurlSession
44
44
45
45
46 log = logging.getLogger(__name__)
46 log = logging.getLogger(__name__)
47
47
48
48
49 # TODO: mikhail: Keep it in sync with vcsserver's
49 # TODO: mikhail: Keep it in sync with vcsserver's
50 # HTTPApplication.ALLOWED_EXCEPTIONS
50 # HTTPApplication.ALLOWED_EXCEPTIONS
51 EXCEPTIONS_MAP = {
51 EXCEPTIONS_MAP = {
52 'KeyError': KeyError,
52 'KeyError': KeyError,
53 'URLError': urllib2.URLError,
53 'URLError': urllib2.URLError,
54 }
54 }
55
55
56
56
57 class RepoMaker(object):
57 class RepoMaker(object):
58
58
59 def __init__(self, server_and_port, backend_endpoint, session_factory):
59 def __init__(self, server_and_port, backend_endpoint, session_factory):
60 self.url = urlparse.urljoin(
60 self.url = urlparse.urljoin(
61 'http://%s' % server_and_port, backend_endpoint)
61 'http://%s' % server_and_port, backend_endpoint)
62 self._session_factory = session_factory
62 self._session_factory = session_factory
63
63
64 def __call__(self, path, config, with_wire=None):
64 def __call__(self, path, config, with_wire=None):
65 log.debug('RepoMaker call on %s', path)
65 log.debug('RepoMaker call on %s', path)
66 return RemoteRepo(
66 return RemoteRepo(
67 path, config, self.url, self._session_factory(),
67 path, config, self.url, self._session_factory(),
68 with_wire=with_wire)
68 with_wire=with_wire)
69
69
70 def __getattr__(self, name):
70 def __getattr__(self, name):
71 def f(*args, **kwargs):
71 def f(*args, **kwargs):
72 return self._call(name, *args, **kwargs)
72 return self._call(name, *args, **kwargs)
73 return f
73 return f
74
74
75 @exceptions.map_vcs_exceptions
75 @exceptions.map_vcs_exceptions
76 def _call(self, name, *args, **kwargs):
76 def _call(self, name, *args, **kwargs):
77 payload = {
77 payload = {
78 'id': str(uuid.uuid4()),
78 'id': str(uuid.uuid4()),
79 'method': name,
79 'method': name,
80 'params': {'args': args, 'kwargs': kwargs}
80 'params': {'args': args, 'kwargs': kwargs}
81 }
81 }
82 return _remote_call(
82 return _remote_call(
83 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
83 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
84
84
85
85
86 class ServiceConnection(object):
87 def __init__(self, server_and_port, backend_endpoint, session_factory):
88 self.url = urlparse.urljoin(
89 'http://%s' % server_and_port, backend_endpoint)
90 self._session_factory = session_factory
91
92 def __getattr__(self, name):
93 def f(*args, **kwargs):
94 return self._call(name, *args, **kwargs)
95
96 return f
97
98 @exceptions.map_vcs_exceptions
99 def _call(self, name, *args, **kwargs):
100 payload = {
101 'id': str(uuid.uuid4()),
102 'method': name,
103 'params': {'args': args, 'kwargs': kwargs}
104 }
105 return _remote_call(
106 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
107
108
86 class RemoteRepo(object):
109 class RemoteRepo(object):
87
110
88 def __init__(self, path, config, url, session, with_wire=None):
111 def __init__(self, path, config, url, session, with_wire=None):
89 self.url = url
112 self.url = url
90 self._session = session
113 self._session = session
91 self._wire = {
114 self._wire = {
92 "path": path,
115 "path": path,
93 "config": config,
116 "config": config,
94 "context": self._create_vcs_cache_context(),
117 "context": self._create_vcs_cache_context(),
95 }
118 }
96 if with_wire:
119 if with_wire:
97 self._wire.update(with_wire)
120 self._wire.update(with_wire)
98
121
99 # johbo: Trading complexity for performance. Avoiding the call to
122 # johbo: Trading complexity for performance. Avoiding the call to
100 # log.debug brings a few percent gain even if is is not active.
123 # log.debug brings a few percent gain even if is is not active.
101 if log.isEnabledFor(logging.DEBUG):
124 if log.isEnabledFor(logging.DEBUG):
102 self._call = self._call_with_logging
125 self._call = self._call_with_logging
103
126
104 def __getattr__(self, name):
127 def __getattr__(self, name):
105 def f(*args, **kwargs):
128 def f(*args, **kwargs):
106 return self._call(name, *args, **kwargs)
129 return self._call(name, *args, **kwargs)
107 return f
130 return f
108
131
109 @exceptions.map_vcs_exceptions
132 @exceptions.map_vcs_exceptions
110 def _call(self, name, *args, **kwargs):
133 def _call(self, name, *args, **kwargs):
111 # TODO: oliver: This is currently necessary pre-call since the
134 # TODO: oliver: This is currently necessary pre-call since the
112 # config object is being changed for hooking scenarios
135 # config object is being changed for hooking scenarios
113 wire = copy.deepcopy(self._wire)
136 wire = copy.deepcopy(self._wire)
114 wire["config"] = wire["config"].serialize()
137 wire["config"] = wire["config"].serialize()
115 payload = {
138 payload = {
116 'id': str(uuid.uuid4()),
139 'id': str(uuid.uuid4()),
117 'method': name,
140 'method': name,
118 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
141 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
119 }
142 }
120 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
143 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
121
144
122 def _call_with_logging(self, name, *args, **kwargs):
145 def _call_with_logging(self, name, *args, **kwargs):
123 log.debug('Calling %s@%s', self.url, name)
146 log.debug('Calling %s@%s', self.url, name)
124 return RemoteRepo._call(self, name, *args, **kwargs)
147 return RemoteRepo._call(self, name, *args, **kwargs)
125
148
126 def __getitem__(self, key):
149 def __getitem__(self, key):
127 return self.revision(key)
150 return self.revision(key)
128
151
129 def _create_vcs_cache_context(self):
152 def _create_vcs_cache_context(self):
130 """
153 """
131 Creates a unique string which is passed to the VCSServer on every
154 Creates a unique string which is passed to the VCSServer on every
132 remote call. It is used as cache key in the VCSServer.
155 remote call. It is used as cache key in the VCSServer.
133 """
156 """
134 return str(uuid.uuid4())
157 return str(uuid.uuid4())
135
158
136 def invalidate_vcs_cache(self):
159 def invalidate_vcs_cache(self):
137 """
160 """
138 This invalidates the context which is sent to the VCSServer on every
161 This invalidates the context which is sent to the VCSServer on every
139 call to a remote method. It forces the VCSServer to create a fresh
162 call to a remote method. It forces the VCSServer to create a fresh
140 repository instance on the next call to a remote method.
163 repository instance on the next call to a remote method.
141 """
164 """
142 self._wire['context'] = self._create_vcs_cache_context()
165 self._wire['context'] = self._create_vcs_cache_context()
143
166
144
167
145 class RemoteObject(object):
168 class RemoteObject(object):
146
169
147 def __init__(self, url, session):
170 def __init__(self, url, session):
148 self._url = url
171 self._url = url
149 self._session = session
172 self._session = session
150
173
151 # johbo: Trading complexity for performance. Avoiding the call to
174 # johbo: Trading complexity for performance. Avoiding the call to
152 # log.debug brings a few percent gain even if is is not active.
175 # log.debug brings a few percent gain even if is is not active.
153 if log.isEnabledFor(logging.DEBUG):
176 if log.isEnabledFor(logging.DEBUG):
154 self._call = self._call_with_logging
177 self._call = self._call_with_logging
155
178
156 def __getattr__(self, name):
179 def __getattr__(self, name):
157 def f(*args, **kwargs):
180 def f(*args, **kwargs):
158 return self._call(name, *args, **kwargs)
181 return self._call(name, *args, **kwargs)
159 return f
182 return f
160
183
161 @exceptions.map_vcs_exceptions
184 @exceptions.map_vcs_exceptions
162 def _call(self, name, *args, **kwargs):
185 def _call(self, name, *args, **kwargs):
163 payload = {
186 payload = {
164 'id': str(uuid.uuid4()),
187 'id': str(uuid.uuid4()),
165 'method': name,
188 'method': name,
166 'params': {'args': args, 'kwargs': kwargs}
189 'params': {'args': args, 'kwargs': kwargs}
167 }
190 }
168 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
191 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
169
192
170 def _call_with_logging(self, name, *args, **kwargs):
193 def _call_with_logging(self, name, *args, **kwargs):
171 log.debug('Calling %s@%s', self._url, name)
194 log.debug('Calling %s@%s', self._url, name)
172 return RemoteObject._call(self, name, *args, **kwargs)
195 return RemoteObject._call(self, name, *args, **kwargs)
173
196
174
197
175 def _remote_call(url, payload, exceptions_map, session):
198 def _remote_call(url, payload, exceptions_map, session):
176 try:
199 try:
177 response = session.post(url, data=msgpack.packb(payload))
200 response = session.post(url, data=msgpack.packb(payload))
178 except pycurl.error as e:
201 except pycurl.error as e:
179 raise exceptions.HttpVCSCommunicationError(e)
202 raise exceptions.HttpVCSCommunicationError(e)
180
203
181 response = msgpack.unpackb(response.content)
204 try:
205 response = msgpack.unpackb(response.content)
206 except Exception:
207 log.exception('Failed to decode repsponse %r', response.content)
208 raise
209
182 error = response.get('error')
210 error = response.get('error')
183 if error:
211 if error:
184 type_ = error.get('type', 'Exception')
212 type_ = error.get('type', 'Exception')
185 exc = exceptions_map.get(type_, Exception)
213 exc = exceptions_map.get(type_, Exception)
186 exc = exc(error.get('message'))
214 exc = exc(error.get('message'))
187 try:
215 try:
188 exc._vcs_kind = error['_vcs_kind']
216 exc._vcs_kind = error['_vcs_kind']
189 except KeyError:
217 except KeyError:
190 pass
218 pass
191 raise exc
219 raise exc
192 return response.get('result')
220 return response.get('result')
193
221
194
222
195 class VcsHttpProxy(object):
223 class VcsHttpProxy(object):
196
224
197 CHUNK_SIZE = 16384
225 CHUNK_SIZE = 16384
198
226
199 def __init__(self, server_and_port, backend_endpoint):
227 def __init__(self, server_and_port, backend_endpoint):
200 adapter = requests.adapters.HTTPAdapter(max_retries=5)
228 adapter = requests.adapters.HTTPAdapter(max_retries=5)
201 self.base_url = urlparse.urljoin(
229 self.base_url = urlparse.urljoin(
202 'http://%s' % server_and_port, backend_endpoint)
230 'http://%s' % server_and_port, backend_endpoint)
203 self.session = requests.Session()
231 self.session = requests.Session()
204 self.session.mount('http://', adapter)
232 self.session.mount('http://', adapter)
205
233
206 def handle(self, environment, input_data, *args, **kwargs):
234 def handle(self, environment, input_data, *args, **kwargs):
207 data = {
235 data = {
208 'environment': environment,
236 'environment': environment,
209 'input_data': input_data,
237 'input_data': input_data,
210 'args': args,
238 'args': args,
211 'kwargs': kwargs
239 'kwargs': kwargs
212 }
240 }
213 result = self.session.post(
241 result = self.session.post(
214 self.base_url, msgpack.packb(data), stream=True)
242 self.base_url, msgpack.packb(data), stream=True)
215 return self._get_result(result)
243 return self._get_result(result)
216
244
217 def _deserialize_and_raise(self, error):
245 def _deserialize_and_raise(self, error):
218 exception = Exception(error['message'])
246 exception = Exception(error['message'])
219 try:
247 try:
220 exception._vcs_kind = error['_vcs_kind']
248 exception._vcs_kind = error['_vcs_kind']
221 except KeyError:
249 except KeyError:
222 pass
250 pass
223 raise exception
251 raise exception
224
252
225 def _iterate(self, result):
253 def _iterate(self, result):
226 unpacker = msgpack.Unpacker()
254 unpacker = msgpack.Unpacker()
227 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
255 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
228 unpacker.feed(line)
256 unpacker.feed(line)
229 for chunk in unpacker:
257 for chunk in unpacker:
230 yield chunk
258 yield chunk
231
259
232 def _get_result(self, result):
260 def _get_result(self, result):
233 iterator = self._iterate(result)
261 iterator = self._iterate(result)
234 error = iterator.next()
262 error = iterator.next()
235 if error:
263 if error:
236 self._deserialize_and_raise(error)
264 self._deserialize_and_raise(error)
237
265
238 status = iterator.next()
266 status = iterator.next()
239 headers = iterator.next()
267 headers = iterator.next()
240
268
241 return iterator, status, headers
269 return iterator, status, headers
242
270
243
271
244 class ThreadlocalSessionFactory(object):
272 class ThreadlocalSessionFactory(object):
245 """
273 """
246 Creates one CurlSession per thread on demand.
274 Creates one CurlSession per thread on demand.
247 """
275 """
248
276
249 def __init__(self):
277 def __init__(self):
250 self._thread_local = threading.local()
278 self._thread_local = threading.local()
251
279
252 def __call__(self):
280 def __call__(self):
253 if not hasattr(self._thread_local, 'curl_session'):
281 if not hasattr(self._thread_local, 'curl_session'):
254 self._thread_local.curl_session = CurlSession()
282 self._thread_local.curl_session = CurlSession()
255 return self._thread_local.curl_session
283 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now