##// END OF EJS Templates
fix(svn): cache path check similar like for git
super-admin -
r5567:efc8997d stable
parent child Browse files
Show More
@@ -1,429 +1,430 b''
1 1 # Copyright (C) 2016-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 """
20 20 Client for the VCSServer implemented based on HTTP.
21 21 """
22 22
23 23 import copy
24 24 import logging
25 25 import threading
26 26 import time
27 27 import urllib.request
28 28 import urllib.error
29 29 import urllib.parse
30 30 import urllib.parse
31 31 import uuid
32 32 import traceback
33 33
34 34 import pycurl
35 35 import msgpack
36 36 import requests
37 37 from requests.packages.urllib3.util.retry import Retry
38 38
39 39 import rhodecode
40 40 from rhodecode.lib import rc_cache
41 41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
42 42 from rhodecode.lib.system_info import get_cert_path
43 43 from rhodecode.lib.vcs import exceptions, CurlSession
44 44 from rhodecode.lib.utils2 import str2bool
45 45
46 46 log = logging.getLogger(__name__)
47 47
48 48
49 49 # TODO: mikhail: Keep it in sync with vcsserver's
50 50 # HTTPApplication.ALLOWED_EXCEPTIONS
51 51 EXCEPTIONS_MAP = {
52 52 'KeyError': KeyError,
53 53 'URLError': urllib.error.URLError,
54 54 }
55 55
56 56
57 57 def _remote_call(url, payload, exceptions_map, session, retries=3):
58 58
59 59 for attempt in range(retries):
60 60 try:
61 61 response = session.post(url, data=msgpack.packb(payload))
62 62 break
63 63 except pycurl.error as e:
64 64 error_code, error_message = e.args
65 65 if error_code == pycurl.E_RECV_ERROR:
66 66 log.warning(f'Received a "Connection reset by peer" error. '
67 67 f'Retrying... ({attempt + 1}/{retries})')
68 68 continue # Retry if connection reset error.
69 69 msg = f'{e}. \npycurl traceback: {traceback.format_exc()}'
70 70 raise exceptions.HttpVCSCommunicationError(msg)
71 71 except Exception as e:
72 72 message = getattr(e, 'message', '')
73 73 if 'Failed to connect' in message:
74 74 # gevent doesn't return proper pycurl errors
75 75 raise exceptions.HttpVCSCommunicationError(e)
76 76 else:
77 77 raise
78 78
79 79 if response.status_code >= 400:
80 80 content_type = response.content_type
81 81 log.error('Call to %s returned non 200 HTTP code: %s [%s]',
82 82 url, response.status_code, content_type)
83 83 raise exceptions.HttpVCSCommunicationError(repr(response.content))
84 84
85 85 try:
86 86 response = msgpack.unpackb(response.content)
87 87 except Exception:
88 88 log.exception('Failed to decode response from msgpack')
89 89 raise
90 90
91 91 error = response.get('error')
92 92 if error:
93 93 type_ = error.get('type', 'Exception')
94 94 exc = exceptions_map.get(type_, Exception)
95 95 exc = exc(error.get('message'))
96 96 try:
97 97 exc._vcs_kind = error['_vcs_kind']
98 98 except KeyError:
99 99 pass
100 100
101 101 try:
102 102 exc._vcs_server_traceback = error['traceback']
103 103 exc._vcs_server_org_exc_name = error['org_exc']
104 104 exc._vcs_server_org_exc_tb = error['org_exc_tb']
105 105 except KeyError:
106 106 pass
107 107
108 108 exc.add_note(attach_exc_details(error))
109 109 raise exc # raising the org exception from vcsserver
110 110 return response.get('result')
111 111
112 112
113 113 def attach_exc_details(error):
114 114 note = '-- EXC NOTE -- :\n'
115 115 note += f'vcs_kind: {error.get("_vcs_kind")}\n'
116 116 note += f'org_exc: {error.get("_vcs_kind")}\n'
117 117 note += f'tb: {error.get("traceback")}\n'
118 118 note += '-- END EXC NOTE --'
119 119 return note
120 120
121 121
122 122 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
123 123 try:
124 124 headers = {
125 125 'X-RC-Method': payload.get('method'),
126 126 'X-RC-Repo-Name': payload.get('_repo_name')
127 127 }
128 128 response = session.post(url, data=msgpack.packb(payload), headers=headers)
129 129 except pycurl.error as e:
130 130 error_code, error_message = e.args
131 131 msg = f'{e}. \npycurl traceback: {traceback.format_exc()}'
132 132 raise exceptions.HttpVCSCommunicationError(msg)
133 133 except Exception as e:
134 134 message = getattr(e, 'message', '')
135 135 if 'Failed to connect' in message:
136 136 # gevent doesn't return proper pycurl errors
137 137 raise exceptions.HttpVCSCommunicationError(e)
138 138 else:
139 139 raise
140 140
141 141 if response.status_code >= 400:
142 142 log.error('Call to %s returned non 200 HTTP code: %s',
143 143 url, response.status_code)
144 144 raise exceptions.HttpVCSCommunicationError(repr(response.content))
145 145
146 146 return response.iter_content(chunk_size=chunk_size)
147 147
148 148
149 149 class ServiceConnection(object):
150 150 def __init__(self, server_and_port, backend_endpoint, session_factory):
151 151 self.url = urllib.parse.urljoin(f'http://{server_and_port}', backend_endpoint)
152 152 self._session_factory = session_factory
153 153
154 154 def __getattr__(self, name):
155 155 def f(*args, **kwargs):
156 156 return self._call(name, *args, **kwargs)
157 157 return f
158 158
159 159 @exceptions.map_vcs_exceptions
160 160 def _call(self, name, *args, **kwargs):
161 161 payload = {
162 162 'id': str(uuid.uuid4()),
163 163 'method': name,
164 164 'params': {'args': args, 'kwargs': kwargs}
165 165 }
166 166 return _remote_call(
167 167 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
168 168
169 169
170 170 class RemoteVCSMaker(object):
171 171
172 172 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
173 173 self.url = urllib.parse.urljoin(f'http://{server_and_port}', backend_endpoint)
174 174 self.stream_url = urllib.parse.urljoin(f'http://{server_and_port}', backend_endpoint+'/stream')
175 175
176 176 self._session_factory = session_factory
177 177 self.backend_type = backend_type
178 178
179 179 @classmethod
180 180 def init_cache_region(cls, repo_id):
181 181 cache_namespace_uid = f'repo.{repo_id}'
182 182 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
183 183 return region, cache_namespace_uid
184 184
185 185 def __call__(self, path, repo_id, config, with_wire=None):
186 186 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
187 187 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
188 188
189 189 def __getattr__(self, name):
190 190 def remote_attr(*args, **kwargs):
191 191 return self._call(name, *args, **kwargs)
192 192 return remote_attr
193 193
194 194 @exceptions.map_vcs_exceptions
195 195 def _call(self, func_name, *args, **kwargs):
196 196 payload = {
197 197 'id': str(uuid.uuid4()),
198 198 'method': func_name,
199 199 'backend': self.backend_type,
200 200 'params': {'args': args, 'kwargs': kwargs}
201 201 }
202 202 url = self.url
203 203 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
204 204
205 205
206 206 class RemoteRepo(object):
207 207 CHUNK_SIZE = 16384
208 208
209 209 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
210 210 self.url = remote_maker.url
211 211 self.stream_url = remote_maker.stream_url
212 212 self._session = remote_maker._session_factory()
213 213
214 214 cache_repo_id = self._repo_id_sanitizer(repo_id)
215 215 _repo_name = self._get_repo_name(config, path)
216 216 self._cache_region, self._cache_namespace = \
217 217 remote_maker.init_cache_region(cache_repo_id)
218 218
219 219 with_wire = with_wire or {}
220 220
221 221 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
222 222
223 223 self._wire = {
224 224 "_repo_name": _repo_name,
225 225 "path": path, # repo path
226 226 "repo_id": repo_id,
227 227 "cache_repo_id": cache_repo_id,
228 228 "config": config,
229 229 "repo_state_uid": repo_state_uid,
230 230 "context": self._create_vcs_cache_context(path, repo_state_uid)
231 231 }
232 232
233 233 if with_wire:
234 234 self._wire.update(with_wire)
235 235
236 236 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
237 237 # log.debug brings a few percent gain even if is is not active.
238 238 if log.isEnabledFor(logging.DEBUG):
239 239 self._call_with_logging = True
240 240
241 241 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
242 242
243 243 def _get_repo_name(self, config, path):
244 244 repo_store = config.get('paths', '/')
245 245 return path.split(repo_store)[-1].lstrip('/')
246 246
247 247 def _repo_id_sanitizer(self, repo_id):
248 248 pathless = repo_id.replace('/', '__').replace('-', '_')
249 249 return ''.join(char if ord(char) < 128 else '_{}_'.format(ord(char)) for char in pathless)
250 250
251 251 def __getattr__(self, name):
252 252
253 253 if name.startswith('stream:'):
254 254 def repo_remote_attr(*args, **kwargs):
255 255 return self._call_stream(name, *args, **kwargs)
256 256 else:
257 257 def repo_remote_attr(*args, **kwargs):
258 258 return self._call(name, *args, **kwargs)
259 259
260 260 return repo_remote_attr
261 261
262 262 def _base_call(self, name, *args, **kwargs):
263 263 # TODO: oliver: This is currently necessary pre-call since the
264 264 # config object is being changed for hooking scenarios
265 265 wire = copy.deepcopy(self._wire)
266 266 wire["config"] = wire["config"].serialize()
267 267 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
268 268
269 269 payload = {
270 270 'id': str(uuid.uuid4()),
271 271 'method': name,
272 272 "_repo_name": wire['_repo_name'],
273 273 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
274 274 }
275 275
276 276 context_uid = wire.get('context')
277 277 return context_uid, payload
278 278
279 279 def get_local_cache(self, name, args):
280 280 cache_on = False
281 281 cache_key = ''
282 282 local_cache_on = rhodecode.ConfigGet().get_bool('vcs.methods.cache')
283 283
284 284 cache_methods = [
285 285 'branches', 'tags', 'bookmarks',
286 286 'is_large_file', 'is_binary',
287 287 'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
288 288 'node_history',
289 289 'revision', 'tree_items',
290 290 'ctx_list', 'ctx_branch', 'ctx_description',
291 291 'bulk_request',
292 'assert_correct_path'
292 'assert_correct_path',
293 'is_path_valid_repository',
293 294 ]
294 295
295 296 if local_cache_on and name in cache_methods:
296 297 cache_on = True
297 298 repo_state_uid = self._wire['repo_state_uid']
298 299 call_args = [a for a in args]
299 300 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
300 301
301 302 return cache_on, cache_key
302 303
303 304 @exceptions.map_vcs_exceptions
304 305 def _call(self, name, *args, **kwargs):
305 306 context_uid, payload = self._base_call(name, *args, **kwargs)
306 307 url = self.url
307 308
308 309 start = time.time()
309 310 cache_on, cache_key = self.get_local_cache(name, args)
310 311
311 312 @self._cache_region.conditional_cache_on_arguments(
312 313 namespace=self._cache_namespace, condition=cache_on and cache_key)
313 314 def remote_call(_cache_key):
314 315 if self._call_with_logging:
315 316 args_repr = f'ARG: {str(args):.512}|KW: {str(kwargs):.512}'
316 317 log.debug('Calling %s@%s with args:%r. wire_context: %s cache_on: %s',
317 318 url, name, args_repr, context_uid, cache_on)
318 319 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
319 320
320 321 result = remote_call(cache_key)
321 322 if self._call_with_logging:
322 323 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
323 324 url, name, time.time()-start, context_uid)
324 325 return result
325 326
326 327 @exceptions.map_vcs_exceptions
327 328 def _call_stream(self, name, *args, **kwargs):
328 329 context_uid, payload = self._base_call(name, *args, **kwargs)
329 330 payload['chunk_size'] = self.CHUNK_SIZE
330 331 url = self.stream_url
331 332
332 333 start = time.time()
333 334 cache_on, cache_key = self.get_local_cache(name, args)
334 335
335 336 # Cache is a problem because this is a stream
336 337 def streaming_remote_call(_cache_key):
337 338 if self._call_with_logging:
338 339 args_repr = f'ARG: {str(args):.512}|KW: {str(kwargs):.512}'
339 340 log.debug('Calling %s@%s with args:%r. wire_context: %s cache_on: %s',
340 341 url, name, args_repr, context_uid, cache_on)
341 342 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
342 343
343 344 result = streaming_remote_call(cache_key)
344 345 if self._call_with_logging:
345 346 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
346 347 url, name, time.time()-start, context_uid)
347 348 return result
348 349
349 350 def __getitem__(self, key):
350 351 return self.revision(key)
351 352
352 353 def _create_vcs_cache_context(self, *args):
353 354 """
354 355 Creates a unique string which is passed to the VCSServer on every
355 356 remote call. It is used as cache key in the VCSServer.
356 357 """
357 358 hash_key = '-'.join(map(str, args))
358 359 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
359 360
360 361 def invalidate_vcs_cache(self):
361 362 """
362 363 This invalidates the context which is sent to the VCSServer on every
363 364 call to a remote method. It forces the VCSServer to create a fresh
364 365 repository instance on the next call to a remote method.
365 366 """
366 367 self._wire['context'] = str(uuid.uuid4())
367 368
368 369
369 370 class VcsHttpProxy(object):
370 371
371 372 CHUNK_SIZE = 16384
372 373
373 374 def __init__(self, server_and_port, backend_endpoint):
374 375 retries = Retry(total=5, connect=None, read=None, redirect=None)
375 376
376 377 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
377 378 self.base_url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
378 379 self.session = requests.Session()
379 380 self.session.mount('http://', adapter)
380 381
381 382 def handle(self, environment, input_data, *args, **kwargs):
382 383 data = {
383 384 'environment': environment,
384 385 'input_data': input_data,
385 386 'args': args,
386 387 'kwargs': kwargs
387 388 }
388 389 result = self.session.post(
389 390 self.base_url, msgpack.packb(data), stream=True)
390 391 return self._get_result(result)
391 392
392 393 def _deserialize_and_raise(self, error):
393 394 exception = Exception(error['message'])
394 395 try:
395 396 exception._vcs_kind = error['_vcs_kind']
396 397 except KeyError:
397 398 pass
398 399 raise exception
399 400
400 401 def _iterate(self, result):
401 402 unpacker = msgpack.Unpacker()
402 403 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
403 404 unpacker.feed(line)
404 405 yield from unpacker
405 406
406 407 def _get_result(self, result):
407 408 iterator = self._iterate(result)
408 409 error = next(iterator)
409 410 if error:
410 411 self._deserialize_and_raise(error)
411 412
412 413 status = next(iterator)
413 414 headers = next(iterator)
414 415
415 416 return iterator, status, headers
416 417
417 418
418 419 class ThreadlocalSessionFactory(object):
419 420 """
420 421 Creates one CurlSession per thread on demand.
421 422 """
422 423
423 424 def __init__(self):
424 425 self._thread_local = threading.local()
425 426
426 427 def __call__(self):
427 428 if not hasattr(self._thread_local, 'curl_session'):
428 429 self._thread_local.curl_session = CurlSession()
429 430 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now