##// END OF EJS Templates
caches: use also local cache branch as it used very much
super-admin -
r4797:2aad81ef default
parent child Browse files
Show More
@@ -1,394 +1,394 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2020 RhodeCode GmbH
3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Client for the VCSServer implemented based on HTTP.
22 Client for the VCSServer implemented based on HTTP.
23 """
23 """
24
24
25 import copy
25 import copy
26 import logging
26 import logging
27 import threading
27 import threading
28 import time
28 import time
29 import urllib2
29 import urllib2
30 import urlparse
30 import urlparse
31 import uuid
31 import uuid
32 import traceback
32 import traceback
33
33
34 import pycurl
34 import pycurl
35 import msgpack
35 import msgpack
36 import requests
36 import requests
37 from requests.packages.urllib3.util.retry import Retry
37 from requests.packages.urllib3.util.retry import Retry
38
38
39 import rhodecode
39 import rhodecode
40 from rhodecode.lib import rc_cache
40 from rhodecode.lib import rc_cache
41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
42 from rhodecode.lib.system_info import get_cert_path
42 from rhodecode.lib.system_info import get_cert_path
43 from rhodecode.lib.vcs import exceptions, CurlSession
43 from rhodecode.lib.vcs import exceptions, CurlSession
44 from rhodecode.lib.utils2 import str2bool
44 from rhodecode.lib.utils2 import str2bool
45
45
46 log = logging.getLogger(__name__)
46 log = logging.getLogger(__name__)
47
47
48
48
49 # TODO: mikhail: Keep it in sync with vcsserver's
49 # TODO: mikhail: Keep it in sync with vcsserver's
50 # HTTPApplication.ALLOWED_EXCEPTIONS
50 # HTTPApplication.ALLOWED_EXCEPTIONS
51 EXCEPTIONS_MAP = {
51 EXCEPTIONS_MAP = {
52 'KeyError': KeyError,
52 'KeyError': KeyError,
53 'URLError': urllib2.URLError,
53 'URLError': urllib2.URLError,
54 }
54 }
55
55
56
56
57 def _remote_call(url, payload, exceptions_map, session):
57 def _remote_call(url, payload, exceptions_map, session):
58 try:
58 try:
59 response = session.post(url, data=msgpack.packb(payload))
59 response = session.post(url, data=msgpack.packb(payload))
60 except pycurl.error as e:
60 except pycurl.error as e:
61 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
61 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
62 raise exceptions.HttpVCSCommunicationError(msg)
62 raise exceptions.HttpVCSCommunicationError(msg)
63 except Exception as e:
63 except Exception as e:
64 message = getattr(e, 'message', '')
64 message = getattr(e, 'message', '')
65 if 'Failed to connect' in message:
65 if 'Failed to connect' in message:
66 # gevent doesn't return proper pycurl errors
66 # gevent doesn't return proper pycurl errors
67 raise exceptions.HttpVCSCommunicationError(e)
67 raise exceptions.HttpVCSCommunicationError(e)
68 else:
68 else:
69 raise
69 raise
70
70
71 if response.status_code >= 400:
71 if response.status_code >= 400:
72 log.error('Call to %s returned non 200 HTTP code: %s',
72 log.error('Call to %s returned non 200 HTTP code: %s',
73 url, response.status_code)
73 url, response.status_code)
74 raise exceptions.HttpVCSCommunicationError(repr(response.content))
74 raise exceptions.HttpVCSCommunicationError(repr(response.content))
75
75
76 try:
76 try:
77 response = msgpack.unpackb(response.content)
77 response = msgpack.unpackb(response.content)
78 except Exception:
78 except Exception:
79 log.exception('Failed to decode response %r', response.content)
79 log.exception('Failed to decode response %r', response.content)
80 raise
80 raise
81
81
82 error = response.get('error')
82 error = response.get('error')
83 if error:
83 if error:
84 type_ = error.get('type', 'Exception')
84 type_ = error.get('type', 'Exception')
85 exc = exceptions_map.get(type_, Exception)
85 exc = exceptions_map.get(type_, Exception)
86 exc = exc(error.get('message'))
86 exc = exc(error.get('message'))
87 try:
87 try:
88 exc._vcs_kind = error['_vcs_kind']
88 exc._vcs_kind = error['_vcs_kind']
89 except KeyError:
89 except KeyError:
90 pass
90 pass
91
91
92 try:
92 try:
93 exc._vcs_server_traceback = error['traceback']
93 exc._vcs_server_traceback = error['traceback']
94 exc._vcs_server_org_exc_name = error['org_exc']
94 exc._vcs_server_org_exc_name = error['org_exc']
95 exc._vcs_server_org_exc_tb = error['org_exc_tb']
95 exc._vcs_server_org_exc_tb = error['org_exc_tb']
96 except KeyError:
96 except KeyError:
97 pass
97 pass
98
98
99 raise exc
99 raise exc
100 return response.get('result')
100 return response.get('result')
101
101
102
102
103 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
103 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
104 try:
104 try:
105 response = session.post(url, data=msgpack.packb(payload))
105 response = session.post(url, data=msgpack.packb(payload))
106 except pycurl.error as e:
106 except pycurl.error as e:
107 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
107 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
108 raise exceptions.HttpVCSCommunicationError(msg)
108 raise exceptions.HttpVCSCommunicationError(msg)
109 except Exception as e:
109 except Exception as e:
110 message = getattr(e, 'message', '')
110 message = getattr(e, 'message', '')
111 if 'Failed to connect' in message:
111 if 'Failed to connect' in message:
112 # gevent doesn't return proper pycurl errors
112 # gevent doesn't return proper pycurl errors
113 raise exceptions.HttpVCSCommunicationError(e)
113 raise exceptions.HttpVCSCommunicationError(e)
114 else:
114 else:
115 raise
115 raise
116
116
117 if response.status_code >= 400:
117 if response.status_code >= 400:
118 log.error('Call to %s returned non 200 HTTP code: %s',
118 log.error('Call to %s returned non 200 HTTP code: %s',
119 url, response.status_code)
119 url, response.status_code)
120 raise exceptions.HttpVCSCommunicationError(repr(response.content))
120 raise exceptions.HttpVCSCommunicationError(repr(response.content))
121
121
122 return response.iter_content(chunk_size=chunk_size)
122 return response.iter_content(chunk_size=chunk_size)
123
123
124
124
125 class ServiceConnection(object):
125 class ServiceConnection(object):
126 def __init__(self, server_and_port, backend_endpoint, session_factory):
126 def __init__(self, server_and_port, backend_endpoint, session_factory):
127 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
127 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
128 self._session_factory = session_factory
128 self._session_factory = session_factory
129
129
130 def __getattr__(self, name):
130 def __getattr__(self, name):
131 def f(*args, **kwargs):
131 def f(*args, **kwargs):
132 return self._call(name, *args, **kwargs)
132 return self._call(name, *args, **kwargs)
133 return f
133 return f
134
134
135 @exceptions.map_vcs_exceptions
135 @exceptions.map_vcs_exceptions
136 def _call(self, name, *args, **kwargs):
136 def _call(self, name, *args, **kwargs):
137 payload = {
137 payload = {
138 'id': str(uuid.uuid4()),
138 'id': str(uuid.uuid4()),
139 'method': name,
139 'method': name,
140 'params': {'args': args, 'kwargs': kwargs}
140 'params': {'args': args, 'kwargs': kwargs}
141 }
141 }
142 return _remote_call(
142 return _remote_call(
143 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
143 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
144
144
145
145
146 class RemoteVCSMaker(object):
146 class RemoteVCSMaker(object):
147
147
148 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
148 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
149 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
149 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
150 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
150 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
151
151
152 self._session_factory = session_factory
152 self._session_factory = session_factory
153 self.backend_type = backend_type
153 self.backend_type = backend_type
154
154
155 @classmethod
155 @classmethod
156 def init_cache_region(cls, repo_id):
156 def init_cache_region(cls, repo_id):
157 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
157 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
158 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
158 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
159 return region, cache_namespace_uid
159 return region, cache_namespace_uid
160
160
161 def __call__(self, path, repo_id, config, with_wire=None):
161 def __call__(self, path, repo_id, config, with_wire=None):
162 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
162 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
163 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
163 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
164
164
165 def __getattr__(self, name):
165 def __getattr__(self, name):
166 def remote_attr(*args, **kwargs):
166 def remote_attr(*args, **kwargs):
167 return self._call(name, *args, **kwargs)
167 return self._call(name, *args, **kwargs)
168 return remote_attr
168 return remote_attr
169
169
170 @exceptions.map_vcs_exceptions
170 @exceptions.map_vcs_exceptions
171 def _call(self, func_name, *args, **kwargs):
171 def _call(self, func_name, *args, **kwargs):
172 payload = {
172 payload = {
173 'id': str(uuid.uuid4()),
173 'id': str(uuid.uuid4()),
174 'method': func_name,
174 'method': func_name,
175 'backend': self.backend_type,
175 'backend': self.backend_type,
176 'params': {'args': args, 'kwargs': kwargs}
176 'params': {'args': args, 'kwargs': kwargs}
177 }
177 }
178 url = self.url
178 url = self.url
179 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
179 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
180
180
181
181
182 class RemoteRepo(object):
182 class RemoteRepo(object):
183 CHUNK_SIZE = 16384
183 CHUNK_SIZE = 16384
184
184
185 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
185 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
186 self.url = remote_maker.url
186 self.url = remote_maker.url
187 self.stream_url = remote_maker.stream_url
187 self.stream_url = remote_maker.stream_url
188 self._session = remote_maker._session_factory()
188 self._session = remote_maker._session_factory()
189
189
190 cache_repo_id = self._repo_id_sanitizer(repo_id)
190 cache_repo_id = self._repo_id_sanitizer(repo_id)
191 self._cache_region, self._cache_namespace = \
191 self._cache_region, self._cache_namespace = \
192 remote_maker.init_cache_region(cache_repo_id)
192 remote_maker.init_cache_region(cache_repo_id)
193
193
194 with_wire = with_wire or {}
194 with_wire = with_wire or {}
195
195
196 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
196 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
197 self._wire = {
197 self._wire = {
198 "path": path, # repo path
198 "path": path, # repo path
199 "repo_id": repo_id,
199 "repo_id": repo_id,
200 "cache_repo_id": cache_repo_id,
200 "cache_repo_id": cache_repo_id,
201 "config": config,
201 "config": config,
202 "repo_state_uid": repo_state_uid,
202 "repo_state_uid": repo_state_uid,
203 "context": self._create_vcs_cache_context(path, repo_state_uid)
203 "context": self._create_vcs_cache_context(path, repo_state_uid)
204 }
204 }
205
205
206 if with_wire:
206 if with_wire:
207 self._wire.update(with_wire)
207 self._wire.update(with_wire)
208
208
209 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
209 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
210 # log.debug brings a few percent gain even if is is not active.
210 # log.debug brings a few percent gain even if is is not active.
211 if log.isEnabledFor(logging.DEBUG):
211 if log.isEnabledFor(logging.DEBUG):
212 self._call_with_logging = True
212 self._call_with_logging = True
213
213
214 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
214 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
215
215
216 def _repo_id_sanitizer(self, repo_id):
216 def _repo_id_sanitizer(self, repo_id):
217 return repo_id.replace('/', '__').replace('-', '_')
217 return repo_id.replace('/', '__').replace('-', '_')
218
218
219 def __getattr__(self, name):
219 def __getattr__(self, name):
220
220
221 if name.startswith('stream:'):
221 if name.startswith('stream:'):
222 def repo_remote_attr(*args, **kwargs):
222 def repo_remote_attr(*args, **kwargs):
223 return self._call_stream(name, *args, **kwargs)
223 return self._call_stream(name, *args, **kwargs)
224 else:
224 else:
225 def repo_remote_attr(*args, **kwargs):
225 def repo_remote_attr(*args, **kwargs):
226 return self._call(name, *args, **kwargs)
226 return self._call(name, *args, **kwargs)
227
227
228 return repo_remote_attr
228 return repo_remote_attr
229
229
230 def _base_call(self, name, *args, **kwargs):
230 def _base_call(self, name, *args, **kwargs):
231 # TODO: oliver: This is currently necessary pre-call since the
231 # TODO: oliver: This is currently necessary pre-call since the
232 # config object is being changed for hooking scenarios
232 # config object is being changed for hooking scenarios
233 wire = copy.deepcopy(self._wire)
233 wire = copy.deepcopy(self._wire)
234 wire["config"] = wire["config"].serialize()
234 wire["config"] = wire["config"].serialize()
235 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
235 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
236
236
237 payload = {
237 payload = {
238 'id': str(uuid.uuid4()),
238 'id': str(uuid.uuid4()),
239 'method': name,
239 'method': name,
240 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
240 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
241 }
241 }
242
242
243 context_uid = wire.get('context')
243 context_uid = wire.get('context')
244 return context_uid, payload
244 return context_uid, payload
245
245
246 def get_local_cache(self, name, args):
246 def get_local_cache(self, name, args):
247 cache_on = False
247 cache_on = False
248 cache_key = ''
248 cache_key = ''
249 local_cache_on = str2bool(rhodecode.CONFIG.get('vcs.methods.cache'))
249 local_cache_on = str2bool(rhodecode.CONFIG.get('vcs.methods.cache'))
250
250
251 cache_methods = [
251 cache_methods = [
252 'branches', 'tags', 'bookmarks',
252 'branches', 'tags', 'bookmarks',
253 'is_large_file', 'is_binary',
253 'is_large_file', 'is_binary',
254 'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
254 'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
255 'node_history',
255 'node_history',
256 'revision', 'tree_items',
256 'revision', 'tree_items',
257 'ctx_list',
257 'ctx_list', 'ctx_branch',
258 'bulk_request',
258 'bulk_request',
259 ]
259 ]
260
260
261 if local_cache_on and name in cache_methods:
261 if local_cache_on and name in cache_methods:
262 cache_on = True
262 cache_on = True
263 repo_state_uid = self._wire['repo_state_uid']
263 repo_state_uid = self._wire['repo_state_uid']
264 call_args = [a for a in args]
264 call_args = [a for a in args]
265 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
265 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
266
266
267 return cache_on, cache_key
267 return cache_on, cache_key
268
268
269 @exceptions.map_vcs_exceptions
269 @exceptions.map_vcs_exceptions
270 def _call(self, name, *args, **kwargs):
270 def _call(self, name, *args, **kwargs):
271 context_uid, payload = self._base_call(name, *args, **kwargs)
271 context_uid, payload = self._base_call(name, *args, **kwargs)
272 url = self.url
272 url = self.url
273
273
274 start = time.time()
274 start = time.time()
275 cache_on, cache_key = self.get_local_cache(name, args)
275 cache_on, cache_key = self.get_local_cache(name, args)
276
276
277 @self._cache_region.conditional_cache_on_arguments(
277 @self._cache_region.conditional_cache_on_arguments(
278 namespace=self._cache_namespace, condition=cache_on and cache_key)
278 namespace=self._cache_namespace, condition=cache_on and cache_key)
279 def remote_call(_cache_key):
279 def remote_call(_cache_key):
280 if self._call_with_logging:
280 if self._call_with_logging:
281 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
281 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
282 url, name, args, context_uid, cache_on)
282 url, name, args, context_uid, cache_on)
283 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
283 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
284
284
285 result = remote_call(cache_key)
285 result = remote_call(cache_key)
286 if self._call_with_logging:
286 if self._call_with_logging:
287 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
287 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
288 url, name, time.time()-start, context_uid)
288 url, name, time.time()-start, context_uid)
289 return result
289 return result
290
290
291 @exceptions.map_vcs_exceptions
291 @exceptions.map_vcs_exceptions
292 def _call_stream(self, name, *args, **kwargs):
292 def _call_stream(self, name, *args, **kwargs):
293 context_uid, payload = self._base_call(name, *args, **kwargs)
293 context_uid, payload = self._base_call(name, *args, **kwargs)
294 payload['chunk_size'] = self.CHUNK_SIZE
294 payload['chunk_size'] = self.CHUNK_SIZE
295 url = self.stream_url
295 url = self.stream_url
296
296
297 start = time.time()
297 start = time.time()
298 cache_on, cache_key = self.get_local_cache(name, args)
298 cache_on, cache_key = self.get_local_cache(name, args)
299
299
300 # Cache is a problem because this is a stream
300 # Cache is a problem because this is a stream
301 def streaming_remote_call(_cache_key):
301 def streaming_remote_call(_cache_key):
302 if self._call_with_logging:
302 if self._call_with_logging:
303 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
303 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
304 url, name, args, context_uid, cache_on)
304 url, name, args, context_uid, cache_on)
305 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
305 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
306
306
307 result = streaming_remote_call(cache_key)
307 result = streaming_remote_call(cache_key)
308 if self._call_with_logging:
308 if self._call_with_logging:
309 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
309 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
310 url, name, time.time()-start, context_uid)
310 url, name, time.time()-start, context_uid)
311 return result
311 return result
312
312
313 def __getitem__(self, key):
313 def __getitem__(self, key):
314 return self.revision(key)
314 return self.revision(key)
315
315
316 def _create_vcs_cache_context(self, *args):
316 def _create_vcs_cache_context(self, *args):
317 """
317 """
318 Creates a unique string which is passed to the VCSServer on every
318 Creates a unique string which is passed to the VCSServer on every
319 remote call. It is used as cache key in the VCSServer.
319 remote call. It is used as cache key in the VCSServer.
320 """
320 """
321 hash_key = '-'.join(map(str, args))
321 hash_key = '-'.join(map(str, args))
322 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
322 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
323
323
324 def invalidate_vcs_cache(self):
324 def invalidate_vcs_cache(self):
325 """
325 """
326 This invalidates the context which is sent to the VCSServer on every
326 This invalidates the context which is sent to the VCSServer on every
327 call to a remote method. It forces the VCSServer to create a fresh
327 call to a remote method. It forces the VCSServer to create a fresh
328 repository instance on the next call to a remote method.
328 repository instance on the next call to a remote method.
329 """
329 """
330 self._wire['context'] = str(uuid.uuid4())
330 self._wire['context'] = str(uuid.uuid4())
331
331
332
332
333 class VcsHttpProxy(object):
333 class VcsHttpProxy(object):
334
334
335 CHUNK_SIZE = 16384
335 CHUNK_SIZE = 16384
336
336
337 def __init__(self, server_and_port, backend_endpoint):
337 def __init__(self, server_and_port, backend_endpoint):
338 retries = Retry(total=5, connect=None, read=None, redirect=None)
338 retries = Retry(total=5, connect=None, read=None, redirect=None)
339
339
340 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
340 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
341 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
341 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
342 self.session = requests.Session()
342 self.session = requests.Session()
343 self.session.mount('http://', adapter)
343 self.session.mount('http://', adapter)
344
344
345 def handle(self, environment, input_data, *args, **kwargs):
345 def handle(self, environment, input_data, *args, **kwargs):
346 data = {
346 data = {
347 'environment': environment,
347 'environment': environment,
348 'input_data': input_data,
348 'input_data': input_data,
349 'args': args,
349 'args': args,
350 'kwargs': kwargs
350 'kwargs': kwargs
351 }
351 }
352 result = self.session.post(
352 result = self.session.post(
353 self.base_url, msgpack.packb(data), stream=True)
353 self.base_url, msgpack.packb(data), stream=True)
354 return self._get_result(result)
354 return self._get_result(result)
355
355
356 def _deserialize_and_raise(self, error):
356 def _deserialize_and_raise(self, error):
357 exception = Exception(error['message'])
357 exception = Exception(error['message'])
358 try:
358 try:
359 exception._vcs_kind = error['_vcs_kind']
359 exception._vcs_kind = error['_vcs_kind']
360 except KeyError:
360 except KeyError:
361 pass
361 pass
362 raise exception
362 raise exception
363
363
364 def _iterate(self, result):
364 def _iterate(self, result):
365 unpacker = msgpack.Unpacker()
365 unpacker = msgpack.Unpacker()
366 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
366 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
367 unpacker.feed(line)
367 unpacker.feed(line)
368 for chunk in unpacker:
368 for chunk in unpacker:
369 yield chunk
369 yield chunk
370
370
371 def _get_result(self, result):
371 def _get_result(self, result):
372 iterator = self._iterate(result)
372 iterator = self._iterate(result)
373 error = iterator.next()
373 error = iterator.next()
374 if error:
374 if error:
375 self._deserialize_and_raise(error)
375 self._deserialize_and_raise(error)
376
376
377 status = iterator.next()
377 status = iterator.next()
378 headers = iterator.next()
378 headers = iterator.next()
379
379
380 return iterator, status, headers
380 return iterator, status, headers
381
381
382
382
383 class ThreadlocalSessionFactory(object):
383 class ThreadlocalSessionFactory(object):
384 """
384 """
385 Creates one CurlSession per thread on demand.
385 Creates one CurlSession per thread on demand.
386 """
386 """
387
387
388 def __init__(self):
388 def __init__(self):
389 self._thread_local = threading.local()
389 self._thread_local = threading.local()
390
390
391 def __call__(self):
391 def __call__(self):
392 if not hasattr(self._thread_local, 'curl_session'):
392 if not hasattr(self._thread_local, 'curl_session'):
393 self._thread_local.curl_session = CurlSession()
393 self._thread_local.curl_session = CurlSession()
394 return self._thread_local.curl_session
394 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now