##// END OF EJS Templates
vcs: Add license header from the origin to geventcurl.
Martin Bornhold -
r475:188a7769 default
parent child Browse files
Show More
@@ -1,224 +1,245 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 This serves as a drop in replacement for pycurl. It implements the pycurl Curl
23 23 class in a way that is compatible with gevent.
24 24 """
25 25
26 26
27 27 import logging
28 28 import gevent
29 29 import pycurl
30 30
31 31 # Import everything from pycurl.
32 32 # This allows us to use this module as a drop in replacement of pycurl.
33 33 from pycurl import * # noqa
34 34
35 35 from gevent import core
36 36 from gevent.hub import Waiter
37 37
38 38
39 39 log = logging.getLogger(__name__)
40 40
41 41
42 42 class GeventCurlMulti(object):
43 43 """
44 44 Wrapper around pycurl.CurlMulti that integrates it into gevent's event
45 45 loop.
46
47 Parts of this class are a modified version of code copied from the Tornado
48 Web Server project which is licensed under the Apache License, Version 2.0
49 (the "License"). To be more specific the code originates from this file:
50 https://github.com/tornadoweb/tornado/blob/stable/tornado/curl_httpclient.py
51
52 This is the original license header of the origin:
53
54 Copyright 2009 Facebook
55
56 Licensed under the Apache License, Version 2.0 (the "License"); you may
57 not use this file except in compliance with the License. You may obtain
58 a copy of the License at
59
60 http://www.apache.org/licenses/LICENSE-2.0
61
62 Unless required by applicable law or agreed to in writing, software
63 distributed under the License is distributed on an "AS IS" BASIS,
64 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
65 implied. See the License for the specific language governing
66 permissions and limitations under the License.
46 67 """
47 68
48 69 def __init__(self, loop=None):
49 70 self._watchers = {}
50 71 self._timeout = None
51 72 self.loop = loop or gevent.get_hub().loop
52 73
53 74 # Setup curl's multi instance.
54 75 self._curl_multi = pycurl.CurlMulti()
55 76 self.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
56 77 self.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket)
57 78
58 79 def __getattr__(self, item):
59 80 """
60 81 The pycurl.CurlMulti class is final and we cannot subclass it.
61 82 Therefore we are wrapping it and forward everything to it here.
62 83 """
63 84 return getattr(self._curl_multi, item)
64 85
65 86 def add_handle(self, curl):
66 87 """
67 88 Add handle variant that also takes care about the initial invocation of
68 89 socket action method. This is done by setting an immediate timeout.
69 90 """
70 91 result = self._curl_multi.add_handle(curl)
71 92 self._set_timeout(0)
72 93 return result
73 94
74 95 def _handle_socket(self, event, fd, multi, data):
75 96 """
76 97 Called by libcurl when it wants to change the file descriptors it cares
77 98 about.
78 99 """
79 100 event_map = {
80 101 pycurl.POLL_NONE: core.NONE,
81 102 pycurl.POLL_IN: core.READ,
82 103 pycurl.POLL_OUT: core.WRITE,
83 104 pycurl.POLL_INOUT: core.READ | core.WRITE
84 105 }
85 106
86 107 if event == pycurl.POLL_REMOVE:
87 108 watcher = self._watchers.pop(fd, None)
88 109 if watcher is not None:
89 110 watcher.stop()
90 111 else:
91 112 gloop_event = event_map[event]
92 113 watcher = self._watchers.get(fd)
93 114 if watcher is None:
94 115 watcher = self.loop.io(fd, gloop_event)
95 116 watcher.start(self._handle_events, fd, pass_events=True)
96 117 self._watchers[fd] = watcher
97 118 else:
98 119 if watcher.events != gloop_event:
99 120 watcher.stop()
100 121 watcher.events = gloop_event
101 122 watcher.start(self._handle_events, fd, pass_events=True)
102 123
103 124 def _set_timeout(self, msecs):
104 125 """
105 126 Called by libcurl to schedule a timeout.
106 127 """
107 128 if self._timeout is not None:
108 129 self._timeout.stop()
109 130 self._timeout = self.loop.timer(msecs/1000.0)
110 131 self._timeout.start(self._handle_timeout)
111 132
112 133 def _handle_events(self, events, fd):
113 134 action = 0
114 135 if events & core.READ:
115 136 action |= pycurl.CSELECT_IN
116 137 if events & core.WRITE:
117 138 action |= pycurl.CSELECT_OUT
118 139 while True:
119 140 try:
120 141 ret, num_handles = self._curl_multi.socket_action(fd, action)
121 142 except pycurl.error, e:
122 143 ret = e.args[0]
123 144 if ret != pycurl.E_CALL_MULTI_PERFORM:
124 145 break
125 146 self._finish_pending_requests()
126 147
127 148 def _handle_timeout(self):
128 149 """
129 150 Called by IOLoop when the requested timeout has passed.
130 151 """
131 152 if self._timeout is not None:
132 153 self._timeout.stop()
133 154 self._timeout = None
134 155 while True:
135 156 try:
136 157 ret, num_handles = self._curl_multi.socket_action(
137 158 pycurl.SOCKET_TIMEOUT, 0)
138 159 except pycurl.error, e:
139 160 ret = e.args[0]
140 161 if ret != pycurl.E_CALL_MULTI_PERFORM:
141 162 break
142 163 self._finish_pending_requests()
143 164
144 165 # In theory, we shouldn't have to do this because curl will call
145 166 # _set_timeout whenever the timeout changes. However, sometimes after
146 167 # _handle_timeout we will need to reschedule immediately even though
147 168 # nothing has changed from curl's perspective. This is because when
148 169 # socket_action is called with SOCKET_TIMEOUT, libcurl decides
149 170 # internally which timeouts need to be processed by using a monotonic
150 171 # clock (where available) while tornado uses python's time.time() to
151 172 # decide when timeouts have occurred. When those clocks disagree on
152 173 # elapsed time (as they will whenever there is an NTP adjustment),
153 174 # tornado might call _handle_timeout before libcurl is ready. After
154 175 # each timeout, resync the scheduled timeout with libcurl's current
155 176 # state.
156 177 new_timeout = self._curl_multi.timeout()
157 178 if new_timeout >= 0:
158 179 self._set_timeout(new_timeout)
159 180
160 181 def _finish_pending_requests(self):
161 182 """
162 183 Process any requests that were completed by the last call to
163 184 multi.socket_action.
164 185 """
165 186 while True:
166 187 num_q, ok_list, err_list = self._curl_multi.info_read()
167 188 for curl in ok_list:
168 189 curl.waiter.switch()
169 190 for curl, errnum, errmsg in err_list:
170 191 curl.waiter.throw(Exception('%s %s' % (errnum, errmsg)))
171 192 if num_q == 0:
172 193 break
173 194
174 195
175 196 class GeventCurl(object):
176 197 """
177 198 Gevent compatible implementation of the pycurl.Curl class. Essentially a
178 199 wrapper around pycurl.Curl with a customized perform method. It uses the
179 200 GeventCurlMulti class to implement a blocking API to libcurl's "easy"
180 201 interface.
181 202 """
182 203
183 204 # Reference to the GeventCurlMulti instance.
184 205 _multi_instance = None
185 206
186 207 def __init__(self):
187 208 self._curl = pycurl.Curl()
188 209
189 210 def __getattr__(self, item):
190 211 """
191 212 The pycurl.Curl class is final and we cannot subclass it. Therefore we
192 213 are wrapping it and forward everything to it here.
193 214 """
194 215 return getattr(self._curl, item)
195 216
196 217 @property
197 218 def _multi(self):
198 219 """
199 220 Lazy property that returns the GeventCurlMulti instance. The value is
200 221 cached as a class attribute. Therefore only one instance per process
201 222 exists.
202 223 """
203 224 if GeventCurl._multi_instance is None:
204 225 GeventCurl._multi_instance = GeventCurlMulti()
205 226 return GeventCurl._multi_instance
206 227
207 228 def perform(self):
208 229 """
209 230 This perform method is compatible with gevent because it uses gevent
210 231 synchronization mechanisms to wait for the request to finish.
211 232 """
212 233 waiter = self._curl.waiter = Waiter()
213 234 try:
214 235 self._multi.add_handle(self._curl)
215 236 response = waiter.get()
216 237 finally:
217 238 self._multi.remove_handle(self._curl)
218 239 del self._curl.waiter
219 240
220 241 return response
221 242
222 243 # Curl is originally imported from pycurl. At this point we override it with
223 244 # our custom implementation.
224 245 Curl = GeventCurl
General Comments 0
You need to be logged in to leave comments. Login now