##// END OF EJS Templates
caches: fix id sanitizer.
super-admin -
r4778:5d4c07f5 default
parent child Browse files
Show More
@@ -1,383 +1,384 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2020 RhodeCode GmbH
3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Client for the VCSServer implemented based on HTTP.
22 Client for the VCSServer implemented based on HTTP.
23 """
23 """
24
24
25 import copy
25 import copy
26 import logging
26 import logging
27 import threading
27 import threading
28 import time
28 import time
29 import urllib2
29 import urllib2
30 import urlparse
30 import urlparse
31 import uuid
31 import uuid
32 import traceback
32 import traceback
33
33
34 import pycurl
34 import pycurl
35 import msgpack
35 import msgpack
36 import requests
36 import requests
37 from requests.packages.urllib3.util.retry import Retry
37 from requests.packages.urllib3.util.retry import Retry
38
38
39 import rhodecode
39 import rhodecode
40 from rhodecode.lib import rc_cache
40 from rhodecode.lib import rc_cache
41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
42 from rhodecode.lib.system_info import get_cert_path
42 from rhodecode.lib.system_info import get_cert_path
43 from rhodecode.lib.vcs import exceptions, CurlSession
43 from rhodecode.lib.vcs import exceptions, CurlSession
44
44
45 log = logging.getLogger(__name__)
45 log = logging.getLogger(__name__)
46
46
47
47
48 # TODO: mikhail: Keep it in sync with vcsserver's
48 # TODO: mikhail: Keep it in sync with vcsserver's
49 # HTTPApplication.ALLOWED_EXCEPTIONS
49 # HTTPApplication.ALLOWED_EXCEPTIONS
50 EXCEPTIONS_MAP = {
50 EXCEPTIONS_MAP = {
51 'KeyError': KeyError,
51 'KeyError': KeyError,
52 'URLError': urllib2.URLError,
52 'URLError': urllib2.URLError,
53 }
53 }
54
54
55
55
56 def _remote_call(url, payload, exceptions_map, session):
56 def _remote_call(url, payload, exceptions_map, session):
57 try:
57 try:
58 response = session.post(url, data=msgpack.packb(payload))
58 response = session.post(url, data=msgpack.packb(payload))
59 except pycurl.error as e:
59 except pycurl.error as e:
60 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
60 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
61 raise exceptions.HttpVCSCommunicationError(msg)
61 raise exceptions.HttpVCSCommunicationError(msg)
62 except Exception as e:
62 except Exception as e:
63 message = getattr(e, 'message', '')
63 message = getattr(e, 'message', '')
64 if 'Failed to connect' in message:
64 if 'Failed to connect' in message:
65 # gevent doesn't return proper pycurl errors
65 # gevent doesn't return proper pycurl errors
66 raise exceptions.HttpVCSCommunicationError(e)
66 raise exceptions.HttpVCSCommunicationError(e)
67 else:
67 else:
68 raise
68 raise
69
69
70 if response.status_code >= 400:
70 if response.status_code >= 400:
71 log.error('Call to %s returned non 200 HTTP code: %s',
71 log.error('Call to %s returned non 200 HTTP code: %s',
72 url, response.status_code)
72 url, response.status_code)
73 raise exceptions.HttpVCSCommunicationError(repr(response.content))
73 raise exceptions.HttpVCSCommunicationError(repr(response.content))
74
74
75 try:
75 try:
76 response = msgpack.unpackb(response.content)
76 response = msgpack.unpackb(response.content)
77 except Exception:
77 except Exception:
78 log.exception('Failed to decode response %r', response.content)
78 log.exception('Failed to decode response %r', response.content)
79 raise
79 raise
80
80
81 error = response.get('error')
81 error = response.get('error')
82 if error:
82 if error:
83 type_ = error.get('type', 'Exception')
83 type_ = error.get('type', 'Exception')
84 exc = exceptions_map.get(type_, Exception)
84 exc = exceptions_map.get(type_, Exception)
85 exc = exc(error.get('message'))
85 exc = exc(error.get('message'))
86 try:
86 try:
87 exc._vcs_kind = error['_vcs_kind']
87 exc._vcs_kind = error['_vcs_kind']
88 except KeyError:
88 except KeyError:
89 pass
89 pass
90
90
91 try:
91 try:
92 exc._vcs_server_traceback = error['traceback']
92 exc._vcs_server_traceback = error['traceback']
93 exc._vcs_server_org_exc_name = error['org_exc']
93 exc._vcs_server_org_exc_name = error['org_exc']
94 exc._vcs_server_org_exc_tb = error['org_exc_tb']
94 exc._vcs_server_org_exc_tb = error['org_exc_tb']
95 except KeyError:
95 except KeyError:
96 pass
96 pass
97
97
98 raise exc
98 raise exc
99 return response.get('result')
99 return response.get('result')
100
100
101
101
102 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
102 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
103 try:
103 try:
104 response = session.post(url, data=msgpack.packb(payload))
104 response = session.post(url, data=msgpack.packb(payload))
105 except pycurl.error as e:
105 except pycurl.error as e:
106 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
106 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
107 raise exceptions.HttpVCSCommunicationError(msg)
107 raise exceptions.HttpVCSCommunicationError(msg)
108 except Exception as e:
108 except Exception as e:
109 message = getattr(e, 'message', '')
109 message = getattr(e, 'message', '')
110 if 'Failed to connect' in message:
110 if 'Failed to connect' in message:
111 # gevent doesn't return proper pycurl errors
111 # gevent doesn't return proper pycurl errors
112 raise exceptions.HttpVCSCommunicationError(e)
112 raise exceptions.HttpVCSCommunicationError(e)
113 else:
113 else:
114 raise
114 raise
115
115
116 if response.status_code >= 400:
116 if response.status_code >= 400:
117 log.error('Call to %s returned non 200 HTTP code: %s',
117 log.error('Call to %s returned non 200 HTTP code: %s',
118 url, response.status_code)
118 url, response.status_code)
119 raise exceptions.HttpVCSCommunicationError(repr(response.content))
119 raise exceptions.HttpVCSCommunicationError(repr(response.content))
120
120
121 return response.iter_content(chunk_size=chunk_size)
121 return response.iter_content(chunk_size=chunk_size)
122
122
123
123
124 class ServiceConnection(object):
124 class ServiceConnection(object):
125 def __init__(self, server_and_port, backend_endpoint, session_factory):
125 def __init__(self, server_and_port, backend_endpoint, session_factory):
126 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
126 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
127 self._session_factory = session_factory
127 self._session_factory = session_factory
128
128
129 def __getattr__(self, name):
129 def __getattr__(self, name):
130 def f(*args, **kwargs):
130 def f(*args, **kwargs):
131 return self._call(name, *args, **kwargs)
131 return self._call(name, *args, **kwargs)
132 return f
132 return f
133
133
134 @exceptions.map_vcs_exceptions
134 @exceptions.map_vcs_exceptions
135 def _call(self, name, *args, **kwargs):
135 def _call(self, name, *args, **kwargs):
136 payload = {
136 payload = {
137 'id': str(uuid.uuid4()),
137 'id': str(uuid.uuid4()),
138 'method': name,
138 'method': name,
139 'params': {'args': args, 'kwargs': kwargs}
139 'params': {'args': args, 'kwargs': kwargs}
140 }
140 }
141 return _remote_call(
141 return _remote_call(
142 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
142 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
143
143
144
144
145 class RemoteVCSMaker(object):
145 class RemoteVCSMaker(object):
146
146
147 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
147 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
148 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
148 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
149 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
149 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
150
150
151 self._session_factory = session_factory
151 self._session_factory = session_factory
152 self.backend_type = backend_type
152 self.backend_type = backend_type
153
153
154 @classmethod
154 @classmethod
155 def init_cache_region(cls, repo_id):
155 def init_cache_region(cls, repo_id):
156 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
156 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
157 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
157 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
158 return region, cache_namespace_uid
158 return region, cache_namespace_uid
159
159
160 def __call__(self, path, repo_id, config, with_wire=None):
160 def __call__(self, path, repo_id, config, with_wire=None):
161 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
161 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
162 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
162 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
163
163
164 def __getattr__(self, name):
164 def __getattr__(self, name):
165 def remote_attr(*args, **kwargs):
165 def remote_attr(*args, **kwargs):
166 return self._call(name, *args, **kwargs)
166 return self._call(name, *args, **kwargs)
167 return remote_attr
167 return remote_attr
168
168
169 @exceptions.map_vcs_exceptions
169 @exceptions.map_vcs_exceptions
170 def _call(self, func_name, *args, **kwargs):
170 def _call(self, func_name, *args, **kwargs):
171 payload = {
171 payload = {
172 'id': str(uuid.uuid4()),
172 'id': str(uuid.uuid4()),
173 'method': func_name,
173 'method': func_name,
174 'backend': self.backend_type,
174 'backend': self.backend_type,
175 'params': {'args': args, 'kwargs': kwargs}
175 'params': {'args': args, 'kwargs': kwargs}
176 }
176 }
177 url = self.url
177 url = self.url
178 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
178 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
179
179
180
180
181 class RemoteRepo(object):
181 class RemoteRepo(object):
182 CHUNK_SIZE = 16384
182 CHUNK_SIZE = 16384
183
183
184 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
184 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
185 self.url = remote_maker.url
185 self.url = remote_maker.url
186 self.stream_url = remote_maker.stream_url
186 self.stream_url = remote_maker.stream_url
187 self._session = remote_maker._session_factory()
187 self._session = remote_maker._session_factory()
188
188 cache_repo_id = self._repo_id_sanitizer(repo_id)
189 cache_repo_id = self._repo_id_sanitizer(repo_id)
189 self._cache_region, self._cache_namespace = \
190 self._cache_region, self._cache_namespace = \
190 remote_maker.init_cache_region(self._repo_id_sanitizer(cache_repo_id))
191 remote_maker.init_cache_region(cache_repo_id)
191
192
192 with_wire = with_wire or {}
193 with_wire = with_wire or {}
193
194
194 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
195 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
195 self._wire = {
196 self._wire = {
196 "path": path, # repo path
197 "path": path, # repo path
197 "repo_id": repo_id,
198 "repo_id": repo_id,
198 "cache_repo_id": cache_repo_id,
199 "cache_repo_id": cache_repo_id,
199 "config": config,
200 "config": config,
200 "repo_state_uid": repo_state_uid,
201 "repo_state_uid": repo_state_uid,
201 "context": self._create_vcs_cache_context(path, repo_state_uid)
202 "context": self._create_vcs_cache_context(path, repo_state_uid)
202 }
203 }
203
204
204 if with_wire:
205 if with_wire:
205 self._wire.update(with_wire)
206 self._wire.update(with_wire)
206
207
207 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
208 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
208 # log.debug brings a few percent gain even if is is not active.
209 # log.debug brings a few percent gain even if is is not active.
209 if log.isEnabledFor(logging.DEBUG):
210 if log.isEnabledFor(logging.DEBUG):
210 self._call_with_logging = True
211 self._call_with_logging = True
211
212
212 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
213 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
213
214
214 def _repo_id_sanitizer(self, repo_id):
215 def _repo_id_sanitizer(self, repo_id):
215 return repo_id.replace('/', '__')
216 return repo_id.replace('/', '__').replace('-', '_')
216
217
217 def __getattr__(self, name):
218 def __getattr__(self, name):
218
219
219 if name.startswith('stream:'):
220 if name.startswith('stream:'):
220 def repo_remote_attr(*args, **kwargs):
221 def repo_remote_attr(*args, **kwargs):
221 return self._call_stream(name, *args, **kwargs)
222 return self._call_stream(name, *args, **kwargs)
222 else:
223 else:
223 def repo_remote_attr(*args, **kwargs):
224 def repo_remote_attr(*args, **kwargs):
224 return self._call(name, *args, **kwargs)
225 return self._call(name, *args, **kwargs)
225
226
226 return repo_remote_attr
227 return repo_remote_attr
227
228
228 def _base_call(self, name, *args, **kwargs):
229 def _base_call(self, name, *args, **kwargs):
229 # TODO: oliver: This is currently necessary pre-call since the
230 # TODO: oliver: This is currently necessary pre-call since the
230 # config object is being changed for hooking scenarios
231 # config object is being changed for hooking scenarios
231 wire = copy.deepcopy(self._wire)
232 wire = copy.deepcopy(self._wire)
232 wire["config"] = wire["config"].serialize()
233 wire["config"] = wire["config"].serialize()
233 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
234 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
234
235
235 payload = {
236 payload = {
236 'id': str(uuid.uuid4()),
237 'id': str(uuid.uuid4()),
237 'method': name,
238 'method': name,
238 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
239 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
239 }
240 }
240
241
241 context_uid = wire.get('context')
242 context_uid = wire.get('context')
242 return context_uid, payload
243 return context_uid, payload
243
244
244 @exceptions.map_vcs_exceptions
245 @exceptions.map_vcs_exceptions
245 def _call(self, name, *args, **kwargs):
246 def _call(self, name, *args, **kwargs):
246 context_uid, payload = self._base_call(name, *args, **kwargs)
247 context_uid, payload = self._base_call(name, *args, **kwargs)
247 url = self.url
248 url = self.url
248
249
249 start = time.time()
250 start = time.time()
250
251
251 cache_on = False
252 cache_on = False
252 cache_key = ''
253 cache_key = ''
253 local_cache_on = rhodecode.CONFIG.get('vcs.methods.cache')
254 local_cache_on = rhodecode.CONFIG.get('vcs.methods.cache')
254
255
255 cache_methods = [
256 cache_methods = [
256 'branches', 'tags', 'bookmarks',
257 'branches', 'tags', 'bookmarks',
257 'is_large_file', 'is_binary', 'fctx_size', 'node_history', 'blob_raw_length',
258 'is_large_file', 'is_binary', 'fctx_size', 'node_history', 'blob_raw_length',
258 'revision', 'tree_items',
259 'revision', 'tree_items',
259 'ctx_list',
260 'ctx_list',
260 'bulk_request',
261 'bulk_request',
261 ]
262 ]
262
263
263 if local_cache_on and name in cache_methods:
264 if local_cache_on and name in cache_methods:
264 cache_on = True
265 cache_on = True
265 repo_state_uid = self._wire['repo_state_uid']
266 repo_state_uid = self._wire['repo_state_uid']
266 call_args = [a for a in args]
267 call_args = [a for a in args]
267 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
268 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
268
269
269 @self._cache_region.conditional_cache_on_arguments(
270 @self._cache_region.conditional_cache_on_arguments(
270 namespace=self._cache_namespace, condition=cache_on and cache_key)
271 namespace=self._cache_namespace, condition=cache_on and cache_key)
271 def remote_call(_cache_key):
272 def remote_call(_cache_key):
272 if self._call_with_logging:
273 if self._call_with_logging:
273 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
274 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
274 url, name, args, context_uid, cache_on)
275 url, name, args, context_uid, cache_on)
275 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
276 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
276
277
277 result = remote_call(cache_key)
278 result = remote_call(cache_key)
278 if self._call_with_logging:
279 if self._call_with_logging:
279 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
280 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
280 url, name, time.time()-start, context_uid)
281 url, name, time.time()-start, context_uid)
281 return result
282 return result
282
283
283 @exceptions.map_vcs_exceptions
284 @exceptions.map_vcs_exceptions
284 def _call_stream(self, name, *args, **kwargs):
285 def _call_stream(self, name, *args, **kwargs):
285 context_uid, payload = self._base_call(name, *args, **kwargs)
286 context_uid, payload = self._base_call(name, *args, **kwargs)
286 payload['chunk_size'] = self.CHUNK_SIZE
287 payload['chunk_size'] = self.CHUNK_SIZE
287 url = self.stream_url
288 url = self.stream_url
288
289
289 start = time.time()
290 start = time.time()
290 if self._call_with_logging:
291 if self._call_with_logging:
291 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s',
292 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s',
292 url, name, args, context_uid)
293 url, name, args, context_uid)
293
294
294 result = _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session,
295 result = _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session,
295 self.CHUNK_SIZE)
296 self.CHUNK_SIZE)
296
297
297 if self._call_with_logging:
298 if self._call_with_logging:
298 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
299 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
299 url, name, time.time()-start, context_uid)
300 url, name, time.time()-start, context_uid)
300 return result
301 return result
301
302
302 def __getitem__(self, key):
303 def __getitem__(self, key):
303 return self.revision(key)
304 return self.revision(key)
304
305
305 def _create_vcs_cache_context(self, *args):
306 def _create_vcs_cache_context(self, *args):
306 """
307 """
307 Creates a unique string which is passed to the VCSServer on every
308 Creates a unique string which is passed to the VCSServer on every
308 remote call. It is used as cache key in the VCSServer.
309 remote call. It is used as cache key in the VCSServer.
309 """
310 """
310 hash_key = '-'.join(map(str, args))
311 hash_key = '-'.join(map(str, args))
311 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
312 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
312
313
313 def invalidate_vcs_cache(self):
314 def invalidate_vcs_cache(self):
314 """
315 """
315 This invalidates the context which is sent to the VCSServer on every
316 This invalidates the context which is sent to the VCSServer on every
316 call to a remote method. It forces the VCSServer to create a fresh
317 call to a remote method. It forces the VCSServer to create a fresh
317 repository instance on the next call to a remote method.
318 repository instance on the next call to a remote method.
318 """
319 """
319 self._wire['context'] = str(uuid.uuid4())
320 self._wire['context'] = str(uuid.uuid4())
320
321
321
322
322 class VcsHttpProxy(object):
323 class VcsHttpProxy(object):
323
324
324 CHUNK_SIZE = 16384
325 CHUNK_SIZE = 16384
325
326
326 def __init__(self, server_and_port, backend_endpoint):
327 def __init__(self, server_and_port, backend_endpoint):
327 retries = Retry(total=5, connect=None, read=None, redirect=None)
328 retries = Retry(total=5, connect=None, read=None, redirect=None)
328
329
329 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
330 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
330 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
331 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
331 self.session = requests.Session()
332 self.session = requests.Session()
332 self.session.mount('http://', adapter)
333 self.session.mount('http://', adapter)
333
334
334 def handle(self, environment, input_data, *args, **kwargs):
335 def handle(self, environment, input_data, *args, **kwargs):
335 data = {
336 data = {
336 'environment': environment,
337 'environment': environment,
337 'input_data': input_data,
338 'input_data': input_data,
338 'args': args,
339 'args': args,
339 'kwargs': kwargs
340 'kwargs': kwargs
340 }
341 }
341 result = self.session.post(
342 result = self.session.post(
342 self.base_url, msgpack.packb(data), stream=True)
343 self.base_url, msgpack.packb(data), stream=True)
343 return self._get_result(result)
344 return self._get_result(result)
344
345
345 def _deserialize_and_raise(self, error):
346 def _deserialize_and_raise(self, error):
346 exception = Exception(error['message'])
347 exception = Exception(error['message'])
347 try:
348 try:
348 exception._vcs_kind = error['_vcs_kind']
349 exception._vcs_kind = error['_vcs_kind']
349 except KeyError:
350 except KeyError:
350 pass
351 pass
351 raise exception
352 raise exception
352
353
353 def _iterate(self, result):
354 def _iterate(self, result):
354 unpacker = msgpack.Unpacker()
355 unpacker = msgpack.Unpacker()
355 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
356 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
356 unpacker.feed(line)
357 unpacker.feed(line)
357 for chunk in unpacker:
358 for chunk in unpacker:
358 yield chunk
359 yield chunk
359
360
360 def _get_result(self, result):
361 def _get_result(self, result):
361 iterator = self._iterate(result)
362 iterator = self._iterate(result)
362 error = iterator.next()
363 error = iterator.next()
363 if error:
364 if error:
364 self._deserialize_and_raise(error)
365 self._deserialize_and_raise(error)
365
366
366 status = iterator.next()
367 status = iterator.next()
367 headers = iterator.next()
368 headers = iterator.next()
368
369
369 return iterator, status, headers
370 return iterator, status, headers
370
371
371
372
372 class ThreadlocalSessionFactory(object):
373 class ThreadlocalSessionFactory(object):
373 """
374 """
374 Creates one CurlSession per thread on demand.
375 Creates one CurlSession per thread on demand.
375 """
376 """
376
377
377 def __init__(self):
378 def __init__(self):
378 self._thread_local = threading.local()
379 self._thread_local = threading.local()
379
380
380 def __call__(self):
381 def __call__(self):
381 if not hasattr(self._thread_local, 'curl_session'):
382 if not hasattr(self._thread_local, 'curl_session'):
382 self._thread_local.curl_session = CurlSession()
383 self._thread_local.curl_session = CurlSession()
383 return self._thread_local.curl_session
384 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now