##// END OF EJS Templates
chore(http-client): small refactor and use fstrings
super-admin -
r5260:e241ac2a default
parent child Browse files
Show More
@@ -1,429 +1,429 b''
1 # Copyright (C) 2016-2023 RhodeCode GmbH
1 # Copyright (C) 2016-2023 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 """
19 """
20 Client for the VCSServer implemented based on HTTP.
20 Client for the VCSServer implemented based on HTTP.
21 """
21 """
22
22
23 import copy
23 import copy
24 import logging
24 import logging
25 import threading
25 import threading
26 import time
26 import time
27 import urllib.request
27 import urllib.request
28 import urllib.error
28 import urllib.error
29 import urllib.parse
29 import urllib.parse
30 import urllib.parse
30 import urllib.parse
31 import uuid
31 import uuid
32 import traceback
32 import traceback
33
33
34 import pycurl
34 import pycurl
35 import msgpack
35 import msgpack
36 import requests
36 import requests
37 from requests.packages.urllib3.util.retry import Retry
37 from requests.packages.urllib3.util.retry import Retry
38
38
39 import rhodecode
39 import rhodecode
40 from rhodecode.lib import rc_cache
40 from rhodecode.lib import rc_cache
41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
42 from rhodecode.lib.system_info import get_cert_path
42 from rhodecode.lib.system_info import get_cert_path
43 from rhodecode.lib.vcs import exceptions, CurlSession
43 from rhodecode.lib.vcs import exceptions, CurlSession
44 from rhodecode.lib.utils2 import str2bool
44 from rhodecode.lib.utils2 import str2bool
45
45
46 log = logging.getLogger(__name__)
46 log = logging.getLogger(__name__)
47
47
48
48
49 # TODO: mikhail: Keep it in sync with vcsserver's
49 # TODO: mikhail: Keep it in sync with vcsserver's
50 # HTTPApplication.ALLOWED_EXCEPTIONS
50 # HTTPApplication.ALLOWED_EXCEPTIONS
51 EXCEPTIONS_MAP = {
51 EXCEPTIONS_MAP = {
52 'KeyError': KeyError,
52 'KeyError': KeyError,
53 'URLError': urllib.error.URLError,
53 'URLError': urllib.error.URLError,
54 }
54 }
55
55
56
56
57 def _remote_call(url, payload, exceptions_map, session, retries=3):
57 def _remote_call(url, payload, exceptions_map, session, retries=3):
58
58
59 for attempt in range(retries):
59 for attempt in range(retries):
60 try:
60 try:
61 response = session.post(url, data=msgpack.packb(payload))
61 response = session.post(url, data=msgpack.packb(payload))
62 break
62 break
63 except pycurl.error as e:
63 except pycurl.error as e:
64 error_code, error_message = e.args
64 error_code, error_message = e.args
65 if error_code == pycurl.E_RECV_ERROR:
65 if error_code == pycurl.E_RECV_ERROR:
66 log.warning(f'Received a "Connection reset by peer" error. '
66 log.warning(f'Received a "Connection reset by peer" error. '
67 f'Retrying... ({attempt + 1}/{retries})')
67 f'Retrying... ({attempt + 1}/{retries})')
68 continue # Retry if connection reset error.
68 continue # Retry if connection reset error.
69 msg = f'{e}. \npycurl traceback: {traceback.format_exc()}'
69 msg = f'{e}. \npycurl traceback: {traceback.format_exc()}'
70 raise exceptions.HttpVCSCommunicationError(msg)
70 raise exceptions.HttpVCSCommunicationError(msg)
71 except Exception as e:
71 except Exception as e:
72 message = getattr(e, 'message', '')
72 message = getattr(e, 'message', '')
73 if 'Failed to connect' in message:
73 if 'Failed to connect' in message:
74 # gevent doesn't return proper pycurl errors
74 # gevent doesn't return proper pycurl errors
75 raise exceptions.HttpVCSCommunicationError(e)
75 raise exceptions.HttpVCSCommunicationError(e)
76 else:
76 else:
77 raise
77 raise
78
78
79 if response.status_code >= 400:
79 if response.status_code >= 400:
80 content_type = response.content_type
80 content_type = response.content_type
81 log.error('Call to %s returned non 200 HTTP code: %s [%s]',
81 log.error('Call to %s returned non 200 HTTP code: %s [%s]',
82 url, response.status_code, content_type)
82 url, response.status_code, content_type)
83 raise exceptions.HttpVCSCommunicationError(repr(response.content))
83 raise exceptions.HttpVCSCommunicationError(repr(response.content))
84
84
85 try:
85 try:
86 response = msgpack.unpackb(response.content)
86 response = msgpack.unpackb(response.content)
87 except Exception:
87 except Exception:
88 log.exception('Failed to decode response from msgpack')
88 log.exception('Failed to decode response from msgpack')
89 raise
89 raise
90
90
91 error = response.get('error')
91 error = response.get('error')
92 if error:
92 if error:
93 type_ = error.get('type', 'Exception')
93 type_ = error.get('type', 'Exception')
94 exc = exceptions_map.get(type_, Exception)
94 exc = exceptions_map.get(type_, Exception)
95 exc = exc(error.get('message'))
95 exc = exc(error.get('message'))
96 try:
96 try:
97 exc._vcs_kind = error['_vcs_kind']
97 exc._vcs_kind = error['_vcs_kind']
98 except KeyError:
98 except KeyError:
99 pass
99 pass
100
100
101 try:
101 try:
102 exc._vcs_server_traceback = error['traceback']
102 exc._vcs_server_traceback = error['traceback']
103 exc._vcs_server_org_exc_name = error['org_exc']
103 exc._vcs_server_org_exc_name = error['org_exc']
104 exc._vcs_server_org_exc_tb = error['org_exc_tb']
104 exc._vcs_server_org_exc_tb = error['org_exc_tb']
105 except KeyError:
105 except KeyError:
106 pass
106 pass
107
107
108 exc.add_note(attach_exc_details(error))
108 exc.add_note(attach_exc_details(error))
109 raise exc # raising the org exception from vcsserver
109 raise exc # raising the org exception from vcsserver
110 return response.get('result')
110 return response.get('result')
111
111
112
112
113 def attach_exc_details(error):
113 def attach_exc_details(error):
114 note = '-- EXC NOTE -- :\n'
114 note = '-- EXC NOTE -- :\n'
115 note += f'vcs_kind: {error.get("_vcs_kind")}\n'
115 note += f'vcs_kind: {error.get("_vcs_kind")}\n'
116 note += f'org_exc: {error.get("_vcs_kind")}\n'
116 note += f'org_exc: {error.get("_vcs_kind")}\n'
117 note += f'tb: {error.get("traceback")}\n'
117 note += f'tb: {error.get("traceback")}\n'
118 note += '-- END EXC NOTE --'
118 note += '-- END EXC NOTE --'
119 return note
119 return note
120
120
121
121
122 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
122 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
123 try:
123 try:
124 headers = {
124 headers = {
125 'X-RC-Method': payload.get('method'),
125 'X-RC-Method': payload.get('method'),
126 'X-RC-Repo-Name': payload.get('_repo_name')
126 'X-RC-Repo-Name': payload.get('_repo_name')
127 }
127 }
128 response = session.post(url, data=msgpack.packb(payload), headers=headers)
128 response = session.post(url, data=msgpack.packb(payload), headers=headers)
129 except pycurl.error as e:
129 except pycurl.error as e:
130 error_code, error_message = e.args
130 error_code, error_message = e.args
131 msg = f'{e}. \npycurl traceback: {traceback.format_exc()}'
131 msg = f'{e}. \npycurl traceback: {traceback.format_exc()}'
132 raise exceptions.HttpVCSCommunicationError(msg)
132 raise exceptions.HttpVCSCommunicationError(msg)
133 except Exception as e:
133 except Exception as e:
134 message = getattr(e, 'message', '')
134 message = getattr(e, 'message', '')
135 if 'Failed to connect' in message:
135 if 'Failed to connect' in message:
136 # gevent doesn't return proper pycurl errors
136 # gevent doesn't return proper pycurl errors
137 raise exceptions.HttpVCSCommunicationError(e)
137 raise exceptions.HttpVCSCommunicationError(e)
138 else:
138 else:
139 raise
139 raise
140
140
141 if response.status_code >= 400:
141 if response.status_code >= 400:
142 log.error('Call to %s returned non 200 HTTP code: %s',
142 log.error('Call to %s returned non 200 HTTP code: %s',
143 url, response.status_code)
143 url, response.status_code)
144 raise exceptions.HttpVCSCommunicationError(repr(response.content))
144 raise exceptions.HttpVCSCommunicationError(repr(response.content))
145
145
146 return response.iter_content(chunk_size=chunk_size)
146 return response.iter_content(chunk_size=chunk_size)
147
147
148
148
149 class ServiceConnection(object):
149 class ServiceConnection(object):
150 def __init__(self, server_and_port, backend_endpoint, session_factory):
150 def __init__(self, server_and_port, backend_endpoint, session_factory):
151 self.url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
151 self.url = urllib.parse.urljoin(f'http://{server_and_port}', backend_endpoint)
152 self._session_factory = session_factory
152 self._session_factory = session_factory
153
153
154 def __getattr__(self, name):
154 def __getattr__(self, name):
155 def f(*args, **kwargs):
155 def f(*args, **kwargs):
156 return self._call(name, *args, **kwargs)
156 return self._call(name, *args, **kwargs)
157 return f
157 return f
158
158
159 @exceptions.map_vcs_exceptions
159 @exceptions.map_vcs_exceptions
160 def _call(self, name, *args, **kwargs):
160 def _call(self, name, *args, **kwargs):
161 payload = {
161 payload = {
162 'id': str(uuid.uuid4()),
162 'id': str(uuid.uuid4()),
163 'method': name,
163 'method': name,
164 'params': {'args': args, 'kwargs': kwargs}
164 'params': {'args': args, 'kwargs': kwargs}
165 }
165 }
166 return _remote_call(
166 return _remote_call(
167 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
167 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
168
168
169
169
170 class RemoteVCSMaker(object):
170 class RemoteVCSMaker(object):
171
171
172 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
172 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
173 self.url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
173 self.url = urllib.parse.urljoin(f'http://{server_and_port}', backend_endpoint)
174 self.stream_url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
174 self.stream_url = urllib.parse.urljoin(f'http://{server_and_port}', backend_endpoint+'/stream')
175
175
176 self._session_factory = session_factory
176 self._session_factory = session_factory
177 self.backend_type = backend_type
177 self.backend_type = backend_type
178
178
179 @classmethod
179 @classmethod
180 def init_cache_region(cls, repo_id):
180 def init_cache_region(cls, repo_id):
181 cache_namespace_uid = f'repo.{repo_id}'
181 cache_namespace_uid = f'repo.{repo_id}'
182 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
182 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
183 return region, cache_namespace_uid
183 return region, cache_namespace_uid
184
184
185 def __call__(self, path, repo_id, config, with_wire=None):
185 def __call__(self, path, repo_id, config, with_wire=None):
186 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
186 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
187 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
187 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
188
188
189 def __getattr__(self, name):
189 def __getattr__(self, name):
190 def remote_attr(*args, **kwargs):
190 def remote_attr(*args, **kwargs):
191 return self._call(name, *args, **kwargs)
191 return self._call(name, *args, **kwargs)
192 return remote_attr
192 return remote_attr
193
193
194 @exceptions.map_vcs_exceptions
194 @exceptions.map_vcs_exceptions
195 def _call(self, func_name, *args, **kwargs):
195 def _call(self, func_name, *args, **kwargs):
196 payload = {
196 payload = {
197 'id': str(uuid.uuid4()),
197 'id': str(uuid.uuid4()),
198 'method': func_name,
198 'method': func_name,
199 'backend': self.backend_type,
199 'backend': self.backend_type,
200 'params': {'args': args, 'kwargs': kwargs}
200 'params': {'args': args, 'kwargs': kwargs}
201 }
201 }
202 url = self.url
202 url = self.url
203 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
203 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
204
204
205
205
206 class RemoteRepo(object):
206 class RemoteRepo(object):
207 CHUNK_SIZE = 16384
207 CHUNK_SIZE = 16384
208
208
209 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
209 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
210 self.url = remote_maker.url
210 self.url = remote_maker.url
211 self.stream_url = remote_maker.stream_url
211 self.stream_url = remote_maker.stream_url
212 self._session = remote_maker._session_factory()
212 self._session = remote_maker._session_factory()
213
213
214 cache_repo_id = self._repo_id_sanitizer(repo_id)
214 cache_repo_id = self._repo_id_sanitizer(repo_id)
215 _repo_name = self._get_repo_name(config, path)
215 _repo_name = self._get_repo_name(config, path)
216 self._cache_region, self._cache_namespace = \
216 self._cache_region, self._cache_namespace = \
217 remote_maker.init_cache_region(cache_repo_id)
217 remote_maker.init_cache_region(cache_repo_id)
218
218
219 with_wire = with_wire or {}
219 with_wire = with_wire or {}
220
220
221 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
221 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
222
222
223 self._wire = {
223 self._wire = {
224 "_repo_name": _repo_name,
224 "_repo_name": _repo_name,
225 "path": path, # repo path
225 "path": path, # repo path
226 "repo_id": repo_id,
226 "repo_id": repo_id,
227 "cache_repo_id": cache_repo_id,
227 "cache_repo_id": cache_repo_id,
228 "config": config,
228 "config": config,
229 "repo_state_uid": repo_state_uid,
229 "repo_state_uid": repo_state_uid,
230 "context": self._create_vcs_cache_context(path, repo_state_uid)
230 "context": self._create_vcs_cache_context(path, repo_state_uid)
231 }
231 }
232
232
233 if with_wire:
233 if with_wire:
234 self._wire.update(with_wire)
234 self._wire.update(with_wire)
235
235
236 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
236 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
237 # log.debug brings a few percent gain even if is is not active.
237 # log.debug brings a few percent gain even if is is not active.
238 if log.isEnabledFor(logging.DEBUG):
238 if log.isEnabledFor(logging.DEBUG):
239 self._call_with_logging = True
239 self._call_with_logging = True
240
240
241 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
241 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
242
242
243 def _get_repo_name(self, config, path):
243 def _get_repo_name(self, config, path):
244 repo_store = config.get('paths', '/')
244 repo_store = config.get('paths', '/')
245 return path.split(repo_store)[-1].lstrip('/')
245 return path.split(repo_store)[-1].lstrip('/')
246
246
247 def _repo_id_sanitizer(self, repo_id):
247 def _repo_id_sanitizer(self, repo_id):
248 pathless = repo_id.replace('/', '__').replace('-', '_')
248 pathless = repo_id.replace('/', '__').replace('-', '_')
249 return ''.join(char if ord(char) < 128 else '_{}_'.format(ord(char)) for char in pathless)
249 return ''.join(char if ord(char) < 128 else '_{}_'.format(ord(char)) for char in pathless)
250
250
251 def __getattr__(self, name):
251 def __getattr__(self, name):
252
252
253 if name.startswith('stream:'):
253 if name.startswith('stream:'):
254 def repo_remote_attr(*args, **kwargs):
254 def repo_remote_attr(*args, **kwargs):
255 return self._call_stream(name, *args, **kwargs)
255 return self._call_stream(name, *args, **kwargs)
256 else:
256 else:
257 def repo_remote_attr(*args, **kwargs):
257 def repo_remote_attr(*args, **kwargs):
258 return self._call(name, *args, **kwargs)
258 return self._call(name, *args, **kwargs)
259
259
260 return repo_remote_attr
260 return repo_remote_attr
261
261
262 def _base_call(self, name, *args, **kwargs):
262 def _base_call(self, name, *args, **kwargs):
263 # TODO: oliver: This is currently necessary pre-call since the
263 # TODO: oliver: This is currently necessary pre-call since the
264 # config object is being changed for hooking scenarios
264 # config object is being changed for hooking scenarios
265 wire = copy.deepcopy(self._wire)
265 wire = copy.deepcopy(self._wire)
266 wire["config"] = wire["config"].serialize()
266 wire["config"] = wire["config"].serialize()
267 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
267 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
268
268
269 payload = {
269 payload = {
270 'id': str(uuid.uuid4()),
270 'id': str(uuid.uuid4()),
271 'method': name,
271 'method': name,
272 "_repo_name": wire['_repo_name'],
272 "_repo_name": wire['_repo_name'],
273 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
273 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
274 }
274 }
275
275
276 context_uid = wire.get('context')
276 context_uid = wire.get('context')
277 return context_uid, payload
277 return context_uid, payload
278
278
279 def get_local_cache(self, name, args):
279 def get_local_cache(self, name, args):
280 cache_on = False
280 cache_on = False
281 cache_key = ''
281 cache_key = ''
282 local_cache_on = rhodecode.ConfigGet().get_bool('vcs.methods.cache')
282 local_cache_on = rhodecode.ConfigGet().get_bool('vcs.methods.cache')
283
283
284 cache_methods = [
284 cache_methods = [
285 'branches', 'tags', 'bookmarks',
285 'branches', 'tags', 'bookmarks',
286 'is_large_file', 'is_binary',
286 'is_large_file', 'is_binary',
287 'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
287 'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
288 'node_history',
288 'node_history',
289 'revision', 'tree_items',
289 'revision', 'tree_items',
290 'ctx_list', 'ctx_branch', 'ctx_description',
290 'ctx_list', 'ctx_branch', 'ctx_description',
291 'bulk_request',
291 'bulk_request',
292 'assert_correct_path'
292 'assert_correct_path'
293 ]
293 ]
294
294
295 if local_cache_on and name in cache_methods:
295 if local_cache_on and name in cache_methods:
296 cache_on = True
296 cache_on = True
297 repo_state_uid = self._wire['repo_state_uid']
297 repo_state_uid = self._wire['repo_state_uid']
298 call_args = [a for a in args]
298 call_args = [a for a in args]
299 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
299 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
300
300
301 return cache_on, cache_key
301 return cache_on, cache_key
302
302
303 @exceptions.map_vcs_exceptions
303 @exceptions.map_vcs_exceptions
304 def _call(self, name, *args, **kwargs):
304 def _call(self, name, *args, **kwargs):
305 context_uid, payload = self._base_call(name, *args, **kwargs)
305 context_uid, payload = self._base_call(name, *args, **kwargs)
306 url = self.url
306 url = self.url
307
307
308 start = time.time()
308 start = time.time()
309 cache_on, cache_key = self.get_local_cache(name, args)
309 cache_on, cache_key = self.get_local_cache(name, args)
310
310
311 @self._cache_region.conditional_cache_on_arguments(
311 @self._cache_region.conditional_cache_on_arguments(
312 namespace=self._cache_namespace, condition=cache_on and cache_key)
312 namespace=self._cache_namespace, condition=cache_on and cache_key)
313 def remote_call(_cache_key):
313 def remote_call(_cache_key):
314 if self._call_with_logging:
314 if self._call_with_logging:
315 args_repr = f'ARG: {str(args):.512}|KW: {str(kwargs):.512}'
315 args_repr = f'ARG: {str(args):.512}|KW: {str(kwargs):.512}'
316 log.debug('Calling %s@%s with args:%r. wire_context: %s cache_on: %s',
316 log.debug('Calling %s@%s with args:%r. wire_context: %s cache_on: %s',
317 url, name, args_repr, context_uid, cache_on)
317 url, name, args_repr, context_uid, cache_on)
318 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
318 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
319
319
320 result = remote_call(cache_key)
320 result = remote_call(cache_key)
321 if self._call_with_logging:
321 if self._call_with_logging:
322 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
322 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
323 url, name, time.time()-start, context_uid)
323 url, name, time.time()-start, context_uid)
324 return result
324 return result
325
325
326 @exceptions.map_vcs_exceptions
326 @exceptions.map_vcs_exceptions
327 def _call_stream(self, name, *args, **kwargs):
327 def _call_stream(self, name, *args, **kwargs):
328 context_uid, payload = self._base_call(name, *args, **kwargs)
328 context_uid, payload = self._base_call(name, *args, **kwargs)
329 payload['chunk_size'] = self.CHUNK_SIZE
329 payload['chunk_size'] = self.CHUNK_SIZE
330 url = self.stream_url
330 url = self.stream_url
331
331
332 start = time.time()
332 start = time.time()
333 cache_on, cache_key = self.get_local_cache(name, args)
333 cache_on, cache_key = self.get_local_cache(name, args)
334
334
335 # Cache is a problem because this is a stream
335 # Cache is a problem because this is a stream
336 def streaming_remote_call(_cache_key):
336 def streaming_remote_call(_cache_key):
337 if self._call_with_logging:
337 if self._call_with_logging:
338 args_repr = f'ARG: {str(args):.512}|KW: {str(kwargs):.512}'
338 args_repr = f'ARG: {str(args):.512}|KW: {str(kwargs):.512}'
339 log.debug('Calling %s@%s with args:%r. wire_context: %s cache_on: %s',
339 log.debug('Calling %s@%s with args:%r. wire_context: %s cache_on: %s',
340 url, name, args_repr, context_uid, cache_on)
340 url, name, args_repr, context_uid, cache_on)
341 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
341 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
342
342
343 result = streaming_remote_call(cache_key)
343 result = streaming_remote_call(cache_key)
344 if self._call_with_logging:
344 if self._call_with_logging:
345 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
345 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
346 url, name, time.time()-start, context_uid)
346 url, name, time.time()-start, context_uid)
347 return result
347 return result
348
348
349 def __getitem__(self, key):
349 def __getitem__(self, key):
350 return self.revision(key)
350 return self.revision(key)
351
351
352 def _create_vcs_cache_context(self, *args):
352 def _create_vcs_cache_context(self, *args):
353 """
353 """
354 Creates a unique string which is passed to the VCSServer on every
354 Creates a unique string which is passed to the VCSServer on every
355 remote call. It is used as cache key in the VCSServer.
355 remote call. It is used as cache key in the VCSServer.
356 """
356 """
357 hash_key = '-'.join(map(str, args))
357 hash_key = '-'.join(map(str, args))
358 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
358 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
359
359
360 def invalidate_vcs_cache(self):
360 def invalidate_vcs_cache(self):
361 """
361 """
362 This invalidates the context which is sent to the VCSServer on every
362 This invalidates the context which is sent to the VCSServer on every
363 call to a remote method. It forces the VCSServer to create a fresh
363 call to a remote method. It forces the VCSServer to create a fresh
364 repository instance on the next call to a remote method.
364 repository instance on the next call to a remote method.
365 """
365 """
366 self._wire['context'] = str(uuid.uuid4())
366 self._wire['context'] = str(uuid.uuid4())
367
367
368
368
369 class VcsHttpProxy(object):
369 class VcsHttpProxy(object):
370
370
371 CHUNK_SIZE = 16384
371 CHUNK_SIZE = 16384
372
372
373 def __init__(self, server_and_port, backend_endpoint):
373 def __init__(self, server_and_port, backend_endpoint):
374 retries = Retry(total=5, connect=None, read=None, redirect=None)
374 retries = Retry(total=5, connect=None, read=None, redirect=None)
375
375
376 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
376 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
377 self.base_url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
377 self.base_url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
378 self.session = requests.Session()
378 self.session = requests.Session()
379 self.session.mount('http://', adapter)
379 self.session.mount('http://', adapter)
380
380
381 def handle(self, environment, input_data, *args, **kwargs):
381 def handle(self, environment, input_data, *args, **kwargs):
382 data = {
382 data = {
383 'environment': environment,
383 'environment': environment,
384 'input_data': input_data,
384 'input_data': input_data,
385 'args': args,
385 'args': args,
386 'kwargs': kwargs
386 'kwargs': kwargs
387 }
387 }
388 result = self.session.post(
388 result = self.session.post(
389 self.base_url, msgpack.packb(data), stream=True)
389 self.base_url, msgpack.packb(data), stream=True)
390 return self._get_result(result)
390 return self._get_result(result)
391
391
392 def _deserialize_and_raise(self, error):
392 def _deserialize_and_raise(self, error):
393 exception = Exception(error['message'])
393 exception = Exception(error['message'])
394 try:
394 try:
395 exception._vcs_kind = error['_vcs_kind']
395 exception._vcs_kind = error['_vcs_kind']
396 except KeyError:
396 except KeyError:
397 pass
397 pass
398 raise exception
398 raise exception
399
399
400 def _iterate(self, result):
400 def _iterate(self, result):
401 unpacker = msgpack.Unpacker()
401 unpacker = msgpack.Unpacker()
402 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
402 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
403 unpacker.feed(line)
403 unpacker.feed(line)
404 yield from unpacker
404 yield from unpacker
405
405
406 def _get_result(self, result):
406 def _get_result(self, result):
407 iterator = self._iterate(result)
407 iterator = self._iterate(result)
408 error = next(iterator)
408 error = next(iterator)
409 if error:
409 if error:
410 self._deserialize_and_raise(error)
410 self._deserialize_and_raise(error)
411
411
412 status = next(iterator)
412 status = next(iterator)
413 headers = next(iterator)
413 headers = next(iterator)
414
414
415 return iterator, status, headers
415 return iterator, status, headers
416
416
417
417
418 class ThreadlocalSessionFactory(object):
418 class ThreadlocalSessionFactory(object):
419 """
419 """
420 Creates one CurlSession per thread on demand.
420 Creates one CurlSession per thread on demand.
421 """
421 """
422
422
423 def __init__(self):
423 def __init__(self):
424 self._thread_local = threading.local()
424 self._thread_local = threading.local()
425
425
426 def __call__(self):
426 def __call__(self):
427 if not hasattr(self._thread_local, 'curl_session'):
427 if not hasattr(self._thread_local, 'curl_session'):
428 self._thread_local.curl_session = CurlSession()
428 self._thread_local.curl_session = CurlSession()
429 return self._thread_local.curl_session
429 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now