##// END OF EJS Templates
remote-call: added retries for connection-reset-by-peer and optimize logging leaking too much info
super-admin -
r5033:65a0999f default
parent child Browse files
Show More
@@ -1,412 +1,418 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2020 RhodeCode GmbH
3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Client for the VCSServer implemented based on HTTP.
22 Client for the VCSServer implemented based on HTTP.
23 """
23 """
24
24
25 import copy
25 import copy
26 import logging
26 import logging
27 import threading
27 import threading
28 import time
28 import time
29 import urllib.request, urllib.error, urllib.parse
29 import urllib.request, urllib.error, urllib.parse
30 import urllib.parse
30 import urllib.parse
31 import uuid
31 import uuid
32 import traceback
32 import traceback
33
33
34 import pycurl
34 import pycurl
35 import msgpack
35 import msgpack
36 import requests
36 import requests
37 from requests.packages.urllib3.util.retry import Retry
37 from requests.packages.urllib3.util.retry import Retry
38
38
39 import rhodecode
39 import rhodecode
40 from rhodecode.lib import rc_cache
40 from rhodecode.lib import rc_cache
41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
42 from rhodecode.lib.system_info import get_cert_path
42 from rhodecode.lib.system_info import get_cert_path
43 from rhodecode.lib.vcs import exceptions, CurlSession
43 from rhodecode.lib.vcs import exceptions, CurlSession
44 from rhodecode.lib.utils2 import str2bool
44 from rhodecode.lib.utils2 import str2bool
45
45
46 log = logging.getLogger(__name__)
46 log = logging.getLogger(__name__)
47
47
48
48
49 # TODO: mikhail: Keep it in sync with vcsserver's
49 # TODO: mikhail: Keep it in sync with vcsserver's
50 # HTTPApplication.ALLOWED_EXCEPTIONS
50 # HTTPApplication.ALLOWED_EXCEPTIONS
51 EXCEPTIONS_MAP = {
51 EXCEPTIONS_MAP = {
52 'KeyError': KeyError,
52 'KeyError': KeyError,
53 'URLError': urllib.error.URLError,
53 'URLError': urllib.error.URLError,
54 }
54 }
55
55
56
56
57 def _remote_call(url, payload, exceptions_map, session):
57 def _remote_call(url, payload, exceptions_map, session, retries=3):
58
59 for attempt in range(retries):
58 try:
60 try:
59 headers = {
61 response = session.post(url, data=msgpack.packb(payload))
60 'X-RC-Method': payload.get('method'),
61 'X-RC-Repo-Name': payload.get('_repo_name')
62 }
63 response = session.post(url, data=msgpack.packb(payload), headers=headers)
64 except pycurl.error as e:
62 except pycurl.error as e:
63 error_code, error_message = e.args
64 if error_code == pycurl.E_RECV_ERROR:
65 log.warning(f'Received a "Connection reset by peer" error. '
66 f'Retrying... ({attempt + 1}/{retries})')
67 continue # Retry if connection reset error.
65 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
68 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
66 raise exceptions.HttpVCSCommunicationError(msg)
69 raise exceptions.HttpVCSCommunicationError(msg)
67 except Exception as e:
70 except Exception as e:
68 message = getattr(e, 'message', '')
71 message = getattr(e, 'message', '')
69 if 'Failed to connect' in message:
72 if 'Failed to connect' in message:
70 # gevent doesn't return proper pycurl errors
73 # gevent doesn't return proper pycurl errors
71 raise exceptions.HttpVCSCommunicationError(e)
74 raise exceptions.HttpVCSCommunicationError(e)
72 else:
75 else:
73 raise
76 raise
74
77
75 if response.status_code >= 400:
78 if response.status_code >= 400:
76 log.error('Call to %s returned non 200 HTTP code: %s',
79 log.error('Call to %s returned non 200 HTTP code: %s',
77 url, response.status_code)
80 url, response.status_code)
78 raise exceptions.HttpVCSCommunicationError(repr(response.content))
81 raise exceptions.HttpVCSCommunicationError(repr(response.content))
79
82
80 try:
83 try:
81 response = msgpack.unpackb(response.content, raw=False)
84 response = msgpack.unpackb(response.content, raw=False)
82 except Exception:
85 except Exception:
83 log.exception('Failed to decode response %r', response.content)
86 log.exception('Failed to decode response from msgpack')
84 raise
87 raise
85
88
86 error = response.get('error')
89 error = response.get('error')
87 if error:
90 if error:
88 type_ = error.get('type', 'Exception')
91 type_ = error.get('type', 'Exception')
89 exc = exceptions_map.get(type_, Exception)
92 exc = exceptions_map.get(type_, Exception)
90 exc = exc(error.get('message'))
93 exc = exc(error.get('message'))
91 try:
94 try:
92 exc._vcs_kind = error['_vcs_kind']
95 exc._vcs_kind = error['_vcs_kind']
93 except KeyError:
96 except KeyError:
94 pass
97 pass
95
98
96 try:
99 try:
97 exc._vcs_server_traceback = error['traceback']
100 exc._vcs_server_traceback = error['traceback']
98 exc._vcs_server_org_exc_name = error['org_exc']
101 exc._vcs_server_org_exc_name = error['org_exc']
99 exc._vcs_server_org_exc_tb = error['org_exc_tb']
102 exc._vcs_server_org_exc_tb = error['org_exc_tb']
100 except KeyError:
103 except KeyError:
101 pass
104 pass
102
105
103 raise exc
106 raise exc
104 return response.get('result')
107 return response.get('result')
105
108
106
109
107 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
110 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
108 try:
111 try:
109 headers = {
112 headers = {
110 'X-RC-Method': payload.get('method'),
113 'X-RC-Method': payload.get('method'),
111 'X-RC-Repo-Name': payload.get('_repo_name')
114 'X-RC-Repo-Name': payload.get('_repo_name')
112 }
115 }
113 response = session.post(url, data=msgpack.packb(payload), headers=headers)
116 response = session.post(url, data=msgpack.packb(payload), headers=headers)
114 except pycurl.error as e:
117 except pycurl.error as e:
118 error_code, error_message = e.args
115 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
119 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
116 raise exceptions.HttpVCSCommunicationError(msg)
120 raise exceptions.HttpVCSCommunicationError(msg)
117 except Exception as e:
121 except Exception as e:
118 message = getattr(e, 'message', '')
122 message = getattr(e, 'message', '')
119 if 'Failed to connect' in message:
123 if 'Failed to connect' in message:
120 # gevent doesn't return proper pycurl errors
124 # gevent doesn't return proper pycurl errors
121 raise exceptions.HttpVCSCommunicationError(e)
125 raise exceptions.HttpVCSCommunicationError(e)
122 else:
126 else:
123 raise
127 raise
124
128
125 if response.status_code >= 400:
129 if response.status_code >= 400:
126 log.error('Call to %s returned non 200 HTTP code: %s',
130 log.error('Call to %s returned non 200 HTTP code: %s',
127 url, response.status_code)
131 url, response.status_code)
128 raise exceptions.HttpVCSCommunicationError(repr(response.content))
132 raise exceptions.HttpVCSCommunicationError(repr(response.content))
129
133
130 return response.iter_content(chunk_size=chunk_size)
134 return response.iter_content(chunk_size=chunk_size)
131
135
132
136
133 class ServiceConnection(object):
137 class ServiceConnection(object):
134 def __init__(self, server_and_port, backend_endpoint, session_factory):
138 def __init__(self, server_and_port, backend_endpoint, session_factory):
135 self.url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
139 self.url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
136 self._session_factory = session_factory
140 self._session_factory = session_factory
137
141
138 def __getattr__(self, name):
142 def __getattr__(self, name):
139 def f(*args, **kwargs):
143 def f(*args, **kwargs):
140 return self._call(name, *args, **kwargs)
144 return self._call(name, *args, **kwargs)
141 return f
145 return f
142
146
143 @exceptions.map_vcs_exceptions
147 @exceptions.map_vcs_exceptions
144 def _call(self, name, *args, **kwargs):
148 def _call(self, name, *args, **kwargs):
145 payload = {
149 payload = {
146 'id': str(uuid.uuid4()),
150 'id': str(uuid.uuid4()),
147 'method': name,
151 'method': name,
148 'params': {'args': args, 'kwargs': kwargs}
152 'params': {'args': args, 'kwargs': kwargs}
149 }
153 }
150 return _remote_call(
154 return _remote_call(
151 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
155 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
152
156
153
157
154 class RemoteVCSMaker(object):
158 class RemoteVCSMaker(object):
155
159
156 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
160 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
157 self.url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
161 self.url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
158 self.stream_url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
162 self.stream_url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
159
163
160 self._session_factory = session_factory
164 self._session_factory = session_factory
161 self.backend_type = backend_type
165 self.backend_type = backend_type
162
166
163 @classmethod
167 @classmethod
164 def init_cache_region(cls, repo_id):
168 def init_cache_region(cls, repo_id):
165 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
169 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
166 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
170 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
167 return region, cache_namespace_uid
171 return region, cache_namespace_uid
168
172
169 def __call__(self, path, repo_id, config, with_wire=None):
173 def __call__(self, path, repo_id, config, with_wire=None):
170 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
174 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
171 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
175 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
172
176
173 def __getattr__(self, name):
177 def __getattr__(self, name):
174 def remote_attr(*args, **kwargs):
178 def remote_attr(*args, **kwargs):
175 return self._call(name, *args, **kwargs)
179 return self._call(name, *args, **kwargs)
176 return remote_attr
180 return remote_attr
177
181
178 @exceptions.map_vcs_exceptions
182 @exceptions.map_vcs_exceptions
179 def _call(self, func_name, *args, **kwargs):
183 def _call(self, func_name, *args, **kwargs):
180 payload = {
184 payload = {
181 'id': str(uuid.uuid4()),
185 'id': str(uuid.uuid4()),
182 'method': func_name,
186 'method': func_name,
183 'backend': self.backend_type,
187 'backend': self.backend_type,
184 'params': {'args': args, 'kwargs': kwargs}
188 'params': {'args': args, 'kwargs': kwargs}
185 }
189 }
186 url = self.url
190 url = self.url
187 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
191 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
188
192
189
193
190 class RemoteRepo(object):
194 class RemoteRepo(object):
191 CHUNK_SIZE = 16384
195 CHUNK_SIZE = 16384
192
196
193 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
197 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
194 self.url = remote_maker.url
198 self.url = remote_maker.url
195 self.stream_url = remote_maker.stream_url
199 self.stream_url = remote_maker.stream_url
196 self._session = remote_maker._session_factory()
200 self._session = remote_maker._session_factory()
197
201
198 cache_repo_id = self._repo_id_sanitizer(repo_id)
202 cache_repo_id = self._repo_id_sanitizer(repo_id)
199 _repo_name = self._get_repo_name(config, path)
203 _repo_name = self._get_repo_name(config, path)
200 self._cache_region, self._cache_namespace = \
204 self._cache_region, self._cache_namespace = \
201 remote_maker.init_cache_region(cache_repo_id)
205 remote_maker.init_cache_region(cache_repo_id)
202
206
203 with_wire = with_wire or {}
207 with_wire = with_wire or {}
204
208
205 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
209 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
206
210
207 self._wire = {
211 self._wire = {
208 "_repo_name": _repo_name,
212 "_repo_name": _repo_name,
209 "path": path, # repo path
213 "path": path, # repo path
210 "repo_id": repo_id,
214 "repo_id": repo_id,
211 "cache_repo_id": cache_repo_id,
215 "cache_repo_id": cache_repo_id,
212 "config": config,
216 "config": config,
213 "repo_state_uid": repo_state_uid,
217 "repo_state_uid": repo_state_uid,
214 "context": self._create_vcs_cache_context(path, repo_state_uid)
218 "context": self._create_vcs_cache_context(path, repo_state_uid)
215 }
219 }
216
220
217 if with_wire:
221 if with_wire:
218 self._wire.update(with_wire)
222 self._wire.update(with_wire)
219
223
220 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
224 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
221 # log.debug brings a few percent gain even if is is not active.
225 # log.debug brings a few percent gain even if is is not active.
222 if log.isEnabledFor(logging.DEBUG):
226 if log.isEnabledFor(logging.DEBUG):
223 self._call_with_logging = True
227 self._call_with_logging = True
224
228
225 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
229 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
226
230
227 def _get_repo_name(self, config, path):
231 def _get_repo_name(self, config, path):
228 repo_store = config.get('paths', '/')
232 repo_store = config.get('paths', '/')
229 return path.split(repo_store)[-1].lstrip('/')
233 return path.split(repo_store)[-1].lstrip('/')
230
234
231 def _repo_id_sanitizer(self, repo_id):
235 def _repo_id_sanitizer(self, repo_id):
232 pathless = repo_id.replace('/', '__').replace('-', '_')
236 pathless = repo_id.replace('/', '__').replace('-', '_')
233 return ''.join(char if ord(char) < 128 else '_{}_'.format(ord(char)) for char in pathless)
237 return ''.join(char if ord(char) < 128 else '_{}_'.format(ord(char)) for char in pathless)
234
238
235 def __getattr__(self, name):
239 def __getattr__(self, name):
236
240
237 if name.startswith('stream:'):
241 if name.startswith('stream:'):
238 def repo_remote_attr(*args, **kwargs):
242 def repo_remote_attr(*args, **kwargs):
239 return self._call_stream(name, *args, **kwargs)
243 return self._call_stream(name, *args, **kwargs)
240 else:
244 else:
241 def repo_remote_attr(*args, **kwargs):
245 def repo_remote_attr(*args, **kwargs):
242 return self._call(name, *args, **kwargs)
246 return self._call(name, *args, **kwargs)
243
247
244 return repo_remote_attr
248 return repo_remote_attr
245
249
246 def _base_call(self, name, *args, **kwargs):
250 def _base_call(self, name, *args, **kwargs):
247 # TODO: oliver: This is currently necessary pre-call since the
251 # TODO: oliver: This is currently necessary pre-call since the
248 # config object is being changed for hooking scenarios
252 # config object is being changed for hooking scenarios
249 wire = copy.deepcopy(self._wire)
253 wire = copy.deepcopy(self._wire)
250 wire["config"] = wire["config"].serialize()
254 wire["config"] = wire["config"].serialize()
251 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
255 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
252
256
253 payload = {
257 payload = {
254 'id': str(uuid.uuid4()),
258 'id': str(uuid.uuid4()),
255 'method': name,
259 'method': name,
256 "_repo_name": wire['_repo_name'],
260 "_repo_name": wire['_repo_name'],
257 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
261 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
258 }
262 }
259
263
260 context_uid = wire.get('context')
264 context_uid = wire.get('context')
261 return context_uid, payload
265 return context_uid, payload
262
266
263 def get_local_cache(self, name, args):
267 def get_local_cache(self, name, args):
264 cache_on = False
268 cache_on = False
265 cache_key = ''
269 cache_key = ''
266 local_cache_on = str2bool(rhodecode.CONFIG.get('vcs.methods.cache'))
270 local_cache_on = str2bool(rhodecode.CONFIG.get('vcs.methods.cache'))
267
271
268 cache_methods = [
272 cache_methods = [
269 'branches', 'tags', 'bookmarks',
273 'branches', 'tags', 'bookmarks',
270 'is_large_file', 'is_binary',
274 'is_large_file', 'is_binary',
271 'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
275 'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
272 'node_history',
276 'node_history',
273 'revision', 'tree_items',
277 'revision', 'tree_items',
274 'ctx_list', 'ctx_branch', 'ctx_description',
278 'ctx_list', 'ctx_branch', 'ctx_description',
275 'bulk_request',
279 'bulk_request',
276 'assert_correct_path'
280 'assert_correct_path'
277 ]
281 ]
278
282
279 if local_cache_on and name in cache_methods:
283 if local_cache_on and name in cache_methods:
280 cache_on = True
284 cache_on = True
281 repo_state_uid = self._wire['repo_state_uid']
285 repo_state_uid = self._wire['repo_state_uid']
282 call_args = [a for a in args]
286 call_args = [a for a in args]
283 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
287 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
284
288
285 return cache_on, cache_key
289 return cache_on, cache_key
286
290
287 @exceptions.map_vcs_exceptions
291 @exceptions.map_vcs_exceptions
288 def _call(self, name, *args, **kwargs):
292 def _call(self, name, *args, **kwargs):
289 context_uid, payload = self._base_call(name, *args, **kwargs)
293 context_uid, payload = self._base_call(name, *args, **kwargs)
290 url = self.url
294 url = self.url
291
295
292 start = time.time()
296 start = time.time()
293 cache_on, cache_key = self.get_local_cache(name, args)
297 cache_on, cache_key = self.get_local_cache(name, args)
294
298
295 @self._cache_region.conditional_cache_on_arguments(
299 @self._cache_region.conditional_cache_on_arguments(
296 namespace=self._cache_namespace, condition=cache_on and cache_key)
300 namespace=self._cache_namespace, condition=cache_on and cache_key)
297 def remote_call(_cache_key):
301 def remote_call(_cache_key):
298 if self._call_with_logging:
302 if self._call_with_logging:
299 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
303 args_repr = f'ARG: {str(args):.256}|KW: {str(kwargs):.256}'
300 url, name, args, context_uid, cache_on)
304 log.debug('Calling %s@%s with args:%r. wire_context: %s cache_on: %s',
305 url, name, args_repr, context_uid, cache_on)
301 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
306 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
302
307
303 result = remote_call(cache_key)
308 result = remote_call(cache_key)
304 if self._call_with_logging:
309 if self._call_with_logging:
305 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
310 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
306 url, name, time.time()-start, context_uid)
311 url, name, time.time()-start, context_uid)
307 return result
312 return result
308
313
309 @exceptions.map_vcs_exceptions
314 @exceptions.map_vcs_exceptions
310 def _call_stream(self, name, *args, **kwargs):
315 def _call_stream(self, name, *args, **kwargs):
311 context_uid, payload = self._base_call(name, *args, **kwargs)
316 context_uid, payload = self._base_call(name, *args, **kwargs)
312 payload['chunk_size'] = self.CHUNK_SIZE
317 payload['chunk_size'] = self.CHUNK_SIZE
313 url = self.stream_url
318 url = self.stream_url
314
319
315 start = time.time()
320 start = time.time()
316 cache_on, cache_key = self.get_local_cache(name, args)
321 cache_on, cache_key = self.get_local_cache(name, args)
317
322
318 # Cache is a problem because this is a stream
323 # Cache is a problem because this is a stream
319 def streaming_remote_call(_cache_key):
324 def streaming_remote_call(_cache_key):
320 if self._call_with_logging:
325 if self._call_with_logging:
321 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
326 args_repr = f'ARG: {str(args):.256}|KW: {str(kwargs):.256}'
322 url, name, args, context_uid, cache_on)
327 log.debug('Calling %s@%s with args:%r. wire_context: %s cache_on: %s',
328 url, name, args_repr, context_uid, cache_on)
323 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
329 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
324
330
325 result = streaming_remote_call(cache_key)
331 result = streaming_remote_call(cache_key)
326 if self._call_with_logging:
332 if self._call_with_logging:
327 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
333 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
328 url, name, time.time()-start, context_uid)
334 url, name, time.time()-start, context_uid)
329 return result
335 return result
330
336
331 def __getitem__(self, key):
337 def __getitem__(self, key):
332 return self.revision(key)
338 return self.revision(key)
333
339
334 def _create_vcs_cache_context(self, *args):
340 def _create_vcs_cache_context(self, *args):
335 """
341 """
336 Creates a unique string which is passed to the VCSServer on every
342 Creates a unique string which is passed to the VCSServer on every
337 remote call. It is used as cache key in the VCSServer.
343 remote call. It is used as cache key in the VCSServer.
338 """
344 """
339 hash_key = '-'.join(map(str, args))
345 hash_key = '-'.join(map(str, args))
340 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
346 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
341
347
342 def invalidate_vcs_cache(self):
348 def invalidate_vcs_cache(self):
343 """
349 """
344 This invalidates the context which is sent to the VCSServer on every
350 This invalidates the context which is sent to the VCSServer on every
345 call to a remote method. It forces the VCSServer to create a fresh
351 call to a remote method. It forces the VCSServer to create a fresh
346 repository instance on the next call to a remote method.
352 repository instance on the next call to a remote method.
347 """
353 """
348 self._wire['context'] = str(uuid.uuid4())
354 self._wire['context'] = str(uuid.uuid4())
349
355
350
356
351 class VcsHttpProxy(object):
357 class VcsHttpProxy(object):
352
358
353 CHUNK_SIZE = 16384
359 CHUNK_SIZE = 16384
354
360
355 def __init__(self, server_and_port, backend_endpoint):
361 def __init__(self, server_and_port, backend_endpoint):
356 retries = Retry(total=5, connect=None, read=None, redirect=None)
362 retries = Retry(total=5, connect=None, read=None, redirect=None)
357
363
358 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
364 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
359 self.base_url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
365 self.base_url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
360 self.session = requests.Session()
366 self.session = requests.Session()
361 self.session.mount('http://', adapter)
367 self.session.mount('http://', adapter)
362
368
363 def handle(self, environment, input_data, *args, **kwargs):
369 def handle(self, environment, input_data, *args, **kwargs):
364 data = {
370 data = {
365 'environment': environment,
371 'environment': environment,
366 'input_data': input_data,
372 'input_data': input_data,
367 'args': args,
373 'args': args,
368 'kwargs': kwargs
374 'kwargs': kwargs
369 }
375 }
370 result = self.session.post(
376 result = self.session.post(
371 self.base_url, msgpack.packb(data), stream=True)
377 self.base_url, msgpack.packb(data), stream=True)
372 return self._get_result(result)
378 return self._get_result(result)
373
379
374 def _deserialize_and_raise(self, error):
380 def _deserialize_and_raise(self, error):
375 exception = Exception(error['message'])
381 exception = Exception(error['message'])
376 try:
382 try:
377 exception._vcs_kind = error['_vcs_kind']
383 exception._vcs_kind = error['_vcs_kind']
378 except KeyError:
384 except KeyError:
379 pass
385 pass
380 raise exception
386 raise exception
381
387
382 def _iterate(self, result):
388 def _iterate(self, result):
383 unpacker = msgpack.Unpacker()
389 unpacker = msgpack.Unpacker()
384 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
390 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
385 unpacker.feed(line)
391 unpacker.feed(line)
386 for chunk in unpacker:
392 for chunk in unpacker:
387 yield chunk
393 yield chunk
388
394
389 def _get_result(self, result):
395 def _get_result(self, result):
390 iterator = self._iterate(result)
396 iterator = self._iterate(result)
391 error = next(iterator)
397 error = next(iterator)
392 if error:
398 if error:
393 self._deserialize_and_raise(error)
399 self._deserialize_and_raise(error)
394
400
395 status = next(iterator)
401 status = next(iterator)
396 headers = next(iterator)
402 headers = next(iterator)
397
403
398 return iterator, status, headers
404 return iterator, status, headers
399
405
400
406
401 class ThreadlocalSessionFactory(object):
407 class ThreadlocalSessionFactory(object):
402 """
408 """
403 Creates one CurlSession per thread on demand.
409 Creates one CurlSession per thread on demand.
404 """
410 """
405
411
406 def __init__(self):
412 def __init__(self):
407 self._thread_local = threading.local()
413 self._thread_local = threading.local()
408
414
409 def __call__(self):
415 def __call__(self):
410 if not hasattr(self._thread_local, 'curl_session'):
416 if not hasattr(self._thread_local, 'curl_session'):
411 self._thread_local.curl_session = CurlSession()
417 self._thread_local.curl_session = CurlSession()
412 return self._thread_local.curl_session
418 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now