##// END OF EJS Templates
gevent: fix new switch version
marcink -
r2829:c245839c default
parent child Browse files
Show More
@@ -1,245 +1,245 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2018 RhodeCode GmbH
3 # Copyright (C) 2016-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 This serves as a drop in replacement for pycurl. It implements the pycurl Curl
22 This serves as a drop in replacement for pycurl. It implements the pycurl Curl
23 class in a way that is compatible with gevent.
23 class in a way that is compatible with gevent.
24 """
24 """
25
25
26
26
27 import logging
27 import logging
28 import gevent
28 import gevent
29 import pycurl
29 import pycurl
30
30
31 # Import everything from pycurl.
31 # Import everything from pycurl.
32 # This allows us to use this module as a drop in replacement of pycurl.
32 # This allows us to use this module as a drop in replacement of pycurl.
33 from pycurl import * # noqa
33 from pycurl import * # noqa
34
34
35 from gevent import core
35 from gevent import core
36 from gevent.hub import Waiter
36 from gevent.hub import Waiter
37
37
38
38
39 log = logging.getLogger(__name__)
39 log = logging.getLogger(__name__)
40
40
41
41
42 class GeventCurlMulti(object):
42 class GeventCurlMulti(object):
43 """
43 """
44 Wrapper around pycurl.CurlMulti that integrates it into gevent's event
44 Wrapper around pycurl.CurlMulti that integrates it into gevent's event
45 loop.
45 loop.
46
46
47 Parts of this class are a modified version of code copied from the Tornado
47 Parts of this class are a modified version of code copied from the Tornado
48 Web Server project which is licensed under the Apache License, Version 2.0
48 Web Server project which is licensed under the Apache License, Version 2.0
49 (the "License"). To be more specific the code originates from this file:
49 (the "License"). To be more specific the code originates from this file:
50 https://github.com/tornadoweb/tornado/blob/stable/tornado/curl_httpclient.py
50 https://github.com/tornadoweb/tornado/blob/stable/tornado/curl_httpclient.py
51
51
52 This is the original license header of the origin:
52 This is the original license header of the origin:
53
53
54 Copyright 2009 Facebook
54 Copyright 2009 Facebook
55
55
56 Licensed under the Apache License, Version 2.0 (the "License"); you may
56 Licensed under the Apache License, Version 2.0 (the "License"); you may
57 not use this file except in compliance with the License. You may obtain
57 not use this file except in compliance with the License. You may obtain
58 a copy of the License at
58 a copy of the License at
59
59
60 http://www.apache.org/licenses/LICENSE-2.0
60 http://www.apache.org/licenses/LICENSE-2.0
61
61
62 Unless required by applicable law or agreed to in writing, software
62 Unless required by applicable law or agreed to in writing, software
63 distributed under the License is distributed on an "AS IS" BASIS,
63 distributed under the License is distributed on an "AS IS" BASIS,
64 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
64 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
65 implied. See the License for the specific language governing
65 implied. See the License for the specific language governing
66 permissions and limitations under the License.
66 permissions and limitations under the License.
67 """
67 """
68
68
69 def __init__(self, loop=None):
69 def __init__(self, loop=None):
70 self._watchers = {}
70 self._watchers = {}
71 self._timeout = None
71 self._timeout = None
72 self.loop = loop or gevent.get_hub().loop
72 self.loop = loop or gevent.get_hub().loop
73
73
74 # Setup curl's multi instance.
74 # Setup curl's multi instance.
75 self._curl_multi = pycurl.CurlMulti()
75 self._curl_multi = pycurl.CurlMulti()
76 self.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
76 self.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
77 self.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket)
77 self.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket)
78
78
79 def __getattr__(self, item):
79 def __getattr__(self, item):
80 """
80 """
81 The pycurl.CurlMulti class is final and we cannot subclass it.
81 The pycurl.CurlMulti class is final and we cannot subclass it.
82 Therefore we are wrapping it and forward everything to it here.
82 Therefore we are wrapping it and forward everything to it here.
83 """
83 """
84 return getattr(self._curl_multi, item)
84 return getattr(self._curl_multi, item)
85
85
86 def add_handle(self, curl):
86 def add_handle(self, curl):
87 """
87 """
88 Add handle variant that also takes care about the initial invocation of
88 Add handle variant that also takes care about the initial invocation of
89 socket action method. This is done by setting an immediate timeout.
89 socket action method. This is done by setting an immediate timeout.
90 """
90 """
91 result = self._curl_multi.add_handle(curl)
91 result = self._curl_multi.add_handle(curl)
92 self._set_timeout(0)
92 self._set_timeout(0)
93 return result
93 return result
94
94
95 def _handle_socket(self, event, fd, multi, data):
95 def _handle_socket(self, event, fd, multi, data):
96 """
96 """
97 Called by libcurl when it wants to change the file descriptors it cares
97 Called by libcurl when it wants to change the file descriptors it cares
98 about.
98 about.
99 """
99 """
100 event_map = {
100 event_map = {
101 pycurl.POLL_NONE: core.NONE,
101 pycurl.POLL_NONE: core.NONE,
102 pycurl.POLL_IN: core.READ,
102 pycurl.POLL_IN: core.READ,
103 pycurl.POLL_OUT: core.WRITE,
103 pycurl.POLL_OUT: core.WRITE,
104 pycurl.POLL_INOUT: core.READ | core.WRITE
104 pycurl.POLL_INOUT: core.READ | core.WRITE
105 }
105 }
106
106
107 if event == pycurl.POLL_REMOVE:
107 if event == pycurl.POLL_REMOVE:
108 watcher = self._watchers.pop(fd, None)
108 watcher = self._watchers.pop(fd, None)
109 if watcher is not None:
109 if watcher is not None:
110 watcher.stop()
110 watcher.stop()
111 else:
111 else:
112 gloop_event = event_map[event]
112 gloop_event = event_map[event]
113 watcher = self._watchers.get(fd)
113 watcher = self._watchers.get(fd)
114 if watcher is None:
114 if watcher is None:
115 watcher = self.loop.io(fd, gloop_event)
115 watcher = self.loop.io(fd, gloop_event)
116 watcher.start(self._handle_events, fd, pass_events=True)
116 watcher.start(self._handle_events, fd, pass_events=True)
117 self._watchers[fd] = watcher
117 self._watchers[fd] = watcher
118 else:
118 else:
119 if watcher.events != gloop_event:
119 if watcher.events != gloop_event:
120 watcher.stop()
120 watcher.stop()
121 watcher.events = gloop_event
121 watcher.events = gloop_event
122 watcher.start(self._handle_events, fd, pass_events=True)
122 watcher.start(self._handle_events, fd, pass_events=True)
123
123
124 def _set_timeout(self, msecs):
124 def _set_timeout(self, msecs):
125 """
125 """
126 Called by libcurl to schedule a timeout.
126 Called by libcurl to schedule a timeout.
127 """
127 """
128 if self._timeout is not None:
128 if self._timeout is not None:
129 self._timeout.stop()
129 self._timeout.stop()
130 self._timeout = self.loop.timer(msecs/1000.0)
130 self._timeout = self.loop.timer(msecs/1000.0)
131 self._timeout.start(self._handle_timeout)
131 self._timeout.start(self._handle_timeout)
132
132
133 def _handle_events(self, events, fd):
133 def _handle_events(self, events, fd):
134 action = 0
134 action = 0
135 if events & core.READ:
135 if events & core.READ:
136 action |= pycurl.CSELECT_IN
136 action |= pycurl.CSELECT_IN
137 if events & core.WRITE:
137 if events & core.WRITE:
138 action |= pycurl.CSELECT_OUT
138 action |= pycurl.CSELECT_OUT
139 while True:
139 while True:
140 try:
140 try:
141 ret, num_handles = self._curl_multi.socket_action(fd, action)
141 ret, num_handles = self._curl_multi.socket_action(fd, action)
142 except pycurl.error, e:
142 except pycurl.error, e:
143 ret = e.args[0]
143 ret = e.args[0]
144 if ret != pycurl.E_CALL_MULTI_PERFORM:
144 if ret != pycurl.E_CALL_MULTI_PERFORM:
145 break
145 break
146 self._finish_pending_requests()
146 self._finish_pending_requests()
147
147
148 def _handle_timeout(self):
148 def _handle_timeout(self):
149 """
149 """
150 Called by IOLoop when the requested timeout has passed.
150 Called by IOLoop when the requested timeout has passed.
151 """
151 """
152 if self._timeout is not None:
152 if self._timeout is not None:
153 self._timeout.stop()
153 self._timeout.stop()
154 self._timeout = None
154 self._timeout = None
155 while True:
155 while True:
156 try:
156 try:
157 ret, num_handles = self._curl_multi.socket_action(
157 ret, num_handles = self._curl_multi.socket_action(
158 pycurl.SOCKET_TIMEOUT, 0)
158 pycurl.SOCKET_TIMEOUT, 0)
159 except pycurl.error, e:
159 except pycurl.error, e:
160 ret = e.args[0]
160 ret = e.args[0]
161 if ret != pycurl.E_CALL_MULTI_PERFORM:
161 if ret != pycurl.E_CALL_MULTI_PERFORM:
162 break
162 break
163 self._finish_pending_requests()
163 self._finish_pending_requests()
164
164
165 # In theory, we shouldn't have to do this because curl will call
165 # In theory, we shouldn't have to do this because curl will call
166 # _set_timeout whenever the timeout changes. However, sometimes after
166 # _set_timeout whenever the timeout changes. However, sometimes after
167 # _handle_timeout we will need to reschedule immediately even though
167 # _handle_timeout we will need to reschedule immediately even though
168 # nothing has changed from curl's perspective. This is because when
168 # nothing has changed from curl's perspective. This is because when
169 # socket_action is called with SOCKET_TIMEOUT, libcurl decides
169 # socket_action is called with SOCKET_TIMEOUT, libcurl decides
170 # internally which timeouts need to be processed by using a monotonic
170 # internally which timeouts need to be processed by using a monotonic
171 # clock (where available) while tornado uses python's time.time() to
171 # clock (where available) while tornado uses python's time.time() to
172 # decide when timeouts have occurred. When those clocks disagree on
172 # decide when timeouts have occurred. When those clocks disagree on
173 # elapsed time (as they will whenever there is an NTP adjustment),
173 # elapsed time (as they will whenever there is an NTP adjustment),
174 # tornado might call _handle_timeout before libcurl is ready. After
174 # tornado might call _handle_timeout before libcurl is ready. After
175 # each timeout, resync the scheduled timeout with libcurl's current
175 # each timeout, resync the scheduled timeout with libcurl's current
176 # state.
176 # state.
177 new_timeout = self._curl_multi.timeout()
177 new_timeout = self._curl_multi.timeout()
178 if new_timeout >= 0:
178 if new_timeout >= 0:
179 self._set_timeout(new_timeout)
179 self._set_timeout(new_timeout)
180
180
181 def _finish_pending_requests(self):
181 def _finish_pending_requests(self):
182 """
182 """
183 Process any requests that were completed by the last call to
183 Process any requests that were completed by the last call to
184 multi.socket_action.
184 multi.socket_action.
185 """
185 """
186 while True:
186 while True:
187 num_q, ok_list, err_list = self._curl_multi.info_read()
187 num_q, ok_list, err_list = self._curl_multi.info_read()
188 for curl in ok_list:
188 for curl in ok_list:
189 curl.waiter.switch()
189 curl.waiter.switch(value=None)
190 for curl, errnum, errmsg in err_list:
190 for curl, errnum, errmsg in err_list:
191 curl.waiter.throw(Exception('%s %s' % (errnum, errmsg)))
191 curl.waiter.throw(Exception('%s %s' % (errnum, errmsg)))
192 if num_q == 0:
192 if num_q == 0:
193 break
193 break
194
194
195
195
196 class GeventCurl(object):
196 class GeventCurl(object):
197 """
197 """
198 Gevent compatible implementation of the pycurl.Curl class. Essentially a
198 Gevent compatible implementation of the pycurl.Curl class. Essentially a
199 wrapper around pycurl.Curl with a customized perform method. It uses the
199 wrapper around pycurl.Curl with a customized perform method. It uses the
200 GeventCurlMulti class to implement a blocking API to libcurl's "easy"
200 GeventCurlMulti class to implement a blocking API to libcurl's "easy"
201 interface.
201 interface.
202 """
202 """
203
203
204 # Reference to the GeventCurlMulti instance.
204 # Reference to the GeventCurlMulti instance.
205 _multi_instance = None
205 _multi_instance = None
206
206
207 def __init__(self):
207 def __init__(self):
208 self._curl = pycurl.Curl()
208 self._curl = pycurl.Curl()
209
209
210 def __getattr__(self, item):
210 def __getattr__(self, item):
211 """
211 """
212 The pycurl.Curl class is final and we cannot subclass it. Therefore we
212 The pycurl.Curl class is final and we cannot subclass it. Therefore we
213 are wrapping it and forward everything to it here.
213 are wrapping it and forward everything to it here.
214 """
214 """
215 return getattr(self._curl, item)
215 return getattr(self._curl, item)
216
216
217 @property
217 @property
218 def _multi(self):
218 def _multi(self):
219 """
219 """
220 Lazy property that returns the GeventCurlMulti instance. The value is
220 Lazy property that returns the GeventCurlMulti instance. The value is
221 cached as a class attribute. Therefore only one instance per process
221 cached as a class attribute. Therefore only one instance per process
222 exists.
222 exists.
223 """
223 """
224 if GeventCurl._multi_instance is None:
224 if GeventCurl._multi_instance is None:
225 GeventCurl._multi_instance = GeventCurlMulti()
225 GeventCurl._multi_instance = GeventCurlMulti()
226 return GeventCurl._multi_instance
226 return GeventCurl._multi_instance
227
227
228 def perform(self):
228 def perform(self):
229 """
229 """
230 This perform method is compatible with gevent because it uses gevent
230 This perform method is compatible with gevent because it uses gevent
231 synchronization mechanisms to wait for the request to finish.
231 synchronization mechanisms to wait for the request to finish.
232 """
232 """
233 waiter = self._curl.waiter = Waiter()
233 waiter = self._curl.waiter = Waiter()
234 try:
234 try:
235 self._multi.add_handle(self._curl)
235 self._multi.add_handle(self._curl)
236 response = waiter.get()
236 response = waiter.get()
237 finally:
237 finally:
238 self._multi.remove_handle(self._curl)
238 self._multi.remove_handle(self._curl)
239 del self._curl.waiter
239 del self._curl.waiter
240
240
241 return response
241 return response
242
242
243 # Curl is originally imported from pycurl. At this point we override it with
243 # Curl is originally imported from pycurl. At this point we override it with
244 # our custom implementation.
244 # our custom implementation.
245 Curl = GeventCurl
245 Curl = GeventCurl
General Comments 0
You need to be logged in to leave comments. Login now