##// END OF EJS Templates
caches: use also local cache branch as it used very much
super-admin -
r4797:2aad81ef default
parent child Browse files
Show More
@@ -1,394 +1,394 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Client for the VCSServer implemented based on HTTP.
23 23 """
24 24
25 25 import copy
26 26 import logging
27 27 import threading
28 28 import time
29 29 import urllib2
30 30 import urlparse
31 31 import uuid
32 32 import traceback
33 33
34 34 import pycurl
35 35 import msgpack
36 36 import requests
37 37 from requests.packages.urllib3.util.retry import Retry
38 38
39 39 import rhodecode
40 40 from rhodecode.lib import rc_cache
41 41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
42 42 from rhodecode.lib.system_info import get_cert_path
43 43 from rhodecode.lib.vcs import exceptions, CurlSession
44 44 from rhodecode.lib.utils2 import str2bool
45 45
46 46 log = logging.getLogger(__name__)
47 47
48 48
49 49 # TODO: mikhail: Keep it in sync with vcsserver's
50 50 # HTTPApplication.ALLOWED_EXCEPTIONS
51 51 EXCEPTIONS_MAP = {
52 52 'KeyError': KeyError,
53 53 'URLError': urllib2.URLError,
54 54 }
55 55
56 56
57 57 def _remote_call(url, payload, exceptions_map, session):
58 58 try:
59 59 response = session.post(url, data=msgpack.packb(payload))
60 60 except pycurl.error as e:
61 61 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
62 62 raise exceptions.HttpVCSCommunicationError(msg)
63 63 except Exception as e:
64 64 message = getattr(e, 'message', '')
65 65 if 'Failed to connect' in message:
66 66 # gevent doesn't return proper pycurl errors
67 67 raise exceptions.HttpVCSCommunicationError(e)
68 68 else:
69 69 raise
70 70
71 71 if response.status_code >= 400:
72 72 log.error('Call to %s returned non 200 HTTP code: %s',
73 73 url, response.status_code)
74 74 raise exceptions.HttpVCSCommunicationError(repr(response.content))
75 75
76 76 try:
77 77 response = msgpack.unpackb(response.content)
78 78 except Exception:
79 79 log.exception('Failed to decode response %r', response.content)
80 80 raise
81 81
82 82 error = response.get('error')
83 83 if error:
84 84 type_ = error.get('type', 'Exception')
85 85 exc = exceptions_map.get(type_, Exception)
86 86 exc = exc(error.get('message'))
87 87 try:
88 88 exc._vcs_kind = error['_vcs_kind']
89 89 except KeyError:
90 90 pass
91 91
92 92 try:
93 93 exc._vcs_server_traceback = error['traceback']
94 94 exc._vcs_server_org_exc_name = error['org_exc']
95 95 exc._vcs_server_org_exc_tb = error['org_exc_tb']
96 96 except KeyError:
97 97 pass
98 98
99 99 raise exc
100 100 return response.get('result')
101 101
102 102
103 103 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
104 104 try:
105 105 response = session.post(url, data=msgpack.packb(payload))
106 106 except pycurl.error as e:
107 107 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
108 108 raise exceptions.HttpVCSCommunicationError(msg)
109 109 except Exception as e:
110 110 message = getattr(e, 'message', '')
111 111 if 'Failed to connect' in message:
112 112 # gevent doesn't return proper pycurl errors
113 113 raise exceptions.HttpVCSCommunicationError(e)
114 114 else:
115 115 raise
116 116
117 117 if response.status_code >= 400:
118 118 log.error('Call to %s returned non 200 HTTP code: %s',
119 119 url, response.status_code)
120 120 raise exceptions.HttpVCSCommunicationError(repr(response.content))
121 121
122 122 return response.iter_content(chunk_size=chunk_size)
123 123
124 124
125 125 class ServiceConnection(object):
126 126 def __init__(self, server_and_port, backend_endpoint, session_factory):
127 127 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
128 128 self._session_factory = session_factory
129 129
130 130 def __getattr__(self, name):
131 131 def f(*args, **kwargs):
132 132 return self._call(name, *args, **kwargs)
133 133 return f
134 134
135 135 @exceptions.map_vcs_exceptions
136 136 def _call(self, name, *args, **kwargs):
137 137 payload = {
138 138 'id': str(uuid.uuid4()),
139 139 'method': name,
140 140 'params': {'args': args, 'kwargs': kwargs}
141 141 }
142 142 return _remote_call(
143 143 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
144 144
145 145
146 146 class RemoteVCSMaker(object):
147 147
148 148 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
149 149 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
150 150 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
151 151
152 152 self._session_factory = session_factory
153 153 self.backend_type = backend_type
154 154
155 155 @classmethod
156 156 def init_cache_region(cls, repo_id):
157 157 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
158 158 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
159 159 return region, cache_namespace_uid
160 160
161 161 def __call__(self, path, repo_id, config, with_wire=None):
162 162 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
163 163 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
164 164
165 165 def __getattr__(self, name):
166 166 def remote_attr(*args, **kwargs):
167 167 return self._call(name, *args, **kwargs)
168 168 return remote_attr
169 169
170 170 @exceptions.map_vcs_exceptions
171 171 def _call(self, func_name, *args, **kwargs):
172 172 payload = {
173 173 'id': str(uuid.uuid4()),
174 174 'method': func_name,
175 175 'backend': self.backend_type,
176 176 'params': {'args': args, 'kwargs': kwargs}
177 177 }
178 178 url = self.url
179 179 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
180 180
181 181
182 182 class RemoteRepo(object):
183 183 CHUNK_SIZE = 16384
184 184
185 185 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
186 186 self.url = remote_maker.url
187 187 self.stream_url = remote_maker.stream_url
188 188 self._session = remote_maker._session_factory()
189 189
190 190 cache_repo_id = self._repo_id_sanitizer(repo_id)
191 191 self._cache_region, self._cache_namespace = \
192 192 remote_maker.init_cache_region(cache_repo_id)
193 193
194 194 with_wire = with_wire or {}
195 195
196 196 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
197 197 self._wire = {
198 198 "path": path, # repo path
199 199 "repo_id": repo_id,
200 200 "cache_repo_id": cache_repo_id,
201 201 "config": config,
202 202 "repo_state_uid": repo_state_uid,
203 203 "context": self._create_vcs_cache_context(path, repo_state_uid)
204 204 }
205 205
206 206 if with_wire:
207 207 self._wire.update(with_wire)
208 208
209 209 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
210 210 # log.debug brings a few percent gain even if is is not active.
211 211 if log.isEnabledFor(logging.DEBUG):
212 212 self._call_with_logging = True
213 213
214 214 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
215 215
216 216 def _repo_id_sanitizer(self, repo_id):
217 217 return repo_id.replace('/', '__').replace('-', '_')
218 218
219 219 def __getattr__(self, name):
220 220
221 221 if name.startswith('stream:'):
222 222 def repo_remote_attr(*args, **kwargs):
223 223 return self._call_stream(name, *args, **kwargs)
224 224 else:
225 225 def repo_remote_attr(*args, **kwargs):
226 226 return self._call(name, *args, **kwargs)
227 227
228 228 return repo_remote_attr
229 229
230 230 def _base_call(self, name, *args, **kwargs):
231 231 # TODO: oliver: This is currently necessary pre-call since the
232 232 # config object is being changed for hooking scenarios
233 233 wire = copy.deepcopy(self._wire)
234 234 wire["config"] = wire["config"].serialize()
235 235 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
236 236
237 237 payload = {
238 238 'id': str(uuid.uuid4()),
239 239 'method': name,
240 240 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
241 241 }
242 242
243 243 context_uid = wire.get('context')
244 244 return context_uid, payload
245 245
246 246 def get_local_cache(self, name, args):
247 247 cache_on = False
248 248 cache_key = ''
249 249 local_cache_on = str2bool(rhodecode.CONFIG.get('vcs.methods.cache'))
250 250
251 251 cache_methods = [
252 252 'branches', 'tags', 'bookmarks',
253 253 'is_large_file', 'is_binary',
254 254 'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
255 255 'node_history',
256 256 'revision', 'tree_items',
257 'ctx_list',
257 'ctx_list', 'ctx_branch',
258 258 'bulk_request',
259 259 ]
260 260
261 261 if local_cache_on and name in cache_methods:
262 262 cache_on = True
263 263 repo_state_uid = self._wire['repo_state_uid']
264 264 call_args = [a for a in args]
265 265 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
266 266
267 267 return cache_on, cache_key
268 268
269 269 @exceptions.map_vcs_exceptions
270 270 def _call(self, name, *args, **kwargs):
271 271 context_uid, payload = self._base_call(name, *args, **kwargs)
272 272 url = self.url
273 273
274 274 start = time.time()
275 275 cache_on, cache_key = self.get_local_cache(name, args)
276 276
277 277 @self._cache_region.conditional_cache_on_arguments(
278 278 namespace=self._cache_namespace, condition=cache_on and cache_key)
279 279 def remote_call(_cache_key):
280 280 if self._call_with_logging:
281 281 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
282 282 url, name, args, context_uid, cache_on)
283 283 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
284 284
285 285 result = remote_call(cache_key)
286 286 if self._call_with_logging:
287 287 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
288 288 url, name, time.time()-start, context_uid)
289 289 return result
290 290
291 291 @exceptions.map_vcs_exceptions
292 292 def _call_stream(self, name, *args, **kwargs):
293 293 context_uid, payload = self._base_call(name, *args, **kwargs)
294 294 payload['chunk_size'] = self.CHUNK_SIZE
295 295 url = self.stream_url
296 296
297 297 start = time.time()
298 298 cache_on, cache_key = self.get_local_cache(name, args)
299 299
300 300 # Cache is a problem because this is a stream
301 301 def streaming_remote_call(_cache_key):
302 302 if self._call_with_logging:
303 303 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
304 304 url, name, args, context_uid, cache_on)
305 305 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
306 306
307 307 result = streaming_remote_call(cache_key)
308 308 if self._call_with_logging:
309 309 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
310 310 url, name, time.time()-start, context_uid)
311 311 return result
312 312
313 313 def __getitem__(self, key):
314 314 return self.revision(key)
315 315
316 316 def _create_vcs_cache_context(self, *args):
317 317 """
318 318 Creates a unique string which is passed to the VCSServer on every
319 319 remote call. It is used as cache key in the VCSServer.
320 320 """
321 321 hash_key = '-'.join(map(str, args))
322 322 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
323 323
324 324 def invalidate_vcs_cache(self):
325 325 """
326 326 This invalidates the context which is sent to the VCSServer on every
327 327 call to a remote method. It forces the VCSServer to create a fresh
328 328 repository instance on the next call to a remote method.
329 329 """
330 330 self._wire['context'] = str(uuid.uuid4())
331 331
332 332
333 333 class VcsHttpProxy(object):
334 334
335 335 CHUNK_SIZE = 16384
336 336
337 337 def __init__(self, server_and_port, backend_endpoint):
338 338 retries = Retry(total=5, connect=None, read=None, redirect=None)
339 339
340 340 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
341 341 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
342 342 self.session = requests.Session()
343 343 self.session.mount('http://', adapter)
344 344
345 345 def handle(self, environment, input_data, *args, **kwargs):
346 346 data = {
347 347 'environment': environment,
348 348 'input_data': input_data,
349 349 'args': args,
350 350 'kwargs': kwargs
351 351 }
352 352 result = self.session.post(
353 353 self.base_url, msgpack.packb(data), stream=True)
354 354 return self._get_result(result)
355 355
356 356 def _deserialize_and_raise(self, error):
357 357 exception = Exception(error['message'])
358 358 try:
359 359 exception._vcs_kind = error['_vcs_kind']
360 360 except KeyError:
361 361 pass
362 362 raise exception
363 363
364 364 def _iterate(self, result):
365 365 unpacker = msgpack.Unpacker()
366 366 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
367 367 unpacker.feed(line)
368 368 for chunk in unpacker:
369 369 yield chunk
370 370
371 371 def _get_result(self, result):
372 372 iterator = self._iterate(result)
373 373 error = iterator.next()
374 374 if error:
375 375 self._deserialize_and_raise(error)
376 376
377 377 status = iterator.next()
378 378 headers = iterator.next()
379 379
380 380 return iterator, status, headers
381 381
382 382
383 383 class ThreadlocalSessionFactory(object):
384 384 """
385 385 Creates one CurlSession per thread on demand.
386 386 """
387 387
388 388 def __init__(self):
389 389 self._thread_local = threading.local()
390 390
391 391 def __call__(self):
392 392 if not hasattr(self._thread_local, 'curl_session'):
393 393 self._thread_local.curl_session = CurlSession()
394 394 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now