##// END OF EJS Templates
vcs: Implemented a gevent compatible Curl class, part of #4046...
Martin Bornhold -
r474:50981b14 default
parent child Browse files
Show More
@@ -0,0 +1,224 b''
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2016-2016 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
21 """
22 This serves as a drop in replacement for pycurl. It implements the pycurl Curl
23 class in a way that is compatible with gevent.
24 """
25
26
27 import logging
28 import gevent
29 import pycurl
30
31 # Import everything from pycurl.
32 # This allows us to use this module as a drop in replacement of pycurl.
33 from pycurl import * # noqa
34
35 from gevent import core
36 from gevent.hub import Waiter
37
38
39 log = logging.getLogger(__name__)
40
41
42 class GeventCurlMulti(object):
43 """
44 Wrapper around pycurl.CurlMulti that integrates it into gevent's event
45 loop.
46 """
47
48 def __init__(self, loop=None):
49 self._watchers = {}
50 self._timeout = None
51 self.loop = loop or gevent.get_hub().loop
52
53 # Setup curl's multi instance.
54 self._curl_multi = pycurl.CurlMulti()
55 self.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
56 self.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket)
57
58 def __getattr__(self, item):
59 """
60 The pycurl.CurlMulti class is final and we cannot subclass it.
61 Therefore we are wrapping it and forward everything to it here.
62 """
63 return getattr(self._curl_multi, item)
64
65 def add_handle(self, curl):
66 """
67 Add handle variant that also takes care about the initial invocation of
68 socket action method. This is done by setting an immediate timeout.
69 """
70 result = self._curl_multi.add_handle(curl)
71 self._set_timeout(0)
72 return result
73
74 def _handle_socket(self, event, fd, multi, data):
75 """
76 Called by libcurl when it wants to change the file descriptors it cares
77 about.
78 """
79 event_map = {
80 pycurl.POLL_NONE: core.NONE,
81 pycurl.POLL_IN: core.READ,
82 pycurl.POLL_OUT: core.WRITE,
83 pycurl.POLL_INOUT: core.READ | core.WRITE
84 }
85
86 if event == pycurl.POLL_REMOVE:
87 watcher = self._watchers.pop(fd, None)
88 if watcher is not None:
89 watcher.stop()
90 else:
91 gloop_event = event_map[event]
92 watcher = self._watchers.get(fd)
93 if watcher is None:
94 watcher = self.loop.io(fd, gloop_event)
95 watcher.start(self._handle_events, fd, pass_events=True)
96 self._watchers[fd] = watcher
97 else:
98 if watcher.events != gloop_event:
99 watcher.stop()
100 watcher.events = gloop_event
101 watcher.start(self._handle_events, fd, pass_events=True)
102
103 def _set_timeout(self, msecs):
104 """
105 Called by libcurl to schedule a timeout.
106 """
107 if self._timeout is not None:
108 self._timeout.stop()
109 self._timeout = self.loop.timer(msecs/1000.0)
110 self._timeout.start(self._handle_timeout)
111
112 def _handle_events(self, events, fd):
113 action = 0
114 if events & core.READ:
115 action |= pycurl.CSELECT_IN
116 if events & core.WRITE:
117 action |= pycurl.CSELECT_OUT
118 while True:
119 try:
120 ret, num_handles = self._curl_multi.socket_action(fd, action)
121 except pycurl.error, e:
122 ret = e.args[0]
123 if ret != pycurl.E_CALL_MULTI_PERFORM:
124 break
125 self._finish_pending_requests()
126
127 def _handle_timeout(self):
128 """
129 Called by IOLoop when the requested timeout has passed.
130 """
131 if self._timeout is not None:
132 self._timeout.stop()
133 self._timeout = None
134 while True:
135 try:
136 ret, num_handles = self._curl_multi.socket_action(
137 pycurl.SOCKET_TIMEOUT, 0)
138 except pycurl.error, e:
139 ret = e.args[0]
140 if ret != pycurl.E_CALL_MULTI_PERFORM:
141 break
142 self._finish_pending_requests()
143
144 # In theory, we shouldn't have to do this because curl will call
145 # _set_timeout whenever the timeout changes. However, sometimes after
146 # _handle_timeout we will need to reschedule immediately even though
147 # nothing has changed from curl's perspective. This is because when
148 # socket_action is called with SOCKET_TIMEOUT, libcurl decides
149 # internally which timeouts need to be processed by using a monotonic
150 # clock (where available) while tornado uses python's time.time() to
151 # decide when timeouts have occurred. When those clocks disagree on
152 # elapsed time (as they will whenever there is an NTP adjustment),
153 # tornado might call _handle_timeout before libcurl is ready. After
154 # each timeout, resync the scheduled timeout with libcurl's current
155 # state.
156 new_timeout = self._curl_multi.timeout()
157 if new_timeout >= 0:
158 self._set_timeout(new_timeout)
159
160 def _finish_pending_requests(self):
161 """
162 Process any requests that were completed by the last call to
163 multi.socket_action.
164 """
165 while True:
166 num_q, ok_list, err_list = self._curl_multi.info_read()
167 for curl in ok_list:
168 curl.waiter.switch()
169 for curl, errnum, errmsg in err_list:
170 curl.waiter.throw(Exception('%s %s' % (errnum, errmsg)))
171 if num_q == 0:
172 break
173
174
175 class GeventCurl(object):
176 """
177 Gevent compatible implementation of the pycurl.Curl class. Essentially a
178 wrapper around pycurl.Curl with a customized perform method. It uses the
179 GeventCurlMulti class to implement a blocking API to libcurl's "easy"
180 interface.
181 """
182
183 # Reference to the GeventCurlMulti instance.
184 _multi_instance = None
185
186 def __init__(self):
187 self._curl = pycurl.Curl()
188
189 def __getattr__(self, item):
190 """
191 The pycurl.Curl class is final and we cannot subclass it. Therefore we
192 are wrapping it and forward everything to it here.
193 """
194 return getattr(self._curl, item)
195
196 @property
197 def _multi(self):
198 """
199 Lazy property that returns the GeventCurlMulti instance. The value is
200 cached as a class attribute. Therefore only one instance per process
201 exists.
202 """
203 if GeventCurl._multi_instance is None:
204 GeventCurl._multi_instance = GeventCurlMulti()
205 return GeventCurl._multi_instance
206
207 def perform(self):
208 """
209 This perform method is compatible with gevent because it uses gevent
210 synchronization mechanisms to wait for the request to finish.
211 """
212 waiter = self._curl.waiter = Waiter()
213 try:
214 self._multi.add_handle(self._curl)
215 response = waiter.get()
216 finally:
217 self._multi.remove_handle(self._curl)
218 del self._curl.waiter
219
220 return response
221
222 # Curl is originally imported from pycurl. At this point we override it with
223 # our custom implementation.
224 Curl = GeventCurl
@@ -40,7 +40,6 b' import time'
40 import urlparse
40 import urlparse
41 from cStringIO import StringIO
41 from cStringIO import StringIO
42
42
43 import pycurl
44 import Pyro4
43 import Pyro4
45 from Pyro4.errors import CommunicationError
44 from Pyro4.errors import CommunicationError
46
45
@@ -49,8 +48,21 b' from rhodecode.lib.vcs.backends import g'
49 from rhodecode.lib.vcs.exceptions import (
48 from rhodecode.lib.vcs.exceptions import (
50 VCSError, RepositoryError, CommitError)
49 VCSError, RepositoryError, CommitError)
51
50
51 log = logging.getLogger(__name__)
52
52
53 log = logging.getLogger(__name__)
53 # The pycurl library directly accesses C API functions and is not patched by
54 # gevent. This will potentially lead to deadlocks due to incompatibility to
55 # gevent. Therefore we check if gevent is active and import a gevent compatible
56 # wrapper in that case.
57 try:
58 from gevent import monkey
59 if monkey.is_module_patched('__builtin__'):
60 import geventcurl as pycurl
61 log.debug('Using gevent comapatible pycurl: %s', pycurl)
62 else:
63 import pycurl
64 except ImportError:
65 import pycurl
54
66
55
67
56 def get_version():
68 def get_version():
General Comments 0
You need to be logged in to leave comments. Login now