##// END OF EJS Templates
vcs-cache: enable more methods, and fix arguments calls
super-admin -
r4747:4ec212aa default
parent child Browse files
Show More
@@ -1,367 +1,378 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2020 RhodeCode GmbH
3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Client for the VCSServer implemented based on HTTP.
22 Client for the VCSServer implemented based on HTTP.
23 """
23 """
24
24
25 import copy
25 import copy
26 import logging
26 import logging
27 import threading
27 import threading
28 import time
28 import time
29 import urllib2
29 import urllib2
30 import urlparse
30 import urlparse
31 import uuid
31 import uuid
32 import traceback
32 import traceback
33
33
34 import pycurl
34 import pycurl
35 import msgpack
35 import msgpack
36 import requests
36 import requests
37 from requests.packages.urllib3.util.retry import Retry
37 from requests.packages.urllib3.util.retry import Retry
38
38
39 import rhodecode
39 import rhodecode
40 from rhodecode.lib import rc_cache
40 from rhodecode.lib import rc_cache
41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
42 from rhodecode.lib.system_info import get_cert_path
42 from rhodecode.lib.system_info import get_cert_path
43 from rhodecode.lib.vcs import exceptions, CurlSession
43 from rhodecode.lib.vcs import exceptions, CurlSession
44
44
45 log = logging.getLogger(__name__)
45 log = logging.getLogger(__name__)
46
46
47
47
48 # TODO: mikhail: Keep it in sync with vcsserver's
48 # TODO: mikhail: Keep it in sync with vcsserver's
49 # HTTPApplication.ALLOWED_EXCEPTIONS
49 # HTTPApplication.ALLOWED_EXCEPTIONS
50 EXCEPTIONS_MAP = {
50 EXCEPTIONS_MAP = {
51 'KeyError': KeyError,
51 'KeyError': KeyError,
52 'URLError': urllib2.URLError,
52 'URLError': urllib2.URLError,
53 }
53 }
54
54
55
55
56 def _remote_call(url, payload, exceptions_map, session):
56 def _remote_call(url, payload, exceptions_map, session):
57 try:
57 try:
58 response = session.post(url, data=msgpack.packb(payload))
58 response = session.post(url, data=msgpack.packb(payload))
59 except pycurl.error as e:
59 except pycurl.error as e:
60 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
60 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
61 raise exceptions.HttpVCSCommunicationError(msg)
61 raise exceptions.HttpVCSCommunicationError(msg)
62 except Exception as e:
62 except Exception as e:
63 message = getattr(e, 'message', '')
63 message = getattr(e, 'message', '')
64 if 'Failed to connect' in message:
64 if 'Failed to connect' in message:
65 # gevent doesn't return proper pycurl errors
65 # gevent doesn't return proper pycurl errors
66 raise exceptions.HttpVCSCommunicationError(e)
66 raise exceptions.HttpVCSCommunicationError(e)
67 else:
67 else:
68 raise
68 raise
69
69
70 if response.status_code >= 400:
70 if response.status_code >= 400:
71 log.error('Call to %s returned non 200 HTTP code: %s',
71 log.error('Call to %s returned non 200 HTTP code: %s',
72 url, response.status_code)
72 url, response.status_code)
73 raise exceptions.HttpVCSCommunicationError(repr(response.content))
73 raise exceptions.HttpVCSCommunicationError(repr(response.content))
74
74
75 try:
75 try:
76 response = msgpack.unpackb(response.content)
76 response = msgpack.unpackb(response.content)
77 except Exception:
77 except Exception:
78 log.exception('Failed to decode response %r', response.content)
78 log.exception('Failed to decode response %r', response.content)
79 raise
79 raise
80
80
81 error = response.get('error')
81 error = response.get('error')
82 if error:
82 if error:
83 type_ = error.get('type', 'Exception')
83 type_ = error.get('type', 'Exception')
84 exc = exceptions_map.get(type_, Exception)
84 exc = exceptions_map.get(type_, Exception)
85 exc = exc(error.get('message'))
85 exc = exc(error.get('message'))
86 try:
86 try:
87 exc._vcs_kind = error['_vcs_kind']
87 exc._vcs_kind = error['_vcs_kind']
88 except KeyError:
88 except KeyError:
89 pass
89 pass
90
90
91 try:
91 try:
92 exc._vcs_server_traceback = error['traceback']
92 exc._vcs_server_traceback = error['traceback']
93 exc._vcs_server_org_exc_name = error['org_exc']
93 exc._vcs_server_org_exc_name = error['org_exc']
94 exc._vcs_server_org_exc_tb = error['org_exc_tb']
94 exc._vcs_server_org_exc_tb = error['org_exc_tb']
95 except KeyError:
95 except KeyError:
96 pass
96 pass
97
97
98 raise exc
98 raise exc
99 return response.get('result')
99 return response.get('result')
100
100
101
101
102 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
102 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
103 try:
103 try:
104 response = session.post(url, data=msgpack.packb(payload))
104 response = session.post(url, data=msgpack.packb(payload))
105 except pycurl.error as e:
105 except pycurl.error as e:
106 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
106 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
107 raise exceptions.HttpVCSCommunicationError(msg)
107 raise exceptions.HttpVCSCommunicationError(msg)
108 except Exception as e:
108 except Exception as e:
109 message = getattr(e, 'message', '')
109 message = getattr(e, 'message', '')
110 if 'Failed to connect' in message:
110 if 'Failed to connect' in message:
111 # gevent doesn't return proper pycurl errors
111 # gevent doesn't return proper pycurl errors
112 raise exceptions.HttpVCSCommunicationError(e)
112 raise exceptions.HttpVCSCommunicationError(e)
113 else:
113 else:
114 raise
114 raise
115
115
116 if response.status_code >= 400:
116 if response.status_code >= 400:
117 log.error('Call to %s returned non 200 HTTP code: %s',
117 log.error('Call to %s returned non 200 HTTP code: %s',
118 url, response.status_code)
118 url, response.status_code)
119 raise exceptions.HttpVCSCommunicationError(repr(response.content))
119 raise exceptions.HttpVCSCommunicationError(repr(response.content))
120
120
121 return response.iter_content(chunk_size=chunk_size)
121 return response.iter_content(chunk_size=chunk_size)
122
122
123
123
124 class ServiceConnection(object):
124 class ServiceConnection(object):
125 def __init__(self, server_and_port, backend_endpoint, session_factory):
125 def __init__(self, server_and_port, backend_endpoint, session_factory):
126 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
126 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
127 self._session_factory = session_factory
127 self._session_factory = session_factory
128
128
129 def __getattr__(self, name):
129 def __getattr__(self, name):
130 def f(*args, **kwargs):
130 def f(*args, **kwargs):
131 return self._call(name, *args, **kwargs)
131 return self._call(name, *args, **kwargs)
132 return f
132 return f
133
133
134 @exceptions.map_vcs_exceptions
134 @exceptions.map_vcs_exceptions
135 def _call(self, name, *args, **kwargs):
135 def _call(self, name, *args, **kwargs):
136 payload = {
136 payload = {
137 'id': str(uuid.uuid4()),
137 'id': str(uuid.uuid4()),
138 'method': name,
138 'method': name,
139 'params': {'args': args, 'kwargs': kwargs}
139 'params': {'args': args, 'kwargs': kwargs}
140 }
140 }
141 return _remote_call(
141 return _remote_call(
142 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
142 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
143
143
144
144
145 class RemoteVCSMaker(object):
145 class RemoteVCSMaker(object):
146
146
147 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
147 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
148 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
148 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
149 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
149 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
150
150
151 self._session_factory = session_factory
151 self._session_factory = session_factory
152 self.backend_type = backend_type
152 self.backend_type = backend_type
153
153
154 @classmethod
154 @classmethod
155 def init_cache_region(cls, repo_id):
155 def init_cache_region(cls, repo_id):
156 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
156 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
157 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
157 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
158 return region, cache_namespace_uid
158 return region, cache_namespace_uid
159
159
160 def __call__(self, path, repo_id, config, with_wire=None):
160 def __call__(self, path, repo_id, config, with_wire=None):
161 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
161 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
162 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
162 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
163
163
164 def __getattr__(self, name):
164 def __getattr__(self, name):
165 def remote_attr(*args, **kwargs):
165 def remote_attr(*args, **kwargs):
166 return self._call(name, *args, **kwargs)
166 return self._call(name, *args, **kwargs)
167 return remote_attr
167 return remote_attr
168
168
169 @exceptions.map_vcs_exceptions
169 @exceptions.map_vcs_exceptions
170 def _call(self, func_name, *args, **kwargs):
170 def _call(self, func_name, *args, **kwargs):
171 payload = {
171 payload = {
172 'id': str(uuid.uuid4()),
172 'id': str(uuid.uuid4()),
173 'method': func_name,
173 'method': func_name,
174 'backend': self.backend_type,
174 'backend': self.backend_type,
175 'params': {'args': args, 'kwargs': kwargs}
175 'params': {'args': args, 'kwargs': kwargs}
176 }
176 }
177 url = self.url
177 url = self.url
178 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
178 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
179
179
180
180
181 class RemoteRepo(object):
181 class RemoteRepo(object):
182 CHUNK_SIZE = 16384
182 CHUNK_SIZE = 16384
183
183
184 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
184 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
185 self.url = remote_maker.url
185 self.url = remote_maker.url
186 self.stream_url = remote_maker.stream_url
186 self.stream_url = remote_maker.stream_url
187 self._session = remote_maker._session_factory()
187 self._session = remote_maker._session_factory()
188 self._cache_region, self._cache_namespace = \
188 self._cache_region, self._cache_namespace = \
189 remote_maker.init_cache_region(repo_id)
189 remote_maker.init_cache_region(repo_id)
190
190
191 with_wire = with_wire or {}
191 with_wire = with_wire or {}
192
192
193 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
193 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
194 self._wire = {
194 self._wire = {
195 "path": path, # repo path
195 "path": path, # repo path
196 "repo_id": repo_id,
196 "repo_id": repo_id,
197 "config": config,
197 "config": config,
198 "repo_state_uid": repo_state_uid,
198 "repo_state_uid": repo_state_uid,
199 "context": self._create_vcs_cache_context(path, repo_state_uid)
199 "context": self._create_vcs_cache_context(path, repo_state_uid)
200 }
200 }
201
201
202 if with_wire:
202 if with_wire:
203 self._wire.update(with_wire)
203 self._wire.update(with_wire)
204
204
205 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
205 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
206 # log.debug brings a few percent gain even if is is not active.
206 # log.debug brings a few percent gain even if is is not active.
207 if log.isEnabledFor(logging.DEBUG):
207 if log.isEnabledFor(logging.DEBUG):
208 self._call_with_logging = True
208 self._call_with_logging = True
209
209
210 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
210 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
211
211
212 def __getattr__(self, name):
212 def __getattr__(self, name):
213
213
214 if name.startswith('stream:'):
214 if name.startswith('stream:'):
215 def repo_remote_attr(*args, **kwargs):
215 def repo_remote_attr(*args, **kwargs):
216 return self._call_stream(name, *args, **kwargs)
216 return self._call_stream(name, *args, **kwargs)
217 else:
217 else:
218 def repo_remote_attr(*args, **kwargs):
218 def repo_remote_attr(*args, **kwargs):
219 return self._call(name, *args, **kwargs)
219 return self._call(name, *args, **kwargs)
220
220
221 return repo_remote_attr
221 return repo_remote_attr
222
222
223 def _base_call(self, name, *args, **kwargs):
223 def _base_call(self, name, *args, **kwargs):
224 # TODO: oliver: This is currently necessary pre-call since the
224 # TODO: oliver: This is currently necessary pre-call since the
225 # config object is being changed for hooking scenarios
225 # config object is being changed for hooking scenarios
226 wire = copy.deepcopy(self._wire)
226 wire = copy.deepcopy(self._wire)
227 wire["config"] = wire["config"].serialize()
227 wire["config"] = wire["config"].serialize()
228 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
228 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
229
229
230 payload = {
230 payload = {
231 'id': str(uuid.uuid4()),
231 'id': str(uuid.uuid4()),
232 'method': name,
232 'method': name,
233 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
233 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
234 }
234 }
235
235
236 context_uid = wire.get('context')
236 context_uid = wire.get('context')
237 return context_uid, payload
237 return context_uid, payload
238
238
239 @exceptions.map_vcs_exceptions
239 @exceptions.map_vcs_exceptions
240 def _call(self, name, *args, **kwargs):
240 def _call(self, name, *args, **kwargs):
241 context_uid, payload = self._base_call(name, *args, **kwargs)
241 context_uid, payload = self._base_call(name, *args, **kwargs)
242 url = self.url
242 url = self.url
243
243
244 start = time.time()
244 start = time.time()
245
245
246 cache_on = False
246 cache_on = False
247 cache_key = ''
247 cache_key = ''
248 local_cache = rhodecode.CONFIG.get('vcs.methods.cache')
248 local_cache_on = rhodecode.CONFIG.get('vcs.methods.cache')
249 if local_cache and name in ['is_large_file', 'is_binary', 'fctx_size', 'bulk_request']:
249
250 cache_methods = [
251 'branches', 'tags', 'bookmarks',
252 'is_large_file', 'is_binary', 'fctx_size', 'node_history', 'blob_raw_length',
253 'revision', 'tree_items',
254 'ctx_list',
255 'bulk_request',
256 ]
257
258 if local_cache_on and name in cache_methods:
250 cache_on = True
259 cache_on = True
251 cache_key = compute_key_from_params(name, args[0], args[1])
260 repo_state_uid = self._wire['repo_state_uid']
261 call_args = [a for a in args]
262 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
252
263
253 @self._cache_region.conditional_cache_on_arguments(
264 @self._cache_region.conditional_cache_on_arguments(
254 namespace=self._cache_namespace, condition=cache_on and cache_key)
265 namespace=self._cache_namespace, condition=cache_on and cache_key)
255 def remote_call(_cache_key):
266 def remote_call(_cache_key):
256 if self._call_with_logging:
267 if self._call_with_logging:
257 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
268 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
258 url, name, args, context_uid, cache_on)
269 url, name, args, context_uid, cache_on)
259 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
270 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
260
271
261 result = remote_call(cache_key)
272 result = remote_call(cache_key)
262 if self._call_with_logging:
273 if self._call_with_logging:
263 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
274 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
264 url, name, time.time()-start, context_uid)
275 url, name, time.time()-start, context_uid)
265 return result
276 return result
266
277
267 @exceptions.map_vcs_exceptions
278 @exceptions.map_vcs_exceptions
268 def _call_stream(self, name, *args, **kwargs):
279 def _call_stream(self, name, *args, **kwargs):
269 context_uid, payload = self._base_call(name, *args, **kwargs)
280 context_uid, payload = self._base_call(name, *args, **kwargs)
270 payload['chunk_size'] = self.CHUNK_SIZE
281 payload['chunk_size'] = self.CHUNK_SIZE
271 url = self.stream_url
282 url = self.stream_url
272
283
273 start = time.time()
284 start = time.time()
274 if self._call_with_logging:
285 if self._call_with_logging:
275 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s',
286 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s',
276 url, name, args, context_uid)
287 url, name, args, context_uid)
277
288
278 result = _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session,
289 result = _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session,
279 self.CHUNK_SIZE)
290 self.CHUNK_SIZE)
280
291
281 if self._call_with_logging:
292 if self._call_with_logging:
282 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
293 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
283 url, name, time.time()-start, context_uid)
294 url, name, time.time()-start, context_uid)
284 return result
295 return result
285
296
286 def __getitem__(self, key):
297 def __getitem__(self, key):
287 return self.revision(key)
298 return self.revision(key)
288
299
289 def _create_vcs_cache_context(self, *args):
300 def _create_vcs_cache_context(self, *args):
290 """
301 """
291 Creates a unique string which is passed to the VCSServer on every
302 Creates a unique string which is passed to the VCSServer on every
292 remote call. It is used as cache key in the VCSServer.
303 remote call. It is used as cache key in the VCSServer.
293 """
304 """
294 hash_key = '-'.join(map(str, args))
305 hash_key = '-'.join(map(str, args))
295 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
306 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
296
307
297 def invalidate_vcs_cache(self):
308 def invalidate_vcs_cache(self):
298 """
309 """
299 This invalidates the context which is sent to the VCSServer on every
310 This invalidates the context which is sent to the VCSServer on every
300 call to a remote method. It forces the VCSServer to create a fresh
311 call to a remote method. It forces the VCSServer to create a fresh
301 repository instance on the next call to a remote method.
312 repository instance on the next call to a remote method.
302 """
313 """
303 self._wire['context'] = str(uuid.uuid4())
314 self._wire['context'] = str(uuid.uuid4())
304
315
305
316
306 class VcsHttpProxy(object):
317 class VcsHttpProxy(object):
307
318
308 CHUNK_SIZE = 16384
319 CHUNK_SIZE = 16384
309
320
310 def __init__(self, server_and_port, backend_endpoint):
321 def __init__(self, server_and_port, backend_endpoint):
311 retries = Retry(total=5, connect=None, read=None, redirect=None)
322 retries = Retry(total=5, connect=None, read=None, redirect=None)
312
323
313 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
324 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
314 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
325 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
315 self.session = requests.Session()
326 self.session = requests.Session()
316 self.session.mount('http://', adapter)
327 self.session.mount('http://', adapter)
317
328
318 def handle(self, environment, input_data, *args, **kwargs):
329 def handle(self, environment, input_data, *args, **kwargs):
319 data = {
330 data = {
320 'environment': environment,
331 'environment': environment,
321 'input_data': input_data,
332 'input_data': input_data,
322 'args': args,
333 'args': args,
323 'kwargs': kwargs
334 'kwargs': kwargs
324 }
335 }
325 result = self.session.post(
336 result = self.session.post(
326 self.base_url, msgpack.packb(data), stream=True)
337 self.base_url, msgpack.packb(data), stream=True)
327 return self._get_result(result)
338 return self._get_result(result)
328
339
329 def _deserialize_and_raise(self, error):
340 def _deserialize_and_raise(self, error):
330 exception = Exception(error['message'])
341 exception = Exception(error['message'])
331 try:
342 try:
332 exception._vcs_kind = error['_vcs_kind']
343 exception._vcs_kind = error['_vcs_kind']
333 except KeyError:
344 except KeyError:
334 pass
345 pass
335 raise exception
346 raise exception
336
347
337 def _iterate(self, result):
348 def _iterate(self, result):
338 unpacker = msgpack.Unpacker()
349 unpacker = msgpack.Unpacker()
339 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
350 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
340 unpacker.feed(line)
351 unpacker.feed(line)
341 for chunk in unpacker:
352 for chunk in unpacker:
342 yield chunk
353 yield chunk
343
354
344 def _get_result(self, result):
355 def _get_result(self, result):
345 iterator = self._iterate(result)
356 iterator = self._iterate(result)
346 error = iterator.next()
357 error = iterator.next()
347 if error:
358 if error:
348 self._deserialize_and_raise(error)
359 self._deserialize_and_raise(error)
349
360
350 status = iterator.next()
361 status = iterator.next()
351 headers = iterator.next()
362 headers = iterator.next()
352
363
353 return iterator, status, headers
364 return iterator, status, headers
354
365
355
366
356 class ThreadlocalSessionFactory(object):
367 class ThreadlocalSessionFactory(object):
357 """
368 """
358 Creates one CurlSession per thread on demand.
369 Creates one CurlSession per thread on demand.
359 """
370 """
360
371
361 def __init__(self):
372 def __init__(self):
362 self._thread_local = threading.local()
373 self._thread_local = threading.local()
363
374
364 def __call__(self):
375 def __call__(self):
365 if not hasattr(self._thread_local, 'curl_session'):
376 if not hasattr(self._thread_local, 'curl_session'):
366 self._thread_local.curl_session = CurlSession()
377 self._thread_local.curl_session = CurlSession()
367 return self._thread_local.curl_session
378 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now