##// END OF EJS Templates
chore(http-client): small refactor and use fstrings
super-admin -
r5260:e241ac2a default
parent child Browse files
Show More
@@ -1,429 +1,429 b''
1 1 # Copyright (C) 2016-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 """
20 20 Client for the VCSServer implemented based on HTTP.
21 21 """
22 22
23 23 import copy
24 24 import logging
25 25 import threading
26 26 import time
27 27 import urllib.request
28 28 import urllib.error
29 29 import urllib.parse
30 30 import urllib.parse
31 31 import uuid
32 32 import traceback
33 33
34 34 import pycurl
35 35 import msgpack
36 36 import requests
37 37 from requests.packages.urllib3.util.retry import Retry
38 38
39 39 import rhodecode
40 40 from rhodecode.lib import rc_cache
41 41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
42 42 from rhodecode.lib.system_info import get_cert_path
43 43 from rhodecode.lib.vcs import exceptions, CurlSession
44 44 from rhodecode.lib.utils2 import str2bool
45 45
46 46 log = logging.getLogger(__name__)
47 47
48 48
49 49 # TODO: mikhail: Keep it in sync with vcsserver's
50 50 # HTTPApplication.ALLOWED_EXCEPTIONS
51 51 EXCEPTIONS_MAP = {
52 52 'KeyError': KeyError,
53 53 'URLError': urllib.error.URLError,
54 54 }
55 55
56 56
57 57 def _remote_call(url, payload, exceptions_map, session, retries=3):
58 58
59 59 for attempt in range(retries):
60 60 try:
61 61 response = session.post(url, data=msgpack.packb(payload))
62 62 break
63 63 except pycurl.error as e:
64 64 error_code, error_message = e.args
65 65 if error_code == pycurl.E_RECV_ERROR:
66 66 log.warning(f'Received a "Connection reset by peer" error. '
67 67 f'Retrying... ({attempt + 1}/{retries})')
68 68 continue # Retry if connection reset error.
69 69 msg = f'{e}. \npycurl traceback: {traceback.format_exc()}'
70 70 raise exceptions.HttpVCSCommunicationError(msg)
71 71 except Exception as e:
72 72 message = getattr(e, 'message', '')
73 73 if 'Failed to connect' in message:
74 74 # gevent doesn't return proper pycurl errors
75 75 raise exceptions.HttpVCSCommunicationError(e)
76 76 else:
77 77 raise
78 78
79 79 if response.status_code >= 400:
80 80 content_type = response.content_type
81 81 log.error('Call to %s returned non 200 HTTP code: %s [%s]',
82 82 url, response.status_code, content_type)
83 83 raise exceptions.HttpVCSCommunicationError(repr(response.content))
84 84
85 85 try:
86 86 response = msgpack.unpackb(response.content)
87 87 except Exception:
88 88 log.exception('Failed to decode response from msgpack')
89 89 raise
90 90
91 91 error = response.get('error')
92 92 if error:
93 93 type_ = error.get('type', 'Exception')
94 94 exc = exceptions_map.get(type_, Exception)
95 95 exc = exc(error.get('message'))
96 96 try:
97 97 exc._vcs_kind = error['_vcs_kind']
98 98 except KeyError:
99 99 pass
100 100
101 101 try:
102 102 exc._vcs_server_traceback = error['traceback']
103 103 exc._vcs_server_org_exc_name = error['org_exc']
104 104 exc._vcs_server_org_exc_tb = error['org_exc_tb']
105 105 except KeyError:
106 106 pass
107 107
108 108 exc.add_note(attach_exc_details(error))
109 109 raise exc # raising the org exception from vcsserver
110 110 return response.get('result')
111 111
112 112
113 113 def attach_exc_details(error):
114 114 note = '-- EXC NOTE -- :\n'
115 115 note += f'vcs_kind: {error.get("_vcs_kind")}\n'
116 116 note += f'org_exc: {error.get("_vcs_kind")}\n'
117 117 note += f'tb: {error.get("traceback")}\n'
118 118 note += '-- END EXC NOTE --'
119 119 return note
120 120
121 121
122 122 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
123 123 try:
124 124 headers = {
125 125 'X-RC-Method': payload.get('method'),
126 126 'X-RC-Repo-Name': payload.get('_repo_name')
127 127 }
128 128 response = session.post(url, data=msgpack.packb(payload), headers=headers)
129 129 except pycurl.error as e:
130 130 error_code, error_message = e.args
131 131 msg = f'{e}. \npycurl traceback: {traceback.format_exc()}'
132 132 raise exceptions.HttpVCSCommunicationError(msg)
133 133 except Exception as e:
134 134 message = getattr(e, 'message', '')
135 135 if 'Failed to connect' in message:
136 136 # gevent doesn't return proper pycurl errors
137 137 raise exceptions.HttpVCSCommunicationError(e)
138 138 else:
139 139 raise
140 140
141 141 if response.status_code >= 400:
142 142 log.error('Call to %s returned non 200 HTTP code: %s',
143 143 url, response.status_code)
144 144 raise exceptions.HttpVCSCommunicationError(repr(response.content))
145 145
146 146 return response.iter_content(chunk_size=chunk_size)
147 147
148 148
149 149 class ServiceConnection(object):
150 150 def __init__(self, server_and_port, backend_endpoint, session_factory):
151 self.url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
151 self.url = urllib.parse.urljoin(f'http://{server_and_port}', backend_endpoint)
152 152 self._session_factory = session_factory
153 153
154 154 def __getattr__(self, name):
155 155 def f(*args, **kwargs):
156 156 return self._call(name, *args, **kwargs)
157 157 return f
158 158
159 159 @exceptions.map_vcs_exceptions
160 160 def _call(self, name, *args, **kwargs):
161 161 payload = {
162 162 'id': str(uuid.uuid4()),
163 163 'method': name,
164 164 'params': {'args': args, 'kwargs': kwargs}
165 165 }
166 166 return _remote_call(
167 167 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
168 168
169 169
170 170 class RemoteVCSMaker(object):
171 171
172 172 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
173 self.url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
174 self.stream_url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
173 self.url = urllib.parse.urljoin(f'http://{server_and_port}', backend_endpoint)
174 self.stream_url = urllib.parse.urljoin(f'http://{server_and_port}', backend_endpoint+'/stream')
175 175
176 176 self._session_factory = session_factory
177 177 self.backend_type = backend_type
178 178
179 179 @classmethod
180 180 def init_cache_region(cls, repo_id):
181 181 cache_namespace_uid = f'repo.{repo_id}'
182 182 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
183 183 return region, cache_namespace_uid
184 184
185 185 def __call__(self, path, repo_id, config, with_wire=None):
186 186 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
187 187 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
188 188
189 189 def __getattr__(self, name):
190 190 def remote_attr(*args, **kwargs):
191 191 return self._call(name, *args, **kwargs)
192 192 return remote_attr
193 193
194 194 @exceptions.map_vcs_exceptions
195 195 def _call(self, func_name, *args, **kwargs):
196 196 payload = {
197 197 'id': str(uuid.uuid4()),
198 198 'method': func_name,
199 199 'backend': self.backend_type,
200 200 'params': {'args': args, 'kwargs': kwargs}
201 201 }
202 202 url = self.url
203 203 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
204 204
205 205
206 206 class RemoteRepo(object):
207 207 CHUNK_SIZE = 16384
208 208
209 209 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
210 210 self.url = remote_maker.url
211 211 self.stream_url = remote_maker.stream_url
212 212 self._session = remote_maker._session_factory()
213 213
214 214 cache_repo_id = self._repo_id_sanitizer(repo_id)
215 215 _repo_name = self._get_repo_name(config, path)
216 216 self._cache_region, self._cache_namespace = \
217 217 remote_maker.init_cache_region(cache_repo_id)
218 218
219 219 with_wire = with_wire or {}
220 220
221 221 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
222 222
223 223 self._wire = {
224 224 "_repo_name": _repo_name,
225 225 "path": path, # repo path
226 226 "repo_id": repo_id,
227 227 "cache_repo_id": cache_repo_id,
228 228 "config": config,
229 229 "repo_state_uid": repo_state_uid,
230 230 "context": self._create_vcs_cache_context(path, repo_state_uid)
231 231 }
232 232
233 233 if with_wire:
234 234 self._wire.update(with_wire)
235 235
236 236 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
237 237 # log.debug brings a few percent gain even if is is not active.
238 238 if log.isEnabledFor(logging.DEBUG):
239 239 self._call_with_logging = True
240 240
241 241 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
242 242
243 243 def _get_repo_name(self, config, path):
244 244 repo_store = config.get('paths', '/')
245 245 return path.split(repo_store)[-1].lstrip('/')
246 246
247 247 def _repo_id_sanitizer(self, repo_id):
248 248 pathless = repo_id.replace('/', '__').replace('-', '_')
249 249 return ''.join(char if ord(char) < 128 else '_{}_'.format(ord(char)) for char in pathless)
250 250
251 251 def __getattr__(self, name):
252 252
253 253 if name.startswith('stream:'):
254 254 def repo_remote_attr(*args, **kwargs):
255 255 return self._call_stream(name, *args, **kwargs)
256 256 else:
257 257 def repo_remote_attr(*args, **kwargs):
258 258 return self._call(name, *args, **kwargs)
259 259
260 260 return repo_remote_attr
261 261
262 262 def _base_call(self, name, *args, **kwargs):
263 263 # TODO: oliver: This is currently necessary pre-call since the
264 264 # config object is being changed for hooking scenarios
265 265 wire = copy.deepcopy(self._wire)
266 266 wire["config"] = wire["config"].serialize()
267 267 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
268 268
269 269 payload = {
270 270 'id': str(uuid.uuid4()),
271 271 'method': name,
272 272 "_repo_name": wire['_repo_name'],
273 273 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
274 274 }
275 275
276 276 context_uid = wire.get('context')
277 277 return context_uid, payload
278 278
279 279 def get_local_cache(self, name, args):
280 280 cache_on = False
281 281 cache_key = ''
282 282 local_cache_on = rhodecode.ConfigGet().get_bool('vcs.methods.cache')
283 283
284 284 cache_methods = [
285 285 'branches', 'tags', 'bookmarks',
286 286 'is_large_file', 'is_binary',
287 287 'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
288 288 'node_history',
289 289 'revision', 'tree_items',
290 290 'ctx_list', 'ctx_branch', 'ctx_description',
291 291 'bulk_request',
292 292 'assert_correct_path'
293 293 ]
294 294
295 295 if local_cache_on and name in cache_methods:
296 296 cache_on = True
297 297 repo_state_uid = self._wire['repo_state_uid']
298 298 call_args = [a for a in args]
299 299 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
300 300
301 301 return cache_on, cache_key
302 302
303 303 @exceptions.map_vcs_exceptions
304 304 def _call(self, name, *args, **kwargs):
305 305 context_uid, payload = self._base_call(name, *args, **kwargs)
306 306 url = self.url
307 307
308 308 start = time.time()
309 309 cache_on, cache_key = self.get_local_cache(name, args)
310 310
311 311 @self._cache_region.conditional_cache_on_arguments(
312 312 namespace=self._cache_namespace, condition=cache_on and cache_key)
313 313 def remote_call(_cache_key):
314 314 if self._call_with_logging:
315 315 args_repr = f'ARG: {str(args):.512}|KW: {str(kwargs):.512}'
316 316 log.debug('Calling %s@%s with args:%r. wire_context: %s cache_on: %s',
317 317 url, name, args_repr, context_uid, cache_on)
318 318 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
319 319
320 320 result = remote_call(cache_key)
321 321 if self._call_with_logging:
322 322 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
323 323 url, name, time.time()-start, context_uid)
324 324 return result
325 325
326 326 @exceptions.map_vcs_exceptions
327 327 def _call_stream(self, name, *args, **kwargs):
328 328 context_uid, payload = self._base_call(name, *args, **kwargs)
329 329 payload['chunk_size'] = self.CHUNK_SIZE
330 330 url = self.stream_url
331 331
332 332 start = time.time()
333 333 cache_on, cache_key = self.get_local_cache(name, args)
334 334
335 335 # Cache is a problem because this is a stream
336 336 def streaming_remote_call(_cache_key):
337 337 if self._call_with_logging:
338 338 args_repr = f'ARG: {str(args):.512}|KW: {str(kwargs):.512}'
339 339 log.debug('Calling %s@%s with args:%r. wire_context: %s cache_on: %s',
340 340 url, name, args_repr, context_uid, cache_on)
341 341 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
342 342
343 343 result = streaming_remote_call(cache_key)
344 344 if self._call_with_logging:
345 345 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
346 346 url, name, time.time()-start, context_uid)
347 347 return result
348 348
349 349 def __getitem__(self, key):
350 350 return self.revision(key)
351 351
352 352 def _create_vcs_cache_context(self, *args):
353 353 """
354 354 Creates a unique string which is passed to the VCSServer on every
355 355 remote call. It is used as cache key in the VCSServer.
356 356 """
357 357 hash_key = '-'.join(map(str, args))
358 358 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
359 359
360 360 def invalidate_vcs_cache(self):
361 361 """
362 362 This invalidates the context which is sent to the VCSServer on every
363 363 call to a remote method. It forces the VCSServer to create a fresh
364 364 repository instance on the next call to a remote method.
365 365 """
366 366 self._wire['context'] = str(uuid.uuid4())
367 367
368 368
369 369 class VcsHttpProxy(object):
370 370
371 371 CHUNK_SIZE = 16384
372 372
373 373 def __init__(self, server_and_port, backend_endpoint):
374 374 retries = Retry(total=5, connect=None, read=None, redirect=None)
375 375
376 376 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
377 377 self.base_url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
378 378 self.session = requests.Session()
379 379 self.session.mount('http://', adapter)
380 380
381 381 def handle(self, environment, input_data, *args, **kwargs):
382 382 data = {
383 383 'environment': environment,
384 384 'input_data': input_data,
385 385 'args': args,
386 386 'kwargs': kwargs
387 387 }
388 388 result = self.session.post(
389 389 self.base_url, msgpack.packb(data), stream=True)
390 390 return self._get_result(result)
391 391
392 392 def _deserialize_and_raise(self, error):
393 393 exception = Exception(error['message'])
394 394 try:
395 395 exception._vcs_kind = error['_vcs_kind']
396 396 except KeyError:
397 397 pass
398 398 raise exception
399 399
400 400 def _iterate(self, result):
401 401 unpacker = msgpack.Unpacker()
402 402 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
403 403 unpacker.feed(line)
404 404 yield from unpacker
405 405
406 406 def _get_result(self, result):
407 407 iterator = self._iterate(result)
408 408 error = next(iterator)
409 409 if error:
410 410 self._deserialize_and_raise(error)
411 411
412 412 status = next(iterator)
413 413 headers = next(iterator)
414 414
415 415 return iterator, status, headers
416 416
417 417
418 418 class ThreadlocalSessionFactory(object):
419 419 """
420 420 Creates one CurlSession per thread on demand.
421 421 """
422 422
423 423 def __init__(self):
424 424 self._thread_local = threading.local()
425 425
426 426 def __call__(self):
427 427 if not hasattr(self._thread_local, 'curl_session'):
428 428 self._thread_local.curl_session = CurlSession()
429 429 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now