##// END OF EJS Templates
caches: added path check to cached methods for faster git repo checks
super-admin -
r4828:252be6a7 default
parent child Browse files
Show More
@@ -1,395 +1,396 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Client for the VCSServer implemented based on HTTP.
23 23 """
24 24
25 25 import copy
26 26 import logging
27 27 import threading
28 28 import time
29 29 import urllib2
30 30 import urlparse
31 31 import uuid
32 32 import traceback
33 33
34 34 import pycurl
35 35 import msgpack
36 36 import requests
37 37 from requests.packages.urllib3.util.retry import Retry
38 38
39 39 import rhodecode
40 40 from rhodecode.lib import rc_cache
41 41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
42 42 from rhodecode.lib.system_info import get_cert_path
43 43 from rhodecode.lib.vcs import exceptions, CurlSession
44 44 from rhodecode.lib.utils2 import str2bool
45 45
46 46 log = logging.getLogger(__name__)
47 47
48 48
49 49 # TODO: mikhail: Keep it in sync with vcsserver's
50 50 # HTTPApplication.ALLOWED_EXCEPTIONS
51 51 EXCEPTIONS_MAP = {
52 52 'KeyError': KeyError,
53 53 'URLError': urllib2.URLError,
54 54 }
55 55
56 56
57 57 def _remote_call(url, payload, exceptions_map, session):
58 58 try:
59 59 response = session.post(url, data=msgpack.packb(payload))
60 60 except pycurl.error as e:
61 61 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
62 62 raise exceptions.HttpVCSCommunicationError(msg)
63 63 except Exception as e:
64 64 message = getattr(e, 'message', '')
65 65 if 'Failed to connect' in message:
66 66 # gevent doesn't return proper pycurl errors
67 67 raise exceptions.HttpVCSCommunicationError(e)
68 68 else:
69 69 raise
70 70
71 71 if response.status_code >= 400:
72 72 log.error('Call to %s returned non 200 HTTP code: %s',
73 73 url, response.status_code)
74 74 raise exceptions.HttpVCSCommunicationError(repr(response.content))
75 75
76 76 try:
77 77 response = msgpack.unpackb(response.content)
78 78 except Exception:
79 79 log.exception('Failed to decode response %r', response.content)
80 80 raise
81 81
82 82 error = response.get('error')
83 83 if error:
84 84 type_ = error.get('type', 'Exception')
85 85 exc = exceptions_map.get(type_, Exception)
86 86 exc = exc(error.get('message'))
87 87 try:
88 88 exc._vcs_kind = error['_vcs_kind']
89 89 except KeyError:
90 90 pass
91 91
92 92 try:
93 93 exc._vcs_server_traceback = error['traceback']
94 94 exc._vcs_server_org_exc_name = error['org_exc']
95 95 exc._vcs_server_org_exc_tb = error['org_exc_tb']
96 96 except KeyError:
97 97 pass
98 98
99 99 raise exc
100 100 return response.get('result')
101 101
102 102
103 103 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
104 104 try:
105 105 response = session.post(url, data=msgpack.packb(payload))
106 106 except pycurl.error as e:
107 107 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
108 108 raise exceptions.HttpVCSCommunicationError(msg)
109 109 except Exception as e:
110 110 message = getattr(e, 'message', '')
111 111 if 'Failed to connect' in message:
112 112 # gevent doesn't return proper pycurl errors
113 113 raise exceptions.HttpVCSCommunicationError(e)
114 114 else:
115 115 raise
116 116
117 117 if response.status_code >= 400:
118 118 log.error('Call to %s returned non 200 HTTP code: %s',
119 119 url, response.status_code)
120 120 raise exceptions.HttpVCSCommunicationError(repr(response.content))
121 121
122 122 return response.iter_content(chunk_size=chunk_size)
123 123
124 124
125 125 class ServiceConnection(object):
126 126 def __init__(self, server_and_port, backend_endpoint, session_factory):
127 127 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
128 128 self._session_factory = session_factory
129 129
130 130 def __getattr__(self, name):
131 131 def f(*args, **kwargs):
132 132 return self._call(name, *args, **kwargs)
133 133 return f
134 134
135 135 @exceptions.map_vcs_exceptions
136 136 def _call(self, name, *args, **kwargs):
137 137 payload = {
138 138 'id': str(uuid.uuid4()),
139 139 'method': name,
140 140 'params': {'args': args, 'kwargs': kwargs}
141 141 }
142 142 return _remote_call(
143 143 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
144 144
145 145
146 146 class RemoteVCSMaker(object):
147 147
148 148 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
149 149 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
150 150 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
151 151
152 152 self._session_factory = session_factory
153 153 self.backend_type = backend_type
154 154
155 155 @classmethod
156 156 def init_cache_region(cls, repo_id):
157 157 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
158 158 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
159 159 return region, cache_namespace_uid
160 160
161 161 def __call__(self, path, repo_id, config, with_wire=None):
162 162 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
163 163 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
164 164
165 165 def __getattr__(self, name):
166 166 def remote_attr(*args, **kwargs):
167 167 return self._call(name, *args, **kwargs)
168 168 return remote_attr
169 169
170 170 @exceptions.map_vcs_exceptions
171 171 def _call(self, func_name, *args, **kwargs):
172 172 payload = {
173 173 'id': str(uuid.uuid4()),
174 174 'method': func_name,
175 175 'backend': self.backend_type,
176 176 'params': {'args': args, 'kwargs': kwargs}
177 177 }
178 178 url = self.url
179 179 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
180 180
181 181
182 182 class RemoteRepo(object):
183 183 CHUNK_SIZE = 16384
184 184
185 185 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
186 186 self.url = remote_maker.url
187 187 self.stream_url = remote_maker.stream_url
188 188 self._session = remote_maker._session_factory()
189 189
190 190 cache_repo_id = self._repo_id_sanitizer(repo_id)
191 191 self._cache_region, self._cache_namespace = \
192 192 remote_maker.init_cache_region(cache_repo_id)
193 193
194 194 with_wire = with_wire or {}
195 195
196 196 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
197 197 self._wire = {
198 198 "path": path, # repo path
199 199 "repo_id": repo_id,
200 200 "cache_repo_id": cache_repo_id,
201 201 "config": config,
202 202 "repo_state_uid": repo_state_uid,
203 203 "context": self._create_vcs_cache_context(path, repo_state_uid)
204 204 }
205 205
206 206 if with_wire:
207 207 self._wire.update(with_wire)
208 208
209 209 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
210 210 # log.debug brings a few percent gain even if is is not active.
211 211 if log.isEnabledFor(logging.DEBUG):
212 212 self._call_with_logging = True
213 213
214 214 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
215 215
216 216 def _repo_id_sanitizer(self, repo_id):
217 217 pathless = repo_id.replace('/', '__').replace('-', '_')
218 218 return ''.join(char if ord(char) < 128 else '_{}_'.format(ord(char)) for char in pathless)
219 219
220 220 def __getattr__(self, name):
221 221
222 222 if name.startswith('stream:'):
223 223 def repo_remote_attr(*args, **kwargs):
224 224 return self._call_stream(name, *args, **kwargs)
225 225 else:
226 226 def repo_remote_attr(*args, **kwargs):
227 227 return self._call(name, *args, **kwargs)
228 228
229 229 return repo_remote_attr
230 230
231 231 def _base_call(self, name, *args, **kwargs):
232 232 # TODO: oliver: This is currently necessary pre-call since the
233 233 # config object is being changed for hooking scenarios
234 234 wire = copy.deepcopy(self._wire)
235 235 wire["config"] = wire["config"].serialize()
236 236 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
237 237
238 238 payload = {
239 239 'id': str(uuid.uuid4()),
240 240 'method': name,
241 241 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
242 242 }
243 243
244 244 context_uid = wire.get('context')
245 245 return context_uid, payload
246 246
247 247 def get_local_cache(self, name, args):
248 248 cache_on = False
249 249 cache_key = ''
250 250 local_cache_on = str2bool(rhodecode.CONFIG.get('vcs.methods.cache'))
251 251
252 252 cache_methods = [
253 253 'branches', 'tags', 'bookmarks',
254 254 'is_large_file', 'is_binary',
255 255 'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
256 256 'node_history',
257 257 'revision', 'tree_items',
258 258 'ctx_list', 'ctx_branch', 'ctx_description',
259 259 'bulk_request',
260 'assert_correct_path'
260 261 ]
261 262
262 263 if local_cache_on and name in cache_methods:
263 264 cache_on = True
264 265 repo_state_uid = self._wire['repo_state_uid']
265 266 call_args = [a for a in args]
266 267 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
267 268
268 269 return cache_on, cache_key
269 270
270 271 @exceptions.map_vcs_exceptions
271 272 def _call(self, name, *args, **kwargs):
272 273 context_uid, payload = self._base_call(name, *args, **kwargs)
273 274 url = self.url
274 275
275 276 start = time.time()
276 277 cache_on, cache_key = self.get_local_cache(name, args)
277 278
278 279 @self._cache_region.conditional_cache_on_arguments(
279 280 namespace=self._cache_namespace, condition=cache_on and cache_key)
280 281 def remote_call(_cache_key):
281 282 if self._call_with_logging:
282 283 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
283 284 url, name, args, context_uid, cache_on)
284 285 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
285 286
286 287 result = remote_call(cache_key)
287 288 if self._call_with_logging:
288 289 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
289 290 url, name, time.time()-start, context_uid)
290 291 return result
291 292
292 293 @exceptions.map_vcs_exceptions
293 294 def _call_stream(self, name, *args, **kwargs):
294 295 context_uid, payload = self._base_call(name, *args, **kwargs)
295 296 payload['chunk_size'] = self.CHUNK_SIZE
296 297 url = self.stream_url
297 298
298 299 start = time.time()
299 300 cache_on, cache_key = self.get_local_cache(name, args)
300 301
301 302 # Cache is a problem because this is a stream
302 303 def streaming_remote_call(_cache_key):
303 304 if self._call_with_logging:
304 305 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
305 306 url, name, args, context_uid, cache_on)
306 307 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
307 308
308 309 result = streaming_remote_call(cache_key)
309 310 if self._call_with_logging:
310 311 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
311 312 url, name, time.time()-start, context_uid)
312 313 return result
313 314
314 315 def __getitem__(self, key):
315 316 return self.revision(key)
316 317
317 318 def _create_vcs_cache_context(self, *args):
318 319 """
319 320 Creates a unique string which is passed to the VCSServer on every
320 321 remote call. It is used as cache key in the VCSServer.
321 322 """
322 323 hash_key = '-'.join(map(str, args))
323 324 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
324 325
325 326 def invalidate_vcs_cache(self):
326 327 """
327 328 This invalidates the context which is sent to the VCSServer on every
328 329 call to a remote method. It forces the VCSServer to create a fresh
329 330 repository instance on the next call to a remote method.
330 331 """
331 332 self._wire['context'] = str(uuid.uuid4())
332 333
333 334
334 335 class VcsHttpProxy(object):
335 336
336 337 CHUNK_SIZE = 16384
337 338
338 339 def __init__(self, server_and_port, backend_endpoint):
339 340 retries = Retry(total=5, connect=None, read=None, redirect=None)
340 341
341 342 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
342 343 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
343 344 self.session = requests.Session()
344 345 self.session.mount('http://', adapter)
345 346
346 347 def handle(self, environment, input_data, *args, **kwargs):
347 348 data = {
348 349 'environment': environment,
349 350 'input_data': input_data,
350 351 'args': args,
351 352 'kwargs': kwargs
352 353 }
353 354 result = self.session.post(
354 355 self.base_url, msgpack.packb(data), stream=True)
355 356 return self._get_result(result)
356 357
357 358 def _deserialize_and_raise(self, error):
358 359 exception = Exception(error['message'])
359 360 try:
360 361 exception._vcs_kind = error['_vcs_kind']
361 362 except KeyError:
362 363 pass
363 364 raise exception
364 365
365 366 def _iterate(self, result):
366 367 unpacker = msgpack.Unpacker()
367 368 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
368 369 unpacker.feed(line)
369 370 for chunk in unpacker:
370 371 yield chunk
371 372
372 373 def _get_result(self, result):
373 374 iterator = self._iterate(result)
374 375 error = iterator.next()
375 376 if error:
376 377 self._deserialize_and_raise(error)
377 378
378 379 status = iterator.next()
379 380 headers = iterator.next()
380 381
381 382 return iterator, status, headers
382 383
383 384
384 385 class ThreadlocalSessionFactory(object):
385 386 """
386 387 Creates one CurlSession per thread on demand.
387 388 """
388 389
389 390 def __init__(self):
390 391 self._thread_local = threading.local()
391 392
392 393 def __call__(self):
393 394 if not hasattr(self._thread_local, 'curl_session'):
394 395 self._thread_local.curl_session = CurlSession()
395 396 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now