##// END OF EJS Templates
http_client: some reformatting
super-admin -
r4790:eadea78d default
parent child Browse files
Show More
@@ -1,384 +1,394 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2020 RhodeCode GmbH
3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Client for the VCSServer implemented based on HTTP.
22 Client for the VCSServer implemented based on HTTP.
23 """
23 """
24
24
25 import copy
25 import copy
26 import logging
26 import logging
27 import threading
27 import threading
28 import time
28 import time
29 import urllib2
29 import urllib2
30 import urlparse
30 import urlparse
31 import uuid
31 import uuid
32 import traceback
32 import traceback
33
33
34 import pycurl
34 import pycurl
35 import msgpack
35 import msgpack
36 import requests
36 import requests
37 from requests.packages.urllib3.util.retry import Retry
37 from requests.packages.urllib3.util.retry import Retry
38
38
39 import rhodecode
39 import rhodecode
40 from rhodecode.lib import rc_cache
40 from rhodecode.lib import rc_cache
41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
42 from rhodecode.lib.system_info import get_cert_path
42 from rhodecode.lib.system_info import get_cert_path
43 from rhodecode.lib.vcs import exceptions, CurlSession
43 from rhodecode.lib.vcs import exceptions, CurlSession
44 from rhodecode.lib.utils2 import str2bool
44
45
45 log = logging.getLogger(__name__)
46 log = logging.getLogger(__name__)
46
47
47
48
48 # TODO: mikhail: Keep it in sync with vcsserver's
49 # TODO: mikhail: Keep it in sync with vcsserver's
49 # HTTPApplication.ALLOWED_EXCEPTIONS
50 # HTTPApplication.ALLOWED_EXCEPTIONS
50 EXCEPTIONS_MAP = {
51 EXCEPTIONS_MAP = {
51 'KeyError': KeyError,
52 'KeyError': KeyError,
52 'URLError': urllib2.URLError,
53 'URLError': urllib2.URLError,
53 }
54 }
54
55
55
56
56 def _remote_call(url, payload, exceptions_map, session):
57 def _remote_call(url, payload, exceptions_map, session):
57 try:
58 try:
58 response = session.post(url, data=msgpack.packb(payload))
59 response = session.post(url, data=msgpack.packb(payload))
59 except pycurl.error as e:
60 except pycurl.error as e:
60 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
61 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
61 raise exceptions.HttpVCSCommunicationError(msg)
62 raise exceptions.HttpVCSCommunicationError(msg)
62 except Exception as e:
63 except Exception as e:
63 message = getattr(e, 'message', '')
64 message = getattr(e, 'message', '')
64 if 'Failed to connect' in message:
65 if 'Failed to connect' in message:
65 # gevent doesn't return proper pycurl errors
66 # gevent doesn't return proper pycurl errors
66 raise exceptions.HttpVCSCommunicationError(e)
67 raise exceptions.HttpVCSCommunicationError(e)
67 else:
68 else:
68 raise
69 raise
69
70
70 if response.status_code >= 400:
71 if response.status_code >= 400:
71 log.error('Call to %s returned non 200 HTTP code: %s',
72 log.error('Call to %s returned non 200 HTTP code: %s',
72 url, response.status_code)
73 url, response.status_code)
73 raise exceptions.HttpVCSCommunicationError(repr(response.content))
74 raise exceptions.HttpVCSCommunicationError(repr(response.content))
74
75
75 try:
76 try:
76 response = msgpack.unpackb(response.content)
77 response = msgpack.unpackb(response.content)
77 except Exception:
78 except Exception:
78 log.exception('Failed to decode response %r', response.content)
79 log.exception('Failed to decode response %r', response.content)
79 raise
80 raise
80
81
81 error = response.get('error')
82 error = response.get('error')
82 if error:
83 if error:
83 type_ = error.get('type', 'Exception')
84 type_ = error.get('type', 'Exception')
84 exc = exceptions_map.get(type_, Exception)
85 exc = exceptions_map.get(type_, Exception)
85 exc = exc(error.get('message'))
86 exc = exc(error.get('message'))
86 try:
87 try:
87 exc._vcs_kind = error['_vcs_kind']
88 exc._vcs_kind = error['_vcs_kind']
88 except KeyError:
89 except KeyError:
89 pass
90 pass
90
91
91 try:
92 try:
92 exc._vcs_server_traceback = error['traceback']
93 exc._vcs_server_traceback = error['traceback']
93 exc._vcs_server_org_exc_name = error['org_exc']
94 exc._vcs_server_org_exc_name = error['org_exc']
94 exc._vcs_server_org_exc_tb = error['org_exc_tb']
95 exc._vcs_server_org_exc_tb = error['org_exc_tb']
95 except KeyError:
96 except KeyError:
96 pass
97 pass
97
98
98 raise exc
99 raise exc
99 return response.get('result')
100 return response.get('result')
100
101
101
102
102 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
103 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
103 try:
104 try:
104 response = session.post(url, data=msgpack.packb(payload))
105 response = session.post(url, data=msgpack.packb(payload))
105 except pycurl.error as e:
106 except pycurl.error as e:
106 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
107 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
107 raise exceptions.HttpVCSCommunicationError(msg)
108 raise exceptions.HttpVCSCommunicationError(msg)
108 except Exception as e:
109 except Exception as e:
109 message = getattr(e, 'message', '')
110 message = getattr(e, 'message', '')
110 if 'Failed to connect' in message:
111 if 'Failed to connect' in message:
111 # gevent doesn't return proper pycurl errors
112 # gevent doesn't return proper pycurl errors
112 raise exceptions.HttpVCSCommunicationError(e)
113 raise exceptions.HttpVCSCommunicationError(e)
113 else:
114 else:
114 raise
115 raise
115
116
116 if response.status_code >= 400:
117 if response.status_code >= 400:
117 log.error('Call to %s returned non 200 HTTP code: %s',
118 log.error('Call to %s returned non 200 HTTP code: %s',
118 url, response.status_code)
119 url, response.status_code)
119 raise exceptions.HttpVCSCommunicationError(repr(response.content))
120 raise exceptions.HttpVCSCommunicationError(repr(response.content))
120
121
121 return response.iter_content(chunk_size=chunk_size)
122 return response.iter_content(chunk_size=chunk_size)
122
123
123
124
124 class ServiceConnection(object):
125 class ServiceConnection(object):
125 def __init__(self, server_and_port, backend_endpoint, session_factory):
126 def __init__(self, server_and_port, backend_endpoint, session_factory):
126 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
127 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
127 self._session_factory = session_factory
128 self._session_factory = session_factory
128
129
129 def __getattr__(self, name):
130 def __getattr__(self, name):
130 def f(*args, **kwargs):
131 def f(*args, **kwargs):
131 return self._call(name, *args, **kwargs)
132 return self._call(name, *args, **kwargs)
132 return f
133 return f
133
134
134 @exceptions.map_vcs_exceptions
135 @exceptions.map_vcs_exceptions
135 def _call(self, name, *args, **kwargs):
136 def _call(self, name, *args, **kwargs):
136 payload = {
137 payload = {
137 'id': str(uuid.uuid4()),
138 'id': str(uuid.uuid4()),
138 'method': name,
139 'method': name,
139 'params': {'args': args, 'kwargs': kwargs}
140 'params': {'args': args, 'kwargs': kwargs}
140 }
141 }
141 return _remote_call(
142 return _remote_call(
142 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
143 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
143
144
144
145
145 class RemoteVCSMaker(object):
146 class RemoteVCSMaker(object):
146
147
147 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
148 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
148 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
149 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
149 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
150 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
150
151
151 self._session_factory = session_factory
152 self._session_factory = session_factory
152 self.backend_type = backend_type
153 self.backend_type = backend_type
153
154
154 @classmethod
155 @classmethod
155 def init_cache_region(cls, repo_id):
156 def init_cache_region(cls, repo_id):
156 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
157 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
157 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
158 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
158 return region, cache_namespace_uid
159 return region, cache_namespace_uid
159
160
160 def __call__(self, path, repo_id, config, with_wire=None):
161 def __call__(self, path, repo_id, config, with_wire=None):
161 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
162 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
162 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
163 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
163
164
164 def __getattr__(self, name):
165 def __getattr__(self, name):
165 def remote_attr(*args, **kwargs):
166 def remote_attr(*args, **kwargs):
166 return self._call(name, *args, **kwargs)
167 return self._call(name, *args, **kwargs)
167 return remote_attr
168 return remote_attr
168
169
169 @exceptions.map_vcs_exceptions
170 @exceptions.map_vcs_exceptions
170 def _call(self, func_name, *args, **kwargs):
171 def _call(self, func_name, *args, **kwargs):
171 payload = {
172 payload = {
172 'id': str(uuid.uuid4()),
173 'id': str(uuid.uuid4()),
173 'method': func_name,
174 'method': func_name,
174 'backend': self.backend_type,
175 'backend': self.backend_type,
175 'params': {'args': args, 'kwargs': kwargs}
176 'params': {'args': args, 'kwargs': kwargs}
176 }
177 }
177 url = self.url
178 url = self.url
178 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
179 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
179
180
180
181
181 class RemoteRepo(object):
182 class RemoteRepo(object):
182 CHUNK_SIZE = 16384
183 CHUNK_SIZE = 16384
183
184
184 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
185 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
185 self.url = remote_maker.url
186 self.url = remote_maker.url
186 self.stream_url = remote_maker.stream_url
187 self.stream_url = remote_maker.stream_url
187 self._session = remote_maker._session_factory()
188 self._session = remote_maker._session_factory()
188
189
189 cache_repo_id = self._repo_id_sanitizer(repo_id)
190 cache_repo_id = self._repo_id_sanitizer(repo_id)
190 self._cache_region, self._cache_namespace = \
191 self._cache_region, self._cache_namespace = \
191 remote_maker.init_cache_region(cache_repo_id)
192 remote_maker.init_cache_region(cache_repo_id)
192
193
193 with_wire = with_wire or {}
194 with_wire = with_wire or {}
194
195
195 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
196 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
196 self._wire = {
197 self._wire = {
197 "path": path, # repo path
198 "path": path, # repo path
198 "repo_id": repo_id,
199 "repo_id": repo_id,
199 "cache_repo_id": cache_repo_id,
200 "cache_repo_id": cache_repo_id,
200 "config": config,
201 "config": config,
201 "repo_state_uid": repo_state_uid,
202 "repo_state_uid": repo_state_uid,
202 "context": self._create_vcs_cache_context(path, repo_state_uid)
203 "context": self._create_vcs_cache_context(path, repo_state_uid)
203 }
204 }
204
205
205 if with_wire:
206 if with_wire:
206 self._wire.update(with_wire)
207 self._wire.update(with_wire)
207
208
208 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
209 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
209 # log.debug brings a few percent gain even if is is not active.
210 # log.debug brings a few percent gain even if is is not active.
210 if log.isEnabledFor(logging.DEBUG):
211 if log.isEnabledFor(logging.DEBUG):
211 self._call_with_logging = True
212 self._call_with_logging = True
212
213
213 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
214 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
214
215
215 def _repo_id_sanitizer(self, repo_id):
216 def _repo_id_sanitizer(self, repo_id):
216 return repo_id.replace('/', '__').replace('-', '_')
217 return repo_id.replace('/', '__').replace('-', '_')
217
218
218 def __getattr__(self, name):
219 def __getattr__(self, name):
219
220
220 if name.startswith('stream:'):
221 if name.startswith('stream:'):
221 def repo_remote_attr(*args, **kwargs):
222 def repo_remote_attr(*args, **kwargs):
222 return self._call_stream(name, *args, **kwargs)
223 return self._call_stream(name, *args, **kwargs)
223 else:
224 else:
224 def repo_remote_attr(*args, **kwargs):
225 def repo_remote_attr(*args, **kwargs):
225 return self._call(name, *args, **kwargs)
226 return self._call(name, *args, **kwargs)
226
227
227 return repo_remote_attr
228 return repo_remote_attr
228
229
229 def _base_call(self, name, *args, **kwargs):
230 def _base_call(self, name, *args, **kwargs):
230 # TODO: oliver: This is currently necessary pre-call since the
231 # TODO: oliver: This is currently necessary pre-call since the
231 # config object is being changed for hooking scenarios
232 # config object is being changed for hooking scenarios
232 wire = copy.deepcopy(self._wire)
233 wire = copy.deepcopy(self._wire)
233 wire["config"] = wire["config"].serialize()
234 wire["config"] = wire["config"].serialize()
234 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
235 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
235
236
236 payload = {
237 payload = {
237 'id': str(uuid.uuid4()),
238 'id': str(uuid.uuid4()),
238 'method': name,
239 'method': name,
239 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
240 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
240 }
241 }
241
242
242 context_uid = wire.get('context')
243 context_uid = wire.get('context')
243 return context_uid, payload
244 return context_uid, payload
244
245
245 @exceptions.map_vcs_exceptions
246 def get_local_cache(self, name, args):
246 def _call(self, name, *args, **kwargs):
247 context_uid, payload = self._base_call(name, *args, **kwargs)
248 url = self.url
249
250 start = time.time()
251
252 cache_on = False
247 cache_on = False
253 cache_key = ''
248 cache_key = ''
254 local_cache_on = rhodecode.CONFIG.get('vcs.methods.cache')
249 local_cache_on = str2bool(rhodecode.CONFIG.get('vcs.methods.cache'))
255
250
256 cache_methods = [
251 cache_methods = [
257 'branches', 'tags', 'bookmarks',
252 'branches', 'tags', 'bookmarks',
258 'is_large_file', 'is_binary', 'fctx_size', 'node_history', 'blob_raw_length',
253 'is_large_file', 'is_binary',
254 'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
255 'node_history',
259 'revision', 'tree_items',
256 'revision', 'tree_items',
260 'ctx_list',
257 'ctx_list',
261 'bulk_request',
258 'bulk_request',
262 ]
259 ]
263
260
264 if local_cache_on and name in cache_methods:
261 if local_cache_on and name in cache_methods:
265 cache_on = True
262 cache_on = True
266 repo_state_uid = self._wire['repo_state_uid']
263 repo_state_uid = self._wire['repo_state_uid']
267 call_args = [a for a in args]
264 call_args = [a for a in args]
268 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
265 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
269
266
267 return cache_on, cache_key
268
269 @exceptions.map_vcs_exceptions
270 def _call(self, name, *args, **kwargs):
271 context_uid, payload = self._base_call(name, *args, **kwargs)
272 url = self.url
273
274 start = time.time()
275 cache_on, cache_key = self.get_local_cache(name, args)
276
270 @self._cache_region.conditional_cache_on_arguments(
277 @self._cache_region.conditional_cache_on_arguments(
271 namespace=self._cache_namespace, condition=cache_on and cache_key)
278 namespace=self._cache_namespace, condition=cache_on and cache_key)
272 def remote_call(_cache_key):
279 def remote_call(_cache_key):
273 if self._call_with_logging:
280 if self._call_with_logging:
274 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
281 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
275 url, name, args, context_uid, cache_on)
282 url, name, args, context_uid, cache_on)
276 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
283 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
277
284
278 result = remote_call(cache_key)
285 result = remote_call(cache_key)
279 if self._call_with_logging:
286 if self._call_with_logging:
280 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
287 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
281 url, name, time.time()-start, context_uid)
288 url, name, time.time()-start, context_uid)
282 return result
289 return result
283
290
284 @exceptions.map_vcs_exceptions
291 @exceptions.map_vcs_exceptions
285 def _call_stream(self, name, *args, **kwargs):
292 def _call_stream(self, name, *args, **kwargs):
286 context_uid, payload = self._base_call(name, *args, **kwargs)
293 context_uid, payload = self._base_call(name, *args, **kwargs)
287 payload['chunk_size'] = self.CHUNK_SIZE
294 payload['chunk_size'] = self.CHUNK_SIZE
288 url = self.stream_url
295 url = self.stream_url
289
296
290 start = time.time()
297 start = time.time()
291 if self._call_with_logging:
298 cache_on, cache_key = self.get_local_cache(name, args)
292 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s',
293 url, name, args, context_uid)
294
299
295 result = _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session,
300 # Cache is a problem because this is a stream
296 self.CHUNK_SIZE)
301 def streaming_remote_call(_cache_key):
302 if self._call_with_logging:
303 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
304 url, name, args, context_uid, cache_on)
305 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
297
306
307 result = streaming_remote_call(cache_key)
298 if self._call_with_logging:
308 if self._call_with_logging:
299 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
309 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
300 url, name, time.time()-start, context_uid)
310 url, name, time.time()-start, context_uid)
301 return result
311 return result
302
312
303 def __getitem__(self, key):
313 def __getitem__(self, key):
304 return self.revision(key)
314 return self.revision(key)
305
315
306 def _create_vcs_cache_context(self, *args):
316 def _create_vcs_cache_context(self, *args):
307 """
317 """
308 Creates a unique string which is passed to the VCSServer on every
318 Creates a unique string which is passed to the VCSServer on every
309 remote call. It is used as cache key in the VCSServer.
319 remote call. It is used as cache key in the VCSServer.
310 """
320 """
311 hash_key = '-'.join(map(str, args))
321 hash_key = '-'.join(map(str, args))
312 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
322 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
313
323
314 def invalidate_vcs_cache(self):
324 def invalidate_vcs_cache(self):
315 """
325 """
316 This invalidates the context which is sent to the VCSServer on every
326 This invalidates the context which is sent to the VCSServer on every
317 call to a remote method. It forces the VCSServer to create a fresh
327 call to a remote method. It forces the VCSServer to create a fresh
318 repository instance on the next call to a remote method.
328 repository instance on the next call to a remote method.
319 """
329 """
320 self._wire['context'] = str(uuid.uuid4())
330 self._wire['context'] = str(uuid.uuid4())
321
331
322
332
323 class VcsHttpProxy(object):
333 class VcsHttpProxy(object):
324
334
325 CHUNK_SIZE = 16384
335 CHUNK_SIZE = 16384
326
336
327 def __init__(self, server_and_port, backend_endpoint):
337 def __init__(self, server_and_port, backend_endpoint):
328 retries = Retry(total=5, connect=None, read=None, redirect=None)
338 retries = Retry(total=5, connect=None, read=None, redirect=None)
329
339
330 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
340 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
331 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
341 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
332 self.session = requests.Session()
342 self.session = requests.Session()
333 self.session.mount('http://', adapter)
343 self.session.mount('http://', adapter)
334
344
335 def handle(self, environment, input_data, *args, **kwargs):
345 def handle(self, environment, input_data, *args, **kwargs):
336 data = {
346 data = {
337 'environment': environment,
347 'environment': environment,
338 'input_data': input_data,
348 'input_data': input_data,
339 'args': args,
349 'args': args,
340 'kwargs': kwargs
350 'kwargs': kwargs
341 }
351 }
342 result = self.session.post(
352 result = self.session.post(
343 self.base_url, msgpack.packb(data), stream=True)
353 self.base_url, msgpack.packb(data), stream=True)
344 return self._get_result(result)
354 return self._get_result(result)
345
355
346 def _deserialize_and_raise(self, error):
356 def _deserialize_and_raise(self, error):
347 exception = Exception(error['message'])
357 exception = Exception(error['message'])
348 try:
358 try:
349 exception._vcs_kind = error['_vcs_kind']
359 exception._vcs_kind = error['_vcs_kind']
350 except KeyError:
360 except KeyError:
351 pass
361 pass
352 raise exception
362 raise exception
353
363
354 def _iterate(self, result):
364 def _iterate(self, result):
355 unpacker = msgpack.Unpacker()
365 unpacker = msgpack.Unpacker()
356 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
366 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
357 unpacker.feed(line)
367 unpacker.feed(line)
358 for chunk in unpacker:
368 for chunk in unpacker:
359 yield chunk
369 yield chunk
360
370
361 def _get_result(self, result):
371 def _get_result(self, result):
362 iterator = self._iterate(result)
372 iterator = self._iterate(result)
363 error = iterator.next()
373 error = iterator.next()
364 if error:
374 if error:
365 self._deserialize_and_raise(error)
375 self._deserialize_and_raise(error)
366
376
367 status = iterator.next()
377 status = iterator.next()
368 headers = iterator.next()
378 headers = iterator.next()
369
379
370 return iterator, status, headers
380 return iterator, status, headers
371
381
372
382
373 class ThreadlocalSessionFactory(object):
383 class ThreadlocalSessionFactory(object):
374 """
384 """
375 Creates one CurlSession per thread on demand.
385 Creates one CurlSession per thread on demand.
376 """
386 """
377
387
378 def __init__(self):
388 def __init__(self):
379 self._thread_local = threading.local()
389 self._thread_local = threading.local()
380
390
381 def __call__(self):
391 def __call__(self):
382 if not hasattr(self._thread_local, 'curl_session'):
392 if not hasattr(self._thread_local, 'curl_session'):
383 self._thread_local.curl_session = CurlSession()
393 self._thread_local.curl_session = CurlSession()
384 return self._thread_local.curl_session
394 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now