##// END OF EJS Templates
vcs-cache: enable more methods, and fix arguments calls
super-admin -
r4747:4ec212aa default
parent child Browse files
Show More
@@ -1,367 +1,378 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Client for the VCSServer implemented based on HTTP.
23 23 """
24 24
25 25 import copy
26 26 import logging
27 27 import threading
28 28 import time
29 29 import urllib2
30 30 import urlparse
31 31 import uuid
32 32 import traceback
33 33
34 34 import pycurl
35 35 import msgpack
36 36 import requests
37 37 from requests.packages.urllib3.util.retry import Retry
38 38
39 39 import rhodecode
40 40 from rhodecode.lib import rc_cache
41 41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
42 42 from rhodecode.lib.system_info import get_cert_path
43 43 from rhodecode.lib.vcs import exceptions, CurlSession
44 44
45 45 log = logging.getLogger(__name__)
46 46
47 47
48 48 # TODO: mikhail: Keep it in sync with vcsserver's
49 49 # HTTPApplication.ALLOWED_EXCEPTIONS
50 50 EXCEPTIONS_MAP = {
51 51 'KeyError': KeyError,
52 52 'URLError': urllib2.URLError,
53 53 }
54 54
55 55
56 56 def _remote_call(url, payload, exceptions_map, session):
57 57 try:
58 58 response = session.post(url, data=msgpack.packb(payload))
59 59 except pycurl.error as e:
60 60 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
61 61 raise exceptions.HttpVCSCommunicationError(msg)
62 62 except Exception as e:
63 63 message = getattr(e, 'message', '')
64 64 if 'Failed to connect' in message:
65 65 # gevent doesn't return proper pycurl errors
66 66 raise exceptions.HttpVCSCommunicationError(e)
67 67 else:
68 68 raise
69 69
70 70 if response.status_code >= 400:
71 71 log.error('Call to %s returned non 200 HTTP code: %s',
72 72 url, response.status_code)
73 73 raise exceptions.HttpVCSCommunicationError(repr(response.content))
74 74
75 75 try:
76 76 response = msgpack.unpackb(response.content)
77 77 except Exception:
78 78 log.exception('Failed to decode response %r', response.content)
79 79 raise
80 80
81 81 error = response.get('error')
82 82 if error:
83 83 type_ = error.get('type', 'Exception')
84 84 exc = exceptions_map.get(type_, Exception)
85 85 exc = exc(error.get('message'))
86 86 try:
87 87 exc._vcs_kind = error['_vcs_kind']
88 88 except KeyError:
89 89 pass
90 90
91 91 try:
92 92 exc._vcs_server_traceback = error['traceback']
93 93 exc._vcs_server_org_exc_name = error['org_exc']
94 94 exc._vcs_server_org_exc_tb = error['org_exc_tb']
95 95 except KeyError:
96 96 pass
97 97
98 98 raise exc
99 99 return response.get('result')
100 100
101 101
102 102 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
103 103 try:
104 104 response = session.post(url, data=msgpack.packb(payload))
105 105 except pycurl.error as e:
106 106 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
107 107 raise exceptions.HttpVCSCommunicationError(msg)
108 108 except Exception as e:
109 109 message = getattr(e, 'message', '')
110 110 if 'Failed to connect' in message:
111 111 # gevent doesn't return proper pycurl errors
112 112 raise exceptions.HttpVCSCommunicationError(e)
113 113 else:
114 114 raise
115 115
116 116 if response.status_code >= 400:
117 117 log.error('Call to %s returned non 200 HTTP code: %s',
118 118 url, response.status_code)
119 119 raise exceptions.HttpVCSCommunicationError(repr(response.content))
120 120
121 121 return response.iter_content(chunk_size=chunk_size)
122 122
123 123
124 124 class ServiceConnection(object):
125 125 def __init__(self, server_and_port, backend_endpoint, session_factory):
126 126 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
127 127 self._session_factory = session_factory
128 128
129 129 def __getattr__(self, name):
130 130 def f(*args, **kwargs):
131 131 return self._call(name, *args, **kwargs)
132 132 return f
133 133
134 134 @exceptions.map_vcs_exceptions
135 135 def _call(self, name, *args, **kwargs):
136 136 payload = {
137 137 'id': str(uuid.uuid4()),
138 138 'method': name,
139 139 'params': {'args': args, 'kwargs': kwargs}
140 140 }
141 141 return _remote_call(
142 142 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
143 143
144 144
145 145 class RemoteVCSMaker(object):
146 146
147 147 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
148 148 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
149 149 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
150 150
151 151 self._session_factory = session_factory
152 152 self.backend_type = backend_type
153 153
154 154 @classmethod
155 155 def init_cache_region(cls, repo_id):
156 156 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
157 157 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
158 158 return region, cache_namespace_uid
159 159
160 160 def __call__(self, path, repo_id, config, with_wire=None):
161 161 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
162 162 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
163 163
164 164 def __getattr__(self, name):
165 165 def remote_attr(*args, **kwargs):
166 166 return self._call(name, *args, **kwargs)
167 167 return remote_attr
168 168
169 169 @exceptions.map_vcs_exceptions
170 170 def _call(self, func_name, *args, **kwargs):
171 171 payload = {
172 172 'id': str(uuid.uuid4()),
173 173 'method': func_name,
174 174 'backend': self.backend_type,
175 175 'params': {'args': args, 'kwargs': kwargs}
176 176 }
177 177 url = self.url
178 178 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
179 179
180 180
181 181 class RemoteRepo(object):
182 182 CHUNK_SIZE = 16384
183 183
184 184 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
185 185 self.url = remote_maker.url
186 186 self.stream_url = remote_maker.stream_url
187 187 self._session = remote_maker._session_factory()
188 188 self._cache_region, self._cache_namespace = \
189 189 remote_maker.init_cache_region(repo_id)
190 190
191 191 with_wire = with_wire or {}
192 192
193 193 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
194 194 self._wire = {
195 195 "path": path, # repo path
196 196 "repo_id": repo_id,
197 197 "config": config,
198 198 "repo_state_uid": repo_state_uid,
199 199 "context": self._create_vcs_cache_context(path, repo_state_uid)
200 200 }
201 201
202 202 if with_wire:
203 203 self._wire.update(with_wire)
204 204
205 205 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
206 206 # log.debug brings a few percent gain even if is is not active.
207 207 if log.isEnabledFor(logging.DEBUG):
208 208 self._call_with_logging = True
209 209
210 210 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
211 211
212 212 def __getattr__(self, name):
213 213
214 214 if name.startswith('stream:'):
215 215 def repo_remote_attr(*args, **kwargs):
216 216 return self._call_stream(name, *args, **kwargs)
217 217 else:
218 218 def repo_remote_attr(*args, **kwargs):
219 219 return self._call(name, *args, **kwargs)
220 220
221 221 return repo_remote_attr
222 222
223 223 def _base_call(self, name, *args, **kwargs):
224 224 # TODO: oliver: This is currently necessary pre-call since the
225 225 # config object is being changed for hooking scenarios
226 226 wire = copy.deepcopy(self._wire)
227 227 wire["config"] = wire["config"].serialize()
228 228 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
229 229
230 230 payload = {
231 231 'id': str(uuid.uuid4()),
232 232 'method': name,
233 233 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
234 234 }
235 235
236 236 context_uid = wire.get('context')
237 237 return context_uid, payload
238 238
239 239 @exceptions.map_vcs_exceptions
240 240 def _call(self, name, *args, **kwargs):
241 241 context_uid, payload = self._base_call(name, *args, **kwargs)
242 242 url = self.url
243 243
244 244 start = time.time()
245 245
246 246 cache_on = False
247 247 cache_key = ''
248 local_cache = rhodecode.CONFIG.get('vcs.methods.cache')
249 if local_cache and name in ['is_large_file', 'is_binary', 'fctx_size', 'bulk_request']:
248 local_cache_on = rhodecode.CONFIG.get('vcs.methods.cache')
249
250 cache_methods = [
251 'branches', 'tags', 'bookmarks',
252 'is_large_file', 'is_binary', 'fctx_size', 'node_history', 'blob_raw_length',
253 'revision', 'tree_items',
254 'ctx_list',
255 'bulk_request',
256 ]
257
258 if local_cache_on and name in cache_methods:
250 259 cache_on = True
251 cache_key = compute_key_from_params(name, args[0], args[1])
260 repo_state_uid = self._wire['repo_state_uid']
261 call_args = [a for a in args]
262 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
252 263
253 264 @self._cache_region.conditional_cache_on_arguments(
254 265 namespace=self._cache_namespace, condition=cache_on and cache_key)
255 266 def remote_call(_cache_key):
256 267 if self._call_with_logging:
257 268 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
258 269 url, name, args, context_uid, cache_on)
259 270 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
260 271
261 272 result = remote_call(cache_key)
262 273 if self._call_with_logging:
263 274 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
264 275 url, name, time.time()-start, context_uid)
265 276 return result
266 277
267 278 @exceptions.map_vcs_exceptions
268 279 def _call_stream(self, name, *args, **kwargs):
269 280 context_uid, payload = self._base_call(name, *args, **kwargs)
270 281 payload['chunk_size'] = self.CHUNK_SIZE
271 282 url = self.stream_url
272 283
273 284 start = time.time()
274 285 if self._call_with_logging:
275 286 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s',
276 287 url, name, args, context_uid)
277 288
278 289 result = _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session,
279 290 self.CHUNK_SIZE)
280 291
281 292 if self._call_with_logging:
282 293 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
283 294 url, name, time.time()-start, context_uid)
284 295 return result
285 296
286 297 def __getitem__(self, key):
287 298 return self.revision(key)
288 299
289 300 def _create_vcs_cache_context(self, *args):
290 301 """
291 302 Creates a unique string which is passed to the VCSServer on every
292 303 remote call. It is used as cache key in the VCSServer.
293 304 """
294 305 hash_key = '-'.join(map(str, args))
295 306 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
296 307
297 308 def invalidate_vcs_cache(self):
298 309 """
299 310 This invalidates the context which is sent to the VCSServer on every
300 311 call to a remote method. It forces the VCSServer to create a fresh
301 312 repository instance on the next call to a remote method.
302 313 """
303 314 self._wire['context'] = str(uuid.uuid4())
304 315
305 316
306 317 class VcsHttpProxy(object):
307 318
308 319 CHUNK_SIZE = 16384
309 320
310 321 def __init__(self, server_and_port, backend_endpoint):
311 322 retries = Retry(total=5, connect=None, read=None, redirect=None)
312 323
313 324 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
314 325 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
315 326 self.session = requests.Session()
316 327 self.session.mount('http://', adapter)
317 328
318 329 def handle(self, environment, input_data, *args, **kwargs):
319 330 data = {
320 331 'environment': environment,
321 332 'input_data': input_data,
322 333 'args': args,
323 334 'kwargs': kwargs
324 335 }
325 336 result = self.session.post(
326 337 self.base_url, msgpack.packb(data), stream=True)
327 338 return self._get_result(result)
328 339
329 340 def _deserialize_and_raise(self, error):
330 341 exception = Exception(error['message'])
331 342 try:
332 343 exception._vcs_kind = error['_vcs_kind']
333 344 except KeyError:
334 345 pass
335 346 raise exception
336 347
337 348 def _iterate(self, result):
338 349 unpacker = msgpack.Unpacker()
339 350 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
340 351 unpacker.feed(line)
341 352 for chunk in unpacker:
342 353 yield chunk
343 354
344 355 def _get_result(self, result):
345 356 iterator = self._iterate(result)
346 357 error = iterator.next()
347 358 if error:
348 359 self._deserialize_and_raise(error)
349 360
350 361 status = iterator.next()
351 362 headers = iterator.next()
352 363
353 364 return iterator, status, headers
354 365
355 366
356 367 class ThreadlocalSessionFactory(object):
357 368 """
358 369 Creates one CurlSession per thread on demand.
359 370 """
360 371
361 372 def __init__(self):
362 373 self._thread_local = threading.local()
363 374
364 375 def __call__(self):
365 376 if not hasattr(self._thread_local, 'curl_session'):
366 377 self._thread_local.curl_session = CurlSession()
367 378 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now