##// END OF EJS Templates
caches: fix id sanitizer.
super-admin -
r4778:5d4c07f5 default
parent child Browse files
Show More
@@ -1,383 +1,384 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Client for the VCSServer implemented based on HTTP.
23 23 """
24 24
25 25 import copy
26 26 import logging
27 27 import threading
28 28 import time
29 29 import urllib2
30 30 import urlparse
31 31 import uuid
32 32 import traceback
33 33
34 34 import pycurl
35 35 import msgpack
36 36 import requests
37 37 from requests.packages.urllib3.util.retry import Retry
38 38
39 39 import rhodecode
40 40 from rhodecode.lib import rc_cache
41 41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
42 42 from rhodecode.lib.system_info import get_cert_path
43 43 from rhodecode.lib.vcs import exceptions, CurlSession
44 44
45 45 log = logging.getLogger(__name__)
46 46
47 47
48 48 # TODO: mikhail: Keep it in sync with vcsserver's
49 49 # HTTPApplication.ALLOWED_EXCEPTIONS
50 50 EXCEPTIONS_MAP = {
51 51 'KeyError': KeyError,
52 52 'URLError': urllib2.URLError,
53 53 }
54 54
55 55
56 56 def _remote_call(url, payload, exceptions_map, session):
57 57 try:
58 58 response = session.post(url, data=msgpack.packb(payload))
59 59 except pycurl.error as e:
60 60 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
61 61 raise exceptions.HttpVCSCommunicationError(msg)
62 62 except Exception as e:
63 63 message = getattr(e, 'message', '')
64 64 if 'Failed to connect' in message:
65 65 # gevent doesn't return proper pycurl errors
66 66 raise exceptions.HttpVCSCommunicationError(e)
67 67 else:
68 68 raise
69 69
70 70 if response.status_code >= 400:
71 71 log.error('Call to %s returned non 200 HTTP code: %s',
72 72 url, response.status_code)
73 73 raise exceptions.HttpVCSCommunicationError(repr(response.content))
74 74
75 75 try:
76 76 response = msgpack.unpackb(response.content)
77 77 except Exception:
78 78 log.exception('Failed to decode response %r', response.content)
79 79 raise
80 80
81 81 error = response.get('error')
82 82 if error:
83 83 type_ = error.get('type', 'Exception')
84 84 exc = exceptions_map.get(type_, Exception)
85 85 exc = exc(error.get('message'))
86 86 try:
87 87 exc._vcs_kind = error['_vcs_kind']
88 88 except KeyError:
89 89 pass
90 90
91 91 try:
92 92 exc._vcs_server_traceback = error['traceback']
93 93 exc._vcs_server_org_exc_name = error['org_exc']
94 94 exc._vcs_server_org_exc_tb = error['org_exc_tb']
95 95 except KeyError:
96 96 pass
97 97
98 98 raise exc
99 99 return response.get('result')
100 100
101 101
102 102 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
103 103 try:
104 104 response = session.post(url, data=msgpack.packb(payload))
105 105 except pycurl.error as e:
106 106 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
107 107 raise exceptions.HttpVCSCommunicationError(msg)
108 108 except Exception as e:
109 109 message = getattr(e, 'message', '')
110 110 if 'Failed to connect' in message:
111 111 # gevent doesn't return proper pycurl errors
112 112 raise exceptions.HttpVCSCommunicationError(e)
113 113 else:
114 114 raise
115 115
116 116 if response.status_code >= 400:
117 117 log.error('Call to %s returned non 200 HTTP code: %s',
118 118 url, response.status_code)
119 119 raise exceptions.HttpVCSCommunicationError(repr(response.content))
120 120
121 121 return response.iter_content(chunk_size=chunk_size)
122 122
123 123
124 124 class ServiceConnection(object):
125 125 def __init__(self, server_and_port, backend_endpoint, session_factory):
126 126 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
127 127 self._session_factory = session_factory
128 128
129 129 def __getattr__(self, name):
130 130 def f(*args, **kwargs):
131 131 return self._call(name, *args, **kwargs)
132 132 return f
133 133
134 134 @exceptions.map_vcs_exceptions
135 135 def _call(self, name, *args, **kwargs):
136 136 payload = {
137 137 'id': str(uuid.uuid4()),
138 138 'method': name,
139 139 'params': {'args': args, 'kwargs': kwargs}
140 140 }
141 141 return _remote_call(
142 142 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
143 143
144 144
145 145 class RemoteVCSMaker(object):
146 146
147 147 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
148 148 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
149 149 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
150 150
151 151 self._session_factory = session_factory
152 152 self.backend_type = backend_type
153 153
154 154 @classmethod
155 155 def init_cache_region(cls, repo_id):
156 156 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
157 157 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
158 158 return region, cache_namespace_uid
159 159
160 160 def __call__(self, path, repo_id, config, with_wire=None):
161 161 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
162 162 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
163 163
164 164 def __getattr__(self, name):
165 165 def remote_attr(*args, **kwargs):
166 166 return self._call(name, *args, **kwargs)
167 167 return remote_attr
168 168
169 169 @exceptions.map_vcs_exceptions
170 170 def _call(self, func_name, *args, **kwargs):
171 171 payload = {
172 172 'id': str(uuid.uuid4()),
173 173 'method': func_name,
174 174 'backend': self.backend_type,
175 175 'params': {'args': args, 'kwargs': kwargs}
176 176 }
177 177 url = self.url
178 178 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
179 179
180 180
181 181 class RemoteRepo(object):
182 182 CHUNK_SIZE = 16384
183 183
184 184 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
185 185 self.url = remote_maker.url
186 186 self.stream_url = remote_maker.stream_url
187 187 self._session = remote_maker._session_factory()
188
188 189 cache_repo_id = self._repo_id_sanitizer(repo_id)
189 190 self._cache_region, self._cache_namespace = \
190 remote_maker.init_cache_region(self._repo_id_sanitizer(cache_repo_id))
191 remote_maker.init_cache_region(cache_repo_id)
191 192
192 193 with_wire = with_wire or {}
193 194
194 195 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
195 196 self._wire = {
196 197 "path": path, # repo path
197 198 "repo_id": repo_id,
198 199 "cache_repo_id": cache_repo_id,
199 200 "config": config,
200 201 "repo_state_uid": repo_state_uid,
201 202 "context": self._create_vcs_cache_context(path, repo_state_uid)
202 203 }
203 204
204 205 if with_wire:
205 206 self._wire.update(with_wire)
206 207
207 208 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
208 209 # log.debug brings a few percent gain even if is is not active.
209 210 if log.isEnabledFor(logging.DEBUG):
210 211 self._call_with_logging = True
211 212
212 213 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
213 214
214 215 def _repo_id_sanitizer(self, repo_id):
215 return repo_id.replace('/', '__')
216 return repo_id.replace('/', '__').replace('-', '_')
216 217
217 218 def __getattr__(self, name):
218 219
219 220 if name.startswith('stream:'):
220 221 def repo_remote_attr(*args, **kwargs):
221 222 return self._call_stream(name, *args, **kwargs)
222 223 else:
223 224 def repo_remote_attr(*args, **kwargs):
224 225 return self._call(name, *args, **kwargs)
225 226
226 227 return repo_remote_attr
227 228
228 229 def _base_call(self, name, *args, **kwargs):
229 230 # TODO: oliver: This is currently necessary pre-call since the
230 231 # config object is being changed for hooking scenarios
231 232 wire = copy.deepcopy(self._wire)
232 233 wire["config"] = wire["config"].serialize()
233 234 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
234 235
235 236 payload = {
236 237 'id': str(uuid.uuid4()),
237 238 'method': name,
238 239 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
239 240 }
240 241
241 242 context_uid = wire.get('context')
242 243 return context_uid, payload
243 244
244 245 @exceptions.map_vcs_exceptions
245 246 def _call(self, name, *args, **kwargs):
246 247 context_uid, payload = self._base_call(name, *args, **kwargs)
247 248 url = self.url
248 249
249 250 start = time.time()
250 251
251 252 cache_on = False
252 253 cache_key = ''
253 254 local_cache_on = rhodecode.CONFIG.get('vcs.methods.cache')
254 255
255 256 cache_methods = [
256 257 'branches', 'tags', 'bookmarks',
257 258 'is_large_file', 'is_binary', 'fctx_size', 'node_history', 'blob_raw_length',
258 259 'revision', 'tree_items',
259 260 'ctx_list',
260 261 'bulk_request',
261 262 ]
262 263
263 264 if local_cache_on and name in cache_methods:
264 265 cache_on = True
265 266 repo_state_uid = self._wire['repo_state_uid']
266 267 call_args = [a for a in args]
267 268 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
268 269
269 270 @self._cache_region.conditional_cache_on_arguments(
270 271 namespace=self._cache_namespace, condition=cache_on and cache_key)
271 272 def remote_call(_cache_key):
272 273 if self._call_with_logging:
273 274 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
274 275 url, name, args, context_uid, cache_on)
275 276 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
276 277
277 278 result = remote_call(cache_key)
278 279 if self._call_with_logging:
279 280 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
280 281 url, name, time.time()-start, context_uid)
281 282 return result
282 283
283 284 @exceptions.map_vcs_exceptions
284 285 def _call_stream(self, name, *args, **kwargs):
285 286 context_uid, payload = self._base_call(name, *args, **kwargs)
286 287 payload['chunk_size'] = self.CHUNK_SIZE
287 288 url = self.stream_url
288 289
289 290 start = time.time()
290 291 if self._call_with_logging:
291 292 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s',
292 293 url, name, args, context_uid)
293 294
294 295 result = _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session,
295 296 self.CHUNK_SIZE)
296 297
297 298 if self._call_with_logging:
298 299 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
299 300 url, name, time.time()-start, context_uid)
300 301 return result
301 302
302 303 def __getitem__(self, key):
303 304 return self.revision(key)
304 305
305 306 def _create_vcs_cache_context(self, *args):
306 307 """
307 308 Creates a unique string which is passed to the VCSServer on every
308 309 remote call. It is used as cache key in the VCSServer.
309 310 """
310 311 hash_key = '-'.join(map(str, args))
311 312 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
312 313
313 314 def invalidate_vcs_cache(self):
314 315 """
315 316 This invalidates the context which is sent to the VCSServer on every
316 317 call to a remote method. It forces the VCSServer to create a fresh
317 318 repository instance on the next call to a remote method.
318 319 """
319 320 self._wire['context'] = str(uuid.uuid4())
320 321
321 322
322 323 class VcsHttpProxy(object):
323 324
324 325 CHUNK_SIZE = 16384
325 326
326 327 def __init__(self, server_and_port, backend_endpoint):
327 328 retries = Retry(total=5, connect=None, read=None, redirect=None)
328 329
329 330 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
330 331 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
331 332 self.session = requests.Session()
332 333 self.session.mount('http://', adapter)
333 334
334 335 def handle(self, environment, input_data, *args, **kwargs):
335 336 data = {
336 337 'environment': environment,
337 338 'input_data': input_data,
338 339 'args': args,
339 340 'kwargs': kwargs
340 341 }
341 342 result = self.session.post(
342 343 self.base_url, msgpack.packb(data), stream=True)
343 344 return self._get_result(result)
344 345
345 346 def _deserialize_and_raise(self, error):
346 347 exception = Exception(error['message'])
347 348 try:
348 349 exception._vcs_kind = error['_vcs_kind']
349 350 except KeyError:
350 351 pass
351 352 raise exception
352 353
353 354 def _iterate(self, result):
354 355 unpacker = msgpack.Unpacker()
355 356 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
356 357 unpacker.feed(line)
357 358 for chunk in unpacker:
358 359 yield chunk
359 360
360 361 def _get_result(self, result):
361 362 iterator = self._iterate(result)
362 363 error = iterator.next()
363 364 if error:
364 365 self._deserialize_and_raise(error)
365 366
366 367 status = iterator.next()
367 368 headers = iterator.next()
368 369
369 370 return iterator, status, headers
370 371
371 372
372 373 class ThreadlocalSessionFactory(object):
373 374 """
374 375 Creates one CurlSession per thread on demand.
375 376 """
376 377
377 378 def __init__(self):
378 379 self._thread_local = threading.local()
379 380
380 381 def __call__(self):
381 382 if not hasattr(self._thread_local, 'curl_session'):
382 383 self._thread_local.curl_session = CurlSession()
383 384 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now