##// END OF EJS Templates
vcs: added headers into stream call too
super-admin -
r4888:279ce63e default
parent child Browse files
Show More
@@ -1,408 +1,412 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Client for the VCSServer implemented based on HTTP.
23 23 """
24 24
25 25 import copy
26 26 import logging
27 27 import threading
28 28 import time
29 29 import urllib2
30 30 import urlparse
31 31 import uuid
32 32 import traceback
33 33
34 34 import pycurl
35 35 import msgpack
36 36 import requests
37 37 from requests.packages.urllib3.util.retry import Retry
38 38
39 39 import rhodecode
40 40 from rhodecode.lib import rc_cache
41 41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
42 42 from rhodecode.lib.system_info import get_cert_path
43 43 from rhodecode.lib.vcs import exceptions, CurlSession
44 44 from rhodecode.lib.utils2 import str2bool
45 45
46 46 log = logging.getLogger(__name__)
47 47
48 48
49 49 # TODO: mikhail: Keep it in sync with vcsserver's
50 50 # HTTPApplication.ALLOWED_EXCEPTIONS
51 51 EXCEPTIONS_MAP = {
52 52 'KeyError': KeyError,
53 53 'URLError': urllib2.URLError,
54 54 }
55 55
56 56
57 57 def _remote_call(url, payload, exceptions_map, session):
58 58 try:
59 59 headers = {
60 60 'X-RC-Method': payload.get('method'),
61 61 'X-RC-Repo-Name': payload.get('_repo_name')
62 62 }
63 63 response = session.post(url, data=msgpack.packb(payload), headers=headers)
64 64 except pycurl.error as e:
65 65 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
66 66 raise exceptions.HttpVCSCommunicationError(msg)
67 67 except Exception as e:
68 68 message = getattr(e, 'message', '')
69 69 if 'Failed to connect' in message:
70 70 # gevent doesn't return proper pycurl errors
71 71 raise exceptions.HttpVCSCommunicationError(e)
72 72 else:
73 73 raise
74 74
75 75 if response.status_code >= 400:
76 76 log.error('Call to %s returned non 200 HTTP code: %s',
77 77 url, response.status_code)
78 78 raise exceptions.HttpVCSCommunicationError(repr(response.content))
79 79
80 80 try:
81 81 response = msgpack.unpackb(response.content)
82 82 except Exception:
83 83 log.exception('Failed to decode response %r', response.content)
84 84 raise
85 85
86 86 error = response.get('error')
87 87 if error:
88 88 type_ = error.get('type', 'Exception')
89 89 exc = exceptions_map.get(type_, Exception)
90 90 exc = exc(error.get('message'))
91 91 try:
92 92 exc._vcs_kind = error['_vcs_kind']
93 93 except KeyError:
94 94 pass
95 95
96 96 try:
97 97 exc._vcs_server_traceback = error['traceback']
98 98 exc._vcs_server_org_exc_name = error['org_exc']
99 99 exc._vcs_server_org_exc_tb = error['org_exc_tb']
100 100 except KeyError:
101 101 pass
102 102
103 103 raise exc
104 104 return response.get('result')
105 105
106 106
107 107 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
108 108 try:
109 response = session.post(url, data=msgpack.packb(payload))
109 headers = {
110 'X-RC-Method': payload.get('method'),
111 'X-RC-Repo-Name': payload.get('_repo_name')
112 }
113 response = session.post(url, data=msgpack.packb(payload), headers=headers)
110 114 except pycurl.error as e:
111 115 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
112 116 raise exceptions.HttpVCSCommunicationError(msg)
113 117 except Exception as e:
114 118 message = getattr(e, 'message', '')
115 119 if 'Failed to connect' in message:
116 120 # gevent doesn't return proper pycurl errors
117 121 raise exceptions.HttpVCSCommunicationError(e)
118 122 else:
119 123 raise
120 124
121 125 if response.status_code >= 400:
122 126 log.error('Call to %s returned non 200 HTTP code: %s',
123 127 url, response.status_code)
124 128 raise exceptions.HttpVCSCommunicationError(repr(response.content))
125 129
126 130 return response.iter_content(chunk_size=chunk_size)
127 131
128 132
129 133 class ServiceConnection(object):
130 134 def __init__(self, server_and_port, backend_endpoint, session_factory):
131 135 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
132 136 self._session_factory = session_factory
133 137
134 138 def __getattr__(self, name):
135 139 def f(*args, **kwargs):
136 140 return self._call(name, *args, **kwargs)
137 141 return f
138 142
139 143 @exceptions.map_vcs_exceptions
140 144 def _call(self, name, *args, **kwargs):
141 145 payload = {
142 146 'id': str(uuid.uuid4()),
143 147 'method': name,
144 148 'params': {'args': args, 'kwargs': kwargs}
145 149 }
146 150 return _remote_call(
147 151 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
148 152
149 153
150 154 class RemoteVCSMaker(object):
151 155
152 156 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
153 157 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
154 158 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
155 159
156 160 self._session_factory = session_factory
157 161 self.backend_type = backend_type
158 162
159 163 @classmethod
160 164 def init_cache_region(cls, repo_id):
161 165 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
162 166 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
163 167 return region, cache_namespace_uid
164 168
165 169 def __call__(self, path, repo_id, config, with_wire=None):
166 170 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
167 171 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
168 172
169 173 def __getattr__(self, name):
170 174 def remote_attr(*args, **kwargs):
171 175 return self._call(name, *args, **kwargs)
172 176 return remote_attr
173 177
174 178 @exceptions.map_vcs_exceptions
175 179 def _call(self, func_name, *args, **kwargs):
176 180 payload = {
177 181 'id': str(uuid.uuid4()),
178 182 'method': func_name,
179 183 'backend': self.backend_type,
180 184 'params': {'args': args, 'kwargs': kwargs}
181 185 }
182 186 url = self.url
183 187 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
184 188
185 189
186 190 class RemoteRepo(object):
187 191 CHUNK_SIZE = 16384
188 192
189 193 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
190 194 self.url = remote_maker.url
191 195 self.stream_url = remote_maker.stream_url
192 196 self._session = remote_maker._session_factory()
193 197
194 198 cache_repo_id = self._repo_id_sanitizer(repo_id)
195 199 _repo_name = self._get_repo_name(config, path)
196 200 self._cache_region, self._cache_namespace = \
197 201 remote_maker.init_cache_region(cache_repo_id)
198 202
199 203 with_wire = with_wire or {}
200 204
201 205 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
202 206
203 207 self._wire = {
204 208 "_repo_name": _repo_name,
205 209 "path": path, # repo path
206 210 "repo_id": repo_id,
207 211 "cache_repo_id": cache_repo_id,
208 212 "config": config,
209 213 "repo_state_uid": repo_state_uid,
210 214 "context": self._create_vcs_cache_context(path, repo_state_uid)
211 215 }
212 216
213 217 if with_wire:
214 218 self._wire.update(with_wire)
215 219
216 220 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
217 221 # log.debug brings a few percent gain even if is is not active.
218 222 if log.isEnabledFor(logging.DEBUG):
219 223 self._call_with_logging = True
220 224
221 225 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
222 226
223 227 def _get_repo_name(self, config, path):
224 228 repo_store = config.get('paths', '/')
225 229 return path.split(repo_store)[-1].lstrip('/')
226 230
227 231 def _repo_id_sanitizer(self, repo_id):
228 232 pathless = repo_id.replace('/', '__').replace('-', '_')
229 233 return ''.join(char if ord(char) < 128 else '_{}_'.format(ord(char)) for char in pathless)
230 234
231 235 def __getattr__(self, name):
232 236
233 237 if name.startswith('stream:'):
234 238 def repo_remote_attr(*args, **kwargs):
235 239 return self._call_stream(name, *args, **kwargs)
236 240 else:
237 241 def repo_remote_attr(*args, **kwargs):
238 242 return self._call(name, *args, **kwargs)
239 243
240 244 return repo_remote_attr
241 245
242 246 def _base_call(self, name, *args, **kwargs):
243 247 # TODO: oliver: This is currently necessary pre-call since the
244 248 # config object is being changed for hooking scenarios
245 249 wire = copy.deepcopy(self._wire)
246 250 wire["config"] = wire["config"].serialize()
247 251 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
248 252
249 253 payload = {
250 254 'id': str(uuid.uuid4()),
251 255 'method': name,
252 256 "_repo_name": wire['_repo_name'],
253 257 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
254 258 }
255 259
256 260 context_uid = wire.get('context')
257 261 return context_uid, payload
258 262
259 263 def get_local_cache(self, name, args):
260 264 cache_on = False
261 265 cache_key = ''
262 266 local_cache_on = str2bool(rhodecode.CONFIG.get('vcs.methods.cache'))
263 267
264 268 cache_methods = [
265 269 'branches', 'tags', 'bookmarks',
266 270 'is_large_file', 'is_binary',
267 271 'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
268 272 'node_history',
269 273 'revision', 'tree_items',
270 274 'ctx_list', 'ctx_branch', 'ctx_description',
271 275 'bulk_request',
272 276 'assert_correct_path'
273 277 ]
274 278
275 279 if local_cache_on and name in cache_methods:
276 280 cache_on = True
277 281 repo_state_uid = self._wire['repo_state_uid']
278 282 call_args = [a for a in args]
279 283 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
280 284
281 285 return cache_on, cache_key
282 286
283 287 @exceptions.map_vcs_exceptions
284 288 def _call(self, name, *args, **kwargs):
285 289 context_uid, payload = self._base_call(name, *args, **kwargs)
286 290 url = self.url
287 291
288 292 start = time.time()
289 293 cache_on, cache_key = self.get_local_cache(name, args)
290 294
291 295 @self._cache_region.conditional_cache_on_arguments(
292 296 namespace=self._cache_namespace, condition=cache_on and cache_key)
293 297 def remote_call(_cache_key):
294 298 if self._call_with_logging:
295 299 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
296 300 url, name, args, context_uid, cache_on)
297 301 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
298 302
299 303 result = remote_call(cache_key)
300 304 if self._call_with_logging:
301 305 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
302 306 url, name, time.time()-start, context_uid)
303 307 return result
304 308
305 309 @exceptions.map_vcs_exceptions
306 310 def _call_stream(self, name, *args, **kwargs):
307 311 context_uid, payload = self._base_call(name, *args, **kwargs)
308 312 payload['chunk_size'] = self.CHUNK_SIZE
309 313 url = self.stream_url
310 314
311 315 start = time.time()
312 316 cache_on, cache_key = self.get_local_cache(name, args)
313 317
314 318 # Cache is a problem because this is a stream
315 319 def streaming_remote_call(_cache_key):
316 320 if self._call_with_logging:
317 321 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
318 322 url, name, args, context_uid, cache_on)
319 323 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
320 324
321 325 result = streaming_remote_call(cache_key)
322 326 if self._call_with_logging:
323 327 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
324 328 url, name, time.time()-start, context_uid)
325 329 return result
326 330
327 331 def __getitem__(self, key):
328 332 return self.revision(key)
329 333
330 334 def _create_vcs_cache_context(self, *args):
331 335 """
332 336 Creates a unique string which is passed to the VCSServer on every
333 337 remote call. It is used as cache key in the VCSServer.
334 338 """
335 339 hash_key = '-'.join(map(str, args))
336 340 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
337 341
338 342 def invalidate_vcs_cache(self):
339 343 """
340 344 This invalidates the context which is sent to the VCSServer on every
341 345 call to a remote method. It forces the VCSServer to create a fresh
342 346 repository instance on the next call to a remote method.
343 347 """
344 348 self._wire['context'] = str(uuid.uuid4())
345 349
346 350
347 351 class VcsHttpProxy(object):
348 352
349 353 CHUNK_SIZE = 16384
350 354
351 355 def __init__(self, server_and_port, backend_endpoint):
352 356 retries = Retry(total=5, connect=None, read=None, redirect=None)
353 357
354 358 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
355 359 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
356 360 self.session = requests.Session()
357 361 self.session.mount('http://', adapter)
358 362
359 363 def handle(self, environment, input_data, *args, **kwargs):
360 364 data = {
361 365 'environment': environment,
362 366 'input_data': input_data,
363 367 'args': args,
364 368 'kwargs': kwargs
365 369 }
366 370 result = self.session.post(
367 371 self.base_url, msgpack.packb(data), stream=True)
368 372 return self._get_result(result)
369 373
370 374 def _deserialize_and_raise(self, error):
371 375 exception = Exception(error['message'])
372 376 try:
373 377 exception._vcs_kind = error['_vcs_kind']
374 378 except KeyError:
375 379 pass
376 380 raise exception
377 381
378 382 def _iterate(self, result):
379 383 unpacker = msgpack.Unpacker()
380 384 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
381 385 unpacker.feed(line)
382 386 for chunk in unpacker:
383 387 yield chunk
384 388
385 389 def _get_result(self, result):
386 390 iterator = self._iterate(result)
387 391 error = iterator.next()
388 392 if error:
389 393 self._deserialize_and_raise(error)
390 394
391 395 status = iterator.next()
392 396 headers = iterator.next()
393 397
394 398 return iterator, status, headers
395 399
396 400
397 401 class ThreadlocalSessionFactory(object):
398 402 """
399 403 Creates one CurlSession per thread on demand.
400 404 """
401 405
402 406 def __init__(self):
403 407 self._thread_local = threading.local()
404 408
405 409 def __call__(self):
406 410 if not hasattr(self._thread_local, 'curl_session'):
407 411 self._thread_local.curl_session = CurlSession()
408 412 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now