##// END OF EJS Templates
merge: Resolved conflicts
merge: Resolved conflicts

File last commit:

r5092:d0d88608 default
r5471:df8a724f merge v5.1.0 stable
Show More
geventcurl.py
251 lines | 9.0 KiB | text/x-python | PythonLexer
# Copyright (C) 2016-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
This serves as a drop in replacement for pycurl. It implements the pycurl Curl
class in a way that is compatible with gevent.
"""
import logging
import gevent
import pycurl
import greenlet
# Import everything from pycurl.
# This allows us to use this module as a drop in replacement of pycurl.
from pycurl import * # pragma: no cover
from gevent import core
from gevent.hub import Waiter
log = logging.getLogger(__name__)
class GeventCurlMulti(object):
"""
Wrapper around pycurl.CurlMulti that integrates it into gevent's event
loop.
Parts of this class are a modified version of code copied from the Tornado
Web Server project which is licensed under the Apache License, Version 2.0
(the "License"). To be more specific the code originates from this file:
https://github.com/tornadoweb/tornado/blob/stable/tornado/curl_httpclient.py
This is the original license header of the origin:
Copyright 2009 Facebook
Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain
a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License.
"""
def __init__(self, loop=None):
self._watchers = {}
self._timeout = None
self.loop = loop or gevent.get_hub().loop
# Setup curl's multi instance.
self._curl_multi = pycurl.CurlMulti()
self.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
self.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket)
def __getattr__(self, item):
"""
The pycurl.CurlMulti class is final and we cannot subclass it.
Therefore we are wrapping it and forward everything to it here.
"""
return getattr(self._curl_multi, item)
def add_handle(self, curl):
"""
Add handle variant that also takes care about the initial invocation of
socket action method. This is done by setting an immediate timeout.
"""
result = self._curl_multi.add_handle(curl)
self._set_timeout(0)
return result
def _handle_socket(self, event, fd, multi, data):
"""
Called by libcurl when it wants to change the file descriptors it cares
about.
"""
event_map = {
pycurl.POLL_NONE: core.NONE,
pycurl.POLL_IN: core.READ,
pycurl.POLL_OUT: core.WRITE,
pycurl.POLL_INOUT: core.READ | core.WRITE
}
if event == pycurl.POLL_REMOVE:
watcher = self._watchers.pop(fd, None)
if watcher is not None:
watcher.stop()
else:
gloop_event = event_map[event]
watcher = self._watchers.get(fd)
if watcher is None:
watcher = self.loop.io(fd, gloop_event)
watcher.start(self._handle_events, fd, pass_events=True)
self._watchers[fd] = watcher
else:
if watcher.events != gloop_event:
watcher.stop()
watcher.events = gloop_event
watcher.start(self._handle_events, fd, pass_events=True)
def _set_timeout(self, msecs):
"""
Called by libcurl to schedule a timeout.
"""
if self._timeout is not None:
self._timeout.stop()
self._timeout = self.loop.timer(msecs/1000.0)
self._timeout.start(self._handle_timeout)
def _handle_events(self, events, fd):
action = 0
if events & core.READ:
action |= pycurl.CSELECT_IN
if events & core.WRITE:
action |= pycurl.CSELECT_OUT
while True:
try:
ret, num_handles = self._curl_multi.socket_action(fd, action)
except pycurl.error as e:
ret = e.args[0]
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
self._finish_pending_requests()
def _handle_timeout(self):
"""
Called by IOLoop when the requested timeout has passed.
"""
if self._timeout is not None:
self._timeout.stop()
self._timeout = None
while True:
try:
ret, num_handles = self._curl_multi.socket_action(
pycurl.SOCKET_TIMEOUT, 0)
except pycurl.error as e:
ret = e.args[0]
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
self._finish_pending_requests()
# In theory, we shouldn't have to do this because curl will call
# _set_timeout whenever the timeout changes. However, sometimes after
# _handle_timeout we will need to reschedule immediately even though
# nothing has changed from curl's perspective. This is because when
# socket_action is called with SOCKET_TIMEOUT, libcurl decides
# internally which timeouts need to be processed by using a monotonic
# clock (where available) while tornado uses python's time.time() to
# decide when timeouts have occurred. When those clocks disagree on
# elapsed time (as they will whenever there is an NTP adjustment),
# tornado might call _handle_timeout before libcurl is ready. After
# each timeout, resync the scheduled timeout with libcurl's current
# state.
new_timeout = self._curl_multi.timeout()
if new_timeout >= 0:
self._set_timeout(new_timeout)
def _finish_pending_requests(self):
"""
Process any requests that were completed by the last call to
multi.socket_action.
"""
while True:
num_q, ok_list, err_list = self._curl_multi.info_read()
for curl in ok_list:
curl.waiter.switch(None)
for curl, errnum, errmsg in err_list:
curl.waiter.throw(Exception('{} {}'.format(errnum, errmsg)))
if num_q == 0:
break
class GeventCurl(object):
"""
Gevent compatible implementation of the pycurl.Curl class. Essentially a
wrapper around pycurl.Curl with a customized perform method. It uses the
GeventCurlMulti class to implement a blocking API to libcurl's "easy"
interface.
"""
# Reference to the GeventCurlMulti instance.
_multi_instance = None
def __init__(self):
self._curl = pycurl.Curl()
def __getattr__(self, item):
"""
The pycurl.Curl class is final and we cannot subclass it. Therefore we
are wrapping it and forward everything to it here.
"""
return getattr(self._curl, item)
@property
def _multi(self):
"""
Lazy property that returns the GeventCurlMulti instance. The value is
cached as a class attribute. Therefore only one instance per process
exists.
"""
if GeventCurl._multi_instance is None:
GeventCurl._multi_instance = GeventCurlMulti()
return GeventCurl._multi_instance
def perform(self):
"""
This perform method is compatible with gevent because it uses gevent
synchronization mechanisms to wait for the request to finish.
"""
if getattr(self._curl, 'waiter', None) is not None:
current = greenlet.getcurrent()
msg = 'This curl object is already used by another greenlet, {}, \n' \
'this is {}'.format(self._curl.waiter, current)
raise Exception(msg)
waiter = self._curl.waiter = Waiter()
try:
self._multi.add_handle(self._curl)
try:
return waiter.get()
finally:
self._multi.remove_handle(self._curl)
finally:
del self._curl.waiter
# Curl is originally imported from pycurl. At this point we override it with
# our custom implementation.
Curl = GeventCurl