##// END OF EJS Templates
core: properly report 502 errors for gevent and gunicorn....
marcink -
r2554:4c3d007d default
parent child Browse files
Show More
@@ -1,139 +1,140 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2018 RhodeCode GmbH
3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Set of custom exceptions used in RhodeCode
22 Set of custom exceptions used in RhodeCode
23 """
23 """
24
24
25 from webob.exc import HTTPClientError
25 from webob.exc import HTTPClientError
26 from pyramid.httpexceptions import HTTPBadGateway
26 from pyramid.httpexceptions import HTTPBadGateway
27
27
28
28
29 class LdapUsernameError(Exception):
29 class LdapUsernameError(Exception):
30 pass
30 pass
31
31
32
32
33 class LdapPasswordError(Exception):
33 class LdapPasswordError(Exception):
34 pass
34 pass
35
35
36
36
37 class LdapConnectionError(Exception):
37 class LdapConnectionError(Exception):
38 pass
38 pass
39
39
40
40
41 class LdapImportError(Exception):
41 class LdapImportError(Exception):
42 pass
42 pass
43
43
44
44
45 class DefaultUserException(Exception):
45 class DefaultUserException(Exception):
46 pass
46 pass
47
47
48
48
49 class UserOwnsReposException(Exception):
49 class UserOwnsReposException(Exception):
50 pass
50 pass
51
51
52
52
53 class UserOwnsRepoGroupsException(Exception):
53 class UserOwnsRepoGroupsException(Exception):
54 pass
54 pass
55
55
56
56
57 class UserOwnsUserGroupsException(Exception):
57 class UserOwnsUserGroupsException(Exception):
58 pass
58 pass
59
59
60
60
61 class UserGroupAssignedException(Exception):
61 class UserGroupAssignedException(Exception):
62 pass
62 pass
63
63
64
64
65 class StatusChangeOnClosedPullRequestError(Exception):
65 class StatusChangeOnClosedPullRequestError(Exception):
66 pass
66 pass
67
67
68
68
69 class AttachedForksError(Exception):
69 class AttachedForksError(Exception):
70 pass
70 pass
71
71
72
72
73 class RepoGroupAssignmentError(Exception):
73 class RepoGroupAssignmentError(Exception):
74 pass
74 pass
75
75
76
76
77 class NonRelativePathError(Exception):
77 class NonRelativePathError(Exception):
78 pass
78 pass
79
79
80
80
81 class HTTPRequirementError(HTTPClientError):
81 class HTTPRequirementError(HTTPClientError):
82 title = explanation = 'Repository Requirement Missing'
82 title = explanation = 'Repository Requirement Missing'
83 reason = None
83 reason = None
84
84
85 def __init__(self, message, *args, **kwargs):
85 def __init__(self, message, *args, **kwargs):
86 self.title = self.explanation = message
86 self.title = self.explanation = message
87 super(HTTPRequirementError, self).__init__(*args, **kwargs)
87 super(HTTPRequirementError, self).__init__(*args, **kwargs)
88 self.args = (message, )
88 self.args = (message, )
89
89
90
90
91 class HTTPLockedRC(HTTPClientError):
91 class HTTPLockedRC(HTTPClientError):
92 """
92 """
93 Special Exception For locked Repos in RhodeCode, the return code can
93 Special Exception For locked Repos in RhodeCode, the return code can
94 be overwritten by _code keyword argument passed into constructors
94 be overwritten by _code keyword argument passed into constructors
95 """
95 """
96 code = 423
96 code = 423
97 title = explanation = 'Repository Locked'
97 title = explanation = 'Repository Locked'
98 reason = None
98 reason = None
99
99
100 def __init__(self, message, *args, **kwargs):
100 def __init__(self, message, *args, **kwargs):
101 from rhodecode import CONFIG
101 from rhodecode import CONFIG
102 from rhodecode.lib.utils2 import safe_int
102 from rhodecode.lib.utils2 import safe_int
103 _code = CONFIG.get('lock_ret_code')
103 _code = CONFIG.get('lock_ret_code')
104 self.code = safe_int(_code, self.code)
104 self.code = safe_int(_code, self.code)
105 self.title = self.explanation = message
105 self.title = self.explanation = message
106 super(HTTPLockedRC, self).__init__(*args, **kwargs)
106 super(HTTPLockedRC, self).__init__(*args, **kwargs)
107 self.args = (message, )
107 self.args = (message, )
108
108
109
109
110 class IMCCommitError(Exception):
110 class IMCCommitError(Exception):
111 pass
111 pass
112
112
113
113
114 class UserCreationError(Exception):
114 class UserCreationError(Exception):
115 pass
115 pass
116
116
117
117
118 class NotAllowedToCreateUserError(Exception):
118 class NotAllowedToCreateUserError(Exception):
119 pass
119 pass
120
120
121
121
122 class RepositoryCreationError(Exception):
122 class RepositoryCreationError(Exception):
123 pass
123 pass
124
124
125
125
126 class VCSServerUnavailable(HTTPBadGateway):
126 class VCSServerUnavailable(HTTPBadGateway):
127 """ HTTP Exception class for VCS Server errors """
127 """ HTTP Exception class for VCS Server errors """
128 code = 502
128 code = 502
129 title = 'VCS Server Error'
129 title = 'VCS Server Error'
130 causes = [
130 causes = [
131 'VCS Server is not running',
131 'VCS Server is not running',
132 'Incorrect vcs.server=host:port',
132 'Incorrect vcs.server=host:port',
133 'Incorrect vcs.server.protocol',
133 'Incorrect vcs.server.protocol',
134 ]
134 ]
135
135 def __init__(self, message=''):
136 def __init__(self, message=''):
136 self.explanation = 'Could not connect to VCS Server'
137 self.explanation = 'Could not connect to VCS Server'
137 if message:
138 if message:
138 self.explanation += ': ' + message
139 self.explanation += ': ' + message
139 super(VCSServerUnavailable, self).__init__()
140 super(VCSServerUnavailable, self).__init__()
@@ -1,295 +1,302 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2018 RhodeCode GmbH
3 # Copyright (C) 2016-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Client for the VCSServer implemented based on HTTP.
22 Client for the VCSServer implemented based on HTTP.
23 """
23 """
24
24
25 import copy
25 import copy
26 import logging
26 import logging
27 import threading
27 import threading
28 import urllib2
28 import urllib2
29 import urlparse
29 import urlparse
30 import uuid
30 import uuid
31
31
32 import pycurl
32 import pycurl
33 import msgpack
33 import msgpack
34 import requests
34 import requests
35 from requests.packages.urllib3.util.retry import Retry
35 from requests.packages.urllib3.util.retry import Retry
36
36
37 from . import exceptions, CurlSession
37 from . import exceptions, CurlSession
38
38
39
39
40 log = logging.getLogger(__name__)
40 log = logging.getLogger(__name__)
41
41
42
42
43 # TODO: mikhail: Keep it in sync with vcsserver's
43 # TODO: mikhail: Keep it in sync with vcsserver's
44 # HTTPApplication.ALLOWED_EXCEPTIONS
44 # HTTPApplication.ALLOWED_EXCEPTIONS
45 EXCEPTIONS_MAP = {
45 EXCEPTIONS_MAP = {
46 'KeyError': KeyError,
46 'KeyError': KeyError,
47 'URLError': urllib2.URLError,
47 'URLError': urllib2.URLError,
48 }
48 }
49
49
50
50
51 class RepoMaker(object):
51 class RepoMaker(object):
52
52
53 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
53 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
54 self.url = urlparse.urljoin(
54 self.url = urlparse.urljoin(
55 'http://%s' % server_and_port, backend_endpoint)
55 'http://%s' % server_and_port, backend_endpoint)
56 self._session_factory = session_factory
56 self._session_factory = session_factory
57 self.backend_type = backend_type
57 self.backend_type = backend_type
58
58
59 def __call__(self, path, config, with_wire=None):
59 def __call__(self, path, config, with_wire=None):
60 log.debug('RepoMaker call on %s', path)
60 log.debug('RepoMaker call on %s', path)
61 return RemoteRepo(
61 return RemoteRepo(
62 path, config, self.url, self._session_factory(),
62 path, config, self.url, self._session_factory(),
63 with_wire=with_wire)
63 with_wire=with_wire)
64
64
65 def __getattr__(self, name):
65 def __getattr__(self, name):
66 def f(*args, **kwargs):
66 def f(*args, **kwargs):
67 return self._call(name, *args, **kwargs)
67 return self._call(name, *args, **kwargs)
68 return f
68 return f
69
69
70 @exceptions.map_vcs_exceptions
70 @exceptions.map_vcs_exceptions
71 def _call(self, name, *args, **kwargs):
71 def _call(self, name, *args, **kwargs):
72 payload = {
72 payload = {
73 'id': str(uuid.uuid4()),
73 'id': str(uuid.uuid4()),
74 'method': name,
74 'method': name,
75 'backend': self.backend_type,
75 'backend': self.backend_type,
76 'params': {'args': args, 'kwargs': kwargs}
76 'params': {'args': args, 'kwargs': kwargs}
77 }
77 }
78 return _remote_call(
78 return _remote_call(
79 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
79 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
80
80
81
81
82 class ServiceConnection(object):
82 class ServiceConnection(object):
83 def __init__(self, server_and_port, backend_endpoint, session_factory):
83 def __init__(self, server_and_port, backend_endpoint, session_factory):
84 self.url = urlparse.urljoin(
84 self.url = urlparse.urljoin(
85 'http://%s' % server_and_port, backend_endpoint)
85 'http://%s' % server_and_port, backend_endpoint)
86 self._session_factory = session_factory
86 self._session_factory = session_factory
87
87
88 def __getattr__(self, name):
88 def __getattr__(self, name):
89 def f(*args, **kwargs):
89 def f(*args, **kwargs):
90 return self._call(name, *args, **kwargs)
90 return self._call(name, *args, **kwargs)
91
91
92 return f
92 return f
93
93
94 @exceptions.map_vcs_exceptions
94 @exceptions.map_vcs_exceptions
95 def _call(self, name, *args, **kwargs):
95 def _call(self, name, *args, **kwargs):
96 payload = {
96 payload = {
97 'id': str(uuid.uuid4()),
97 'id': str(uuid.uuid4()),
98 'method': name,
98 'method': name,
99 'params': {'args': args, 'kwargs': kwargs}
99 'params': {'args': args, 'kwargs': kwargs}
100 }
100 }
101 return _remote_call(
101 return _remote_call(
102 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
102 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
103
103
104
104
105 class RemoteRepo(object):
105 class RemoteRepo(object):
106
106
107 def __init__(self, path, config, url, session, with_wire=None):
107 def __init__(self, path, config, url, session, with_wire=None):
108 self.url = url
108 self.url = url
109 self._session = session
109 self._session = session
110 self._wire = {
110 self._wire = {
111 "path": path,
111 "path": path,
112 "config": config,
112 "config": config,
113 "context": self._create_vcs_cache_context(),
113 "context": self._create_vcs_cache_context(),
114 }
114 }
115 if with_wire:
115 if with_wire:
116 self._wire.update(with_wire)
116 self._wire.update(with_wire)
117
117
118 # johbo: Trading complexity for performance. Avoiding the call to
118 # johbo: Trading complexity for performance. Avoiding the call to
119 # log.debug brings a few percent gain even if is is not active.
119 # log.debug brings a few percent gain even if is is not active.
120 if log.isEnabledFor(logging.DEBUG):
120 if log.isEnabledFor(logging.DEBUG):
121 self._call = self._call_with_logging
121 self._call = self._call_with_logging
122
122
123 def __getattr__(self, name):
123 def __getattr__(self, name):
124 def f(*args, **kwargs):
124 def f(*args, **kwargs):
125 return self._call(name, *args, **kwargs)
125 return self._call(name, *args, **kwargs)
126 return f
126 return f
127
127
128 @exceptions.map_vcs_exceptions
128 @exceptions.map_vcs_exceptions
129 def _call(self, name, *args, **kwargs):
129 def _call(self, name, *args, **kwargs):
130 # TODO: oliver: This is currently necessary pre-call since the
130 # TODO: oliver: This is currently necessary pre-call since the
131 # config object is being changed for hooking scenarios
131 # config object is being changed for hooking scenarios
132 wire = copy.deepcopy(self._wire)
132 wire = copy.deepcopy(self._wire)
133 wire["config"] = wire["config"].serialize()
133 wire["config"] = wire["config"].serialize()
134 payload = {
134 payload = {
135 'id': str(uuid.uuid4()),
135 'id': str(uuid.uuid4()),
136 'method': name,
136 'method': name,
137 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
137 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
138 }
138 }
139 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
139 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
140
140
141 def _call_with_logging(self, name, *args, **kwargs):
141 def _call_with_logging(self, name, *args, **kwargs):
142
142
143 log.debug('Calling %s@%s with args:%r', self.url, name, args)
143 log.debug('Calling %s@%s with args:%r', self.url, name, args)
144 return RemoteRepo._call(self, name, *args, **kwargs)
144 return RemoteRepo._call(self, name, *args, **kwargs)
145
145
146 def __getitem__(self, key):
146 def __getitem__(self, key):
147 return self.revision(key)
147 return self.revision(key)
148
148
149 def _create_vcs_cache_context(self):
149 def _create_vcs_cache_context(self):
150 """
150 """
151 Creates a unique string which is passed to the VCSServer on every
151 Creates a unique string which is passed to the VCSServer on every
152 remote call. It is used as cache key in the VCSServer.
152 remote call. It is used as cache key in the VCSServer.
153 """
153 """
154 return str(uuid.uuid4())
154 return str(uuid.uuid4())
155
155
156 def invalidate_vcs_cache(self):
156 def invalidate_vcs_cache(self):
157 """
157 """
158 This invalidates the context which is sent to the VCSServer on every
158 This invalidates the context which is sent to the VCSServer on every
159 call to a remote method. It forces the VCSServer to create a fresh
159 call to a remote method. It forces the VCSServer to create a fresh
160 repository instance on the next call to a remote method.
160 repository instance on the next call to a remote method.
161 """
161 """
162 self._wire['context'] = self._create_vcs_cache_context()
162 self._wire['context'] = self._create_vcs_cache_context()
163
163
164
164
165 class RemoteObject(object):
165 class RemoteObject(object):
166
166
167 def __init__(self, url, session):
167 def __init__(self, url, session):
168 self._url = url
168 self._url = url
169 self._session = session
169 self._session = session
170
170
171 # johbo: Trading complexity for performance. Avoiding the call to
171 # johbo: Trading complexity for performance. Avoiding the call to
172 # log.debug brings a few percent gain even if is is not active.
172 # log.debug brings a few percent gain even if is is not active.
173 if log.isEnabledFor(logging.DEBUG):
173 if log.isEnabledFor(logging.DEBUG):
174 self._call = self._call_with_logging
174 self._call = self._call_with_logging
175
175
176 def __getattr__(self, name):
176 def __getattr__(self, name):
177 def f(*args, **kwargs):
177 def f(*args, **kwargs):
178 return self._call(name, *args, **kwargs)
178 return self._call(name, *args, **kwargs)
179 return f
179 return f
180
180
181 @exceptions.map_vcs_exceptions
181 @exceptions.map_vcs_exceptions
182 def _call(self, name, *args, **kwargs):
182 def _call(self, name, *args, **kwargs):
183 payload = {
183 payload = {
184 'id': str(uuid.uuid4()),
184 'id': str(uuid.uuid4()),
185 'method': name,
185 'method': name,
186 'params': {'args': args, 'kwargs': kwargs}
186 'params': {'args': args, 'kwargs': kwargs}
187 }
187 }
188 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
188 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
189
189
190 def _call_with_logging(self, name, *args, **kwargs):
190 def _call_with_logging(self, name, *args, **kwargs):
191 log.debug('Calling %s@%s', self._url, name)
191 log.debug('Calling %s@%s', self._url, name)
192 return RemoteObject._call(self, name, *args, **kwargs)
192 return RemoteObject._call(self, name, *args, **kwargs)
193
193
194
194
195 def _remote_call(url, payload, exceptions_map, session):
195 def _remote_call(url, payload, exceptions_map, session):
196 try:
196 try:
197 response = session.post(url, data=msgpack.packb(payload))
197 response = session.post(url, data=msgpack.packb(payload))
198 except pycurl.error as e:
198 except pycurl.error as e:
199 raise exceptions.HttpVCSCommunicationError(e)
199 raise exceptions.HttpVCSCommunicationError(e)
200 except Exception as e:
201 message = getattr(e, 'message', '')
202 if 'Failed to connect' in message:
203 # gevent doesn't return proper pycurl errors
204 raise exceptions.HttpVCSCommunicationError(e)
205 else:
206 raise
200
207
201 if response.status_code >= 400:
208 if response.status_code >= 400:
202 log.error('Call to %s returned non 200 HTTP code: %s',
209 log.error('Call to %s returned non 200 HTTP code: %s',
203 url, response.status_code)
210 url, response.status_code)
204 raise exceptions.HttpVCSCommunicationError(repr(response.content))
211 raise exceptions.HttpVCSCommunicationError(repr(response.content))
205
212
206 try:
213 try:
207 response = msgpack.unpackb(response.content)
214 response = msgpack.unpackb(response.content)
208 except Exception:
215 except Exception:
209 log.exception('Failed to decode response %r', response.content)
216 log.exception('Failed to decode response %r', response.content)
210 raise
217 raise
211
218
212 error = response.get('error')
219 error = response.get('error')
213 if error:
220 if error:
214 type_ = error.get('type', 'Exception')
221 type_ = error.get('type', 'Exception')
215 exc = exceptions_map.get(type_, Exception)
222 exc = exceptions_map.get(type_, Exception)
216 exc = exc(error.get('message'))
223 exc = exc(error.get('message'))
217 try:
224 try:
218 exc._vcs_kind = error['_vcs_kind']
225 exc._vcs_kind = error['_vcs_kind']
219 except KeyError:
226 except KeyError:
220 pass
227 pass
221
228
222 try:
229 try:
223 exc._vcs_server_traceback = error['traceback']
230 exc._vcs_server_traceback = error['traceback']
224 except KeyError:
231 except KeyError:
225 pass
232 pass
226
233
227 raise exc
234 raise exc
228 return response.get('result')
235 return response.get('result')
229
236
230
237
231 class VcsHttpProxy(object):
238 class VcsHttpProxy(object):
232
239
233 CHUNK_SIZE = 16384
240 CHUNK_SIZE = 16384
234
241
235 def __init__(self, server_and_port, backend_endpoint):
242 def __init__(self, server_and_port, backend_endpoint):
236
243
237
244
238 retries = Retry(total=5, connect=None, read=None, redirect=None)
245 retries = Retry(total=5, connect=None, read=None, redirect=None)
239
246
240 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
247 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
241 self.base_url = urlparse.urljoin(
248 self.base_url = urlparse.urljoin(
242 'http://%s' % server_and_port, backend_endpoint)
249 'http://%s' % server_and_port, backend_endpoint)
243 self.session = requests.Session()
250 self.session = requests.Session()
244 self.session.mount('http://', adapter)
251 self.session.mount('http://', adapter)
245
252
246 def handle(self, environment, input_data, *args, **kwargs):
253 def handle(self, environment, input_data, *args, **kwargs):
247 data = {
254 data = {
248 'environment': environment,
255 'environment': environment,
249 'input_data': input_data,
256 'input_data': input_data,
250 'args': args,
257 'args': args,
251 'kwargs': kwargs
258 'kwargs': kwargs
252 }
259 }
253 result = self.session.post(
260 result = self.session.post(
254 self.base_url, msgpack.packb(data), stream=True)
261 self.base_url, msgpack.packb(data), stream=True)
255 return self._get_result(result)
262 return self._get_result(result)
256
263
257 def _deserialize_and_raise(self, error):
264 def _deserialize_and_raise(self, error):
258 exception = Exception(error['message'])
265 exception = Exception(error['message'])
259 try:
266 try:
260 exception._vcs_kind = error['_vcs_kind']
267 exception._vcs_kind = error['_vcs_kind']
261 except KeyError:
268 except KeyError:
262 pass
269 pass
263 raise exception
270 raise exception
264
271
265 def _iterate(self, result):
272 def _iterate(self, result):
266 unpacker = msgpack.Unpacker()
273 unpacker = msgpack.Unpacker()
267 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
274 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
268 unpacker.feed(line)
275 unpacker.feed(line)
269 for chunk in unpacker:
276 for chunk in unpacker:
270 yield chunk
277 yield chunk
271
278
272 def _get_result(self, result):
279 def _get_result(self, result):
273 iterator = self._iterate(result)
280 iterator = self._iterate(result)
274 error = iterator.next()
281 error = iterator.next()
275 if error:
282 if error:
276 self._deserialize_and_raise(error)
283 self._deserialize_and_raise(error)
277
284
278 status = iterator.next()
285 status = iterator.next()
279 headers = iterator.next()
286 headers = iterator.next()
280
287
281 return iterator, status, headers
288 return iterator, status, headers
282
289
283
290
284 class ThreadlocalSessionFactory(object):
291 class ThreadlocalSessionFactory(object):
285 """
292 """
286 Creates one CurlSession per thread on demand.
293 Creates one CurlSession per thread on demand.
287 """
294 """
288
295
289 def __init__(self):
296 def __init__(self):
290 self._thread_local = threading.local()
297 self._thread_local = threading.local()
291
298
292 def __call__(self):
299 def __call__(self):
293 if not hasattr(self._thread_local, 'curl_session'):
300 if not hasattr(self._thread_local, 'curl_session'):
294 self._thread_local.curl_session = CurlSession()
301 self._thread_local.curl_session = CurlSession()
295 return self._thread_local.curl_session
302 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now