##// END OF EJS Templates
vcs: Implemented a gevent compatible Curl class, part of #4046...
Martin Bornhold -
r474:50981b14 default
parent child Browse files
Show More
@@ -0,0 +1,224 b''
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2016-2016 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
21 """
22 This serves as a drop in replacement for pycurl. It implements the pycurl Curl
23 class in a way that is compatible with gevent.
24 """
25
26
27 import logging
28 import gevent
29 import pycurl
30
31 # Import everything from pycurl.
32 # This allows us to use this module as a drop in replacement of pycurl.
33 from pycurl import * # noqa
34
35 from gevent import core
36 from gevent.hub import Waiter
37
38
39 log = logging.getLogger(__name__)
40
41
42 class GeventCurlMulti(object):
43 """
44 Wrapper around pycurl.CurlMulti that integrates it into gevent's event
45 loop.
46 """
47
48 def __init__(self, loop=None):
49 self._watchers = {}
50 self._timeout = None
51 self.loop = loop or gevent.get_hub().loop
52
53 # Setup curl's multi instance.
54 self._curl_multi = pycurl.CurlMulti()
55 self.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
56 self.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket)
57
58 def __getattr__(self, item):
59 """
60 The pycurl.CurlMulti class is final and we cannot subclass it.
61 Therefore we are wrapping it and forward everything to it here.
62 """
63 return getattr(self._curl_multi, item)
64
65 def add_handle(self, curl):
66 """
67 Add handle variant that also takes care about the initial invocation of
68 socket action method. This is done by setting an immediate timeout.
69 """
70 result = self._curl_multi.add_handle(curl)
71 self._set_timeout(0)
72 return result
73
74 def _handle_socket(self, event, fd, multi, data):
75 """
76 Called by libcurl when it wants to change the file descriptors it cares
77 about.
78 """
79 event_map = {
80 pycurl.POLL_NONE: core.NONE,
81 pycurl.POLL_IN: core.READ,
82 pycurl.POLL_OUT: core.WRITE,
83 pycurl.POLL_INOUT: core.READ | core.WRITE
84 }
85
86 if event == pycurl.POLL_REMOVE:
87 watcher = self._watchers.pop(fd, None)
88 if watcher is not None:
89 watcher.stop()
90 else:
91 gloop_event = event_map[event]
92 watcher = self._watchers.get(fd)
93 if watcher is None:
94 watcher = self.loop.io(fd, gloop_event)
95 watcher.start(self._handle_events, fd, pass_events=True)
96 self._watchers[fd] = watcher
97 else:
98 if watcher.events != gloop_event:
99 watcher.stop()
100 watcher.events = gloop_event
101 watcher.start(self._handle_events, fd, pass_events=True)
102
103 def _set_timeout(self, msecs):
104 """
105 Called by libcurl to schedule a timeout.
106 """
107 if self._timeout is not None:
108 self._timeout.stop()
109 self._timeout = self.loop.timer(msecs/1000.0)
110 self._timeout.start(self._handle_timeout)
111
112 def _handle_events(self, events, fd):
113 action = 0
114 if events & core.READ:
115 action |= pycurl.CSELECT_IN
116 if events & core.WRITE:
117 action |= pycurl.CSELECT_OUT
118 while True:
119 try:
120 ret, num_handles = self._curl_multi.socket_action(fd, action)
121 except pycurl.error, e:
122 ret = e.args[0]
123 if ret != pycurl.E_CALL_MULTI_PERFORM:
124 break
125 self._finish_pending_requests()
126
127 def _handle_timeout(self):
128 """
129 Called by IOLoop when the requested timeout has passed.
130 """
131 if self._timeout is not None:
132 self._timeout.stop()
133 self._timeout = None
134 while True:
135 try:
136 ret, num_handles = self._curl_multi.socket_action(
137 pycurl.SOCKET_TIMEOUT, 0)
138 except pycurl.error, e:
139 ret = e.args[0]
140 if ret != pycurl.E_CALL_MULTI_PERFORM:
141 break
142 self._finish_pending_requests()
143
144 # In theory, we shouldn't have to do this because curl will call
145 # _set_timeout whenever the timeout changes. However, sometimes after
146 # _handle_timeout we will need to reschedule immediately even though
147 # nothing has changed from curl's perspective. This is because when
148 # socket_action is called with SOCKET_TIMEOUT, libcurl decides
149 # internally which timeouts need to be processed by using a monotonic
150 # clock (where available) while tornado uses python's time.time() to
151 # decide when timeouts have occurred. When those clocks disagree on
152 # elapsed time (as they will whenever there is an NTP adjustment),
153 # tornado might call _handle_timeout before libcurl is ready. After
154 # each timeout, resync the scheduled timeout with libcurl's current
155 # state.
156 new_timeout = self._curl_multi.timeout()
157 if new_timeout >= 0:
158 self._set_timeout(new_timeout)
159
160 def _finish_pending_requests(self):
161 """
162 Process any requests that were completed by the last call to
163 multi.socket_action.
164 """
165 while True:
166 num_q, ok_list, err_list = self._curl_multi.info_read()
167 for curl in ok_list:
168 curl.waiter.switch()
169 for curl, errnum, errmsg in err_list:
170 curl.waiter.throw(Exception('%s %s' % (errnum, errmsg)))
171 if num_q == 0:
172 break
173
174
175 class GeventCurl(object):
176 """
177 Gevent compatible implementation of the pycurl.Curl class. Essentially a
178 wrapper around pycurl.Curl with a customized perform method. It uses the
179 GeventCurlMulti class to implement a blocking API to libcurl's "easy"
180 interface.
181 """
182
183 # Reference to the GeventCurlMulti instance.
184 _multi_instance = None
185
186 def __init__(self):
187 self._curl = pycurl.Curl()
188
189 def __getattr__(self, item):
190 """
191 The pycurl.Curl class is final and we cannot subclass it. Therefore we
192 are wrapping it and forward everything to it here.
193 """
194 return getattr(self._curl, item)
195
196 @property
197 def _multi(self):
198 """
199 Lazy property that returns the GeventCurlMulti instance. The value is
200 cached as a class attribute. Therefore only one instance per process
201 exists.
202 """
203 if GeventCurl._multi_instance is None:
204 GeventCurl._multi_instance = GeventCurlMulti()
205 return GeventCurl._multi_instance
206
207 def perform(self):
208 """
209 This perform method is compatible with gevent because it uses gevent
210 synchronization mechanisms to wait for the request to finish.
211 """
212 waiter = self._curl.waiter = Waiter()
213 try:
214 self._multi.add_handle(self._curl)
215 response = waiter.get()
216 finally:
217 self._multi.remove_handle(self._curl)
218 del self._curl.waiter
219
220 return response
221
222 # Curl is originally imported from pycurl. At this point we override it with
223 # our custom implementation.
224 Curl = GeventCurl
@@ -1,278 +1,290 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2014-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Various version Control System version lib (vcs) management abstraction layer
23 23 for Python. Build with server client architecture.
24 24 """
25 25
26 26
27 27 VERSION = (0, 5, 0, 'dev')
28 28
29 29 __version__ = '.'.join((str(each) for each in VERSION[:4]))
30 30
31 31 __all__ = [
32 32 'get_version', 'get_repo', 'get_backend',
33 33 'VCSError', 'RepositoryError', 'CommitError'
34 34 ]
35 35
36 36 import atexit
37 37 import logging
38 38 import subprocess
39 39 import time
40 40 import urlparse
41 41 from cStringIO import StringIO
42 42
43 import pycurl
44 43 import Pyro4
45 44 from Pyro4.errors import CommunicationError
46 45
47 46 from rhodecode.lib.vcs.conf import settings
48 47 from rhodecode.lib.vcs.backends import get_repo, get_backend
49 48 from rhodecode.lib.vcs.exceptions import (
50 49 VCSError, RepositoryError, CommitError)
51 50
51 log = logging.getLogger(__name__)
52 52
53 log = logging.getLogger(__name__)
53 # The pycurl library directly accesses C API functions and is not patched by
54 # gevent. This will potentially lead to deadlocks due to incompatibility to
55 # gevent. Therefore we check if gevent is active and import a gevent compatible
56 # wrapper in that case.
57 try:
58 from gevent import monkey
59 if monkey.is_module_patched('__builtin__'):
60 import geventcurl as pycurl
61 log.debug('Using gevent comapatible pycurl: %s', pycurl)
62 else:
63 import pycurl
64 except ImportError:
65 import pycurl
54 66
55 67
56 68 def get_version():
57 69 """
58 70 Returns shorter version (digit parts only) as string.
59 71 """
60 72 return '.'.join((str(each) for each in VERSION[:3]))
61 73
62 74
63 75 def connect_pyro4(server_and_port):
64 76 from rhodecode.lib.vcs import connection, client
65 77 from rhodecode.lib.middleware.utils import scm_app
66 78
67 79 git_remote = client.RequestScopeProxyFactory(
68 80 settings.pyro_remote(settings.PYRO_GIT, server_and_port))
69 81 hg_remote = client.RequestScopeProxyFactory(
70 82 settings.pyro_remote(settings.PYRO_HG, server_and_port))
71 83 svn_remote = client.RequestScopeProxyFactory(
72 84 settings.pyro_remote(settings.PYRO_SVN, server_and_port))
73 85
74 86 connection.Git = client.RepoMaker(proxy_factory=git_remote)
75 87 connection.Hg = client.RepoMaker(proxy_factory=hg_remote)
76 88 connection.Svn = client.RepoMaker(proxy_factory=svn_remote)
77 89
78 90 scm_app.GIT_REMOTE_WSGI = Pyro4.Proxy(
79 91 settings.pyro_remote(
80 92 settings.PYRO_GIT_REMOTE_WSGI, server_and_port))
81 93 scm_app.HG_REMOTE_WSGI = Pyro4.Proxy(
82 94 settings.pyro_remote(
83 95 settings.PYRO_HG_REMOTE_WSGI, server_and_port))
84 96
85 97 @atexit.register
86 98 def free_connection_resources():
87 99 connection.Git = None
88 100 connection.Hg = None
89 101 connection.Svn = None
90 102
91 103
92 104 def connect_http(server_and_port):
93 105 from rhodecode.lib.vcs import connection, client_http
94 106 from rhodecode.lib.middleware.utils import scm_app
95 107
96 108 session_factory = client_http.ThreadlocalSessionFactory()
97 109
98 110 connection.Git = client_http.RepoMaker(
99 111 server_and_port, '/git', session_factory)
100 112 connection.Hg = client_http.RepoMaker(
101 113 server_and_port, '/hg', session_factory)
102 114 connection.Svn = client_http.RepoMaker(
103 115 server_and_port, '/svn', session_factory)
104 116
105 117 scm_app.HG_REMOTE_WSGI = client_http.VcsHttpProxy(
106 118 server_and_port, '/proxy/hg')
107 119 scm_app.GIT_REMOTE_WSGI = client_http.VcsHttpProxy(
108 120 server_and_port, '/proxy/git')
109 121
110 122 @atexit.register
111 123 def free_connection_resources():
112 124 connection.Git = None
113 125 connection.Hg = None
114 126 connection.Svn = None
115 127
116 128
117 129 def connect_vcs(server_and_port, protocol='pyro4'):
118 130 """
119 131 Initializes the connection to the vcs server.
120 132
121 133 :param server_and_port: str, e.g. "localhost:9900"
122 134 :param protocol: str, "pyro4" or "http"
123 135 """
124 136 if protocol == 'pyro4':
125 137 connect_pyro4(server_and_port)
126 138 elif protocol == 'http':
127 139 connect_http(server_and_port)
128 140
129 141
130 142 # TODO: johbo: This function should be moved into our test suite, there is
131 143 # no reason to support starting the vcsserver in Enterprise itself.
132 144 def start_vcs_server(server_and_port, protocol='pyro4', log_level=None):
133 145 """
134 146 Starts the vcs server in a subprocess.
135 147 """
136 148 log.info('Starting VCSServer as a sub process with %s protocol', protocol)
137 149 if protocol == 'http':
138 150 return _start_http_vcs_server(server_and_port, log_level)
139 151 elif protocol == 'pyro4':
140 152 return _start_pyro4_vcs_server(server_and_port, log_level)
141 153
142 154
143 155 def _start_pyro4_vcs_server(server_and_port, log_level=None):
144 156 _try_to_shutdown_running_server(server_and_port)
145 157 host, port = server_and_port.rsplit(":", 1)
146 158 host = host.strip('[]')
147 159 args = [
148 160 'vcsserver', '--port', port, '--host', host, '--locale', 'en_US.UTF-8',
149 161 '--threadpool', '32']
150 162 if log_level:
151 163 args += ['--log-level', log_level]
152 164 proc = subprocess.Popen(args)
153 165
154 166 def cleanup_server_process():
155 167 proc.kill()
156 168 atexit.register(cleanup_server_process)
157 169
158 170 server = create_vcsserver_proxy(server_and_port, protocol='pyro4')
159 171 _wait_until_vcs_server_is_reachable(server)
160 172
161 173
162 174 def _start_http_vcs_server(server_and_port, log_level=None):
163 175 # TODO: mikhail: shutdown if an http server already runs
164 176
165 177 host, port = server_and_port.rsplit(":", 1)
166 178 args = [
167 179 'pserve', 'vcsserver/development_pyramid.ini',
168 180 'http_port=%s' % (port, ), 'http_host=%s' % (host, )]
169 181 proc = subprocess.Popen(args)
170 182
171 183 def cleanup_server_process():
172 184 proc.kill()
173 185 atexit.register(cleanup_server_process)
174 186
175 187 server = create_vcsserver_proxy(server_and_port, protocol='http')
176 188 _wait_until_vcs_server_is_reachable(server)
177 189
178 190
179 191 def _wait_until_vcs_server_is_reachable(server):
180 192 while xrange(80): # max 40s of sleep
181 193 try:
182 194 server.ping()
183 195 break
184 196 except (CommunicationError, pycurl.error):
185 197 pass
186 198 time.sleep(0.5)
187 199
188 200
189 201 def _try_to_shutdown_running_server(server_and_port):
190 202 server = create_vcsserver_proxy(server_and_port)
191 203 try:
192 204 server.shutdown()
193 205 except (CommunicationError, pycurl.error):
194 206 return
195 207
196 208 # TODO: Not sure why this is important, but without it the following start
197 209 # of the server fails.
198 210 server = create_vcsserver_proxy(server_and_port)
199 211 server.ping()
200 212
201 213
202 214 def create_vcsserver_proxy(server_and_port, protocol='pyro4'):
203 215 if protocol == 'pyro4':
204 216 return _create_vcsserver_proxy_pyro4(server_and_port)
205 217 elif protocol == 'http':
206 218 return _create_vcsserver_proxy_http(server_and_port)
207 219
208 220
209 221 def _create_vcsserver_proxy_pyro4(server_and_port):
210 222 server = Pyro4.Proxy(
211 223 settings.pyro_remote(settings.PYRO_VCSSERVER, server_and_port))
212 224 return server
213 225
214 226
215 227 def _create_vcsserver_proxy_http(server_and_port):
216 228 from rhodecode.lib.vcs import client_http
217 229
218 230 session = _create_http_rpc_session()
219 231 url = urlparse.urljoin('http://%s' % server_and_port, '/server')
220 232 return client_http.RemoteObject(url, session)
221 233
222 234
223 235 class CurlSession(object):
224 236 """
225 237 Modeled so that it provides a subset of the requests interface.
226 238
227 239 This has been created so that it does only provide a minimal API for our
228 240 needs. The parts which it provides are based on the API of the library
229 241 `requests` which allows us to easily benchmark against it.
230 242
231 243 Please have a look at the class :class:`requests.Session` when you extend
232 244 it.
233 245 """
234 246
235 247 def __init__(self):
236 248 curl = pycurl.Curl()
237 249 # TODO: johbo: I did test with 7.19 of libcurl. This version has
238 250 # trouble with 100 - continue being set in the expect header. This
239 251 # can lead to massive performance drops, switching it off here.
240 252 curl.setopt(curl.HTTPHEADER, ["Expect:"])
241 253 curl.setopt(curl.TCP_NODELAY, True)
242 254 curl.setopt(curl.PROTOCOLS, curl.PROTO_HTTP)
243 255 self._curl = curl
244 256
245 257 def post(self, url, data, allow_redirects=False):
246 258 response_buffer = StringIO()
247 259
248 260 curl = self._curl
249 261 curl.setopt(curl.URL, url)
250 262 curl.setopt(curl.POST, True)
251 263 curl.setopt(curl.POSTFIELDS, data)
252 264 curl.setopt(curl.FOLLOWLOCATION, allow_redirects)
253 265 curl.setopt(curl.WRITEDATA, response_buffer)
254 266 curl.perform()
255 267
256 268 return CurlResponse(response_buffer)
257 269
258 270
259 271 class CurlResponse(object):
260 272 """
261 273 The response of a request, modeled after the requests API.
262 274
263 275 This class provides a subset of the response interface known from the
264 276 library `requests`. It is intentionally kept similar, so that we can use
265 277 `requests` as a drop in replacement for benchmarking purposes.
266 278 """
267 279
268 280 def __init__(self, response_buffer):
269 281 self._response_buffer = response_buffer
270 282
271 283 @property
272 284 def content(self):
273 285 return self._response_buffer.getvalue()
274 286
275 287
276 288 def _create_http_rpc_session():
277 289 session = CurlSession()
278 290 return session
General Comments 0
You need to be logged in to leave comments. Login now