##// END OF EJS Templates
caches: added path check to cached methods for faster git repo checks
super-admin -
r4828:252be6a7 default
parent child Browse files
Show More
@@ -1,395 +1,396 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2020 RhodeCode GmbH
3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Client for the VCSServer implemented based on HTTP.
22 Client for the VCSServer implemented based on HTTP.
23 """
23 """
24
24
25 import copy
25 import copy
26 import logging
26 import logging
27 import threading
27 import threading
28 import time
28 import time
29 import urllib2
29 import urllib2
30 import urlparse
30 import urlparse
31 import uuid
31 import uuid
32 import traceback
32 import traceback
33
33
34 import pycurl
34 import pycurl
35 import msgpack
35 import msgpack
36 import requests
36 import requests
37 from requests.packages.urllib3.util.retry import Retry
37 from requests.packages.urllib3.util.retry import Retry
38
38
39 import rhodecode
39 import rhodecode
40 from rhodecode.lib import rc_cache
40 from rhodecode.lib import rc_cache
41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
42 from rhodecode.lib.system_info import get_cert_path
42 from rhodecode.lib.system_info import get_cert_path
43 from rhodecode.lib.vcs import exceptions, CurlSession
43 from rhodecode.lib.vcs import exceptions, CurlSession
44 from rhodecode.lib.utils2 import str2bool
44 from rhodecode.lib.utils2 import str2bool
45
45
46 log = logging.getLogger(__name__)
46 log = logging.getLogger(__name__)
47
47
48
48
49 # TODO: mikhail: Keep it in sync with vcsserver's
49 # TODO: mikhail: Keep it in sync with vcsserver's
50 # HTTPApplication.ALLOWED_EXCEPTIONS
50 # HTTPApplication.ALLOWED_EXCEPTIONS
51 EXCEPTIONS_MAP = {
51 EXCEPTIONS_MAP = {
52 'KeyError': KeyError,
52 'KeyError': KeyError,
53 'URLError': urllib2.URLError,
53 'URLError': urllib2.URLError,
54 }
54 }
55
55
56
56
57 def _remote_call(url, payload, exceptions_map, session):
57 def _remote_call(url, payload, exceptions_map, session):
58 try:
58 try:
59 response = session.post(url, data=msgpack.packb(payload))
59 response = session.post(url, data=msgpack.packb(payload))
60 except pycurl.error as e:
60 except pycurl.error as e:
61 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
61 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
62 raise exceptions.HttpVCSCommunicationError(msg)
62 raise exceptions.HttpVCSCommunicationError(msg)
63 except Exception as e:
63 except Exception as e:
64 message = getattr(e, 'message', '')
64 message = getattr(e, 'message', '')
65 if 'Failed to connect' in message:
65 if 'Failed to connect' in message:
66 # gevent doesn't return proper pycurl errors
66 # gevent doesn't return proper pycurl errors
67 raise exceptions.HttpVCSCommunicationError(e)
67 raise exceptions.HttpVCSCommunicationError(e)
68 else:
68 else:
69 raise
69 raise
70
70
71 if response.status_code >= 400:
71 if response.status_code >= 400:
72 log.error('Call to %s returned non 200 HTTP code: %s',
72 log.error('Call to %s returned non 200 HTTP code: %s',
73 url, response.status_code)
73 url, response.status_code)
74 raise exceptions.HttpVCSCommunicationError(repr(response.content))
74 raise exceptions.HttpVCSCommunicationError(repr(response.content))
75
75
76 try:
76 try:
77 response = msgpack.unpackb(response.content)
77 response = msgpack.unpackb(response.content)
78 except Exception:
78 except Exception:
79 log.exception('Failed to decode response %r', response.content)
79 log.exception('Failed to decode response %r', response.content)
80 raise
80 raise
81
81
82 error = response.get('error')
82 error = response.get('error')
83 if error:
83 if error:
84 type_ = error.get('type', 'Exception')
84 type_ = error.get('type', 'Exception')
85 exc = exceptions_map.get(type_, Exception)
85 exc = exceptions_map.get(type_, Exception)
86 exc = exc(error.get('message'))
86 exc = exc(error.get('message'))
87 try:
87 try:
88 exc._vcs_kind = error['_vcs_kind']
88 exc._vcs_kind = error['_vcs_kind']
89 except KeyError:
89 except KeyError:
90 pass
90 pass
91
91
92 try:
92 try:
93 exc._vcs_server_traceback = error['traceback']
93 exc._vcs_server_traceback = error['traceback']
94 exc._vcs_server_org_exc_name = error['org_exc']
94 exc._vcs_server_org_exc_name = error['org_exc']
95 exc._vcs_server_org_exc_tb = error['org_exc_tb']
95 exc._vcs_server_org_exc_tb = error['org_exc_tb']
96 except KeyError:
96 except KeyError:
97 pass
97 pass
98
98
99 raise exc
99 raise exc
100 return response.get('result')
100 return response.get('result')
101
101
102
102
103 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
103 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
104 try:
104 try:
105 response = session.post(url, data=msgpack.packb(payload))
105 response = session.post(url, data=msgpack.packb(payload))
106 except pycurl.error as e:
106 except pycurl.error as e:
107 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
107 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
108 raise exceptions.HttpVCSCommunicationError(msg)
108 raise exceptions.HttpVCSCommunicationError(msg)
109 except Exception as e:
109 except Exception as e:
110 message = getattr(e, 'message', '')
110 message = getattr(e, 'message', '')
111 if 'Failed to connect' in message:
111 if 'Failed to connect' in message:
112 # gevent doesn't return proper pycurl errors
112 # gevent doesn't return proper pycurl errors
113 raise exceptions.HttpVCSCommunicationError(e)
113 raise exceptions.HttpVCSCommunicationError(e)
114 else:
114 else:
115 raise
115 raise
116
116
117 if response.status_code >= 400:
117 if response.status_code >= 400:
118 log.error('Call to %s returned non 200 HTTP code: %s',
118 log.error('Call to %s returned non 200 HTTP code: %s',
119 url, response.status_code)
119 url, response.status_code)
120 raise exceptions.HttpVCSCommunicationError(repr(response.content))
120 raise exceptions.HttpVCSCommunicationError(repr(response.content))
121
121
122 return response.iter_content(chunk_size=chunk_size)
122 return response.iter_content(chunk_size=chunk_size)
123
123
124
124
125 class ServiceConnection(object):
125 class ServiceConnection(object):
126 def __init__(self, server_and_port, backend_endpoint, session_factory):
126 def __init__(self, server_and_port, backend_endpoint, session_factory):
127 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
127 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
128 self._session_factory = session_factory
128 self._session_factory = session_factory
129
129
130 def __getattr__(self, name):
130 def __getattr__(self, name):
131 def f(*args, **kwargs):
131 def f(*args, **kwargs):
132 return self._call(name, *args, **kwargs)
132 return self._call(name, *args, **kwargs)
133 return f
133 return f
134
134
135 @exceptions.map_vcs_exceptions
135 @exceptions.map_vcs_exceptions
136 def _call(self, name, *args, **kwargs):
136 def _call(self, name, *args, **kwargs):
137 payload = {
137 payload = {
138 'id': str(uuid.uuid4()),
138 'id': str(uuid.uuid4()),
139 'method': name,
139 'method': name,
140 'params': {'args': args, 'kwargs': kwargs}
140 'params': {'args': args, 'kwargs': kwargs}
141 }
141 }
142 return _remote_call(
142 return _remote_call(
143 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
143 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
144
144
145
145
146 class RemoteVCSMaker(object):
146 class RemoteVCSMaker(object):
147
147
148 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
148 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
149 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
149 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
150 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
150 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
151
151
152 self._session_factory = session_factory
152 self._session_factory = session_factory
153 self.backend_type = backend_type
153 self.backend_type = backend_type
154
154
155 @classmethod
155 @classmethod
156 def init_cache_region(cls, repo_id):
156 def init_cache_region(cls, repo_id):
157 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
157 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
158 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
158 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
159 return region, cache_namespace_uid
159 return region, cache_namespace_uid
160
160
161 def __call__(self, path, repo_id, config, with_wire=None):
161 def __call__(self, path, repo_id, config, with_wire=None):
162 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
162 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
163 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
163 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
164
164
165 def __getattr__(self, name):
165 def __getattr__(self, name):
166 def remote_attr(*args, **kwargs):
166 def remote_attr(*args, **kwargs):
167 return self._call(name, *args, **kwargs)
167 return self._call(name, *args, **kwargs)
168 return remote_attr
168 return remote_attr
169
169
170 @exceptions.map_vcs_exceptions
170 @exceptions.map_vcs_exceptions
171 def _call(self, func_name, *args, **kwargs):
171 def _call(self, func_name, *args, **kwargs):
172 payload = {
172 payload = {
173 'id': str(uuid.uuid4()),
173 'id': str(uuid.uuid4()),
174 'method': func_name,
174 'method': func_name,
175 'backend': self.backend_type,
175 'backend': self.backend_type,
176 'params': {'args': args, 'kwargs': kwargs}
176 'params': {'args': args, 'kwargs': kwargs}
177 }
177 }
178 url = self.url
178 url = self.url
179 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
179 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
180
180
181
181
182 class RemoteRepo(object):
182 class RemoteRepo(object):
183 CHUNK_SIZE = 16384
183 CHUNK_SIZE = 16384
184
184
185 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
185 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
186 self.url = remote_maker.url
186 self.url = remote_maker.url
187 self.stream_url = remote_maker.stream_url
187 self.stream_url = remote_maker.stream_url
188 self._session = remote_maker._session_factory()
188 self._session = remote_maker._session_factory()
189
189
190 cache_repo_id = self._repo_id_sanitizer(repo_id)
190 cache_repo_id = self._repo_id_sanitizer(repo_id)
191 self._cache_region, self._cache_namespace = \
191 self._cache_region, self._cache_namespace = \
192 remote_maker.init_cache_region(cache_repo_id)
192 remote_maker.init_cache_region(cache_repo_id)
193
193
194 with_wire = with_wire or {}
194 with_wire = with_wire or {}
195
195
196 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
196 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
197 self._wire = {
197 self._wire = {
198 "path": path, # repo path
198 "path": path, # repo path
199 "repo_id": repo_id,
199 "repo_id": repo_id,
200 "cache_repo_id": cache_repo_id,
200 "cache_repo_id": cache_repo_id,
201 "config": config,
201 "config": config,
202 "repo_state_uid": repo_state_uid,
202 "repo_state_uid": repo_state_uid,
203 "context": self._create_vcs_cache_context(path, repo_state_uid)
203 "context": self._create_vcs_cache_context(path, repo_state_uid)
204 }
204 }
205
205
206 if with_wire:
206 if with_wire:
207 self._wire.update(with_wire)
207 self._wire.update(with_wire)
208
208
209 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
209 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
210 # log.debug brings a few percent gain even if is is not active.
210 # log.debug brings a few percent gain even if is is not active.
211 if log.isEnabledFor(logging.DEBUG):
211 if log.isEnabledFor(logging.DEBUG):
212 self._call_with_logging = True
212 self._call_with_logging = True
213
213
214 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
214 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
215
215
216 def _repo_id_sanitizer(self, repo_id):
216 def _repo_id_sanitizer(self, repo_id):
217 pathless = repo_id.replace('/', '__').replace('-', '_')
217 pathless = repo_id.replace('/', '__').replace('-', '_')
218 return ''.join(char if ord(char) < 128 else '_{}_'.format(ord(char)) for char in pathless)
218 return ''.join(char if ord(char) < 128 else '_{}_'.format(ord(char)) for char in pathless)
219
219
220 def __getattr__(self, name):
220 def __getattr__(self, name):
221
221
222 if name.startswith('stream:'):
222 if name.startswith('stream:'):
223 def repo_remote_attr(*args, **kwargs):
223 def repo_remote_attr(*args, **kwargs):
224 return self._call_stream(name, *args, **kwargs)
224 return self._call_stream(name, *args, **kwargs)
225 else:
225 else:
226 def repo_remote_attr(*args, **kwargs):
226 def repo_remote_attr(*args, **kwargs):
227 return self._call(name, *args, **kwargs)
227 return self._call(name, *args, **kwargs)
228
228
229 return repo_remote_attr
229 return repo_remote_attr
230
230
231 def _base_call(self, name, *args, **kwargs):
231 def _base_call(self, name, *args, **kwargs):
232 # TODO: oliver: This is currently necessary pre-call since the
232 # TODO: oliver: This is currently necessary pre-call since the
233 # config object is being changed for hooking scenarios
233 # config object is being changed for hooking scenarios
234 wire = copy.deepcopy(self._wire)
234 wire = copy.deepcopy(self._wire)
235 wire["config"] = wire["config"].serialize()
235 wire["config"] = wire["config"].serialize()
236 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
236 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
237
237
238 payload = {
238 payload = {
239 'id': str(uuid.uuid4()),
239 'id': str(uuid.uuid4()),
240 'method': name,
240 'method': name,
241 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
241 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
242 }
242 }
243
243
244 context_uid = wire.get('context')
244 context_uid = wire.get('context')
245 return context_uid, payload
245 return context_uid, payload
246
246
247 def get_local_cache(self, name, args):
247 def get_local_cache(self, name, args):
248 cache_on = False
248 cache_on = False
249 cache_key = ''
249 cache_key = ''
250 local_cache_on = str2bool(rhodecode.CONFIG.get('vcs.methods.cache'))
250 local_cache_on = str2bool(rhodecode.CONFIG.get('vcs.methods.cache'))
251
251
252 cache_methods = [
252 cache_methods = [
253 'branches', 'tags', 'bookmarks',
253 'branches', 'tags', 'bookmarks',
254 'is_large_file', 'is_binary',
254 'is_large_file', 'is_binary',
255 'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
255 'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
256 'node_history',
256 'node_history',
257 'revision', 'tree_items',
257 'revision', 'tree_items',
258 'ctx_list', 'ctx_branch', 'ctx_description',
258 'ctx_list', 'ctx_branch', 'ctx_description',
259 'bulk_request',
259 'bulk_request',
260 'assert_correct_path'
260 ]
261 ]
261
262
262 if local_cache_on and name in cache_methods:
263 if local_cache_on and name in cache_methods:
263 cache_on = True
264 cache_on = True
264 repo_state_uid = self._wire['repo_state_uid']
265 repo_state_uid = self._wire['repo_state_uid']
265 call_args = [a for a in args]
266 call_args = [a for a in args]
266 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
267 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
267
268
268 return cache_on, cache_key
269 return cache_on, cache_key
269
270
270 @exceptions.map_vcs_exceptions
271 @exceptions.map_vcs_exceptions
271 def _call(self, name, *args, **kwargs):
272 def _call(self, name, *args, **kwargs):
272 context_uid, payload = self._base_call(name, *args, **kwargs)
273 context_uid, payload = self._base_call(name, *args, **kwargs)
273 url = self.url
274 url = self.url
274
275
275 start = time.time()
276 start = time.time()
276 cache_on, cache_key = self.get_local_cache(name, args)
277 cache_on, cache_key = self.get_local_cache(name, args)
277
278
278 @self._cache_region.conditional_cache_on_arguments(
279 @self._cache_region.conditional_cache_on_arguments(
279 namespace=self._cache_namespace, condition=cache_on and cache_key)
280 namespace=self._cache_namespace, condition=cache_on and cache_key)
280 def remote_call(_cache_key):
281 def remote_call(_cache_key):
281 if self._call_with_logging:
282 if self._call_with_logging:
282 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
283 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
283 url, name, args, context_uid, cache_on)
284 url, name, args, context_uid, cache_on)
284 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
285 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
285
286
286 result = remote_call(cache_key)
287 result = remote_call(cache_key)
287 if self._call_with_logging:
288 if self._call_with_logging:
288 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
289 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
289 url, name, time.time()-start, context_uid)
290 url, name, time.time()-start, context_uid)
290 return result
291 return result
291
292
292 @exceptions.map_vcs_exceptions
293 @exceptions.map_vcs_exceptions
293 def _call_stream(self, name, *args, **kwargs):
294 def _call_stream(self, name, *args, **kwargs):
294 context_uid, payload = self._base_call(name, *args, **kwargs)
295 context_uid, payload = self._base_call(name, *args, **kwargs)
295 payload['chunk_size'] = self.CHUNK_SIZE
296 payload['chunk_size'] = self.CHUNK_SIZE
296 url = self.stream_url
297 url = self.stream_url
297
298
298 start = time.time()
299 start = time.time()
299 cache_on, cache_key = self.get_local_cache(name, args)
300 cache_on, cache_key = self.get_local_cache(name, args)
300
301
301 # Cache is a problem because this is a stream
302 # Cache is a problem because this is a stream
302 def streaming_remote_call(_cache_key):
303 def streaming_remote_call(_cache_key):
303 if self._call_with_logging:
304 if self._call_with_logging:
304 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
305 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
305 url, name, args, context_uid, cache_on)
306 url, name, args, context_uid, cache_on)
306 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
307 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
307
308
308 result = streaming_remote_call(cache_key)
309 result = streaming_remote_call(cache_key)
309 if self._call_with_logging:
310 if self._call_with_logging:
310 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
311 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
311 url, name, time.time()-start, context_uid)
312 url, name, time.time()-start, context_uid)
312 return result
313 return result
313
314
314 def __getitem__(self, key):
315 def __getitem__(self, key):
315 return self.revision(key)
316 return self.revision(key)
316
317
317 def _create_vcs_cache_context(self, *args):
318 def _create_vcs_cache_context(self, *args):
318 """
319 """
319 Creates a unique string which is passed to the VCSServer on every
320 Creates a unique string which is passed to the VCSServer on every
320 remote call. It is used as cache key in the VCSServer.
321 remote call. It is used as cache key in the VCSServer.
321 """
322 """
322 hash_key = '-'.join(map(str, args))
323 hash_key = '-'.join(map(str, args))
323 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
324 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
324
325
325 def invalidate_vcs_cache(self):
326 def invalidate_vcs_cache(self):
326 """
327 """
327 This invalidates the context which is sent to the VCSServer on every
328 This invalidates the context which is sent to the VCSServer on every
328 call to a remote method. It forces the VCSServer to create a fresh
329 call to a remote method. It forces the VCSServer to create a fresh
329 repository instance on the next call to a remote method.
330 repository instance on the next call to a remote method.
330 """
331 """
331 self._wire['context'] = str(uuid.uuid4())
332 self._wire['context'] = str(uuid.uuid4())
332
333
333
334
334 class VcsHttpProxy(object):
335 class VcsHttpProxy(object):
335
336
336 CHUNK_SIZE = 16384
337 CHUNK_SIZE = 16384
337
338
338 def __init__(self, server_and_port, backend_endpoint):
339 def __init__(self, server_and_port, backend_endpoint):
339 retries = Retry(total=5, connect=None, read=None, redirect=None)
340 retries = Retry(total=5, connect=None, read=None, redirect=None)
340
341
341 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
342 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
342 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
343 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
343 self.session = requests.Session()
344 self.session = requests.Session()
344 self.session.mount('http://', adapter)
345 self.session.mount('http://', adapter)
345
346
346 def handle(self, environment, input_data, *args, **kwargs):
347 def handle(self, environment, input_data, *args, **kwargs):
347 data = {
348 data = {
348 'environment': environment,
349 'environment': environment,
349 'input_data': input_data,
350 'input_data': input_data,
350 'args': args,
351 'args': args,
351 'kwargs': kwargs
352 'kwargs': kwargs
352 }
353 }
353 result = self.session.post(
354 result = self.session.post(
354 self.base_url, msgpack.packb(data), stream=True)
355 self.base_url, msgpack.packb(data), stream=True)
355 return self._get_result(result)
356 return self._get_result(result)
356
357
357 def _deserialize_and_raise(self, error):
358 def _deserialize_and_raise(self, error):
358 exception = Exception(error['message'])
359 exception = Exception(error['message'])
359 try:
360 try:
360 exception._vcs_kind = error['_vcs_kind']
361 exception._vcs_kind = error['_vcs_kind']
361 except KeyError:
362 except KeyError:
362 pass
363 pass
363 raise exception
364 raise exception
364
365
365 def _iterate(self, result):
366 def _iterate(self, result):
366 unpacker = msgpack.Unpacker()
367 unpacker = msgpack.Unpacker()
367 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
368 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
368 unpacker.feed(line)
369 unpacker.feed(line)
369 for chunk in unpacker:
370 for chunk in unpacker:
370 yield chunk
371 yield chunk
371
372
372 def _get_result(self, result):
373 def _get_result(self, result):
373 iterator = self._iterate(result)
374 iterator = self._iterate(result)
374 error = iterator.next()
375 error = iterator.next()
375 if error:
376 if error:
376 self._deserialize_and_raise(error)
377 self._deserialize_and_raise(error)
377
378
378 status = iterator.next()
379 status = iterator.next()
379 headers = iterator.next()
380 headers = iterator.next()
380
381
381 return iterator, status, headers
382 return iterator, status, headers
382
383
383
384
384 class ThreadlocalSessionFactory(object):
385 class ThreadlocalSessionFactory(object):
385 """
386 """
386 Creates one CurlSession per thread on demand.
387 Creates one CurlSession per thread on demand.
387 """
388 """
388
389
389 def __init__(self):
390 def __init__(self):
390 self._thread_local = threading.local()
391 self._thread_local = threading.local()
391
392
392 def __call__(self):
393 def __call__(self):
393 if not hasattr(self._thread_local, 'curl_session'):
394 if not hasattr(self._thread_local, 'curl_session'):
394 self._thread_local.curl_session = CurlSession()
395 self._thread_local.curl_session = CurlSession()
395 return self._thread_local.curl_session
396 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now