##// END OF EJS Templates
deps: msgpack==1.1.0
deps: msgpack==1.1.0

File last commit:

r5092:d0d88608 default
r5578:fb1ff85e default
Show More
geventcurl.py
251 lines | 9.0 KiB | text/x-python | PythonLexer
copyrights: updated for 2023
r5088 # Copyright (C) 2016-2023 RhodeCode GmbH
Martin Bornhold
vcs: Implemented a gevent compatible Curl class, part of #4046...
r474 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
This serves as a drop in replacement for pycurl. It implements the pycurl Curl
class in a way that is compatible with gevent.
"""
import logging
import gevent
import pycurl
caches: make gevent curl connection cache friendly....
r2946 import greenlet
Martin Bornhold
vcs: Implemented a gevent compatible Curl class, part of #4046...
r474
# Import everything from pycurl.
# This allows us to use this module as a drop in replacement of pycurl.
code: unified coverage notes to # pragma: no cover
r3282 from pycurl import * # pragma: no cover
Martin Bornhold
vcs: Implemented a gevent compatible Curl class, part of #4046...
r474
from gevent import core
from gevent.hub import Waiter
log = logging.getLogger(__name__)
class GeventCurlMulti(object):
"""
Wrapper around pycurl.CurlMulti that integrates it into gevent's event
loop.
Martin Bornhold
vcs: Add license header from the origin to geventcurl.
r475
Parts of this class are a modified version of code copied from the Tornado
Web Server project which is licensed under the Apache License, Version 2.0
(the "License"). To be more specific the code originates from this file:
https://github.com/tornadoweb/tornado/blob/stable/tornado/curl_httpclient.py
This is the original license header of the origin:
Copyright 2009 Facebook
Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain
a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License.
Martin Bornhold
vcs: Implemented a gevent compatible Curl class, part of #4046...
r474 """
def __init__(self, loop=None):
self._watchers = {}
self._timeout = None
self.loop = loop or gevent.get_hub().loop
# Setup curl's multi instance.
self._curl_multi = pycurl.CurlMulti()
self.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
self.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket)
def __getattr__(self, item):
"""
The pycurl.CurlMulti class is final and we cannot subclass it.
Therefore we are wrapping it and forward everything to it here.
"""
return getattr(self._curl_multi, item)
def add_handle(self, curl):
"""
Add handle variant that also takes care about the initial invocation of
socket action method. This is done by setting an immediate timeout.
"""
result = self._curl_multi.add_handle(curl)
self._set_timeout(0)
return result
def _handle_socket(self, event, fd, multi, data):
"""
Called by libcurl when it wants to change the file descriptors it cares
about.
"""
event_map = {
pycurl.POLL_NONE: core.NONE,
pycurl.POLL_IN: core.READ,
pycurl.POLL_OUT: core.WRITE,
pycurl.POLL_INOUT: core.READ | core.WRITE
}
if event == pycurl.POLL_REMOVE:
watcher = self._watchers.pop(fd, None)
if watcher is not None:
watcher.stop()
else:
gloop_event = event_map[event]
watcher = self._watchers.get(fd)
if watcher is None:
watcher = self.loop.io(fd, gloop_event)
watcher.start(self._handle_events, fd, pass_events=True)
self._watchers[fd] = watcher
else:
if watcher.events != gloop_event:
watcher.stop()
watcher.events = gloop_event
watcher.start(self._handle_events, fd, pass_events=True)
def _set_timeout(self, msecs):
"""
Called by libcurl to schedule a timeout.
"""
if self._timeout is not None:
self._timeout.stop()
self._timeout = self.loop.timer(msecs/1000.0)
self._timeout.start(self._handle_timeout)
def _handle_events(self, events, fd):
action = 0
if events & core.READ:
action |= pycurl.CSELECT_IN
if events & core.WRITE:
action |= pycurl.CSELECT_OUT
while True:
try:
ret, num_handles = self._curl_multi.socket_action(fd, action)
depdendencies: bumped pycurl to 7.43.0.2
r2942 except pycurl.error as e:
Martin Bornhold
vcs: Implemented a gevent compatible Curl class, part of #4046...
r474 ret = e.args[0]
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
self._finish_pending_requests()
def _handle_timeout(self):
"""
Called by IOLoop when the requested timeout has passed.
"""
if self._timeout is not None:
self._timeout.stop()
self._timeout = None
while True:
try:
ret, num_handles = self._curl_multi.socket_action(
pycurl.SOCKET_TIMEOUT, 0)
depdendencies: bumped pycurl to 7.43.0.2
r2942 except pycurl.error as e:
Martin Bornhold
vcs: Implemented a gevent compatible Curl class, part of #4046...
r474 ret = e.args[0]
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
self._finish_pending_requests()
# In theory, we shouldn't have to do this because curl will call
# _set_timeout whenever the timeout changes. However, sometimes after
# _handle_timeout we will need to reschedule immediately even though
# nothing has changed from curl's perspective. This is because when
# socket_action is called with SOCKET_TIMEOUT, libcurl decides
# internally which timeouts need to be processed by using a monotonic
# clock (where available) while tornado uses python's time.time() to
# decide when timeouts have occurred. When those clocks disagree on
# elapsed time (as they will whenever there is an NTP adjustment),
# tornado might call _handle_timeout before libcurl is ready. After
# each timeout, resync the scheduled timeout with libcurl's current
# state.
new_timeout = self._curl_multi.timeout()
if new_timeout >= 0:
self._set_timeout(new_timeout)
def _finish_pending_requests(self):
"""
Process any requests that were completed by the last call to
multi.socket_action.
"""
while True:
num_q, ok_list, err_list = self._curl_multi.info_read()
for curl in ok_list:
gevent: use cython compatible option. so no keyword argument specified.
r2831 curl.waiter.switch(None)
Martin Bornhold
vcs: Implemented a gevent compatible Curl class, part of #4046...
r474 for curl, errnum, errmsg in err_list:
api: modernize code for python3
r5092 curl.waiter.throw(Exception('{} {}'.format(errnum, errmsg)))
Martin Bornhold
vcs: Implemented a gevent compatible Curl class, part of #4046...
r474 if num_q == 0:
break
class GeventCurl(object):
"""
Gevent compatible implementation of the pycurl.Curl class. Essentially a
wrapper around pycurl.Curl with a customized perform method. It uses the
GeventCurlMulti class to implement a blocking API to libcurl's "easy"
interface.
"""
# Reference to the GeventCurlMulti instance.
_multi_instance = None
def __init__(self):
self._curl = pycurl.Curl()
def __getattr__(self, item):
"""
The pycurl.Curl class is final and we cannot subclass it. Therefore we
are wrapping it and forward everything to it here.
"""
return getattr(self._curl, item)
@property
def _multi(self):
"""
Lazy property that returns the GeventCurlMulti instance. The value is
cached as a class attribute. Therefore only one instance per process
exists.
"""
if GeventCurl._multi_instance is None:
GeventCurl._multi_instance = GeventCurlMulti()
return GeventCurl._multi_instance
def perform(self):
"""
This perform method is compatible with gevent because it uses gevent
synchronization mechanisms to wait for the request to finish.
"""
caches: make gevent curl connection cache friendly....
r2946 if getattr(self._curl, 'waiter', None) is not None:
current = greenlet.getcurrent()
msg = 'This curl object is already used by another greenlet, {}, \n' \
'this is {}'.format(self._curl.waiter, current)
raise Exception(msg)
Martin Bornhold
vcs: Implemented a gevent compatible Curl class, part of #4046...
r474 waiter = self._curl.waiter = Waiter()
try:
self._multi.add_handle(self._curl)
depdendencies: bumped pycurl to 7.43.0.2
r2942 try:
return waiter.get()
finally:
self._multi.remove_handle(self._curl)
Martin Bornhold
vcs: Implemented a gevent compatible Curl class, part of #4046...
r474 finally:
del self._curl.waiter
# Curl is originally imported from pycurl. At this point we override it with
# our custom implementation.
Curl = GeventCurl