##// END OF EJS Templates
caches: fixes the file-store backend to use serialized names for filename cache of vcsserver....
super-admin -
r4777:42855995 default
parent child Browse files
Show More
@@ -1,381 +1,383 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Client for the VCSServer implemented based on HTTP.
23 23 """
24 24
25 25 import copy
26 26 import logging
27 27 import threading
28 28 import time
29 29 import urllib2
30 30 import urlparse
31 31 import uuid
32 32 import traceback
33 33
34 34 import pycurl
35 35 import msgpack
36 36 import requests
37 37 from requests.packages.urllib3.util.retry import Retry
38 38
39 39 import rhodecode
40 40 from rhodecode.lib import rc_cache
41 41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
42 42 from rhodecode.lib.system_info import get_cert_path
43 43 from rhodecode.lib.vcs import exceptions, CurlSession
44 44
45 45 log = logging.getLogger(__name__)
46 46
47 47
48 48 # TODO: mikhail: Keep it in sync with vcsserver's
49 49 # HTTPApplication.ALLOWED_EXCEPTIONS
50 50 EXCEPTIONS_MAP = {
51 51 'KeyError': KeyError,
52 52 'URLError': urllib2.URLError,
53 53 }
54 54
55 55
56 56 def _remote_call(url, payload, exceptions_map, session):
57 57 try:
58 58 response = session.post(url, data=msgpack.packb(payload))
59 59 except pycurl.error as e:
60 60 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
61 61 raise exceptions.HttpVCSCommunicationError(msg)
62 62 except Exception as e:
63 63 message = getattr(e, 'message', '')
64 64 if 'Failed to connect' in message:
65 65 # gevent doesn't return proper pycurl errors
66 66 raise exceptions.HttpVCSCommunicationError(e)
67 67 else:
68 68 raise
69 69
70 70 if response.status_code >= 400:
71 71 log.error('Call to %s returned non 200 HTTP code: %s',
72 72 url, response.status_code)
73 73 raise exceptions.HttpVCSCommunicationError(repr(response.content))
74 74
75 75 try:
76 76 response = msgpack.unpackb(response.content)
77 77 except Exception:
78 78 log.exception('Failed to decode response %r', response.content)
79 79 raise
80 80
81 81 error = response.get('error')
82 82 if error:
83 83 type_ = error.get('type', 'Exception')
84 84 exc = exceptions_map.get(type_, Exception)
85 85 exc = exc(error.get('message'))
86 86 try:
87 87 exc._vcs_kind = error['_vcs_kind']
88 88 except KeyError:
89 89 pass
90 90
91 91 try:
92 92 exc._vcs_server_traceback = error['traceback']
93 93 exc._vcs_server_org_exc_name = error['org_exc']
94 94 exc._vcs_server_org_exc_tb = error['org_exc_tb']
95 95 except KeyError:
96 96 pass
97 97
98 98 raise exc
99 99 return response.get('result')
100 100
101 101
102 102 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
103 103 try:
104 104 response = session.post(url, data=msgpack.packb(payload))
105 105 except pycurl.error as e:
106 106 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
107 107 raise exceptions.HttpVCSCommunicationError(msg)
108 108 except Exception as e:
109 109 message = getattr(e, 'message', '')
110 110 if 'Failed to connect' in message:
111 111 # gevent doesn't return proper pycurl errors
112 112 raise exceptions.HttpVCSCommunicationError(e)
113 113 else:
114 114 raise
115 115
116 116 if response.status_code >= 400:
117 117 log.error('Call to %s returned non 200 HTTP code: %s',
118 118 url, response.status_code)
119 119 raise exceptions.HttpVCSCommunicationError(repr(response.content))
120 120
121 121 return response.iter_content(chunk_size=chunk_size)
122 122
123 123
124 124 class ServiceConnection(object):
125 125 def __init__(self, server_and_port, backend_endpoint, session_factory):
126 126 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
127 127 self._session_factory = session_factory
128 128
129 129 def __getattr__(self, name):
130 130 def f(*args, **kwargs):
131 131 return self._call(name, *args, **kwargs)
132 132 return f
133 133
134 134 @exceptions.map_vcs_exceptions
135 135 def _call(self, name, *args, **kwargs):
136 136 payload = {
137 137 'id': str(uuid.uuid4()),
138 138 'method': name,
139 139 'params': {'args': args, 'kwargs': kwargs}
140 140 }
141 141 return _remote_call(
142 142 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
143 143
144 144
145 145 class RemoteVCSMaker(object):
146 146
147 147 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
148 148 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
149 149 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
150 150
151 151 self._session_factory = session_factory
152 152 self.backend_type = backend_type
153 153
154 154 @classmethod
155 155 def init_cache_region(cls, repo_id):
156 156 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
157 157 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
158 158 return region, cache_namespace_uid
159 159
160 160 def __call__(self, path, repo_id, config, with_wire=None):
161 161 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
162 162 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
163 163
164 164 def __getattr__(self, name):
165 165 def remote_attr(*args, **kwargs):
166 166 return self._call(name, *args, **kwargs)
167 167 return remote_attr
168 168
169 169 @exceptions.map_vcs_exceptions
170 170 def _call(self, func_name, *args, **kwargs):
171 171 payload = {
172 172 'id': str(uuid.uuid4()),
173 173 'method': func_name,
174 174 'backend': self.backend_type,
175 175 'params': {'args': args, 'kwargs': kwargs}
176 176 }
177 177 url = self.url
178 178 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
179 179
180 180
181 181 class RemoteRepo(object):
182 182 CHUNK_SIZE = 16384
183 183
184 184 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
185 185 self.url = remote_maker.url
186 186 self.stream_url = remote_maker.stream_url
187 187 self._session = remote_maker._session_factory()
188 cache_repo_id = self._repo_id_sanitizer(repo_id)
188 189 self._cache_region, self._cache_namespace = \
189 remote_maker.init_cache_region(self._repo_id_sanitizer(repo_id))
190 remote_maker.init_cache_region(self._repo_id_sanitizer(cache_repo_id))
190 191
191 192 with_wire = with_wire or {}
192 193
193 194 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
194 195 self._wire = {
195 196 "path": path, # repo path
196 197 "repo_id": repo_id,
198 "cache_repo_id": cache_repo_id,
197 199 "config": config,
198 200 "repo_state_uid": repo_state_uid,
199 201 "context": self._create_vcs_cache_context(path, repo_state_uid)
200 202 }
201 203
202 204 if with_wire:
203 205 self._wire.update(with_wire)
204 206
205 207 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
206 208 # log.debug brings a few percent gain even if is is not active.
207 209 if log.isEnabledFor(logging.DEBUG):
208 210 self._call_with_logging = True
209 211
210 212 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
211 213
212 214 def _repo_id_sanitizer(self, repo_id):
213 215 return repo_id.replace('/', '__')
214 216
215 217 def __getattr__(self, name):
216 218
217 219 if name.startswith('stream:'):
218 220 def repo_remote_attr(*args, **kwargs):
219 221 return self._call_stream(name, *args, **kwargs)
220 222 else:
221 223 def repo_remote_attr(*args, **kwargs):
222 224 return self._call(name, *args, **kwargs)
223 225
224 226 return repo_remote_attr
225 227
226 228 def _base_call(self, name, *args, **kwargs):
227 229 # TODO: oliver: This is currently necessary pre-call since the
228 230 # config object is being changed for hooking scenarios
229 231 wire = copy.deepcopy(self._wire)
230 232 wire["config"] = wire["config"].serialize()
231 233 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
232 234
233 235 payload = {
234 236 'id': str(uuid.uuid4()),
235 237 'method': name,
236 238 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
237 239 }
238 240
239 241 context_uid = wire.get('context')
240 242 return context_uid, payload
241 243
242 244 @exceptions.map_vcs_exceptions
243 245 def _call(self, name, *args, **kwargs):
244 246 context_uid, payload = self._base_call(name, *args, **kwargs)
245 247 url = self.url
246 248
247 249 start = time.time()
248 250
249 251 cache_on = False
250 252 cache_key = ''
251 253 local_cache_on = rhodecode.CONFIG.get('vcs.methods.cache')
252 254
253 255 cache_methods = [
254 256 'branches', 'tags', 'bookmarks',
255 257 'is_large_file', 'is_binary', 'fctx_size', 'node_history', 'blob_raw_length',
256 258 'revision', 'tree_items',
257 259 'ctx_list',
258 260 'bulk_request',
259 261 ]
260 262
261 263 if local_cache_on and name in cache_methods:
262 264 cache_on = True
263 265 repo_state_uid = self._wire['repo_state_uid']
264 266 call_args = [a for a in args]
265 267 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
266 268
267 269 @self._cache_region.conditional_cache_on_arguments(
268 270 namespace=self._cache_namespace, condition=cache_on and cache_key)
269 271 def remote_call(_cache_key):
270 272 if self._call_with_logging:
271 273 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
272 274 url, name, args, context_uid, cache_on)
273 275 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
274 276
275 277 result = remote_call(cache_key)
276 278 if self._call_with_logging:
277 279 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
278 280 url, name, time.time()-start, context_uid)
279 281 return result
280 282
281 283 @exceptions.map_vcs_exceptions
282 284 def _call_stream(self, name, *args, **kwargs):
283 285 context_uid, payload = self._base_call(name, *args, **kwargs)
284 286 payload['chunk_size'] = self.CHUNK_SIZE
285 287 url = self.stream_url
286 288
287 289 start = time.time()
288 290 if self._call_with_logging:
289 291 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s',
290 292 url, name, args, context_uid)
291 293
292 294 result = _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session,
293 295 self.CHUNK_SIZE)
294 296
295 297 if self._call_with_logging:
296 298 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
297 299 url, name, time.time()-start, context_uid)
298 300 return result
299 301
300 302 def __getitem__(self, key):
301 303 return self.revision(key)
302 304
303 305 def _create_vcs_cache_context(self, *args):
304 306 """
305 307 Creates a unique string which is passed to the VCSServer on every
306 308 remote call. It is used as cache key in the VCSServer.
307 309 """
308 310 hash_key = '-'.join(map(str, args))
309 311 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
310 312
311 313 def invalidate_vcs_cache(self):
312 314 """
313 315 This invalidates the context which is sent to the VCSServer on every
314 316 call to a remote method. It forces the VCSServer to create a fresh
315 317 repository instance on the next call to a remote method.
316 318 """
317 319 self._wire['context'] = str(uuid.uuid4())
318 320
319 321
320 322 class VcsHttpProxy(object):
321 323
322 324 CHUNK_SIZE = 16384
323 325
324 326 def __init__(self, server_and_port, backend_endpoint):
325 327 retries = Retry(total=5, connect=None, read=None, redirect=None)
326 328
327 329 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
328 330 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
329 331 self.session = requests.Session()
330 332 self.session.mount('http://', adapter)
331 333
332 334 def handle(self, environment, input_data, *args, **kwargs):
333 335 data = {
334 336 'environment': environment,
335 337 'input_data': input_data,
336 338 'args': args,
337 339 'kwargs': kwargs
338 340 }
339 341 result = self.session.post(
340 342 self.base_url, msgpack.packb(data), stream=True)
341 343 return self._get_result(result)
342 344
343 345 def _deserialize_and_raise(self, error):
344 346 exception = Exception(error['message'])
345 347 try:
346 348 exception._vcs_kind = error['_vcs_kind']
347 349 except KeyError:
348 350 pass
349 351 raise exception
350 352
351 353 def _iterate(self, result):
352 354 unpacker = msgpack.Unpacker()
353 355 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
354 356 unpacker.feed(line)
355 357 for chunk in unpacker:
356 358 yield chunk
357 359
358 360 def _get_result(self, result):
359 361 iterator = self._iterate(result)
360 362 error = iterator.next()
361 363 if error:
362 364 self._deserialize_and_raise(error)
363 365
364 366 status = iterator.next()
365 367 headers = iterator.next()
366 368
367 369 return iterator, status, headers
368 370
369 371
370 372 class ThreadlocalSessionFactory(object):
371 373 """
372 374 Creates one CurlSession per thread on demand.
373 375 """
374 376
375 377 def __init__(self):
376 378 self._thread_local = threading.local()
377 379
378 380 def __call__(self):
379 381 if not hasattr(self._thread_local, 'curl_session'):
380 382 self._thread_local.curl_session = CurlSession()
381 383 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now