##// END OF EJS Templates
communication: use unpack without bytes
super-admin -
r4966:a76e3737 default
parent child Browse files
Show More
@@ -1,412 +1,412 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2020 RhodeCode GmbH
3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Client for the VCSServer implemented based on HTTP.
22 Client for the VCSServer implemented based on HTTP.
23 """
23 """
24
24
25 import copy
25 import copy
26 import logging
26 import logging
27 import threading
27 import threading
28 import time
28 import time
29 import urllib.request, urllib.error, urllib.parse
29 import urllib.request, urllib.error, urllib.parse
30 import urllib.parse
30 import urllib.parse
31 import uuid
31 import uuid
32 import traceback
32 import traceback
33
33
34 import pycurl
34 import pycurl
35 import msgpack
35 import msgpack
36 import requests
36 import requests
37 from requests.packages.urllib3.util.retry import Retry
37 from requests.packages.urllib3.util.retry import Retry
38
38
39 import rhodecode
39 import rhodecode
40 from rhodecode.lib import rc_cache
40 from rhodecode.lib import rc_cache
41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
42 from rhodecode.lib.system_info import get_cert_path
42 from rhodecode.lib.system_info import get_cert_path
43 from rhodecode.lib.vcs import exceptions, CurlSession
43 from rhodecode.lib.vcs import exceptions, CurlSession
44 from rhodecode.lib.utils2 import str2bool
44 from rhodecode.lib.utils2 import str2bool
45
45
46 log = logging.getLogger(__name__)
46 log = logging.getLogger(__name__)
47
47
48
48
49 # TODO: mikhail: Keep it in sync with vcsserver's
49 # TODO: mikhail: Keep it in sync with vcsserver's
50 # HTTPApplication.ALLOWED_EXCEPTIONS
50 # HTTPApplication.ALLOWED_EXCEPTIONS
51 EXCEPTIONS_MAP = {
51 EXCEPTIONS_MAP = {
52 'KeyError': KeyError,
52 'KeyError': KeyError,
53 'URLError': urllib.error.URLError,
53 'URLError': urllib.error.URLError,
54 }
54 }
55
55
56
56
57 def _remote_call(url, payload, exceptions_map, session):
57 def _remote_call(url, payload, exceptions_map, session):
58 try:
58 try:
59 headers = {
59 headers = {
60 'X-RC-Method': payload.get('method'),
60 'X-RC-Method': payload.get('method'),
61 'X-RC-Repo-Name': payload.get('_repo_name')
61 'X-RC-Repo-Name': payload.get('_repo_name')
62 }
62 }
63 response = session.post(url, data=msgpack.packb(payload), headers=headers)
63 response = session.post(url, data=msgpack.packb(payload), headers=headers)
64 except pycurl.error as e:
64 except pycurl.error as e:
65 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
65 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
66 raise exceptions.HttpVCSCommunicationError(msg)
66 raise exceptions.HttpVCSCommunicationError(msg)
67 except Exception as e:
67 except Exception as e:
68 message = getattr(e, 'message', '')
68 message = getattr(e, 'message', '')
69 if 'Failed to connect' in message:
69 if 'Failed to connect' in message:
70 # gevent doesn't return proper pycurl errors
70 # gevent doesn't return proper pycurl errors
71 raise exceptions.HttpVCSCommunicationError(e)
71 raise exceptions.HttpVCSCommunicationError(e)
72 else:
72 else:
73 raise
73 raise
74
74
75 if response.status_code >= 400:
75 if response.status_code >= 400:
76 log.error('Call to %s returned non 200 HTTP code: %s',
76 log.error('Call to %s returned non 200 HTTP code: %s',
77 url, response.status_code)
77 url, response.status_code)
78 raise exceptions.HttpVCSCommunicationError(repr(response.content))
78 raise exceptions.HttpVCSCommunicationError(repr(response.content))
79
79
80 try:
80 try:
81 response = msgpack.unpackb(response.content)
81 response = msgpack.unpackb(response.content, raw=False)
82 except Exception:
82 except Exception:
83 log.exception('Failed to decode response %r', response.content)
83 log.exception('Failed to decode response %r', response.content)
84 raise
84 raise
85
85
86 error = response.get('error')
86 error = response.get('error')
87 if error:
87 if error:
88 type_ = error.get('type', 'Exception')
88 type_ = error.get('type', 'Exception')
89 exc = exceptions_map.get(type_, Exception)
89 exc = exceptions_map.get(type_, Exception)
90 exc = exc(error.get('message'))
90 exc = exc(error.get('message'))
91 try:
91 try:
92 exc._vcs_kind = error['_vcs_kind']
92 exc._vcs_kind = error['_vcs_kind']
93 except KeyError:
93 except KeyError:
94 pass
94 pass
95
95
96 try:
96 try:
97 exc._vcs_server_traceback = error['traceback']
97 exc._vcs_server_traceback = error['traceback']
98 exc._vcs_server_org_exc_name = error['org_exc']
98 exc._vcs_server_org_exc_name = error['org_exc']
99 exc._vcs_server_org_exc_tb = error['org_exc_tb']
99 exc._vcs_server_org_exc_tb = error['org_exc_tb']
100 except KeyError:
100 except KeyError:
101 pass
101 pass
102
102
103 raise exc
103 raise exc
104 return response.get('result')
104 return response.get('result')
105
105
106
106
107 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
107 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
108 try:
108 try:
109 headers = {
109 headers = {
110 'X-RC-Method': payload.get('method'),
110 'X-RC-Method': payload.get('method'),
111 'X-RC-Repo-Name': payload.get('_repo_name')
111 'X-RC-Repo-Name': payload.get('_repo_name')
112 }
112 }
113 response = session.post(url, data=msgpack.packb(payload), headers=headers)
113 response = session.post(url, data=msgpack.packb(payload), headers=headers)
114 except pycurl.error as e:
114 except pycurl.error as e:
115 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
115 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
116 raise exceptions.HttpVCSCommunicationError(msg)
116 raise exceptions.HttpVCSCommunicationError(msg)
117 except Exception as e:
117 except Exception as e:
118 message = getattr(e, 'message', '')
118 message = getattr(e, 'message', '')
119 if 'Failed to connect' in message:
119 if 'Failed to connect' in message:
120 # gevent doesn't return proper pycurl errors
120 # gevent doesn't return proper pycurl errors
121 raise exceptions.HttpVCSCommunicationError(e)
121 raise exceptions.HttpVCSCommunicationError(e)
122 else:
122 else:
123 raise
123 raise
124
124
125 if response.status_code >= 400:
125 if response.status_code >= 400:
126 log.error('Call to %s returned non 200 HTTP code: %s',
126 log.error('Call to %s returned non 200 HTTP code: %s',
127 url, response.status_code)
127 url, response.status_code)
128 raise exceptions.HttpVCSCommunicationError(repr(response.content))
128 raise exceptions.HttpVCSCommunicationError(repr(response.content))
129
129
130 return response.iter_content(chunk_size=chunk_size)
130 return response.iter_content(chunk_size=chunk_size)
131
131
132
132
133 class ServiceConnection(object):
133 class ServiceConnection(object):
134 def __init__(self, server_and_port, backend_endpoint, session_factory):
134 def __init__(self, server_and_port, backend_endpoint, session_factory):
135 self.url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
135 self.url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
136 self._session_factory = session_factory
136 self._session_factory = session_factory
137
137
138 def __getattr__(self, name):
138 def __getattr__(self, name):
139 def f(*args, **kwargs):
139 def f(*args, **kwargs):
140 return self._call(name, *args, **kwargs)
140 return self._call(name, *args, **kwargs)
141 return f
141 return f
142
142
143 @exceptions.map_vcs_exceptions
143 @exceptions.map_vcs_exceptions
144 def _call(self, name, *args, **kwargs):
144 def _call(self, name, *args, **kwargs):
145 payload = {
145 payload = {
146 'id': str(uuid.uuid4()),
146 'id': str(uuid.uuid4()),
147 'method': name,
147 'method': name,
148 'params': {'args': args, 'kwargs': kwargs}
148 'params': {'args': args, 'kwargs': kwargs}
149 }
149 }
150 return _remote_call(
150 return _remote_call(
151 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
151 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
152
152
153
153
154 class RemoteVCSMaker(object):
154 class RemoteVCSMaker(object):
155
155
156 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
156 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
157 self.url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
157 self.url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
158 self.stream_url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
158 self.stream_url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
159
159
160 self._session_factory = session_factory
160 self._session_factory = session_factory
161 self.backend_type = backend_type
161 self.backend_type = backend_type
162
162
163 @classmethod
163 @classmethod
164 def init_cache_region(cls, repo_id):
164 def init_cache_region(cls, repo_id):
165 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
165 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
166 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
166 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
167 return region, cache_namespace_uid
167 return region, cache_namespace_uid
168
168
169 def __call__(self, path, repo_id, config, with_wire=None):
169 def __call__(self, path, repo_id, config, with_wire=None):
170 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
170 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
171 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
171 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
172
172
173 def __getattr__(self, name):
173 def __getattr__(self, name):
174 def remote_attr(*args, **kwargs):
174 def remote_attr(*args, **kwargs):
175 return self._call(name, *args, **kwargs)
175 return self._call(name, *args, **kwargs)
176 return remote_attr
176 return remote_attr
177
177
178 @exceptions.map_vcs_exceptions
178 @exceptions.map_vcs_exceptions
179 def _call(self, func_name, *args, **kwargs):
179 def _call(self, func_name, *args, **kwargs):
180 payload = {
180 payload = {
181 'id': str(uuid.uuid4()),
181 'id': str(uuid.uuid4()),
182 'method': func_name,
182 'method': func_name,
183 'backend': self.backend_type,
183 'backend': self.backend_type,
184 'params': {'args': args, 'kwargs': kwargs}
184 'params': {'args': args, 'kwargs': kwargs}
185 }
185 }
186 url = self.url
186 url = self.url
187 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
187 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
188
188
189
189
190 class RemoteRepo(object):
190 class RemoteRepo(object):
191 CHUNK_SIZE = 16384
191 CHUNK_SIZE = 16384
192
192
193 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
193 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
194 self.url = remote_maker.url
194 self.url = remote_maker.url
195 self.stream_url = remote_maker.stream_url
195 self.stream_url = remote_maker.stream_url
196 self._session = remote_maker._session_factory()
196 self._session = remote_maker._session_factory()
197
197
198 cache_repo_id = self._repo_id_sanitizer(repo_id)
198 cache_repo_id = self._repo_id_sanitizer(repo_id)
199 _repo_name = self._get_repo_name(config, path)
199 _repo_name = self._get_repo_name(config, path)
200 self._cache_region, self._cache_namespace = \
200 self._cache_region, self._cache_namespace = \
201 remote_maker.init_cache_region(cache_repo_id)
201 remote_maker.init_cache_region(cache_repo_id)
202
202
203 with_wire = with_wire or {}
203 with_wire = with_wire or {}
204
204
205 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
205 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
206
206
207 self._wire = {
207 self._wire = {
208 "_repo_name": _repo_name,
208 "_repo_name": _repo_name,
209 "path": path, # repo path
209 "path": path, # repo path
210 "repo_id": repo_id,
210 "repo_id": repo_id,
211 "cache_repo_id": cache_repo_id,
211 "cache_repo_id": cache_repo_id,
212 "config": config,
212 "config": config,
213 "repo_state_uid": repo_state_uid,
213 "repo_state_uid": repo_state_uid,
214 "context": self._create_vcs_cache_context(path, repo_state_uid)
214 "context": self._create_vcs_cache_context(path, repo_state_uid)
215 }
215 }
216
216
217 if with_wire:
217 if with_wire:
218 self._wire.update(with_wire)
218 self._wire.update(with_wire)
219
219
220 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
220 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
221 # log.debug brings a few percent gain even if is is not active.
221 # log.debug brings a few percent gain even if is is not active.
222 if log.isEnabledFor(logging.DEBUG):
222 if log.isEnabledFor(logging.DEBUG):
223 self._call_with_logging = True
223 self._call_with_logging = True
224
224
225 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
225 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
226
226
227 def _get_repo_name(self, config, path):
227 def _get_repo_name(self, config, path):
228 repo_store = config.get('paths', '/')
228 repo_store = config.get('paths', '/')
229 return path.split(repo_store)[-1].lstrip('/')
229 return path.split(repo_store)[-1].lstrip('/')
230
230
231 def _repo_id_sanitizer(self, repo_id):
231 def _repo_id_sanitizer(self, repo_id):
232 pathless = repo_id.replace('/', '__').replace('-', '_')
232 pathless = repo_id.replace('/', '__').replace('-', '_')
233 return ''.join(char if ord(char) < 128 else '_{}_'.format(ord(char)) for char in pathless)
233 return ''.join(char if ord(char) < 128 else '_{}_'.format(ord(char)) for char in pathless)
234
234
235 def __getattr__(self, name):
235 def __getattr__(self, name):
236
236
237 if name.startswith('stream:'):
237 if name.startswith('stream:'):
238 def repo_remote_attr(*args, **kwargs):
238 def repo_remote_attr(*args, **kwargs):
239 return self._call_stream(name, *args, **kwargs)
239 return self._call_stream(name, *args, **kwargs)
240 else:
240 else:
241 def repo_remote_attr(*args, **kwargs):
241 def repo_remote_attr(*args, **kwargs):
242 return self._call(name, *args, **kwargs)
242 return self._call(name, *args, **kwargs)
243
243
244 return repo_remote_attr
244 return repo_remote_attr
245
245
246 def _base_call(self, name, *args, **kwargs):
246 def _base_call(self, name, *args, **kwargs):
247 # TODO: oliver: This is currently necessary pre-call since the
247 # TODO: oliver: This is currently necessary pre-call since the
248 # config object is being changed for hooking scenarios
248 # config object is being changed for hooking scenarios
249 wire = copy.deepcopy(self._wire)
249 wire = copy.deepcopy(self._wire)
250 wire["config"] = wire["config"].serialize()
250 wire["config"] = wire["config"].serialize()
251 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
251 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
252
252
253 payload = {
253 payload = {
254 'id': str(uuid.uuid4()),
254 'id': str(uuid.uuid4()),
255 'method': name,
255 'method': name,
256 "_repo_name": wire['_repo_name'],
256 "_repo_name": wire['_repo_name'],
257 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
257 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
258 }
258 }
259
259
260 context_uid = wire.get('context')
260 context_uid = wire.get('context')
261 return context_uid, payload
261 return context_uid, payload
262
262
263 def get_local_cache(self, name, args):
263 def get_local_cache(self, name, args):
264 cache_on = False
264 cache_on = False
265 cache_key = ''
265 cache_key = ''
266 local_cache_on = str2bool(rhodecode.CONFIG.get('vcs.methods.cache'))
266 local_cache_on = str2bool(rhodecode.CONFIG.get('vcs.methods.cache'))
267
267
268 cache_methods = [
268 cache_methods = [
269 'branches', 'tags', 'bookmarks',
269 'branches', 'tags', 'bookmarks',
270 'is_large_file', 'is_binary',
270 'is_large_file', 'is_binary',
271 'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
271 'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
272 'node_history',
272 'node_history',
273 'revision', 'tree_items',
273 'revision', 'tree_items',
274 'ctx_list', 'ctx_branch', 'ctx_description',
274 'ctx_list', 'ctx_branch', 'ctx_description',
275 'bulk_request',
275 'bulk_request',
276 'assert_correct_path'
276 'assert_correct_path'
277 ]
277 ]
278
278
279 if local_cache_on and name in cache_methods:
279 if local_cache_on and name in cache_methods:
280 cache_on = True
280 cache_on = True
281 repo_state_uid = self._wire['repo_state_uid']
281 repo_state_uid = self._wire['repo_state_uid']
282 call_args = [a for a in args]
282 call_args = [a for a in args]
283 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
283 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
284
284
285 return cache_on, cache_key
285 return cache_on, cache_key
286
286
287 @exceptions.map_vcs_exceptions
287 @exceptions.map_vcs_exceptions
288 def _call(self, name, *args, **kwargs):
288 def _call(self, name, *args, **kwargs):
289 context_uid, payload = self._base_call(name, *args, **kwargs)
289 context_uid, payload = self._base_call(name, *args, **kwargs)
290 url = self.url
290 url = self.url
291
291
292 start = time.time()
292 start = time.time()
293 cache_on, cache_key = self.get_local_cache(name, args)
293 cache_on, cache_key = self.get_local_cache(name, args)
294
294
295 @self._cache_region.conditional_cache_on_arguments(
295 @self._cache_region.conditional_cache_on_arguments(
296 namespace=self._cache_namespace, condition=cache_on and cache_key)
296 namespace=self._cache_namespace, condition=cache_on and cache_key)
297 def remote_call(_cache_key):
297 def remote_call(_cache_key):
298 if self._call_with_logging:
298 if self._call_with_logging:
299 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
299 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
300 url, name, args, context_uid, cache_on)
300 url, name, args, context_uid, cache_on)
301 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
301 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
302
302
303 result = remote_call(cache_key)
303 result = remote_call(cache_key)
304 if self._call_with_logging:
304 if self._call_with_logging:
305 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
305 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
306 url, name, time.time()-start, context_uid)
306 url, name, time.time()-start, context_uid)
307 return result
307 return result
308
308
309 @exceptions.map_vcs_exceptions
309 @exceptions.map_vcs_exceptions
310 def _call_stream(self, name, *args, **kwargs):
310 def _call_stream(self, name, *args, **kwargs):
311 context_uid, payload = self._base_call(name, *args, **kwargs)
311 context_uid, payload = self._base_call(name, *args, **kwargs)
312 payload['chunk_size'] = self.CHUNK_SIZE
312 payload['chunk_size'] = self.CHUNK_SIZE
313 url = self.stream_url
313 url = self.stream_url
314
314
315 start = time.time()
315 start = time.time()
316 cache_on, cache_key = self.get_local_cache(name, args)
316 cache_on, cache_key = self.get_local_cache(name, args)
317
317
318 # Cache is a problem because this is a stream
318 # Cache is a problem because this is a stream
319 def streaming_remote_call(_cache_key):
319 def streaming_remote_call(_cache_key):
320 if self._call_with_logging:
320 if self._call_with_logging:
321 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
321 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
322 url, name, args, context_uid, cache_on)
322 url, name, args, context_uid, cache_on)
323 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
323 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
324
324
325 result = streaming_remote_call(cache_key)
325 result = streaming_remote_call(cache_key)
326 if self._call_with_logging:
326 if self._call_with_logging:
327 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
327 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
328 url, name, time.time()-start, context_uid)
328 url, name, time.time()-start, context_uid)
329 return result
329 return result
330
330
331 def __getitem__(self, key):
331 def __getitem__(self, key):
332 return self.revision(key)
332 return self.revision(key)
333
333
334 def _create_vcs_cache_context(self, *args):
334 def _create_vcs_cache_context(self, *args):
335 """
335 """
336 Creates a unique string which is passed to the VCSServer on every
336 Creates a unique string which is passed to the VCSServer on every
337 remote call. It is used as cache key in the VCSServer.
337 remote call. It is used as cache key in the VCSServer.
338 """
338 """
339 hash_key = '-'.join(map(str, args))
339 hash_key = '-'.join(map(str, args))
340 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
340 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
341
341
342 def invalidate_vcs_cache(self):
342 def invalidate_vcs_cache(self):
343 """
343 """
344 This invalidates the context which is sent to the VCSServer on every
344 This invalidates the context which is sent to the VCSServer on every
345 call to a remote method. It forces the VCSServer to create a fresh
345 call to a remote method. It forces the VCSServer to create a fresh
346 repository instance on the next call to a remote method.
346 repository instance on the next call to a remote method.
347 """
347 """
348 self._wire['context'] = str(uuid.uuid4())
348 self._wire['context'] = str(uuid.uuid4())
349
349
350
350
351 class VcsHttpProxy(object):
351 class VcsHttpProxy(object):
352
352
353 CHUNK_SIZE = 16384
353 CHUNK_SIZE = 16384
354
354
355 def __init__(self, server_and_port, backend_endpoint):
355 def __init__(self, server_and_port, backend_endpoint):
356 retries = Retry(total=5, connect=None, read=None, redirect=None)
356 retries = Retry(total=5, connect=None, read=None, redirect=None)
357
357
358 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
358 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
359 self.base_url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
359 self.base_url = urllib.parse.urljoin('http://%s' % server_and_port, backend_endpoint)
360 self.session = requests.Session()
360 self.session = requests.Session()
361 self.session.mount('http://', adapter)
361 self.session.mount('http://', adapter)
362
362
363 def handle(self, environment, input_data, *args, **kwargs):
363 def handle(self, environment, input_data, *args, **kwargs):
364 data = {
364 data = {
365 'environment': environment,
365 'environment': environment,
366 'input_data': input_data,
366 'input_data': input_data,
367 'args': args,
367 'args': args,
368 'kwargs': kwargs
368 'kwargs': kwargs
369 }
369 }
370 result = self.session.post(
370 result = self.session.post(
371 self.base_url, msgpack.packb(data), stream=True)
371 self.base_url, msgpack.packb(data), stream=True)
372 return self._get_result(result)
372 return self._get_result(result)
373
373
374 def _deserialize_and_raise(self, error):
374 def _deserialize_and_raise(self, error):
375 exception = Exception(error['message'])
375 exception = Exception(error['message'])
376 try:
376 try:
377 exception._vcs_kind = error['_vcs_kind']
377 exception._vcs_kind = error['_vcs_kind']
378 except KeyError:
378 except KeyError:
379 pass
379 pass
380 raise exception
380 raise exception
381
381
382 def _iterate(self, result):
382 def _iterate(self, result):
383 unpacker = msgpack.Unpacker()
383 unpacker = msgpack.Unpacker()
384 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
384 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
385 unpacker.feed(line)
385 unpacker.feed(line)
386 for chunk in unpacker:
386 for chunk in unpacker:
387 yield chunk
387 yield chunk
388
388
389 def _get_result(self, result):
389 def _get_result(self, result):
390 iterator = self._iterate(result)
390 iterator = self._iterate(result)
391 error = next(iterator)
391 error = next(iterator)
392 if error:
392 if error:
393 self._deserialize_and_raise(error)
393 self._deserialize_and_raise(error)
394
394
395 status = next(iterator)
395 status = next(iterator)
396 headers = next(iterator)
396 headers = next(iterator)
397
397
398 return iterator, status, headers
398 return iterator, status, headers
399
399
400
400
401 class ThreadlocalSessionFactory(object):
401 class ThreadlocalSessionFactory(object):
402 """
402 """
403 Creates one CurlSession per thread on demand.
403 Creates one CurlSession per thread on demand.
404 """
404 """
405
405
406 def __init__(self):
406 def __init__(self):
407 self._thread_local = threading.local()
407 self._thread_local = threading.local()
408
408
409 def __call__(self):
409 def __call__(self):
410 if not hasattr(self._thread_local, 'curl_session'):
410 if not hasattr(self._thread_local, 'curl_session'):
411 self._thread_local.curl_session = CurlSession()
411 self._thread_local.curl_session = CurlSession()
412 return self._thread_local.curl_session
412 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now