##// END OF EJS Templates
remote-call: added retries for connection-reset-by-peer and optimize logging leaking too much info
super-admin -
r5033:65a0999f default
parent child Browse files
Show More
@@ -1,412 +1,418 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Client for the VCSServer implemented based on HTTP.
23 23 """
24 24
25 25 import copy
26 26 import logging
27 27 import threading
28 28 import time
29 29 import urllib.request, urllib.error, urllib.parse
30 30 import urllib.parse
31 31 import uuid
32 32 import traceback
33 33
34 34 import pycurl
35 35 import msgpack
36 36 import requests
37 37 from requests.packages.urllib3.util.retry import Retry
38 38
39 39 import rhodecode
40 40 from rhodecode.lib import rc_cache
41 41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
42 42 from rhodecode.lib.system_info import get_cert_path
43 43 from rhodecode.lib.vcs import exceptions, CurlSession
44 44 from rhodecode.lib.utils2 import str2bool
45 45
46 46 log = logging.getLogger(__name__)
47 47
48 48
49 49 # TODO: mikhail: Keep it in sync with vcsserver's
50 50 # HTTPApplication.ALLOWED_EXCEPTIONS
51 51 EXCEPTIONS_MAP = {
52 52 'KeyError': KeyError,
53 53 'URLError': urllib.error.URLError,
54 54 }
55 55
56 56
57 def _remote_call(url, payload, exceptions_map, session):
58 try:
59 headers = {
60 'X-RC-Method': payload.get('method'),
61 'X-RC-Repo-Name': payload.get('_repo_name')
62 }
63 response = session.post(url, data=msgpack.packb(payload), headers=headers)
64 except pycurl.error as e:
65 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
66 raise exceptions.HttpVCSCommunicationError(msg)
67 except Exception as e:
68 message = getattr(e, 'message', '')
69 if 'Failed to connect' in message:
70 # gevent doesn't return proper pycurl errors
71 raise exceptions.HttpVCSCommunicationError(e)
72 else:
73 raise
57 def _remote_call(url, payload, exceptions_map, session, retries=3):
58
59 for attempt in range(retries):
60 try:
61 response = session.post(url, data=msgpack.packb(payload))
62 except pycurl.error as e:
63 error_code, error_message = e.args
64 if error_code == pycurl.E_RECV_ERROR:
65 log.warning(f'Received a "Connection reset by peer" error. '
66 f'Retrying... ({attempt + 1}/{retries})')
67 continue # Retry if connection reset error.
68 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
69 raise exceptions.HttpVCSCommunicationError(msg)
70 except Exception as e:
71 message = getattr(e, 'message', '')
72 if 'Failed to connect' in message:
73 # gevent doesn't return proper pycurl errors
74 raise exceptions.HttpVCSCommunicationError(e)
75 else:
76 raise
74 77
75 78 if response.status_code >= 400:
76 79 log.error('Call to %s returned non 200 HTTP code: %s',
77 80 url, response.status_code)
78 81 raise exceptions.HttpVCSCommunicationError(repr(response.content))
79 82
80 83 try:
81 84 response = msgpack.unpackb(response.content, raw=False)
82 85 except Exception:
83 log.exception('Failed to decode response %r', response.content)
86 log.exception('Failed to decode response from msgpack')
84 87 raise
85 88
86 89 error = response.get('error')
87 90 if error:
88 91 type_ = error.get('type', 'Exception')
89 92 exc = exceptions_map.get(type_, Exception)
90 93 exc = exc(error.get('message'))
91 94 try:
92 95 exc._vcs_kind = error['_vcs_kind']
93 96 except KeyError:
94 97 pass
95 98
96 99 try:
97 100 exc._vcs_server_traceback = error['traceback']
98 101 exc._vcs_server_org_exc_name = error['org_exc']
99 102 exc._vcs_server_org_exc_tb = error['org_exc_tb']
100 103 except KeyError:
101 104 pass
102 105
103 106 raise exc
104 107 return response.get('result')
105 108
106 109
107 110 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
108 111 try:
109 112 headers = {
110 113 'X-RC-Method': payload.get('method'),
111 114 'X-RC-Repo-Name': payload.get('_repo_name')
112 115 }
113 116 response = session.post(url, data=msgpack.packb(payload), headers=headers)
114 117 except pycurl.error as e:
118 error_code, error_message = e.args
115 119 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
116 120 raise exceptions.HttpVCSCommunicationError(msg)
117 121 except Exception as e:
118 122 message = getattr(e, 'message', '')
119 123 if 'Failed to connect' in message:
120 124 # gevent doesn't return proper pycurl errors
121 125 raise exceptions.HttpVCSCommunicationError(e)
122 126 else:
123 127 raise
124 128
125 129 if response.status_code >= 400:
126 130 log.error('Call to %s returned non 200 HTTP code: %s',
127 131 url, response.status_code)
128 132 raise exceptions.HttpVCSCommunicationError(repr(response.content))
129 133
130 134 return response.iter_content(chunk_size=chunk_size)
131 135
132 136
133 137 class ServiceConnection(object):
134 138 def __init__(self, server_and_port, backend_endpoint, session_factory):
135 139 self.url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
136 140 self._session_factory = session_factory
137 141
138 142 def __getattr__(self, name):
139 143 def f(*args, **kwargs):
140 144 return self._call(name, *args, **kwargs)
141 145 return f
142 146
143 147 @exceptions.map_vcs_exceptions
144 148 def _call(self, name, *args, **kwargs):
145 149 payload = {
146 150 'id': str(uuid.uuid4()),
147 151 'method': name,
148 152 'params': {'args': args, 'kwargs': kwargs}
149 153 }
150 154 return _remote_call(
151 155 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
152 156
153 157
154 158 class RemoteVCSMaker(object):
155 159
156 160 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
157 161 self.url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
158 162 self.stream_url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
159 163
160 164 self._session_factory = session_factory
161 165 self.backend_type = backend_type
162 166
163 167 @classmethod
164 168 def init_cache_region(cls, repo_id):
165 169 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
166 170 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
167 171 return region, cache_namespace_uid
168 172
169 173 def __call__(self, path, repo_id, config, with_wire=None):
170 174 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
171 175 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
172 176
173 177 def __getattr__(self, name):
174 178 def remote_attr(*args, **kwargs):
175 179 return self._call(name, *args, **kwargs)
176 180 return remote_attr
177 181
178 182 @exceptions.map_vcs_exceptions
179 183 def _call(self, func_name, *args, **kwargs):
180 184 payload = {
181 185 'id': str(uuid.uuid4()),
182 186 'method': func_name,
183 187 'backend': self.backend_type,
184 188 'params': {'args': args, 'kwargs': kwargs}
185 189 }
186 190 url = self.url
187 191 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
188 192
189 193
190 194 class RemoteRepo(object):
191 195 CHUNK_SIZE = 16384
192 196
193 197 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
194 198 self.url = remote_maker.url
195 199 self.stream_url = remote_maker.stream_url
196 200 self._session = remote_maker._session_factory()
197 201
198 202 cache_repo_id = self._repo_id_sanitizer(repo_id)
199 203 _repo_name = self._get_repo_name(config, path)
200 204 self._cache_region, self._cache_namespace = \
201 205 remote_maker.init_cache_region(cache_repo_id)
202 206
203 207 with_wire = with_wire or {}
204 208
205 209 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
206 210
207 211 self._wire = {
208 212 "_repo_name": _repo_name,
209 213 "path": path, # repo path
210 214 "repo_id": repo_id,
211 215 "cache_repo_id": cache_repo_id,
212 216 "config": config,
213 217 "repo_state_uid": repo_state_uid,
214 218 "context": self._create_vcs_cache_context(path, repo_state_uid)
215 219 }
216 220
217 221 if with_wire:
218 222 self._wire.update(with_wire)
219 223
220 224 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
221 225 # log.debug brings a few percent gain even if is is not active.
222 226 if log.isEnabledFor(logging.DEBUG):
223 227 self._call_with_logging = True
224 228
225 229 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
226 230
227 231 def _get_repo_name(self, config, path):
228 232 repo_store = config.get('paths', '/')
229 233 return path.split(repo_store)[-1].lstrip('/')
230 234
231 235 def _repo_id_sanitizer(self, repo_id):
232 236 pathless = repo_id.replace('/', '__').replace('-', '_')
233 237 return ''.join(char if ord(char) < 128 else '_{}_'.format(ord(char)) for char in pathless)
234 238
235 239 def __getattr__(self, name):
236 240
237 241 if name.startswith('stream:'):
238 242 def repo_remote_attr(*args, **kwargs):
239 243 return self._call_stream(name, *args, **kwargs)
240 244 else:
241 245 def repo_remote_attr(*args, **kwargs):
242 246 return self._call(name, *args, **kwargs)
243 247
244 248 return repo_remote_attr
245 249
246 250 def _base_call(self, name, *args, **kwargs):
247 251 # TODO: oliver: This is currently necessary pre-call since the
248 252 # config object is being changed for hooking scenarios
249 253 wire = copy.deepcopy(self._wire)
250 254 wire["config"] = wire["config"].serialize()
251 255 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
252 256
253 257 payload = {
254 258 'id': str(uuid.uuid4()),
255 259 'method': name,
256 260 "_repo_name": wire['_repo_name'],
257 261 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
258 262 }
259 263
260 264 context_uid = wire.get('context')
261 265 return context_uid, payload
262 266
263 267 def get_local_cache(self, name, args):
264 268 cache_on = False
265 269 cache_key = ''
266 270 local_cache_on = str2bool(rhodecode.CONFIG.get('vcs.methods.cache'))
267 271
268 272 cache_methods = [
269 273 'branches', 'tags', 'bookmarks',
270 274 'is_large_file', 'is_binary',
271 275 'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
272 276 'node_history',
273 277 'revision', 'tree_items',
274 278 'ctx_list', 'ctx_branch', 'ctx_description',
275 279 'bulk_request',
276 280 'assert_correct_path'
277 281 ]
278 282
279 283 if local_cache_on and name in cache_methods:
280 284 cache_on = True
281 285 repo_state_uid = self._wire['repo_state_uid']
282 286 call_args = [a for a in args]
283 287 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
284 288
285 289 return cache_on, cache_key
286 290
287 291 @exceptions.map_vcs_exceptions
288 292 def _call(self, name, *args, **kwargs):
289 293 context_uid, payload = self._base_call(name, *args, **kwargs)
290 294 url = self.url
291 295
292 296 start = time.time()
293 297 cache_on, cache_key = self.get_local_cache(name, args)
294 298
295 299 @self._cache_region.conditional_cache_on_arguments(
296 300 namespace=self._cache_namespace, condition=cache_on and cache_key)
297 301 def remote_call(_cache_key):
298 302 if self._call_with_logging:
299 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
300 url, name, args, context_uid, cache_on)
303 args_repr = f'ARG: {str(args):.256}|KW: {str(kwargs):.256}'
304 log.debug('Calling %s@%s with args:%r. wire_context: %s cache_on: %s',
305 url, name, args_repr, context_uid, cache_on)
301 306 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
302 307
303 308 result = remote_call(cache_key)
304 309 if self._call_with_logging:
305 310 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
306 311 url, name, time.time()-start, context_uid)
307 312 return result
308 313
309 314 @exceptions.map_vcs_exceptions
310 315 def _call_stream(self, name, *args, **kwargs):
311 316 context_uid, payload = self._base_call(name, *args, **kwargs)
312 317 payload['chunk_size'] = self.CHUNK_SIZE
313 318 url = self.stream_url
314 319
315 320 start = time.time()
316 321 cache_on, cache_key = self.get_local_cache(name, args)
317 322
318 323 # Cache is a problem because this is a stream
319 324 def streaming_remote_call(_cache_key):
320 325 if self._call_with_logging:
321 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
322 url, name, args, context_uid, cache_on)
326 args_repr = f'ARG: {str(args):.256}|KW: {str(kwargs):.256}'
327 log.debug('Calling %s@%s with args:%r. wire_context: %s cache_on: %s',
328 url, name, args_repr, context_uid, cache_on)
323 329 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
324 330
325 331 result = streaming_remote_call(cache_key)
326 332 if self._call_with_logging:
327 333 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
328 334 url, name, time.time()-start, context_uid)
329 335 return result
330 336
331 337 def __getitem__(self, key):
332 338 return self.revision(key)
333 339
334 340 def _create_vcs_cache_context(self, *args):
335 341 """
336 342 Creates a unique string which is passed to the VCSServer on every
337 343 remote call. It is used as cache key in the VCSServer.
338 344 """
339 345 hash_key = '-'.join(map(str, args))
340 346 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
341 347
342 348 def invalidate_vcs_cache(self):
343 349 """
344 350 This invalidates the context which is sent to the VCSServer on every
345 351 call to a remote method. It forces the VCSServer to create a fresh
346 352 repository instance on the next call to a remote method.
347 353 """
348 354 self._wire['context'] = str(uuid.uuid4())
349 355
350 356
351 357 class VcsHttpProxy(object):
352 358
353 359 CHUNK_SIZE = 16384
354 360
355 361 def __init__(self, server_and_port, backend_endpoint):
356 362 retries = Retry(total=5, connect=None, read=None, redirect=None)
357 363
358 364 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
359 365 self.base_url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
360 366 self.session = requests.Session()
361 367 self.session.mount('http://', adapter)
362 368
363 369 def handle(self, environment, input_data, *args, **kwargs):
364 370 data = {
365 371 'environment': environment,
366 372 'input_data': input_data,
367 373 'args': args,
368 374 'kwargs': kwargs
369 375 }
370 376 result = self.session.post(
371 377 self.base_url, msgpack.packb(data), stream=True)
372 378 return self._get_result(result)
373 379
374 380 def _deserialize_and_raise(self, error):
375 381 exception = Exception(error['message'])
376 382 try:
377 383 exception._vcs_kind = error['_vcs_kind']
378 384 except KeyError:
379 385 pass
380 386 raise exception
381 387
382 388 def _iterate(self, result):
383 389 unpacker = msgpack.Unpacker()
384 390 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
385 391 unpacker.feed(line)
386 392 for chunk in unpacker:
387 393 yield chunk
388 394
389 395 def _get_result(self, result):
390 396 iterator = self._iterate(result)
391 397 error = next(iterator)
392 398 if error:
393 399 self._deserialize_and_raise(error)
394 400
395 401 status = next(iterator)
396 402 headers = next(iterator)
397 403
398 404 return iterator, status, headers
399 405
400 406
401 407 class ThreadlocalSessionFactory(object):
402 408 """
403 409 Creates one CurlSession per thread on demand.
404 410 """
405 411
406 412 def __init__(self):
407 413 self._thread_local = threading.local()
408 414
409 415 def __call__(self):
410 416 if not hasattr(self._thread_local, 'curl_session'):
411 417 self._thread_local.curl_session = CurlSession()
412 418 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now