##// END OF EJS Templates
gevent: fix new switch version
marcink -
r2829:c245839c default
parent child Browse files
Show More
@@ -1,245 +1,245 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 This serves as a drop in replacement for pycurl. It implements the pycurl Curl
23 23 class in a way that is compatible with gevent.
24 24 """
25 25
26 26
27 27 import logging
28 28 import gevent
29 29 import pycurl
30 30
31 31 # Import everything from pycurl.
32 32 # This allows us to use this module as a drop in replacement of pycurl.
33 33 from pycurl import * # noqa
34 34
35 35 from gevent import core
36 36 from gevent.hub import Waiter
37 37
38 38
39 39 log = logging.getLogger(__name__)
40 40
41 41
42 42 class GeventCurlMulti(object):
43 43 """
44 44 Wrapper around pycurl.CurlMulti that integrates it into gevent's event
45 45 loop.
46 46
47 47 Parts of this class are a modified version of code copied from the Tornado
48 48 Web Server project which is licensed under the Apache License, Version 2.0
49 49 (the "License"). To be more specific the code originates from this file:
50 50 https://github.com/tornadoweb/tornado/blob/stable/tornado/curl_httpclient.py
51 51
52 52 This is the original license header of the origin:
53 53
54 54 Copyright 2009 Facebook
55 55
56 56 Licensed under the Apache License, Version 2.0 (the "License"); you may
57 57 not use this file except in compliance with the License. You may obtain
58 58 a copy of the License at
59 59
60 60 http://www.apache.org/licenses/LICENSE-2.0
61 61
62 62 Unless required by applicable law or agreed to in writing, software
63 63 distributed under the License is distributed on an "AS IS" BASIS,
64 64 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
65 65 implied. See the License for the specific language governing
66 66 permissions and limitations under the License.
67 67 """
68 68
69 69 def __init__(self, loop=None):
70 70 self._watchers = {}
71 71 self._timeout = None
72 72 self.loop = loop or gevent.get_hub().loop
73 73
74 74 # Setup curl's multi instance.
75 75 self._curl_multi = pycurl.CurlMulti()
76 76 self.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
77 77 self.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket)
78 78
79 79 def __getattr__(self, item):
80 80 """
81 81 The pycurl.CurlMulti class is final and we cannot subclass it.
82 82 Therefore we are wrapping it and forward everything to it here.
83 83 """
84 84 return getattr(self._curl_multi, item)
85 85
86 86 def add_handle(self, curl):
87 87 """
88 88 Add handle variant that also takes care about the initial invocation of
89 89 socket action method. This is done by setting an immediate timeout.
90 90 """
91 91 result = self._curl_multi.add_handle(curl)
92 92 self._set_timeout(0)
93 93 return result
94 94
95 95 def _handle_socket(self, event, fd, multi, data):
96 96 """
97 97 Called by libcurl when it wants to change the file descriptors it cares
98 98 about.
99 99 """
100 100 event_map = {
101 101 pycurl.POLL_NONE: core.NONE,
102 102 pycurl.POLL_IN: core.READ,
103 103 pycurl.POLL_OUT: core.WRITE,
104 104 pycurl.POLL_INOUT: core.READ | core.WRITE
105 105 }
106 106
107 107 if event == pycurl.POLL_REMOVE:
108 108 watcher = self._watchers.pop(fd, None)
109 109 if watcher is not None:
110 110 watcher.stop()
111 111 else:
112 112 gloop_event = event_map[event]
113 113 watcher = self._watchers.get(fd)
114 114 if watcher is None:
115 115 watcher = self.loop.io(fd, gloop_event)
116 116 watcher.start(self._handle_events, fd, pass_events=True)
117 117 self._watchers[fd] = watcher
118 118 else:
119 119 if watcher.events != gloop_event:
120 120 watcher.stop()
121 121 watcher.events = gloop_event
122 122 watcher.start(self._handle_events, fd, pass_events=True)
123 123
124 124 def _set_timeout(self, msecs):
125 125 """
126 126 Called by libcurl to schedule a timeout.
127 127 """
128 128 if self._timeout is not None:
129 129 self._timeout.stop()
130 130 self._timeout = self.loop.timer(msecs/1000.0)
131 131 self._timeout.start(self._handle_timeout)
132 132
133 133 def _handle_events(self, events, fd):
134 134 action = 0
135 135 if events & core.READ:
136 136 action |= pycurl.CSELECT_IN
137 137 if events & core.WRITE:
138 138 action |= pycurl.CSELECT_OUT
139 139 while True:
140 140 try:
141 141 ret, num_handles = self._curl_multi.socket_action(fd, action)
142 142 except pycurl.error, e:
143 143 ret = e.args[0]
144 144 if ret != pycurl.E_CALL_MULTI_PERFORM:
145 145 break
146 146 self._finish_pending_requests()
147 147
148 148 def _handle_timeout(self):
149 149 """
150 150 Called by IOLoop when the requested timeout has passed.
151 151 """
152 152 if self._timeout is not None:
153 153 self._timeout.stop()
154 154 self._timeout = None
155 155 while True:
156 156 try:
157 157 ret, num_handles = self._curl_multi.socket_action(
158 158 pycurl.SOCKET_TIMEOUT, 0)
159 159 except pycurl.error, e:
160 160 ret = e.args[0]
161 161 if ret != pycurl.E_CALL_MULTI_PERFORM:
162 162 break
163 163 self._finish_pending_requests()
164 164
165 165 # In theory, we shouldn't have to do this because curl will call
166 166 # _set_timeout whenever the timeout changes. However, sometimes after
167 167 # _handle_timeout we will need to reschedule immediately even though
168 168 # nothing has changed from curl's perspective. This is because when
169 169 # socket_action is called with SOCKET_TIMEOUT, libcurl decides
170 170 # internally which timeouts need to be processed by using a monotonic
171 171 # clock (where available) while tornado uses python's time.time() to
172 172 # decide when timeouts have occurred. When those clocks disagree on
173 173 # elapsed time (as they will whenever there is an NTP adjustment),
174 174 # tornado might call _handle_timeout before libcurl is ready. After
175 175 # each timeout, resync the scheduled timeout with libcurl's current
176 176 # state.
177 177 new_timeout = self._curl_multi.timeout()
178 178 if new_timeout >= 0:
179 179 self._set_timeout(new_timeout)
180 180
181 181 def _finish_pending_requests(self):
182 182 """
183 183 Process any requests that were completed by the last call to
184 184 multi.socket_action.
185 185 """
186 186 while True:
187 187 num_q, ok_list, err_list = self._curl_multi.info_read()
188 188 for curl in ok_list:
189 curl.waiter.switch()
189 curl.waiter.switch(value=None)
190 190 for curl, errnum, errmsg in err_list:
191 191 curl.waiter.throw(Exception('%s %s' % (errnum, errmsg)))
192 192 if num_q == 0:
193 193 break
194 194
195 195
196 196 class GeventCurl(object):
197 197 """
198 198 Gevent compatible implementation of the pycurl.Curl class. Essentially a
199 199 wrapper around pycurl.Curl with a customized perform method. It uses the
200 200 GeventCurlMulti class to implement a blocking API to libcurl's "easy"
201 201 interface.
202 202 """
203 203
204 204 # Reference to the GeventCurlMulti instance.
205 205 _multi_instance = None
206 206
207 207 def __init__(self):
208 208 self._curl = pycurl.Curl()
209 209
210 210 def __getattr__(self, item):
211 211 """
212 212 The pycurl.Curl class is final and we cannot subclass it. Therefore we
213 213 are wrapping it and forward everything to it here.
214 214 """
215 215 return getattr(self._curl, item)
216 216
217 217 @property
218 218 def _multi(self):
219 219 """
220 220 Lazy property that returns the GeventCurlMulti instance. The value is
221 221 cached as a class attribute. Therefore only one instance per process
222 222 exists.
223 223 """
224 224 if GeventCurl._multi_instance is None:
225 225 GeventCurl._multi_instance = GeventCurlMulti()
226 226 return GeventCurl._multi_instance
227 227
228 228 def perform(self):
229 229 """
230 230 This perform method is compatible with gevent because it uses gevent
231 231 synchronization mechanisms to wait for the request to finish.
232 232 """
233 233 waiter = self._curl.waiter = Waiter()
234 234 try:
235 235 self._multi.add_handle(self._curl)
236 236 response = waiter.get()
237 237 finally:
238 238 self._multi.remove_handle(self._curl)
239 239 del self._curl.waiter
240 240
241 241 return response
242 242
243 243 # Curl is originally imported from pycurl. At this point we override it with
244 244 # our custom implementation.
245 245 Curl = GeventCurl
General Comments 0
You need to be logged in to leave comments. Login now