##// END OF EJS Templates
http: ported old depracated retry format for http client.
marcink -
r1958:7ffffec9 default
parent child Browse files
Show More
@@ -1,277 +1,276 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2012-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 from __future__ import unicode_literals
22 22 import string
23 23 from collections import OrderedDict
24 24
25 25 import deform
26 26 import logging
27 27 import requests
28 28 import colander
29 29 from celery.task import task
30 30 from requests.packages.urllib3.util.retry import Retry
31 31
32 32 from rhodecode import events
33 33 from rhodecode.translation import _
34 34 from rhodecode.integrations.types.base import IntegrationTypeBase
35 35
36 36 log = logging.getLogger(__name__)
37 37
38 38 # updating this required to update the `common_vars` passed in url calling func
39 39 WEBHOOK_URL_VARS = [
40 40 'repo_name',
41 41 'repo_type',
42 42 'repo_id',
43 43 'repo_url',
44 44 # extra repo fields
45 45 'extra:<extra_key_name>',
46 46
47 47 # special attrs below that we handle, using multi-call
48 48 'branch',
49 49 'commit_id',
50 50
51 51 # pr events vars
52 52 'pull_request_id',
53 53 'pull_request_url',
54 54
55 55 # user who triggers the call
56 56 'username',
57 57 'user_id',
58 58
59 59 ]
60 60 URL_VARS = ', '.join('${' + x + '}' for x in WEBHOOK_URL_VARS)
61 61
62 62
63 63 class WebhookHandler(object):
64 64 def __init__(self, template_url, secret_token):
65 65 self.template_url = template_url
66 66 self.secret_token = secret_token
67 67
68 68 def get_base_parsed_template(self, data):
69 69 """
70 70 initially parses the passed in template with some common variables
71 71 available on ALL calls
72 72 """
73 73 # note: make sure to update the `WEBHOOK_URL_VARS` if this changes
74 74 common_vars = {
75 75 'repo_name': data['repo']['repo_name'],
76 76 'repo_type': data['repo']['repo_type'],
77 77 'repo_id': data['repo']['repo_id'],
78 78 'repo_url': data['repo']['url'],
79 79 'username': data['actor']['username'],
80 80 'user_id': data['actor']['user_id']
81 81 }
82 82 extra_vars = {}
83 83 for extra_key, extra_val in data['repo']['extra_fields'].items():
84 84 extra_vars['extra:{}'.format(extra_key)] = extra_val
85 85 common_vars.update(extra_vars)
86 86
87 87 return string.Template(
88 88 self.template_url).safe_substitute(**common_vars)
89 89
90 90 def repo_push_event_handler(self, event, data):
91 91 url = self.get_base_parsed_template(data)
92 92 url_cals = []
93 93 branch_data = OrderedDict()
94 94 for obj in data['push']['branches']:
95 95 branch_data[obj['name']] = obj
96 96
97 97 branches_commits = OrderedDict()
98 98 for commit in data['push']['commits']:
99 99 if commit['branch'] not in branches_commits:
100 100 branch_commits = {'branch': branch_data[commit['branch']],
101 101 'commits': []}
102 102 branches_commits[commit['branch']] = branch_commits
103 103
104 104 branch_commits = branches_commits[commit['branch']]
105 105 branch_commits['commits'].append(commit)
106 106
107 107 if '${branch}' in url:
108 108 # call it multiple times, for each branch if used in variables
109 109 for branch, commit_ids in branches_commits.items():
110 110 branch_url = string.Template(url).safe_substitute(branch=branch)
111 111 # call further down for each commit if used
112 112 if '${commit_id}' in branch_url:
113 113 for commit_data in commit_ids['commits']:
114 114 commit_id = commit_data['raw_id']
115 115 commit_url = string.Template(branch_url).safe_substitute(
116 116 commit_id=commit_id)
117 117 # register per-commit call
118 118 log.debug(
119 119 'register webhook call(%s) to url %s', event, commit_url)
120 120 url_cals.append((commit_url, self.secret_token, data))
121 121
122 122 else:
123 123 # register per-branch call
124 124 log.debug(
125 125 'register webhook call(%s) to url %s', event, branch_url)
126 126 url_cals.append((branch_url, self.secret_token, data))
127 127
128 128 else:
129 129 log.debug(
130 130 'register webhook call(%s) to url %s', event, url)
131 131 url_cals.append((url, self.secret_token, data))
132 132
133 133 return url_cals
134 134
135 135 def repo_create_event_handler(self, event, data):
136 136 url = self.get_base_parsed_template(data)
137 137 log.debug(
138 138 'register webhook call(%s) to url %s', event, url)
139 139 return [(url, self.secret_token, data)]
140 140
141 141 def pull_request_event_handler(self, event, data):
142 142 url = self.get_base_parsed_template(data)
143 143 log.debug(
144 144 'register webhook call(%s) to url %s', event, url)
145 145 url = string.Template(url).safe_substitute(
146 146 pull_request_id=data['pullrequest']['pull_request_id'],
147 147 pull_request_url=data['pullrequest']['url'])
148 148 return [(url, self.secret_token, data)]
149 149
150 150 def __call__(self, event, data):
151 151 if isinstance(event, events.RepoPushEvent):
152 152 return self.repo_push_event_handler(event, data)
153 153 elif isinstance(event, events.RepoCreateEvent):
154 154 return self.repo_create_event_handler(event, data)
155 155 elif isinstance(event, events.PullRequestEvent):
156 156 return self.pull_request_event_handler(event, data)
157 157 else:
158 158 raise ValueError('event type not supported: %s' % events)
159 159
160 160
161 161 class WebhookSettingsSchema(colander.Schema):
162 162 url = colander.SchemaNode(
163 163 colander.String(),
164 164 title=_('Webhook URL'),
165 165 description=
166 166 _('URL of the webhook to receive POST event. Following variables '
167 167 'are allowed to be used: {vars}. Some of the variables would '
168 168 'trigger multiple calls, like ${{branch}} or ${{commit_id}}. '
169 169 'Webhook will be called as many times as unique objects in '
170 170 'data in such cases.').format(vars=URL_VARS),
171 171 missing=colander.required,
172 172 required=True,
173 173 validator=colander.url,
174 174 widget=deform.widget.TextInputWidget(
175 175 placeholder='https://www.example.com/webhook'
176 176 ),
177 177 )
178 178 secret_token = colander.SchemaNode(
179 179 colander.String(),
180 180 title=_('Secret Token'),
181 181 description=_('String used to validate received payloads.'),
182 182 default='',
183 183 missing='',
184 184 widget=deform.widget.TextInputWidget(
185 185 placeholder='secret_token'
186 186 ),
187 187 )
188 188 method_type = colander.SchemaNode(
189 189 colander.String(),
190 190 title=_('Call Method'),
191 191 description=_('Select if the webhook call should be made '
192 192 'with POST or GET.'),
193 193 default='post',
194 194 missing='',
195 195 widget=deform.widget.RadioChoiceWidget(
196 196 values=[('get', 'GET'), ('post', 'POST')],
197 197 inline=True
198 198 ),
199 199 )
200 200
201 201
202 202 class WebhookIntegrationType(IntegrationTypeBase):
203 203 key = 'webhook'
204 204 display_name = _('Webhook')
205 205 description = _('Post json events to a webhook endpoint')
206 206 icon = '''<?xml version="1.0" encoding="UTF-8" standalone="no"?><svg viewBox="0 0 256 239" version="1.1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" preserveAspectRatio="xMidYMid"><g><path d="M119.540432,100.502743 C108.930124,118.338815 98.7646301,135.611455 88.3876025,152.753617 C85.7226696,157.154315 84.4040417,160.738531 86.5332204,166.333309 C92.4107024,181.787152 84.1193605,196.825836 68.5350381,200.908244 C53.8383677,204.759349 39.5192953,195.099955 36.6032893,179.365384 C34.0194114,165.437749 44.8274148,151.78491 60.1824106,149.608284 C61.4694072,149.424428 62.7821041,149.402681 64.944891,149.240571 C72.469175,136.623655 80.1773157,123.700312 88.3025935,110.073173 C73.611854,95.4654658 64.8677898,78.3885437 66.803227,57.2292132 C68.1712787,42.2715849 74.0527146,29.3462646 84.8033863,18.7517722 C105.393354,-1.53572199 136.805164,-4.82141828 161.048542,10.7510424 C184.333097,25.7086706 194.996783,54.8450075 185.906752,79.7822957 C179.052655,77.9239597 172.151111,76.049808 164.563565,73.9917997 C167.418285,60.1274266 165.306899,47.6765751 155.95591,37.0109123 C149.777932,29.9690049 141.850349,26.2780332 132.835442,24.9178894 C114.764113,22.1877169 97.0209573,33.7983633 91.7563309,51.5355878 C85.7800012,71.6669027 94.8245623,88.1111998 119.540432,100.502743 L119.540432,100.502743 Z" fill="#C73A63"></path><path d="M149.841194,79.4106285 C157.316054,92.5969067 164.905578,105.982857 172.427885,119.246236 C210.44865,107.483365 239.114472,128.530009 249.398582,151.063322 C261.81978,178.282014 253.328765,210.520191 228.933162,227.312431 C203.893073,244.551464 172.226236,241.605803 150.040866,219.46195 C155.694953,214.729124 161.376716,209.974552 167.44794,204.895759 C189.360489,219.088306 208.525074,218.420096 222.753207,201.614016 C234.885769,187.277151 234.622834,165.900356 222.138374,151.863988 C207.730339,135.66681 188.431321,135.172572 165.103273,150.721309 C155.426087,133.553447 145.58086,116.521995 136.210101,99.2295848 C133.05093,93.4015266 129.561608,90.0209366 122.440622,88.7873178 C110.547271,86.7253555 102.868785,76.5124151 102.408155,65.0698097 C101.955433,53.7537294 108.621719,43.5249733 119.04224,39.5394355 C129.363912,35.5914599 141.476705,38.7783085 148.419765,47.554004 C154.093621,54.7244134 155.896602,62.7943365 152.911402,71.6372484 C152.081082,74.1025091 151.00562,76.4886916 149.841194,79.4106285 L149.841194,79.4106285 Z" fill="#4B4B4B"></path><path d="M167.706921,187.209935 L121.936499,187.209935 C117.54964,205.253587 108.074103,219.821756 91.7464461,229.085759 C79.0544063,236.285822 65.3738898,238.72736 50.8136292,236.376762 C24.0061432,232.053165 2.08568567,207.920497 0.156179306,180.745298 C-2.02835403,149.962159 19.1309765,122.599149 47.3341915,116.452801 C49.2814904,123.524363 51.2485589,130.663141 53.1958579,137.716911 C27.3195169,150.919004 18.3639187,167.553089 25.6054984,188.352614 C31.9811726,206.657224 50.0900643,216.690262 69.7528413,212.809503 C89.8327554,208.847688 99.9567329,192.160226 98.7211371,165.37844 C117.75722,165.37844 136.809118,165.180745 155.847178,165.475311 C163.280522,165.591951 169.019617,164.820939 174.620326,158.267339 C183.840836,147.48306 200.811003,148.455721 210.741239,158.640984 C220.88894,169.049642 220.402609,185.79839 209.663799,195.768166 C199.302587,205.38802 182.933414,204.874012 173.240413,194.508846 C171.247644,192.37176 169.677943,189.835329 167.706921,187.209935 L167.706921,187.209935 Z" fill="#4A4A4A"></path></g></svg>'''
207 207
208 208 valid_events = [
209 209 events.PullRequestCloseEvent,
210 210 events.PullRequestMergeEvent,
211 211 events.PullRequestUpdateEvent,
212 212 events.PullRequestCommentEvent,
213 213 events.PullRequestReviewEvent,
214 214 events.PullRequestCreateEvent,
215 215 events.RepoPushEvent,
216 216 events.RepoCreateEvent,
217 217 ]
218 218
219 219 def settings_schema(self):
220 220 schema = WebhookSettingsSchema()
221 221 schema.add(colander.SchemaNode(
222 222 colander.Set(),
223 223 widget=deform.widget.CheckboxChoiceWidget(
224 224 values=sorted(
225 225 [(e.name, e.display_name) for e in self.valid_events]
226 226 )
227 227 ),
228 228 description="Events activated for this integration",
229 229 name='events'
230 230 ))
231 231 return schema
232 232
233 233 def send_event(self, event):
234 234 log.debug('handling event %s with webhook integration %s',
235 235 event.name, self)
236 236
237 237 if event.__class__ not in self.valid_events:
238 238 log.debug('event not valid: %r' % event)
239 239 return
240 240
241 241 if event.name not in self.settings['events']:
242 242 log.debug('event ignored: %r' % event)
243 243 return
244 244
245 245 data = event.as_dict()
246 246 template_url = self.settings['url']
247 247
248 248 handler = WebhookHandler(template_url, self.settings['secret_token'])
249 249 url_calls = handler(event, data)
250 250 log.debug('webhook: calling following urls: %s',
251 251 [x[0] for x in url_calls])
252 252 post_to_webhook(url_calls, self.settings)
253 253
254 254
255 255 @task(ignore_result=True)
256 256 def post_to_webhook(url_calls, settings):
257 257 max_retries = 3
258 retries = Retry(
259 total=max_retries,
260 backoff_factor=0.15,
261 status_forcelist=[500, 502, 503, 504])
258 262 for url, token, data in url_calls:
259 # retry max N times
260 retries = Retry(
261 total=max_retries,
262 backoff_factor=0.15,
263 status_forcelist=[500, 502, 503, 504])
264 263 req_session = requests.Session()
265 req_session.mount(
264 req_session.mount( # retry max N times
266 265 'http://', requests.adapters.HTTPAdapter(max_retries=retries))
267 266
268 267 method = settings.get('method_type') or 'post'
269 268 call_method = getattr(req_session, method)
270 269
271 270 log.debug('calling WEBHOOK with method: %s', call_method)
272 271 resp = call_method(url, json={
273 272 'token': token,
274 273 'event': data
275 274 })
276 275 log.debug('Got WEBHOOK response: %s', resp)
277 276 resp.raise_for_status() # raise exception on a failed request
@@ -1,290 +1,295 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Client for the VCSServer implemented based on HTTP.
23 23 """
24 24
25 25 import copy
26 26 import logging
27 27 import threading
28 28 import urllib2
29 29 import urlparse
30 30 import uuid
31 31
32 32 import pycurl
33 33 import msgpack
34 34 import requests
35 from requests.packages.urllib3.util.retry import Retry
35 36
36 37 from . import exceptions, CurlSession
37 38
38 39
39 40 log = logging.getLogger(__name__)
40 41
41 42
42 43 # TODO: mikhail: Keep it in sync with vcsserver's
43 44 # HTTPApplication.ALLOWED_EXCEPTIONS
44 45 EXCEPTIONS_MAP = {
45 46 'KeyError': KeyError,
46 47 'URLError': urllib2.URLError,
47 48 }
48 49
49 50
50 51 class RepoMaker(object):
51 52
52 53 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
53 54 self.url = urlparse.urljoin(
54 55 'http://%s' % server_and_port, backend_endpoint)
55 56 self._session_factory = session_factory
56 57 self.backend_type = backend_type
57 58
58 59 def __call__(self, path, config, with_wire=None):
59 60 log.debug('RepoMaker call on %s', path)
60 61 return RemoteRepo(
61 62 path, config, self.url, self._session_factory(),
62 63 with_wire=with_wire)
63 64
64 65 def __getattr__(self, name):
65 66 def f(*args, **kwargs):
66 67 return self._call(name, *args, **kwargs)
67 68 return f
68 69
69 70 @exceptions.map_vcs_exceptions
70 71 def _call(self, name, *args, **kwargs):
71 72 payload = {
72 73 'id': str(uuid.uuid4()),
73 74 'method': name,
74 75 'backend': self.backend_type,
75 76 'params': {'args': args, 'kwargs': kwargs}
76 77 }
77 78 return _remote_call(
78 79 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
79 80
80 81
81 82 class ServiceConnection(object):
82 83 def __init__(self, server_and_port, backend_endpoint, session_factory):
83 84 self.url = urlparse.urljoin(
84 85 'http://%s' % server_and_port, backend_endpoint)
85 86 self._session_factory = session_factory
86 87
87 88 def __getattr__(self, name):
88 89 def f(*args, **kwargs):
89 90 return self._call(name, *args, **kwargs)
90 91
91 92 return f
92 93
93 94 @exceptions.map_vcs_exceptions
94 95 def _call(self, name, *args, **kwargs):
95 96 payload = {
96 97 'id': str(uuid.uuid4()),
97 98 'method': name,
98 99 'params': {'args': args, 'kwargs': kwargs}
99 100 }
100 101 return _remote_call(
101 102 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
102 103
103 104
104 105 class RemoteRepo(object):
105 106
106 107 def __init__(self, path, config, url, session, with_wire=None):
107 108 self.url = url
108 109 self._session = session
109 110 self._wire = {
110 111 "path": path,
111 112 "config": config,
112 113 "context": self._create_vcs_cache_context(),
113 114 }
114 115 if with_wire:
115 116 self._wire.update(with_wire)
116 117
117 118 # johbo: Trading complexity for performance. Avoiding the call to
118 119 # log.debug brings a few percent gain even if is is not active.
119 120 if log.isEnabledFor(logging.DEBUG):
120 121 self._call = self._call_with_logging
121 122
122 123 def __getattr__(self, name):
123 124 def f(*args, **kwargs):
124 125 return self._call(name, *args, **kwargs)
125 126 return f
126 127
127 128 @exceptions.map_vcs_exceptions
128 129 def _call(self, name, *args, **kwargs):
129 130 # TODO: oliver: This is currently necessary pre-call since the
130 131 # config object is being changed for hooking scenarios
131 132 wire = copy.deepcopy(self._wire)
132 133 wire["config"] = wire["config"].serialize()
133 134 payload = {
134 135 'id': str(uuid.uuid4()),
135 136 'method': name,
136 137 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
137 138 }
138 139 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
139 140
140 141 def _call_with_logging(self, name, *args, **kwargs):
141 142
142 143 log.debug('Calling %s@%s with args:%r', self.url, name, args)
143 144 return RemoteRepo._call(self, name, *args, **kwargs)
144 145
145 146 def __getitem__(self, key):
146 147 return self.revision(key)
147 148
148 149 def _create_vcs_cache_context(self):
149 150 """
150 151 Creates a unique string which is passed to the VCSServer on every
151 152 remote call. It is used as cache key in the VCSServer.
152 153 """
153 154 return str(uuid.uuid4())
154 155
155 156 def invalidate_vcs_cache(self):
156 157 """
157 158 This invalidates the context which is sent to the VCSServer on every
158 159 call to a remote method. It forces the VCSServer to create a fresh
159 160 repository instance on the next call to a remote method.
160 161 """
161 162 self._wire['context'] = self._create_vcs_cache_context()
162 163
163 164
164 165 class RemoteObject(object):
165 166
166 167 def __init__(self, url, session):
167 168 self._url = url
168 169 self._session = session
169 170
170 171 # johbo: Trading complexity for performance. Avoiding the call to
171 172 # log.debug brings a few percent gain even if is is not active.
172 173 if log.isEnabledFor(logging.DEBUG):
173 174 self._call = self._call_with_logging
174 175
175 176 def __getattr__(self, name):
176 177 def f(*args, **kwargs):
177 178 return self._call(name, *args, **kwargs)
178 179 return f
179 180
180 181 @exceptions.map_vcs_exceptions
181 182 def _call(self, name, *args, **kwargs):
182 183 payload = {
183 184 'id': str(uuid.uuid4()),
184 185 'method': name,
185 186 'params': {'args': args, 'kwargs': kwargs}
186 187 }
187 188 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
188 189
189 190 def _call_with_logging(self, name, *args, **kwargs):
190 191 log.debug('Calling %s@%s', self._url, name)
191 192 return RemoteObject._call(self, name, *args, **kwargs)
192 193
193 194
194 195 def _remote_call(url, payload, exceptions_map, session):
195 196 try:
196 197 response = session.post(url, data=msgpack.packb(payload))
197 198 except pycurl.error as e:
198 199 raise exceptions.HttpVCSCommunicationError(e)
199 200
200 201 if response.status_code >= 400:
201 202 log.error('Call to %s returned non 200 HTTP code: %s',
202 203 url, response.status_code)
203 204 raise exceptions.HttpVCSCommunicationError(repr(response.content))
204 205
205 206 try:
206 207 response = msgpack.unpackb(response.content)
207 208 except Exception:
208 209 log.exception('Failed to decode response %r', response.content)
209 210 raise
210 211
211 212 error = response.get('error')
212 213 if error:
213 214 type_ = error.get('type', 'Exception')
214 215 exc = exceptions_map.get(type_, Exception)
215 216 exc = exc(error.get('message'))
216 217 try:
217 218 exc._vcs_kind = error['_vcs_kind']
218 219 except KeyError:
219 220 pass
220 221
221 222 try:
222 223 exc._vcs_server_traceback = error['traceback']
223 224 except KeyError:
224 225 pass
225 226
226 227 raise exc
227 228 return response.get('result')
228 229
229 230
230 231 class VcsHttpProxy(object):
231 232
232 233 CHUNK_SIZE = 16384
233 234
234 235 def __init__(self, server_and_port, backend_endpoint):
235 adapter = requests.adapters.HTTPAdapter(max_retries=5)
236
237
238 retries = Retry(total=5, connect=None, read=None, redirect=None)
239
240 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
236 241 self.base_url = urlparse.urljoin(
237 242 'http://%s' % server_and_port, backend_endpoint)
238 243 self.session = requests.Session()
239 244 self.session.mount('http://', adapter)
240 245
241 246 def handle(self, environment, input_data, *args, **kwargs):
242 247 data = {
243 248 'environment': environment,
244 249 'input_data': input_data,
245 250 'args': args,
246 251 'kwargs': kwargs
247 252 }
248 253 result = self.session.post(
249 254 self.base_url, msgpack.packb(data), stream=True)
250 255 return self._get_result(result)
251 256
252 257 def _deserialize_and_raise(self, error):
253 258 exception = Exception(error['message'])
254 259 try:
255 260 exception._vcs_kind = error['_vcs_kind']
256 261 except KeyError:
257 262 pass
258 263 raise exception
259 264
260 265 def _iterate(self, result):
261 266 unpacker = msgpack.Unpacker()
262 267 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
263 268 unpacker.feed(line)
264 269 for chunk in unpacker:
265 270 yield chunk
266 271
267 272 def _get_result(self, result):
268 273 iterator = self._iterate(result)
269 274 error = iterator.next()
270 275 if error:
271 276 self._deserialize_and_raise(error)
272 277
273 278 status = iterator.next()
274 279 headers = iterator.next()
275 280
276 281 return iterator, status, headers
277 282
278 283
279 284 class ThreadlocalSessionFactory(object):
280 285 """
281 286 Creates one CurlSession per thread on demand.
282 287 """
283 288
284 289 def __init__(self):
285 290 self._thread_local = threading.local()
286 291
287 292 def __call__(self):
288 293 if not hasattr(self._thread_local, 'curl_session'):
289 294 self._thread_local.curl_session = CurlSession()
290 295 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now