##// END OF EJS Templates
vcs: experimental local cache for methods
super-admin -
r4744:1a8b414c default
parent child Browse files
Show More
@@ -1,346 +1,366 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Client for the VCSServer implemented based on HTTP.
23 23 """
24 24
25 25 import copy
26 26 import logging
27 27 import threading
28 28 import time
29 29 import urllib2
30 30 import urlparse
31 31 import uuid
32 32 import traceback
33 33
34 34 import pycurl
35 35 import msgpack
36 36 import requests
37 37 from requests.packages.urllib3.util.retry import Retry
38 38
39 39 import rhodecode
40 from rhodecode.lib import rc_cache
41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
40 42 from rhodecode.lib.system_info import get_cert_path
41 43 from rhodecode.lib.vcs import exceptions, CurlSession
42 44
43 45 log = logging.getLogger(__name__)
44 46
45 47
46 48 # TODO: mikhail: Keep it in sync with vcsserver's
47 49 # HTTPApplication.ALLOWED_EXCEPTIONS
48 50 EXCEPTIONS_MAP = {
49 51 'KeyError': KeyError,
50 52 'URLError': urllib2.URLError,
51 53 }
52 54
53 55
54 56 def _remote_call(url, payload, exceptions_map, session):
55 57 try:
56 58 response = session.post(url, data=msgpack.packb(payload))
57 59 except pycurl.error as e:
58 60 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
59 61 raise exceptions.HttpVCSCommunicationError(msg)
60 62 except Exception as e:
61 63 message = getattr(e, 'message', '')
62 64 if 'Failed to connect' in message:
63 65 # gevent doesn't return proper pycurl errors
64 66 raise exceptions.HttpVCSCommunicationError(e)
65 67 else:
66 68 raise
67 69
68 70 if response.status_code >= 400:
69 71 log.error('Call to %s returned non 200 HTTP code: %s',
70 72 url, response.status_code)
71 73 raise exceptions.HttpVCSCommunicationError(repr(response.content))
72 74
73 75 try:
74 76 response = msgpack.unpackb(response.content)
75 77 except Exception:
76 78 log.exception('Failed to decode response %r', response.content)
77 79 raise
78 80
79 81 error = response.get('error')
80 82 if error:
81 83 type_ = error.get('type', 'Exception')
82 84 exc = exceptions_map.get(type_, Exception)
83 85 exc = exc(error.get('message'))
84 86 try:
85 87 exc._vcs_kind = error['_vcs_kind']
86 88 except KeyError:
87 89 pass
88 90
89 91 try:
90 92 exc._vcs_server_traceback = error['traceback']
91 93 exc._vcs_server_org_exc_name = error['org_exc']
92 94 exc._vcs_server_org_exc_tb = error['org_exc_tb']
93 95 except KeyError:
94 96 pass
95 97
96 98 raise exc
97 99 return response.get('result')
98 100
99 101
100 102 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
101 103 try:
102 104 response = session.post(url, data=msgpack.packb(payload))
103 105 except pycurl.error as e:
104 106 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
105 107 raise exceptions.HttpVCSCommunicationError(msg)
106 108 except Exception as e:
107 109 message = getattr(e, 'message', '')
108 110 if 'Failed to connect' in message:
109 111 # gevent doesn't return proper pycurl errors
110 112 raise exceptions.HttpVCSCommunicationError(e)
111 113 else:
112 114 raise
113 115
114 116 if response.status_code >= 400:
115 117 log.error('Call to %s returned non 200 HTTP code: %s',
116 118 url, response.status_code)
117 119 raise exceptions.HttpVCSCommunicationError(repr(response.content))
118 120
119 121 return response.iter_content(chunk_size=chunk_size)
120 122
121 123
122 124 class ServiceConnection(object):
123 125 def __init__(self, server_and_port, backend_endpoint, session_factory):
124 126 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
125 127 self._session_factory = session_factory
126 128
127 129 def __getattr__(self, name):
128 130 def f(*args, **kwargs):
129 131 return self._call(name, *args, **kwargs)
130
131 132 return f
132 133
133 134 @exceptions.map_vcs_exceptions
134 135 def _call(self, name, *args, **kwargs):
135 136 payload = {
136 137 'id': str(uuid.uuid4()),
137 138 'method': name,
138 139 'params': {'args': args, 'kwargs': kwargs}
139 140 }
140 141 return _remote_call(
141 142 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
142 143
143 144
144 145 class RemoteVCSMaker(object):
145 146
146 147 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
147 148 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
148 149 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
149 150
150 151 self._session_factory = session_factory
151 152 self.backend_type = backend_type
152 153
154 @classmethod
155 def init_cache_region(cls, repo_id):
156 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
157 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
158 return region, cache_namespace_uid
159
153 160 def __call__(self, path, repo_id, config, with_wire=None):
154 161 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
155 162 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
156 163
157 164 def __getattr__(self, name):
158 165 def remote_attr(*args, **kwargs):
159 166 return self._call(name, *args, **kwargs)
160 167 return remote_attr
161 168
162 169 @exceptions.map_vcs_exceptions
163 170 def _call(self, func_name, *args, **kwargs):
164 171 payload = {
165 172 'id': str(uuid.uuid4()),
166 173 'method': func_name,
167 174 'backend': self.backend_type,
168 175 'params': {'args': args, 'kwargs': kwargs}
169 176 }
170 177 url = self.url
171 178 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
172 179
173 180
174 181 class RemoteRepo(object):
175 182 CHUNK_SIZE = 16384
176 183
177 184 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
178 185 self.url = remote_maker.url
179 186 self.stream_url = remote_maker.stream_url
180 187 self._session = remote_maker._session_factory()
188 self._cache_region, self._cache_namespace = \
189 remote_maker.init_cache_region(repo_id)
181 190
182 191 with_wire = with_wire or {}
183 192
184 193 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
185 194 self._wire = {
186 195 "path": path, # repo path
187 196 "repo_id": repo_id,
188 197 "config": config,
189 198 "repo_state_uid": repo_state_uid,
190 199 "context": self._create_vcs_cache_context(path, repo_state_uid)
191 200 }
192 201
193 202 if with_wire:
194 203 self._wire.update(with_wire)
195 204
196 205 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
197 206 # log.debug brings a few percent gain even if is is not active.
198 207 if log.isEnabledFor(logging.DEBUG):
199 208 self._call_with_logging = True
200 209
201 210 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
202 211
203 212 def __getattr__(self, name):
204 213
205 214 if name.startswith('stream:'):
206 215 def repo_remote_attr(*args, **kwargs):
207 216 return self._call_stream(name, *args, **kwargs)
208 217 else:
209 218 def repo_remote_attr(*args, **kwargs):
210 219 return self._call(name, *args, **kwargs)
211 220
212 221 return repo_remote_attr
213 222
214 223 def _base_call(self, name, *args, **kwargs):
215 224 # TODO: oliver: This is currently necessary pre-call since the
216 225 # config object is being changed for hooking scenarios
217 226 wire = copy.deepcopy(self._wire)
218 227 wire["config"] = wire["config"].serialize()
219 228 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
220 229
221 230 payload = {
222 231 'id': str(uuid.uuid4()),
223 232 'method': name,
224 233 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
225 234 }
226 235
227 236 context_uid = wire.get('context')
228 237 return context_uid, payload
229 238
230 239 @exceptions.map_vcs_exceptions
231 240 def _call(self, name, *args, **kwargs):
232 241 context_uid, payload = self._base_call(name, *args, **kwargs)
233 242 url = self.url
234 243
235 244 start = time.time()
236 if self._call_with_logging:
237 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s',
238 url, name, args, context_uid)
245
246 cache_on = False
247 cache_key = ''
248 if name in ['is_large_file', 'is_binary', 'fctx_size', 'bulk_request']:
249 cache_on = True and rhodecode.CONFIG.get('vcs.methods.cache')
250 cache_key = compute_key_from_params(name, args[0], args[1])
239 251
240 result = _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
252 @self._cache_region.conditional_cache_on_arguments(
253 namespace=self._cache_namespace, condition=cache_on and cache_key)
254 def remote_call(_cache_key):
255 if self._call_with_logging:
256 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
257 url, name, args, context_uid, cache_on)
258 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
259
260 result = remote_call(cache_key)
241 261 if self._call_with_logging:
242 262 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
243 263 url, name, time.time()-start, context_uid)
244 264 return result
245 265
246 266 @exceptions.map_vcs_exceptions
247 267 def _call_stream(self, name, *args, **kwargs):
248 268 context_uid, payload = self._base_call(name, *args, **kwargs)
249 269 payload['chunk_size'] = self.CHUNK_SIZE
250 270 url = self.stream_url
251 271
252 272 start = time.time()
253 273 if self._call_with_logging:
254 274 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s',
255 275 url, name, args, context_uid)
256 276
257 277 result = _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session,
258 278 self.CHUNK_SIZE)
259 279
260 280 if self._call_with_logging:
261 281 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
262 282 url, name, time.time()-start, context_uid)
263 283 return result
264 284
265 285 def __getitem__(self, key):
266 286 return self.revision(key)
267 287
268 288 def _create_vcs_cache_context(self, *args):
269 289 """
270 290 Creates a unique string which is passed to the VCSServer on every
271 291 remote call. It is used as cache key in the VCSServer.
272 292 """
273 293 hash_key = '-'.join(map(str, args))
274 294 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
275 295
276 296 def invalidate_vcs_cache(self):
277 297 """
278 298 This invalidates the context which is sent to the VCSServer on every
279 299 call to a remote method. It forces the VCSServer to create a fresh
280 300 repository instance on the next call to a remote method.
281 301 """
282 302 self._wire['context'] = str(uuid.uuid4())
283 303
284 304
285 305 class VcsHttpProxy(object):
286 306
287 307 CHUNK_SIZE = 16384
288 308
289 309 def __init__(self, server_and_port, backend_endpoint):
290 310 retries = Retry(total=5, connect=None, read=None, redirect=None)
291 311
292 312 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
293 313 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
294 314 self.session = requests.Session()
295 315 self.session.mount('http://', adapter)
296 316
297 317 def handle(self, environment, input_data, *args, **kwargs):
298 318 data = {
299 319 'environment': environment,
300 320 'input_data': input_data,
301 321 'args': args,
302 322 'kwargs': kwargs
303 323 }
304 324 result = self.session.post(
305 325 self.base_url, msgpack.packb(data), stream=True)
306 326 return self._get_result(result)
307 327
308 328 def _deserialize_and_raise(self, error):
309 329 exception = Exception(error['message'])
310 330 try:
311 331 exception._vcs_kind = error['_vcs_kind']
312 332 except KeyError:
313 333 pass
314 334 raise exception
315 335
316 336 def _iterate(self, result):
317 337 unpacker = msgpack.Unpacker()
318 338 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
319 339 unpacker.feed(line)
320 340 for chunk in unpacker:
321 341 yield chunk
322 342
323 343 def _get_result(self, result):
324 344 iterator = self._iterate(result)
325 345 error = iterator.next()
326 346 if error:
327 347 self._deserialize_and_raise(error)
328 348
329 349 status = iterator.next()
330 350 headers = iterator.next()
331 351
332 352 return iterator, status, headers
333 353
334 354
335 355 class ThreadlocalSessionFactory(object):
336 356 """
337 357 Creates one CurlSession per thread on demand.
338 358 """
339 359
340 360 def __init__(self):
341 361 self._thread_local = threading.local()
342 362
343 363 def __call__(self):
344 364 if not hasattr(self._thread_local, 'curl_session'):
345 365 self._thread_local.curl_session = CurlSession()
346 366 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now