##// END OF EJS Templates
vcs: Implemented a gevent compatible Curl class, part of #4046...
Martin Bornhold -
r474:50981b14 default
parent child Browse files
Show More
@@ -0,0 +1,224 b''
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2016-2016 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
21 """
22 This serves as a drop in replacement for pycurl. It implements the pycurl Curl
23 class in a way that is compatible with gevent.
24 """
25
26
27 import logging
28 import gevent
29 import pycurl
30
31 # Import everything from pycurl.
32 # This allows us to use this module as a drop in replacement of pycurl.
33 from pycurl import * # noqa
34
35 from gevent import core
36 from gevent.hub import Waiter
37
38
39 log = logging.getLogger(__name__)
40
41
42 class GeventCurlMulti(object):
43 """
44 Wrapper around pycurl.CurlMulti that integrates it into gevent's event
45 loop.
46 """
47
48 def __init__(self, loop=None):
49 self._watchers = {}
50 self._timeout = None
51 self.loop = loop or gevent.get_hub().loop
52
53 # Setup curl's multi instance.
54 self._curl_multi = pycurl.CurlMulti()
55 self.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
56 self.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket)
57
58 def __getattr__(self, item):
59 """
60 The pycurl.CurlMulti class is final and we cannot subclass it.
61 Therefore we are wrapping it and forward everything to it here.
62 """
63 return getattr(self._curl_multi, item)
64
65 def add_handle(self, curl):
66 """
67 Add handle variant that also takes care about the initial invocation of
68 socket action method. This is done by setting an immediate timeout.
69 """
70 result = self._curl_multi.add_handle(curl)
71 self._set_timeout(0)
72 return result
73
74 def _handle_socket(self, event, fd, multi, data):
75 """
76 Called by libcurl when it wants to change the file descriptors it cares
77 about.
78 """
79 event_map = {
80 pycurl.POLL_NONE: core.NONE,
81 pycurl.POLL_IN: core.READ,
82 pycurl.POLL_OUT: core.WRITE,
83 pycurl.POLL_INOUT: core.READ | core.WRITE
84 }
85
86 if event == pycurl.POLL_REMOVE:
87 watcher = self._watchers.pop(fd, None)
88 if watcher is not None:
89 watcher.stop()
90 else:
91 gloop_event = event_map[event]
92 watcher = self._watchers.get(fd)
93 if watcher is None:
94 watcher = self.loop.io(fd, gloop_event)
95 watcher.start(self._handle_events, fd, pass_events=True)
96 self._watchers[fd] = watcher
97 else:
98 if watcher.events != gloop_event:
99 watcher.stop()
100 watcher.events = gloop_event
101 watcher.start(self._handle_events, fd, pass_events=True)
102
103 def _set_timeout(self, msecs):
104 """
105 Called by libcurl to schedule a timeout.
106 """
107 if self._timeout is not None:
108 self._timeout.stop()
109 self._timeout = self.loop.timer(msecs/1000.0)
110 self._timeout.start(self._handle_timeout)
111
112 def _handle_events(self, events, fd):
113 action = 0
114 if events & core.READ:
115 action |= pycurl.CSELECT_IN
116 if events & core.WRITE:
117 action |= pycurl.CSELECT_OUT
118 while True:
119 try:
120 ret, num_handles = self._curl_multi.socket_action(fd, action)
121 except pycurl.error, e:
122 ret = e.args[0]
123 if ret != pycurl.E_CALL_MULTI_PERFORM:
124 break
125 self._finish_pending_requests()
126
127 def _handle_timeout(self):
128 """
129 Called by IOLoop when the requested timeout has passed.
130 """
131 if self._timeout is not None:
132 self._timeout.stop()
133 self._timeout = None
134 while True:
135 try:
136 ret, num_handles = self._curl_multi.socket_action(
137 pycurl.SOCKET_TIMEOUT, 0)
138 except pycurl.error, e:
139 ret = e.args[0]
140 if ret != pycurl.E_CALL_MULTI_PERFORM:
141 break
142 self._finish_pending_requests()
143
144 # In theory, we shouldn't have to do this because curl will call
145 # _set_timeout whenever the timeout changes. However, sometimes after
146 # _handle_timeout we will need to reschedule immediately even though
147 # nothing has changed from curl's perspective. This is because when
148 # socket_action is called with SOCKET_TIMEOUT, libcurl decides
149 # internally which timeouts need to be processed by using a monotonic
150 # clock (where available) while tornado uses python's time.time() to
151 # decide when timeouts have occurred. When those clocks disagree on
152 # elapsed time (as they will whenever there is an NTP adjustment),
153 # tornado might call _handle_timeout before libcurl is ready. After
154 # each timeout, resync the scheduled timeout with libcurl's current
155 # state.
156 new_timeout = self._curl_multi.timeout()
157 if new_timeout >= 0:
158 self._set_timeout(new_timeout)
159
160 def _finish_pending_requests(self):
161 """
162 Process any requests that were completed by the last call to
163 multi.socket_action.
164 """
165 while True:
166 num_q, ok_list, err_list = self._curl_multi.info_read()
167 for curl in ok_list:
168 curl.waiter.switch()
169 for curl, errnum, errmsg in err_list:
170 curl.waiter.throw(Exception('%s %s' % (errnum, errmsg)))
171 if num_q == 0:
172 break
173
174
175 class GeventCurl(object):
176 """
177 Gevent compatible implementation of the pycurl.Curl class. Essentially a
178 wrapper around pycurl.Curl with a customized perform method. It uses the
179 GeventCurlMulti class to implement a blocking API to libcurl's "easy"
180 interface.
181 """
182
183 # Reference to the GeventCurlMulti instance.
184 _multi_instance = None
185
186 def __init__(self):
187 self._curl = pycurl.Curl()
188
189 def __getattr__(self, item):
190 """
191 The pycurl.Curl class is final and we cannot subclass it. Therefore we
192 are wrapping it and forward everything to it here.
193 """
194 return getattr(self._curl, item)
195
196 @property
197 def _multi(self):
198 """
199 Lazy property that returns the GeventCurlMulti instance. The value is
200 cached as a class attribute. Therefore only one instance per process
201 exists.
202 """
203 if GeventCurl._multi_instance is None:
204 GeventCurl._multi_instance = GeventCurlMulti()
205 return GeventCurl._multi_instance
206
207 def perform(self):
208 """
209 This perform method is compatible with gevent because it uses gevent
210 synchronization mechanisms to wait for the request to finish.
211 """
212 waiter = self._curl.waiter = Waiter()
213 try:
214 self._multi.add_handle(self._curl)
215 response = waiter.get()
216 finally:
217 self._multi.remove_handle(self._curl)
218 del self._curl.waiter
219
220 return response
221
222 # Curl is originally imported from pycurl. At this point we override it with
223 # our custom implementation.
224 Curl = GeventCurl
@@ -40,7 +40,6 b' import time'
40 40 import urlparse
41 41 from cStringIO import StringIO
42 42
43 import pycurl
44 43 import Pyro4
45 44 from Pyro4.errors import CommunicationError
46 45
@@ -49,8 +48,21 b' from rhodecode.lib.vcs.backends import g'
49 48 from rhodecode.lib.vcs.exceptions import (
50 49 VCSError, RepositoryError, CommitError)
51 50
51 log = logging.getLogger(__name__)
52 52
53 log = logging.getLogger(__name__)
53 # The pycurl library directly accesses C API functions and is not patched by
54 # gevent. This will potentially lead to deadlocks due to incompatibility to
55 # gevent. Therefore we check if gevent is active and import a gevent compatible
56 # wrapper in that case.
57 try:
58 from gevent import monkey
59 if monkey.is_module_patched('__builtin__'):
60 import geventcurl as pycurl
61 log.debug('Using gevent comapatible pycurl: %s', pycurl)
62 else:
63 import pycurl
64 except ImportError:
65 import pycurl
54 66
55 67
56 68 def get_version():
General Comments 0
You need to be logged in to leave comments. Login now