##// END OF EJS Templates
http: better reporting of msgpack unpack errors.
marcink -
r1110:17efd44c default
parent child Browse files
Show More
@@ -1,255 +1,283 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Client for the VCSServer implemented based on HTTP.
23 23
24 24
25 25 Status
26 26 ------
27 27
28 28 This client implementation shall eventually replace the Pyro4 based
29 29 implementation.
30 30 """
31 31
32 32 import copy
33 33 import logging
34 34 import threading
35 35 import urllib2
36 36 import urlparse
37 37 import uuid
38 38
39 39 import pycurl
40 40 import msgpack
41 41 import requests
42 42
43 43 from . import exceptions, CurlSession
44 44
45 45
46 46 log = logging.getLogger(__name__)
47 47
48 48
49 49 # TODO: mikhail: Keep it in sync with vcsserver's
50 50 # HTTPApplication.ALLOWED_EXCEPTIONS
51 51 EXCEPTIONS_MAP = {
52 52 'KeyError': KeyError,
53 53 'URLError': urllib2.URLError,
54 54 }
55 55
56 56
57 57 class RepoMaker(object):
58 58
59 59 def __init__(self, server_and_port, backend_endpoint, session_factory):
60 60 self.url = urlparse.urljoin(
61 61 'http://%s' % server_and_port, backend_endpoint)
62 62 self._session_factory = session_factory
63 63
64 64 def __call__(self, path, config, with_wire=None):
65 65 log.debug('RepoMaker call on %s', path)
66 66 return RemoteRepo(
67 67 path, config, self.url, self._session_factory(),
68 68 with_wire=with_wire)
69 69
70 70 def __getattr__(self, name):
71 71 def f(*args, **kwargs):
72 72 return self._call(name, *args, **kwargs)
73 73 return f
74 74
75 75 @exceptions.map_vcs_exceptions
76 76 def _call(self, name, *args, **kwargs):
77 77 payload = {
78 78 'id': str(uuid.uuid4()),
79 79 'method': name,
80 80 'params': {'args': args, 'kwargs': kwargs}
81 81 }
82 82 return _remote_call(
83 83 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
84 84
85 85
86 class ServiceConnection(object):
87 def __init__(self, server_and_port, backend_endpoint, session_factory):
88 self.url = urlparse.urljoin(
89 'http://%s' % server_and_port, backend_endpoint)
90 self._session_factory = session_factory
91
92 def __getattr__(self, name):
93 def f(*args, **kwargs):
94 return self._call(name, *args, **kwargs)
95
96 return f
97
98 @exceptions.map_vcs_exceptions
99 def _call(self, name, *args, **kwargs):
100 payload = {
101 'id': str(uuid.uuid4()),
102 'method': name,
103 'params': {'args': args, 'kwargs': kwargs}
104 }
105 return _remote_call(
106 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
107
108
86 109 class RemoteRepo(object):
87 110
88 111 def __init__(self, path, config, url, session, with_wire=None):
89 112 self.url = url
90 113 self._session = session
91 114 self._wire = {
92 115 "path": path,
93 116 "config": config,
94 117 "context": self._create_vcs_cache_context(),
95 118 }
96 119 if with_wire:
97 120 self._wire.update(with_wire)
98 121
99 122 # johbo: Trading complexity for performance. Avoiding the call to
100 123 # log.debug brings a few percent gain even if is is not active.
101 124 if log.isEnabledFor(logging.DEBUG):
102 125 self._call = self._call_with_logging
103 126
104 127 def __getattr__(self, name):
105 128 def f(*args, **kwargs):
106 129 return self._call(name, *args, **kwargs)
107 130 return f
108 131
109 132 @exceptions.map_vcs_exceptions
110 133 def _call(self, name, *args, **kwargs):
111 134 # TODO: oliver: This is currently necessary pre-call since the
112 135 # config object is being changed for hooking scenarios
113 136 wire = copy.deepcopy(self._wire)
114 137 wire["config"] = wire["config"].serialize()
115 138 payload = {
116 139 'id': str(uuid.uuid4()),
117 140 'method': name,
118 141 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
119 142 }
120 143 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
121 144
122 145 def _call_with_logging(self, name, *args, **kwargs):
123 146 log.debug('Calling %s@%s', self.url, name)
124 147 return RemoteRepo._call(self, name, *args, **kwargs)
125 148
126 149 def __getitem__(self, key):
127 150 return self.revision(key)
128 151
129 152 def _create_vcs_cache_context(self):
130 153 """
131 154 Creates a unique string which is passed to the VCSServer on every
132 155 remote call. It is used as cache key in the VCSServer.
133 156 """
134 157 return str(uuid.uuid4())
135 158
136 159 def invalidate_vcs_cache(self):
137 160 """
138 161 This invalidates the context which is sent to the VCSServer on every
139 162 call to a remote method. It forces the VCSServer to create a fresh
140 163 repository instance on the next call to a remote method.
141 164 """
142 165 self._wire['context'] = self._create_vcs_cache_context()
143 166
144 167
145 168 class RemoteObject(object):
146 169
147 170 def __init__(self, url, session):
148 171 self._url = url
149 172 self._session = session
150 173
151 174 # johbo: Trading complexity for performance. Avoiding the call to
152 175 # log.debug brings a few percent gain even if is is not active.
153 176 if log.isEnabledFor(logging.DEBUG):
154 177 self._call = self._call_with_logging
155 178
156 179 def __getattr__(self, name):
157 180 def f(*args, **kwargs):
158 181 return self._call(name, *args, **kwargs)
159 182 return f
160 183
161 184 @exceptions.map_vcs_exceptions
162 185 def _call(self, name, *args, **kwargs):
163 186 payload = {
164 187 'id': str(uuid.uuid4()),
165 188 'method': name,
166 189 'params': {'args': args, 'kwargs': kwargs}
167 190 }
168 191 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
169 192
170 193 def _call_with_logging(self, name, *args, **kwargs):
171 194 log.debug('Calling %s@%s', self._url, name)
172 195 return RemoteObject._call(self, name, *args, **kwargs)
173 196
174 197
175 198 def _remote_call(url, payload, exceptions_map, session):
176 199 try:
177 200 response = session.post(url, data=msgpack.packb(payload))
178 201 except pycurl.error as e:
179 202 raise exceptions.HttpVCSCommunicationError(e)
180 203
204 try:
181 205 response = msgpack.unpackb(response.content)
206 except Exception:
207 log.exception('Failed to decode repsponse %r', response.content)
208 raise
209
182 210 error = response.get('error')
183 211 if error:
184 212 type_ = error.get('type', 'Exception')
185 213 exc = exceptions_map.get(type_, Exception)
186 214 exc = exc(error.get('message'))
187 215 try:
188 216 exc._vcs_kind = error['_vcs_kind']
189 217 except KeyError:
190 218 pass
191 219 raise exc
192 220 return response.get('result')
193 221
194 222
195 223 class VcsHttpProxy(object):
196 224
197 225 CHUNK_SIZE = 16384
198 226
199 227 def __init__(self, server_and_port, backend_endpoint):
200 228 adapter = requests.adapters.HTTPAdapter(max_retries=5)
201 229 self.base_url = urlparse.urljoin(
202 230 'http://%s' % server_and_port, backend_endpoint)
203 231 self.session = requests.Session()
204 232 self.session.mount('http://', adapter)
205 233
206 234 def handle(self, environment, input_data, *args, **kwargs):
207 235 data = {
208 236 'environment': environment,
209 237 'input_data': input_data,
210 238 'args': args,
211 239 'kwargs': kwargs
212 240 }
213 241 result = self.session.post(
214 242 self.base_url, msgpack.packb(data), stream=True)
215 243 return self._get_result(result)
216 244
217 245 def _deserialize_and_raise(self, error):
218 246 exception = Exception(error['message'])
219 247 try:
220 248 exception._vcs_kind = error['_vcs_kind']
221 249 except KeyError:
222 250 pass
223 251 raise exception
224 252
225 253 def _iterate(self, result):
226 254 unpacker = msgpack.Unpacker()
227 255 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
228 256 unpacker.feed(line)
229 257 for chunk in unpacker:
230 258 yield chunk
231 259
232 260 def _get_result(self, result):
233 261 iterator = self._iterate(result)
234 262 error = iterator.next()
235 263 if error:
236 264 self._deserialize_and_raise(error)
237 265
238 266 status = iterator.next()
239 267 headers = iterator.next()
240 268
241 269 return iterator, status, headers
242 270
243 271
244 272 class ThreadlocalSessionFactory(object):
245 273 """
246 274 Creates one CurlSession per thread on demand.
247 275 """
248 276
249 277 def __init__(self):
250 278 self._thread_local = threading.local()
251 279
252 280 def __call__(self):
253 281 if not hasattr(self._thread_local, 'curl_session'):
254 282 self._thread_local.curl_session = CurlSession()
255 283 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now