##// END OF EJS Templates
http_client: some reformatting
super-admin -
r4790:eadea78d default
parent child Browse files
Show More
@@ -1,384 +1,394 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Client for the VCSServer implemented based on HTTP.
23 23 """
24 24
25 25 import copy
26 26 import logging
27 27 import threading
28 28 import time
29 29 import urllib2
30 30 import urlparse
31 31 import uuid
32 32 import traceback
33 33
34 34 import pycurl
35 35 import msgpack
36 36 import requests
37 37 from requests.packages.urllib3.util.retry import Retry
38 38
39 39 import rhodecode
40 40 from rhodecode.lib import rc_cache
41 41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
42 42 from rhodecode.lib.system_info import get_cert_path
43 43 from rhodecode.lib.vcs import exceptions, CurlSession
44 from rhodecode.lib.utils2 import str2bool
44 45
45 46 log = logging.getLogger(__name__)
46 47
47 48
48 49 # TODO: mikhail: Keep it in sync with vcsserver's
49 50 # HTTPApplication.ALLOWED_EXCEPTIONS
50 51 EXCEPTIONS_MAP = {
51 52 'KeyError': KeyError,
52 53 'URLError': urllib2.URLError,
53 54 }
54 55
55 56
56 57 def _remote_call(url, payload, exceptions_map, session):
57 58 try:
58 59 response = session.post(url, data=msgpack.packb(payload))
59 60 except pycurl.error as e:
60 61 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
61 62 raise exceptions.HttpVCSCommunicationError(msg)
62 63 except Exception as e:
63 64 message = getattr(e, 'message', '')
64 65 if 'Failed to connect' in message:
65 66 # gevent doesn't return proper pycurl errors
66 67 raise exceptions.HttpVCSCommunicationError(e)
67 68 else:
68 69 raise
69 70
70 71 if response.status_code >= 400:
71 72 log.error('Call to %s returned non 200 HTTP code: %s',
72 73 url, response.status_code)
73 74 raise exceptions.HttpVCSCommunicationError(repr(response.content))
74 75
75 76 try:
76 77 response = msgpack.unpackb(response.content)
77 78 except Exception:
78 79 log.exception('Failed to decode response %r', response.content)
79 80 raise
80 81
81 82 error = response.get('error')
82 83 if error:
83 84 type_ = error.get('type', 'Exception')
84 85 exc = exceptions_map.get(type_, Exception)
85 86 exc = exc(error.get('message'))
86 87 try:
87 88 exc._vcs_kind = error['_vcs_kind']
88 89 except KeyError:
89 90 pass
90 91
91 92 try:
92 93 exc._vcs_server_traceback = error['traceback']
93 94 exc._vcs_server_org_exc_name = error['org_exc']
94 95 exc._vcs_server_org_exc_tb = error['org_exc_tb']
95 96 except KeyError:
96 97 pass
97 98
98 99 raise exc
99 100 return response.get('result')
100 101
101 102
102 103 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
103 104 try:
104 105 response = session.post(url, data=msgpack.packb(payload))
105 106 except pycurl.error as e:
106 107 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
107 108 raise exceptions.HttpVCSCommunicationError(msg)
108 109 except Exception as e:
109 110 message = getattr(e, 'message', '')
110 111 if 'Failed to connect' in message:
111 112 # gevent doesn't return proper pycurl errors
112 113 raise exceptions.HttpVCSCommunicationError(e)
113 114 else:
114 115 raise
115 116
116 117 if response.status_code >= 400:
117 118 log.error('Call to %s returned non 200 HTTP code: %s',
118 119 url, response.status_code)
119 120 raise exceptions.HttpVCSCommunicationError(repr(response.content))
120 121
121 122 return response.iter_content(chunk_size=chunk_size)
122 123
123 124
124 125 class ServiceConnection(object):
125 126 def __init__(self, server_and_port, backend_endpoint, session_factory):
126 127 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
127 128 self._session_factory = session_factory
128 129
129 130 def __getattr__(self, name):
130 131 def f(*args, **kwargs):
131 132 return self._call(name, *args, **kwargs)
132 133 return f
133 134
134 135 @exceptions.map_vcs_exceptions
135 136 def _call(self, name, *args, **kwargs):
136 137 payload = {
137 138 'id': str(uuid.uuid4()),
138 139 'method': name,
139 140 'params': {'args': args, 'kwargs': kwargs}
140 141 }
141 142 return _remote_call(
142 143 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
143 144
144 145
145 146 class RemoteVCSMaker(object):
146 147
147 148 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
148 149 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
149 150 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
150 151
151 152 self._session_factory = session_factory
152 153 self.backend_type = backend_type
153 154
154 155 @classmethod
155 156 def init_cache_region(cls, repo_id):
156 157 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
157 158 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
158 159 return region, cache_namespace_uid
159 160
160 161 def __call__(self, path, repo_id, config, with_wire=None):
161 162 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
162 163 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
163 164
164 165 def __getattr__(self, name):
165 166 def remote_attr(*args, **kwargs):
166 167 return self._call(name, *args, **kwargs)
167 168 return remote_attr
168 169
169 170 @exceptions.map_vcs_exceptions
170 171 def _call(self, func_name, *args, **kwargs):
171 172 payload = {
172 173 'id': str(uuid.uuid4()),
173 174 'method': func_name,
174 175 'backend': self.backend_type,
175 176 'params': {'args': args, 'kwargs': kwargs}
176 177 }
177 178 url = self.url
178 179 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
179 180
180 181
181 182 class RemoteRepo(object):
182 183 CHUNK_SIZE = 16384
183 184
184 185 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
185 186 self.url = remote_maker.url
186 187 self.stream_url = remote_maker.stream_url
187 188 self._session = remote_maker._session_factory()
188 189
189 190 cache_repo_id = self._repo_id_sanitizer(repo_id)
190 191 self._cache_region, self._cache_namespace = \
191 192 remote_maker.init_cache_region(cache_repo_id)
192 193
193 194 with_wire = with_wire or {}
194 195
195 196 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
196 197 self._wire = {
197 198 "path": path, # repo path
198 199 "repo_id": repo_id,
199 200 "cache_repo_id": cache_repo_id,
200 201 "config": config,
201 202 "repo_state_uid": repo_state_uid,
202 203 "context": self._create_vcs_cache_context(path, repo_state_uid)
203 204 }
204 205
205 206 if with_wire:
206 207 self._wire.update(with_wire)
207 208
208 209 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
209 210 # log.debug brings a few percent gain even if is is not active.
210 211 if log.isEnabledFor(logging.DEBUG):
211 212 self._call_with_logging = True
212 213
213 214 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
214 215
215 216 def _repo_id_sanitizer(self, repo_id):
216 217 return repo_id.replace('/', '__').replace('-', '_')
217 218
218 219 def __getattr__(self, name):
219 220
220 221 if name.startswith('stream:'):
221 222 def repo_remote_attr(*args, **kwargs):
222 223 return self._call_stream(name, *args, **kwargs)
223 224 else:
224 225 def repo_remote_attr(*args, **kwargs):
225 226 return self._call(name, *args, **kwargs)
226 227
227 228 return repo_remote_attr
228 229
229 230 def _base_call(self, name, *args, **kwargs):
230 231 # TODO: oliver: This is currently necessary pre-call since the
231 232 # config object is being changed for hooking scenarios
232 233 wire = copy.deepcopy(self._wire)
233 234 wire["config"] = wire["config"].serialize()
234 235 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
235 236
236 237 payload = {
237 238 'id': str(uuid.uuid4()),
238 239 'method': name,
239 240 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
240 241 }
241 242
242 243 context_uid = wire.get('context')
243 244 return context_uid, payload
244 245
245 @exceptions.map_vcs_exceptions
246 def _call(self, name, *args, **kwargs):
247 context_uid, payload = self._base_call(name, *args, **kwargs)
248 url = self.url
249
250 start = time.time()
251
246 def get_local_cache(self, name, args):
252 247 cache_on = False
253 248 cache_key = ''
254 local_cache_on = rhodecode.CONFIG.get('vcs.methods.cache')
249 local_cache_on = str2bool(rhodecode.CONFIG.get('vcs.methods.cache'))
255 250
256 251 cache_methods = [
257 252 'branches', 'tags', 'bookmarks',
258 'is_large_file', 'is_binary', 'fctx_size', 'node_history', 'blob_raw_length',
253 'is_large_file', 'is_binary',
254 'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
255 'node_history',
259 256 'revision', 'tree_items',
260 257 'ctx_list',
261 258 'bulk_request',
262 259 ]
263 260
264 261 if local_cache_on and name in cache_methods:
265 262 cache_on = True
266 263 repo_state_uid = self._wire['repo_state_uid']
267 264 call_args = [a for a in args]
268 265 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
269 266
267 return cache_on, cache_key
268
269 @exceptions.map_vcs_exceptions
270 def _call(self, name, *args, **kwargs):
271 context_uid, payload = self._base_call(name, *args, **kwargs)
272 url = self.url
273
274 start = time.time()
275 cache_on, cache_key = self.get_local_cache(name, args)
276
270 277 @self._cache_region.conditional_cache_on_arguments(
271 278 namespace=self._cache_namespace, condition=cache_on and cache_key)
272 279 def remote_call(_cache_key):
273 280 if self._call_with_logging:
274 281 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
275 282 url, name, args, context_uid, cache_on)
276 283 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
277 284
278 285 result = remote_call(cache_key)
279 286 if self._call_with_logging:
280 287 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
281 288 url, name, time.time()-start, context_uid)
282 289 return result
283 290
284 291 @exceptions.map_vcs_exceptions
285 292 def _call_stream(self, name, *args, **kwargs):
286 293 context_uid, payload = self._base_call(name, *args, **kwargs)
287 294 payload['chunk_size'] = self.CHUNK_SIZE
288 295 url = self.stream_url
289 296
290 297 start = time.time()
291 if self._call_with_logging:
292 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s',
293 url, name, args, context_uid)
298 cache_on, cache_key = self.get_local_cache(name, args)
294 299
295 result = _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session,
296 self.CHUNK_SIZE)
300 # Cache is a problem because this is a stream
301 def streaming_remote_call(_cache_key):
302 if self._call_with_logging:
303 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
304 url, name, args, context_uid, cache_on)
305 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
297 306
307 result = streaming_remote_call(cache_key)
298 308 if self._call_with_logging:
299 309 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
300 310 url, name, time.time()-start, context_uid)
301 311 return result
302 312
303 313 def __getitem__(self, key):
304 314 return self.revision(key)
305 315
306 316 def _create_vcs_cache_context(self, *args):
307 317 """
308 318 Creates a unique string which is passed to the VCSServer on every
309 319 remote call. It is used as cache key in the VCSServer.
310 320 """
311 321 hash_key = '-'.join(map(str, args))
312 322 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
313 323
314 324 def invalidate_vcs_cache(self):
315 325 """
316 326 This invalidates the context which is sent to the VCSServer on every
317 327 call to a remote method. It forces the VCSServer to create a fresh
318 328 repository instance on the next call to a remote method.
319 329 """
320 330 self._wire['context'] = str(uuid.uuid4())
321 331
322 332
323 333 class VcsHttpProxy(object):
324 334
325 335 CHUNK_SIZE = 16384
326 336
327 337 def __init__(self, server_and_port, backend_endpoint):
328 338 retries = Retry(total=5, connect=None, read=None, redirect=None)
329 339
330 340 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
331 341 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
332 342 self.session = requests.Session()
333 343 self.session.mount('http://', adapter)
334 344
335 345 def handle(self, environment, input_data, *args, **kwargs):
336 346 data = {
337 347 'environment': environment,
338 348 'input_data': input_data,
339 349 'args': args,
340 350 'kwargs': kwargs
341 351 }
342 352 result = self.session.post(
343 353 self.base_url, msgpack.packb(data), stream=True)
344 354 return self._get_result(result)
345 355
346 356 def _deserialize_and_raise(self, error):
347 357 exception = Exception(error['message'])
348 358 try:
349 359 exception._vcs_kind = error['_vcs_kind']
350 360 except KeyError:
351 361 pass
352 362 raise exception
353 363
354 364 def _iterate(self, result):
355 365 unpacker = msgpack.Unpacker()
356 366 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
357 367 unpacker.feed(line)
358 368 for chunk in unpacker:
359 369 yield chunk
360 370
361 371 def _get_result(self, result):
362 372 iterator = self._iterate(result)
363 373 error = iterator.next()
364 374 if error:
365 375 self._deserialize_and_raise(error)
366 376
367 377 status = iterator.next()
368 378 headers = iterator.next()
369 379
370 380 return iterator, status, headers
371 381
372 382
373 383 class ThreadlocalSessionFactory(object):
374 384 """
375 385 Creates one CurlSession per thread on demand.
376 386 """
377 387
378 388 def __init__(self):
379 389 self._thread_local = threading.local()
380 390
381 391 def __call__(self):
382 392 if not hasattr(self._thread_local, 'curl_session'):
383 393 self._thread_local.curl_session = CurlSession()
384 394 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now