##// END OF EJS Templates
vcs: experimental local cache for methods
super-admin -
r4744:1a8b414c default
parent child Browse files
Show More
@@ -1,346 +1,366 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2020 RhodeCode GmbH
3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Client for the VCSServer implemented based on HTTP.
22 Client for the VCSServer implemented based on HTTP.
23 """
23 """
24
24
25 import copy
25 import copy
26 import logging
26 import logging
27 import threading
27 import threading
28 import time
28 import time
29 import urllib2
29 import urllib2
30 import urlparse
30 import urlparse
31 import uuid
31 import uuid
32 import traceback
32 import traceback
33
33
34 import pycurl
34 import pycurl
35 import msgpack
35 import msgpack
36 import requests
36 import requests
37 from requests.packages.urllib3.util.retry import Retry
37 from requests.packages.urllib3.util.retry import Retry
38
38
39 import rhodecode
39 import rhodecode
40 from rhodecode.lib import rc_cache
41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
40 from rhodecode.lib.system_info import get_cert_path
42 from rhodecode.lib.system_info import get_cert_path
41 from rhodecode.lib.vcs import exceptions, CurlSession
43 from rhodecode.lib.vcs import exceptions, CurlSession
42
44
43 log = logging.getLogger(__name__)
45 log = logging.getLogger(__name__)
44
46
45
47
46 # TODO: mikhail: Keep it in sync with vcsserver's
48 # TODO: mikhail: Keep it in sync with vcsserver's
47 # HTTPApplication.ALLOWED_EXCEPTIONS
49 # HTTPApplication.ALLOWED_EXCEPTIONS
48 EXCEPTIONS_MAP = {
50 EXCEPTIONS_MAP = {
49 'KeyError': KeyError,
51 'KeyError': KeyError,
50 'URLError': urllib2.URLError,
52 'URLError': urllib2.URLError,
51 }
53 }
52
54
53
55
54 def _remote_call(url, payload, exceptions_map, session):
56 def _remote_call(url, payload, exceptions_map, session):
55 try:
57 try:
56 response = session.post(url, data=msgpack.packb(payload))
58 response = session.post(url, data=msgpack.packb(payload))
57 except pycurl.error as e:
59 except pycurl.error as e:
58 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
60 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
59 raise exceptions.HttpVCSCommunicationError(msg)
61 raise exceptions.HttpVCSCommunicationError(msg)
60 except Exception as e:
62 except Exception as e:
61 message = getattr(e, 'message', '')
63 message = getattr(e, 'message', '')
62 if 'Failed to connect' in message:
64 if 'Failed to connect' in message:
63 # gevent doesn't return proper pycurl errors
65 # gevent doesn't return proper pycurl errors
64 raise exceptions.HttpVCSCommunicationError(e)
66 raise exceptions.HttpVCSCommunicationError(e)
65 else:
67 else:
66 raise
68 raise
67
69
68 if response.status_code >= 400:
70 if response.status_code >= 400:
69 log.error('Call to %s returned non 200 HTTP code: %s',
71 log.error('Call to %s returned non 200 HTTP code: %s',
70 url, response.status_code)
72 url, response.status_code)
71 raise exceptions.HttpVCSCommunicationError(repr(response.content))
73 raise exceptions.HttpVCSCommunicationError(repr(response.content))
72
74
73 try:
75 try:
74 response = msgpack.unpackb(response.content)
76 response = msgpack.unpackb(response.content)
75 except Exception:
77 except Exception:
76 log.exception('Failed to decode response %r', response.content)
78 log.exception('Failed to decode response %r', response.content)
77 raise
79 raise
78
80
79 error = response.get('error')
81 error = response.get('error')
80 if error:
82 if error:
81 type_ = error.get('type', 'Exception')
83 type_ = error.get('type', 'Exception')
82 exc = exceptions_map.get(type_, Exception)
84 exc = exceptions_map.get(type_, Exception)
83 exc = exc(error.get('message'))
85 exc = exc(error.get('message'))
84 try:
86 try:
85 exc._vcs_kind = error['_vcs_kind']
87 exc._vcs_kind = error['_vcs_kind']
86 except KeyError:
88 except KeyError:
87 pass
89 pass
88
90
89 try:
91 try:
90 exc._vcs_server_traceback = error['traceback']
92 exc._vcs_server_traceback = error['traceback']
91 exc._vcs_server_org_exc_name = error['org_exc']
93 exc._vcs_server_org_exc_name = error['org_exc']
92 exc._vcs_server_org_exc_tb = error['org_exc_tb']
94 exc._vcs_server_org_exc_tb = error['org_exc_tb']
93 except KeyError:
95 except KeyError:
94 pass
96 pass
95
97
96 raise exc
98 raise exc
97 return response.get('result')
99 return response.get('result')
98
100
99
101
100 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
102 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
101 try:
103 try:
102 response = session.post(url, data=msgpack.packb(payload))
104 response = session.post(url, data=msgpack.packb(payload))
103 except pycurl.error as e:
105 except pycurl.error as e:
104 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
106 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
105 raise exceptions.HttpVCSCommunicationError(msg)
107 raise exceptions.HttpVCSCommunicationError(msg)
106 except Exception as e:
108 except Exception as e:
107 message = getattr(e, 'message', '')
109 message = getattr(e, 'message', '')
108 if 'Failed to connect' in message:
110 if 'Failed to connect' in message:
109 # gevent doesn't return proper pycurl errors
111 # gevent doesn't return proper pycurl errors
110 raise exceptions.HttpVCSCommunicationError(e)
112 raise exceptions.HttpVCSCommunicationError(e)
111 else:
113 else:
112 raise
114 raise
113
115
114 if response.status_code >= 400:
116 if response.status_code >= 400:
115 log.error('Call to %s returned non 200 HTTP code: %s',
117 log.error('Call to %s returned non 200 HTTP code: %s',
116 url, response.status_code)
118 url, response.status_code)
117 raise exceptions.HttpVCSCommunicationError(repr(response.content))
119 raise exceptions.HttpVCSCommunicationError(repr(response.content))
118
120
119 return response.iter_content(chunk_size=chunk_size)
121 return response.iter_content(chunk_size=chunk_size)
120
122
121
123
122 class ServiceConnection(object):
124 class ServiceConnection(object):
123 def __init__(self, server_and_port, backend_endpoint, session_factory):
125 def __init__(self, server_and_port, backend_endpoint, session_factory):
124 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
126 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
125 self._session_factory = session_factory
127 self._session_factory = session_factory
126
128
127 def __getattr__(self, name):
129 def __getattr__(self, name):
128 def f(*args, **kwargs):
130 def f(*args, **kwargs):
129 return self._call(name, *args, **kwargs)
131 return self._call(name, *args, **kwargs)
130
131 return f
132 return f
132
133
133 @exceptions.map_vcs_exceptions
134 @exceptions.map_vcs_exceptions
134 def _call(self, name, *args, **kwargs):
135 def _call(self, name, *args, **kwargs):
135 payload = {
136 payload = {
136 'id': str(uuid.uuid4()),
137 'id': str(uuid.uuid4()),
137 'method': name,
138 'method': name,
138 'params': {'args': args, 'kwargs': kwargs}
139 'params': {'args': args, 'kwargs': kwargs}
139 }
140 }
140 return _remote_call(
141 return _remote_call(
141 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
142 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
142
143
143
144
144 class RemoteVCSMaker(object):
145 class RemoteVCSMaker(object):
145
146
146 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
147 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
147 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
148 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
148 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
149 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
149
150
150 self._session_factory = session_factory
151 self._session_factory = session_factory
151 self.backend_type = backend_type
152 self.backend_type = backend_type
152
153
154 @classmethod
155 def init_cache_region(cls, repo_id):
156 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
157 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
158 return region, cache_namespace_uid
159
153 def __call__(self, path, repo_id, config, with_wire=None):
160 def __call__(self, path, repo_id, config, with_wire=None):
154 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
161 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
155 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
162 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
156
163
157 def __getattr__(self, name):
164 def __getattr__(self, name):
158 def remote_attr(*args, **kwargs):
165 def remote_attr(*args, **kwargs):
159 return self._call(name, *args, **kwargs)
166 return self._call(name, *args, **kwargs)
160 return remote_attr
167 return remote_attr
161
168
162 @exceptions.map_vcs_exceptions
169 @exceptions.map_vcs_exceptions
163 def _call(self, func_name, *args, **kwargs):
170 def _call(self, func_name, *args, **kwargs):
164 payload = {
171 payload = {
165 'id': str(uuid.uuid4()),
172 'id': str(uuid.uuid4()),
166 'method': func_name,
173 'method': func_name,
167 'backend': self.backend_type,
174 'backend': self.backend_type,
168 'params': {'args': args, 'kwargs': kwargs}
175 'params': {'args': args, 'kwargs': kwargs}
169 }
176 }
170 url = self.url
177 url = self.url
171 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
178 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
172
179
173
180
174 class RemoteRepo(object):
181 class RemoteRepo(object):
175 CHUNK_SIZE = 16384
182 CHUNK_SIZE = 16384
176
183
177 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
184 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
178 self.url = remote_maker.url
185 self.url = remote_maker.url
179 self.stream_url = remote_maker.stream_url
186 self.stream_url = remote_maker.stream_url
180 self._session = remote_maker._session_factory()
187 self._session = remote_maker._session_factory()
188 self._cache_region, self._cache_namespace = \
189 remote_maker.init_cache_region(repo_id)
181
190
182 with_wire = with_wire or {}
191 with_wire = with_wire or {}
183
192
184 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
193 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
185 self._wire = {
194 self._wire = {
186 "path": path, # repo path
195 "path": path, # repo path
187 "repo_id": repo_id,
196 "repo_id": repo_id,
188 "config": config,
197 "config": config,
189 "repo_state_uid": repo_state_uid,
198 "repo_state_uid": repo_state_uid,
190 "context": self._create_vcs_cache_context(path, repo_state_uid)
199 "context": self._create_vcs_cache_context(path, repo_state_uid)
191 }
200 }
192
201
193 if with_wire:
202 if with_wire:
194 self._wire.update(with_wire)
203 self._wire.update(with_wire)
195
204
196 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
205 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
197 # log.debug brings a few percent gain even if is is not active.
206 # log.debug brings a few percent gain even if is is not active.
198 if log.isEnabledFor(logging.DEBUG):
207 if log.isEnabledFor(logging.DEBUG):
199 self._call_with_logging = True
208 self._call_with_logging = True
200
209
201 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
210 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
202
211
203 def __getattr__(self, name):
212 def __getattr__(self, name):
204
213
205 if name.startswith('stream:'):
214 if name.startswith('stream:'):
206 def repo_remote_attr(*args, **kwargs):
215 def repo_remote_attr(*args, **kwargs):
207 return self._call_stream(name, *args, **kwargs)
216 return self._call_stream(name, *args, **kwargs)
208 else:
217 else:
209 def repo_remote_attr(*args, **kwargs):
218 def repo_remote_attr(*args, **kwargs):
210 return self._call(name, *args, **kwargs)
219 return self._call(name, *args, **kwargs)
211
220
212 return repo_remote_attr
221 return repo_remote_attr
213
222
214 def _base_call(self, name, *args, **kwargs):
223 def _base_call(self, name, *args, **kwargs):
215 # TODO: oliver: This is currently necessary pre-call since the
224 # TODO: oliver: This is currently necessary pre-call since the
216 # config object is being changed for hooking scenarios
225 # config object is being changed for hooking scenarios
217 wire = copy.deepcopy(self._wire)
226 wire = copy.deepcopy(self._wire)
218 wire["config"] = wire["config"].serialize()
227 wire["config"] = wire["config"].serialize()
219 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
228 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
220
229
221 payload = {
230 payload = {
222 'id': str(uuid.uuid4()),
231 'id': str(uuid.uuid4()),
223 'method': name,
232 'method': name,
224 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
233 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
225 }
234 }
226
235
227 context_uid = wire.get('context')
236 context_uid = wire.get('context')
228 return context_uid, payload
237 return context_uid, payload
229
238
230 @exceptions.map_vcs_exceptions
239 @exceptions.map_vcs_exceptions
231 def _call(self, name, *args, **kwargs):
240 def _call(self, name, *args, **kwargs):
232 context_uid, payload = self._base_call(name, *args, **kwargs)
241 context_uid, payload = self._base_call(name, *args, **kwargs)
233 url = self.url
242 url = self.url
234
243
235 start = time.time()
244 start = time.time()
245
246 cache_on = False
247 cache_key = ''
248 if name in ['is_large_file', 'is_binary', 'fctx_size', 'bulk_request']:
249 cache_on = True and rhodecode.CONFIG.get('vcs.methods.cache')
250 cache_key = compute_key_from_params(name, args[0], args[1])
251
252 @self._cache_region.conditional_cache_on_arguments(
253 namespace=self._cache_namespace, condition=cache_on and cache_key)
254 def remote_call(_cache_key):
236 if self._call_with_logging:
255 if self._call_with_logging:
237 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s',
256 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
238 url, name, args, context_uid)
257 url, name, args, context_uid, cache_on)
258 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
239
259
240 result = _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
260 result = remote_call(cache_key)
241 if self._call_with_logging:
261 if self._call_with_logging:
242 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
262 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
243 url, name, time.time()-start, context_uid)
263 url, name, time.time()-start, context_uid)
244 return result
264 return result
245
265
246 @exceptions.map_vcs_exceptions
266 @exceptions.map_vcs_exceptions
247 def _call_stream(self, name, *args, **kwargs):
267 def _call_stream(self, name, *args, **kwargs):
248 context_uid, payload = self._base_call(name, *args, **kwargs)
268 context_uid, payload = self._base_call(name, *args, **kwargs)
249 payload['chunk_size'] = self.CHUNK_SIZE
269 payload['chunk_size'] = self.CHUNK_SIZE
250 url = self.stream_url
270 url = self.stream_url
251
271
252 start = time.time()
272 start = time.time()
253 if self._call_with_logging:
273 if self._call_with_logging:
254 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s',
274 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s',
255 url, name, args, context_uid)
275 url, name, args, context_uid)
256
276
257 result = _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session,
277 result = _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session,
258 self.CHUNK_SIZE)
278 self.CHUNK_SIZE)
259
279
260 if self._call_with_logging:
280 if self._call_with_logging:
261 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
281 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
262 url, name, time.time()-start, context_uid)
282 url, name, time.time()-start, context_uid)
263 return result
283 return result
264
284
265 def __getitem__(self, key):
285 def __getitem__(self, key):
266 return self.revision(key)
286 return self.revision(key)
267
287
268 def _create_vcs_cache_context(self, *args):
288 def _create_vcs_cache_context(self, *args):
269 """
289 """
270 Creates a unique string which is passed to the VCSServer on every
290 Creates a unique string which is passed to the VCSServer on every
271 remote call. It is used as cache key in the VCSServer.
291 remote call. It is used as cache key in the VCSServer.
272 """
292 """
273 hash_key = '-'.join(map(str, args))
293 hash_key = '-'.join(map(str, args))
274 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
294 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
275
295
276 def invalidate_vcs_cache(self):
296 def invalidate_vcs_cache(self):
277 """
297 """
278 This invalidates the context which is sent to the VCSServer on every
298 This invalidates the context which is sent to the VCSServer on every
279 call to a remote method. It forces the VCSServer to create a fresh
299 call to a remote method. It forces the VCSServer to create a fresh
280 repository instance on the next call to a remote method.
300 repository instance on the next call to a remote method.
281 """
301 """
282 self._wire['context'] = str(uuid.uuid4())
302 self._wire['context'] = str(uuid.uuid4())
283
303
284
304
285 class VcsHttpProxy(object):
305 class VcsHttpProxy(object):
286
306
287 CHUNK_SIZE = 16384
307 CHUNK_SIZE = 16384
288
308
289 def __init__(self, server_and_port, backend_endpoint):
309 def __init__(self, server_and_port, backend_endpoint):
290 retries = Retry(total=5, connect=None, read=None, redirect=None)
310 retries = Retry(total=5, connect=None, read=None, redirect=None)
291
311
292 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
312 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
293 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
313 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
294 self.session = requests.Session()
314 self.session = requests.Session()
295 self.session.mount('http://', adapter)
315 self.session.mount('http://', adapter)
296
316
297 def handle(self, environment, input_data, *args, **kwargs):
317 def handle(self, environment, input_data, *args, **kwargs):
298 data = {
318 data = {
299 'environment': environment,
319 'environment': environment,
300 'input_data': input_data,
320 'input_data': input_data,
301 'args': args,
321 'args': args,
302 'kwargs': kwargs
322 'kwargs': kwargs
303 }
323 }
304 result = self.session.post(
324 result = self.session.post(
305 self.base_url, msgpack.packb(data), stream=True)
325 self.base_url, msgpack.packb(data), stream=True)
306 return self._get_result(result)
326 return self._get_result(result)
307
327
308 def _deserialize_and_raise(self, error):
328 def _deserialize_and_raise(self, error):
309 exception = Exception(error['message'])
329 exception = Exception(error['message'])
310 try:
330 try:
311 exception._vcs_kind = error['_vcs_kind']
331 exception._vcs_kind = error['_vcs_kind']
312 except KeyError:
332 except KeyError:
313 pass
333 pass
314 raise exception
334 raise exception
315
335
316 def _iterate(self, result):
336 def _iterate(self, result):
317 unpacker = msgpack.Unpacker()
337 unpacker = msgpack.Unpacker()
318 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
338 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
319 unpacker.feed(line)
339 unpacker.feed(line)
320 for chunk in unpacker:
340 for chunk in unpacker:
321 yield chunk
341 yield chunk
322
342
323 def _get_result(self, result):
343 def _get_result(self, result):
324 iterator = self._iterate(result)
344 iterator = self._iterate(result)
325 error = iterator.next()
345 error = iterator.next()
326 if error:
346 if error:
327 self._deserialize_and_raise(error)
347 self._deserialize_and_raise(error)
328
348
329 status = iterator.next()
349 status = iterator.next()
330 headers = iterator.next()
350 headers = iterator.next()
331
351
332 return iterator, status, headers
352 return iterator, status, headers
333
353
334
354
335 class ThreadlocalSessionFactory(object):
355 class ThreadlocalSessionFactory(object):
336 """
356 """
337 Creates one CurlSession per thread on demand.
357 Creates one CurlSession per thread on demand.
338 """
358 """
339
359
340 def __init__(self):
360 def __init__(self):
341 self._thread_local = threading.local()
361 self._thread_local = threading.local()
342
362
343 def __call__(self):
363 def __call__(self):
344 if not hasattr(self._thread_local, 'curl_session'):
364 if not hasattr(self._thread_local, 'curl_session'):
345 self._thread_local.curl_session = CurlSession()
365 self._thread_local.curl_session = CurlSession()
346 return self._thread_local.curl_session
366 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now