##// END OF EJS Templates
vcs: added headers into stream call too
super-admin -
r4888:279ce63e default
parent child Browse files
Show More
@@ -1,408 +1,412 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2020 RhodeCode GmbH
3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Client for the VCSServer implemented based on HTTP.
22 Client for the VCSServer implemented based on HTTP.
23 """
23 """
24
24
25 import copy
25 import copy
26 import logging
26 import logging
27 import threading
27 import threading
28 import time
28 import time
29 import urllib2
29 import urllib2
30 import urlparse
30 import urlparse
31 import uuid
31 import uuid
32 import traceback
32 import traceback
33
33
34 import pycurl
34 import pycurl
35 import msgpack
35 import msgpack
36 import requests
36 import requests
37 from requests.packages.urllib3.util.retry import Retry
37 from requests.packages.urllib3.util.retry import Retry
38
38
39 import rhodecode
39 import rhodecode
40 from rhodecode.lib import rc_cache
40 from rhodecode.lib import rc_cache
41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
42 from rhodecode.lib.system_info import get_cert_path
42 from rhodecode.lib.system_info import get_cert_path
43 from rhodecode.lib.vcs import exceptions, CurlSession
43 from rhodecode.lib.vcs import exceptions, CurlSession
44 from rhodecode.lib.utils2 import str2bool
44 from rhodecode.lib.utils2 import str2bool
45
45
46 log = logging.getLogger(__name__)
46 log = logging.getLogger(__name__)
47
47
48
48
49 # TODO: mikhail: Keep it in sync with vcsserver's
49 # TODO: mikhail: Keep it in sync with vcsserver's
50 # HTTPApplication.ALLOWED_EXCEPTIONS
50 # HTTPApplication.ALLOWED_EXCEPTIONS
51 EXCEPTIONS_MAP = {
51 EXCEPTIONS_MAP = {
52 'KeyError': KeyError,
52 'KeyError': KeyError,
53 'URLError': urllib2.URLError,
53 'URLError': urllib2.URLError,
54 }
54 }
55
55
56
56
57 def _remote_call(url, payload, exceptions_map, session):
57 def _remote_call(url, payload, exceptions_map, session):
58 try:
58 try:
59 headers = {
59 headers = {
60 'X-RC-Method': payload.get('method'),
60 'X-RC-Method': payload.get('method'),
61 'X-RC-Repo-Name': payload.get('_repo_name')
61 'X-RC-Repo-Name': payload.get('_repo_name')
62 }
62 }
63 response = session.post(url, data=msgpack.packb(payload), headers=headers)
63 response = session.post(url, data=msgpack.packb(payload), headers=headers)
64 except pycurl.error as e:
64 except pycurl.error as e:
65 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
65 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
66 raise exceptions.HttpVCSCommunicationError(msg)
66 raise exceptions.HttpVCSCommunicationError(msg)
67 except Exception as e:
67 except Exception as e:
68 message = getattr(e, 'message', '')
68 message = getattr(e, 'message', '')
69 if 'Failed to connect' in message:
69 if 'Failed to connect' in message:
70 # gevent doesn't return proper pycurl errors
70 # gevent doesn't return proper pycurl errors
71 raise exceptions.HttpVCSCommunicationError(e)
71 raise exceptions.HttpVCSCommunicationError(e)
72 else:
72 else:
73 raise
73 raise
74
74
75 if response.status_code >= 400:
75 if response.status_code >= 400:
76 log.error('Call to %s returned non 200 HTTP code: %s',
76 log.error('Call to %s returned non 200 HTTP code: %s',
77 url, response.status_code)
77 url, response.status_code)
78 raise exceptions.HttpVCSCommunicationError(repr(response.content))
78 raise exceptions.HttpVCSCommunicationError(repr(response.content))
79
79
80 try:
80 try:
81 response = msgpack.unpackb(response.content)
81 response = msgpack.unpackb(response.content)
82 except Exception:
82 except Exception:
83 log.exception('Failed to decode response %r', response.content)
83 log.exception('Failed to decode response %r', response.content)
84 raise
84 raise
85
85
86 error = response.get('error')
86 error = response.get('error')
87 if error:
87 if error:
88 type_ = error.get('type', 'Exception')
88 type_ = error.get('type', 'Exception')
89 exc = exceptions_map.get(type_, Exception)
89 exc = exceptions_map.get(type_, Exception)
90 exc = exc(error.get('message'))
90 exc = exc(error.get('message'))
91 try:
91 try:
92 exc._vcs_kind = error['_vcs_kind']
92 exc._vcs_kind = error['_vcs_kind']
93 except KeyError:
93 except KeyError:
94 pass
94 pass
95
95
96 try:
96 try:
97 exc._vcs_server_traceback = error['traceback']
97 exc._vcs_server_traceback = error['traceback']
98 exc._vcs_server_org_exc_name = error['org_exc']
98 exc._vcs_server_org_exc_name = error['org_exc']
99 exc._vcs_server_org_exc_tb = error['org_exc_tb']
99 exc._vcs_server_org_exc_tb = error['org_exc_tb']
100 except KeyError:
100 except KeyError:
101 pass
101 pass
102
102
103 raise exc
103 raise exc
104 return response.get('result')
104 return response.get('result')
105
105
106
106
107 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
107 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
108 try:
108 try:
109 response = session.post(url, data=msgpack.packb(payload))
109 headers = {
110 'X-RC-Method': payload.get('method'),
111 'X-RC-Repo-Name': payload.get('_repo_name')
112 }
113 response = session.post(url, data=msgpack.packb(payload), headers=headers)
110 except pycurl.error as e:
114 except pycurl.error as e:
111 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
115 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
112 raise exceptions.HttpVCSCommunicationError(msg)
116 raise exceptions.HttpVCSCommunicationError(msg)
113 except Exception as e:
117 except Exception as e:
114 message = getattr(e, 'message', '')
118 message = getattr(e, 'message', '')
115 if 'Failed to connect' in message:
119 if 'Failed to connect' in message:
116 # gevent doesn't return proper pycurl errors
120 # gevent doesn't return proper pycurl errors
117 raise exceptions.HttpVCSCommunicationError(e)
121 raise exceptions.HttpVCSCommunicationError(e)
118 else:
122 else:
119 raise
123 raise
120
124
121 if response.status_code >= 400:
125 if response.status_code >= 400:
122 log.error('Call to %s returned non 200 HTTP code: %s',
126 log.error('Call to %s returned non 200 HTTP code: %s',
123 url, response.status_code)
127 url, response.status_code)
124 raise exceptions.HttpVCSCommunicationError(repr(response.content))
128 raise exceptions.HttpVCSCommunicationError(repr(response.content))
125
129
126 return response.iter_content(chunk_size=chunk_size)
130 return response.iter_content(chunk_size=chunk_size)
127
131
128
132
129 class ServiceConnection(object):
133 class ServiceConnection(object):
130 def __init__(self, server_and_port, backend_endpoint, session_factory):
134 def __init__(self, server_and_port, backend_endpoint, session_factory):
131 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
135 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
132 self._session_factory = session_factory
136 self._session_factory = session_factory
133
137
134 def __getattr__(self, name):
138 def __getattr__(self, name):
135 def f(*args, **kwargs):
139 def f(*args, **kwargs):
136 return self._call(name, *args, **kwargs)
140 return self._call(name, *args, **kwargs)
137 return f
141 return f
138
142
139 @exceptions.map_vcs_exceptions
143 @exceptions.map_vcs_exceptions
140 def _call(self, name, *args, **kwargs):
144 def _call(self, name, *args, **kwargs):
141 payload = {
145 payload = {
142 'id': str(uuid.uuid4()),
146 'id': str(uuid.uuid4()),
143 'method': name,
147 'method': name,
144 'params': {'args': args, 'kwargs': kwargs}
148 'params': {'args': args, 'kwargs': kwargs}
145 }
149 }
146 return _remote_call(
150 return _remote_call(
147 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
151 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
148
152
149
153
150 class RemoteVCSMaker(object):
154 class RemoteVCSMaker(object):
151
155
152 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
156 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
153 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
157 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
154 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
158 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
155
159
156 self._session_factory = session_factory
160 self._session_factory = session_factory
157 self.backend_type = backend_type
161 self.backend_type = backend_type
158
162
159 @classmethod
163 @classmethod
160 def init_cache_region(cls, repo_id):
164 def init_cache_region(cls, repo_id):
161 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
165 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
162 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
166 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
163 return region, cache_namespace_uid
167 return region, cache_namespace_uid
164
168
165 def __call__(self, path, repo_id, config, with_wire=None):
169 def __call__(self, path, repo_id, config, with_wire=None):
166 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
170 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
167 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
171 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
168
172
169 def __getattr__(self, name):
173 def __getattr__(self, name):
170 def remote_attr(*args, **kwargs):
174 def remote_attr(*args, **kwargs):
171 return self._call(name, *args, **kwargs)
175 return self._call(name, *args, **kwargs)
172 return remote_attr
176 return remote_attr
173
177
174 @exceptions.map_vcs_exceptions
178 @exceptions.map_vcs_exceptions
175 def _call(self, func_name, *args, **kwargs):
179 def _call(self, func_name, *args, **kwargs):
176 payload = {
180 payload = {
177 'id': str(uuid.uuid4()),
181 'id': str(uuid.uuid4()),
178 'method': func_name,
182 'method': func_name,
179 'backend': self.backend_type,
183 'backend': self.backend_type,
180 'params': {'args': args, 'kwargs': kwargs}
184 'params': {'args': args, 'kwargs': kwargs}
181 }
185 }
182 url = self.url
186 url = self.url
183 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
187 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
184
188
185
189
186 class RemoteRepo(object):
190 class RemoteRepo(object):
187 CHUNK_SIZE = 16384
191 CHUNK_SIZE = 16384
188
192
189 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
193 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
190 self.url = remote_maker.url
194 self.url = remote_maker.url
191 self.stream_url = remote_maker.stream_url
195 self.stream_url = remote_maker.stream_url
192 self._session = remote_maker._session_factory()
196 self._session = remote_maker._session_factory()
193
197
194 cache_repo_id = self._repo_id_sanitizer(repo_id)
198 cache_repo_id = self._repo_id_sanitizer(repo_id)
195 _repo_name = self._get_repo_name(config, path)
199 _repo_name = self._get_repo_name(config, path)
196 self._cache_region, self._cache_namespace = \
200 self._cache_region, self._cache_namespace = \
197 remote_maker.init_cache_region(cache_repo_id)
201 remote_maker.init_cache_region(cache_repo_id)
198
202
199 with_wire = with_wire or {}
203 with_wire = with_wire or {}
200
204
201 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
205 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
202
206
203 self._wire = {
207 self._wire = {
204 "_repo_name": _repo_name,
208 "_repo_name": _repo_name,
205 "path": path, # repo path
209 "path": path, # repo path
206 "repo_id": repo_id,
210 "repo_id": repo_id,
207 "cache_repo_id": cache_repo_id,
211 "cache_repo_id": cache_repo_id,
208 "config": config,
212 "config": config,
209 "repo_state_uid": repo_state_uid,
213 "repo_state_uid": repo_state_uid,
210 "context": self._create_vcs_cache_context(path, repo_state_uid)
214 "context": self._create_vcs_cache_context(path, repo_state_uid)
211 }
215 }
212
216
213 if with_wire:
217 if with_wire:
214 self._wire.update(with_wire)
218 self._wire.update(with_wire)
215
219
216 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
220 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
217 # log.debug brings a few percent gain even if is is not active.
221 # log.debug brings a few percent gain even if is is not active.
218 if log.isEnabledFor(logging.DEBUG):
222 if log.isEnabledFor(logging.DEBUG):
219 self._call_with_logging = True
223 self._call_with_logging = True
220
224
221 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
225 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
222
226
223 def _get_repo_name(self, config, path):
227 def _get_repo_name(self, config, path):
224 repo_store = config.get('paths', '/')
228 repo_store = config.get('paths', '/')
225 return path.split(repo_store)[-1].lstrip('/')
229 return path.split(repo_store)[-1].lstrip('/')
226
230
227 def _repo_id_sanitizer(self, repo_id):
231 def _repo_id_sanitizer(self, repo_id):
228 pathless = repo_id.replace('/', '__').replace('-', '_')
232 pathless = repo_id.replace('/', '__').replace('-', '_')
229 return ''.join(char if ord(char) < 128 else '_{}_'.format(ord(char)) for char in pathless)
233 return ''.join(char if ord(char) < 128 else '_{}_'.format(ord(char)) for char in pathless)
230
234
231 def __getattr__(self, name):
235 def __getattr__(self, name):
232
236
233 if name.startswith('stream:'):
237 if name.startswith('stream:'):
234 def repo_remote_attr(*args, **kwargs):
238 def repo_remote_attr(*args, **kwargs):
235 return self._call_stream(name, *args, **kwargs)
239 return self._call_stream(name, *args, **kwargs)
236 else:
240 else:
237 def repo_remote_attr(*args, **kwargs):
241 def repo_remote_attr(*args, **kwargs):
238 return self._call(name, *args, **kwargs)
242 return self._call(name, *args, **kwargs)
239
243
240 return repo_remote_attr
244 return repo_remote_attr
241
245
242 def _base_call(self, name, *args, **kwargs):
246 def _base_call(self, name, *args, **kwargs):
243 # TODO: oliver: This is currently necessary pre-call since the
247 # TODO: oliver: This is currently necessary pre-call since the
244 # config object is being changed for hooking scenarios
248 # config object is being changed for hooking scenarios
245 wire = copy.deepcopy(self._wire)
249 wire = copy.deepcopy(self._wire)
246 wire["config"] = wire["config"].serialize()
250 wire["config"] = wire["config"].serialize()
247 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
251 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
248
252
249 payload = {
253 payload = {
250 'id': str(uuid.uuid4()),
254 'id': str(uuid.uuid4()),
251 'method': name,
255 'method': name,
252 "_repo_name": wire['_repo_name'],
256 "_repo_name": wire['_repo_name'],
253 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
257 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
254 }
258 }
255
259
256 context_uid = wire.get('context')
260 context_uid = wire.get('context')
257 return context_uid, payload
261 return context_uid, payload
258
262
259 def get_local_cache(self, name, args):
263 def get_local_cache(self, name, args):
260 cache_on = False
264 cache_on = False
261 cache_key = ''
265 cache_key = ''
262 local_cache_on = str2bool(rhodecode.CONFIG.get('vcs.methods.cache'))
266 local_cache_on = str2bool(rhodecode.CONFIG.get('vcs.methods.cache'))
263
267
264 cache_methods = [
268 cache_methods = [
265 'branches', 'tags', 'bookmarks',
269 'branches', 'tags', 'bookmarks',
266 'is_large_file', 'is_binary',
270 'is_large_file', 'is_binary',
267 'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
271 'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
268 'node_history',
272 'node_history',
269 'revision', 'tree_items',
273 'revision', 'tree_items',
270 'ctx_list', 'ctx_branch', 'ctx_description',
274 'ctx_list', 'ctx_branch', 'ctx_description',
271 'bulk_request',
275 'bulk_request',
272 'assert_correct_path'
276 'assert_correct_path'
273 ]
277 ]
274
278
275 if local_cache_on and name in cache_methods:
279 if local_cache_on and name in cache_methods:
276 cache_on = True
280 cache_on = True
277 repo_state_uid = self._wire['repo_state_uid']
281 repo_state_uid = self._wire['repo_state_uid']
278 call_args = [a for a in args]
282 call_args = [a for a in args]
279 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
283 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
280
284
281 return cache_on, cache_key
285 return cache_on, cache_key
282
286
283 @exceptions.map_vcs_exceptions
287 @exceptions.map_vcs_exceptions
284 def _call(self, name, *args, **kwargs):
288 def _call(self, name, *args, **kwargs):
285 context_uid, payload = self._base_call(name, *args, **kwargs)
289 context_uid, payload = self._base_call(name, *args, **kwargs)
286 url = self.url
290 url = self.url
287
291
288 start = time.time()
292 start = time.time()
289 cache_on, cache_key = self.get_local_cache(name, args)
293 cache_on, cache_key = self.get_local_cache(name, args)
290
294
291 @self._cache_region.conditional_cache_on_arguments(
295 @self._cache_region.conditional_cache_on_arguments(
292 namespace=self._cache_namespace, condition=cache_on and cache_key)
296 namespace=self._cache_namespace, condition=cache_on and cache_key)
293 def remote_call(_cache_key):
297 def remote_call(_cache_key):
294 if self._call_with_logging:
298 if self._call_with_logging:
295 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
299 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
296 url, name, args, context_uid, cache_on)
300 url, name, args, context_uid, cache_on)
297 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
301 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
298
302
299 result = remote_call(cache_key)
303 result = remote_call(cache_key)
300 if self._call_with_logging:
304 if self._call_with_logging:
301 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
305 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
302 url, name, time.time()-start, context_uid)
306 url, name, time.time()-start, context_uid)
303 return result
307 return result
304
308
305 @exceptions.map_vcs_exceptions
309 @exceptions.map_vcs_exceptions
306 def _call_stream(self, name, *args, **kwargs):
310 def _call_stream(self, name, *args, **kwargs):
307 context_uid, payload = self._base_call(name, *args, **kwargs)
311 context_uid, payload = self._base_call(name, *args, **kwargs)
308 payload['chunk_size'] = self.CHUNK_SIZE
312 payload['chunk_size'] = self.CHUNK_SIZE
309 url = self.stream_url
313 url = self.stream_url
310
314
311 start = time.time()
315 start = time.time()
312 cache_on, cache_key = self.get_local_cache(name, args)
316 cache_on, cache_key = self.get_local_cache(name, args)
313
317
314 # Cache is a problem because this is a stream
318 # Cache is a problem because this is a stream
315 def streaming_remote_call(_cache_key):
319 def streaming_remote_call(_cache_key):
316 if self._call_with_logging:
320 if self._call_with_logging:
317 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
321 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
318 url, name, args, context_uid, cache_on)
322 url, name, args, context_uid, cache_on)
319 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
323 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
320
324
321 result = streaming_remote_call(cache_key)
325 result = streaming_remote_call(cache_key)
322 if self._call_with_logging:
326 if self._call_with_logging:
323 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
327 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
324 url, name, time.time()-start, context_uid)
328 url, name, time.time()-start, context_uid)
325 return result
329 return result
326
330
327 def __getitem__(self, key):
331 def __getitem__(self, key):
328 return self.revision(key)
332 return self.revision(key)
329
333
330 def _create_vcs_cache_context(self, *args):
334 def _create_vcs_cache_context(self, *args):
331 """
335 """
332 Creates a unique string which is passed to the VCSServer on every
336 Creates a unique string which is passed to the VCSServer on every
333 remote call. It is used as cache key in the VCSServer.
337 remote call. It is used as cache key in the VCSServer.
334 """
338 """
335 hash_key = '-'.join(map(str, args))
339 hash_key = '-'.join(map(str, args))
336 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
340 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
337
341
338 def invalidate_vcs_cache(self):
342 def invalidate_vcs_cache(self):
339 """
343 """
340 This invalidates the context which is sent to the VCSServer on every
344 This invalidates the context which is sent to the VCSServer on every
341 call to a remote method. It forces the VCSServer to create a fresh
345 call to a remote method. It forces the VCSServer to create a fresh
342 repository instance on the next call to a remote method.
346 repository instance on the next call to a remote method.
343 """
347 """
344 self._wire['context'] = str(uuid.uuid4())
348 self._wire['context'] = str(uuid.uuid4())
345
349
346
350
347 class VcsHttpProxy(object):
351 class VcsHttpProxy(object):
348
352
349 CHUNK_SIZE = 16384
353 CHUNK_SIZE = 16384
350
354
351 def __init__(self, server_and_port, backend_endpoint):
355 def __init__(self, server_and_port, backend_endpoint):
352 retries = Retry(total=5, connect=None, read=None, redirect=None)
356 retries = Retry(total=5, connect=None, read=None, redirect=None)
353
357
354 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
358 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
355 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
359 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
356 self.session = requests.Session()
360 self.session = requests.Session()
357 self.session.mount('http://', adapter)
361 self.session.mount('http://', adapter)
358
362
359 def handle(self, environment, input_data, *args, **kwargs):
363 def handle(self, environment, input_data, *args, **kwargs):
360 data = {
364 data = {
361 'environment': environment,
365 'environment': environment,
362 'input_data': input_data,
366 'input_data': input_data,
363 'args': args,
367 'args': args,
364 'kwargs': kwargs
368 'kwargs': kwargs
365 }
369 }
366 result = self.session.post(
370 result = self.session.post(
367 self.base_url, msgpack.packb(data), stream=True)
371 self.base_url, msgpack.packb(data), stream=True)
368 return self._get_result(result)
372 return self._get_result(result)
369
373
370 def _deserialize_and_raise(self, error):
374 def _deserialize_and_raise(self, error):
371 exception = Exception(error['message'])
375 exception = Exception(error['message'])
372 try:
376 try:
373 exception._vcs_kind = error['_vcs_kind']
377 exception._vcs_kind = error['_vcs_kind']
374 except KeyError:
378 except KeyError:
375 pass
379 pass
376 raise exception
380 raise exception
377
381
378 def _iterate(self, result):
382 def _iterate(self, result):
379 unpacker = msgpack.Unpacker()
383 unpacker = msgpack.Unpacker()
380 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
384 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
381 unpacker.feed(line)
385 unpacker.feed(line)
382 for chunk in unpacker:
386 for chunk in unpacker:
383 yield chunk
387 yield chunk
384
388
385 def _get_result(self, result):
389 def _get_result(self, result):
386 iterator = self._iterate(result)
390 iterator = self._iterate(result)
387 error = iterator.next()
391 error = iterator.next()
388 if error:
392 if error:
389 self._deserialize_and_raise(error)
393 self._deserialize_and_raise(error)
390
394
391 status = iterator.next()
395 status = iterator.next()
392 headers = iterator.next()
396 headers = iterator.next()
393
397
394 return iterator, status, headers
398 return iterator, status, headers
395
399
396
400
397 class ThreadlocalSessionFactory(object):
401 class ThreadlocalSessionFactory(object):
398 """
402 """
399 Creates one CurlSession per thread on demand.
403 Creates one CurlSession per thread on demand.
400 """
404 """
401
405
402 def __init__(self):
406 def __init__(self):
403 self._thread_local = threading.local()
407 self._thread_local = threading.local()
404
408
405 def __call__(self):
409 def __call__(self):
406 if not hasattr(self._thread_local, 'curl_session'):
410 if not hasattr(self._thread_local, 'curl_session'):
407 self._thread_local.curl_session = CurlSession()
411 self._thread_local.curl_session = CurlSession()
408 return self._thread_local.curl_session
412 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now