# Copyright (C) 2016-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ """ This serves as a drop in replacement for pycurl. It implements the pycurl Curl class in a way that is compatible with gevent. """ import logging import gevent import pycurl import greenlet # Import everything from pycurl. # This allows us to use this module as a drop in replacement of pycurl. from pycurl import * # pragma: no cover from gevent import core from gevent.hub import Waiter log = logging.getLogger(__name__) class GeventCurlMulti(object): """ Wrapper around pycurl.CurlMulti that integrates it into gevent's event loop. Parts of this class are a modified version of code copied from the Tornado Web Server project which is licensed under the Apache License, Version 2.0 (the "License"). To be more specific the code originates from this file: https://github.com/tornadoweb/tornado/blob/stable/tornado/curl_httpclient.py This is the original license header of the origin: Copyright 2009 Facebook Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ def __init__(self, loop=None): self._watchers = {} self._timeout = None self.loop = loop or gevent.get_hub().loop # Setup curl's multi instance. self._curl_multi = pycurl.CurlMulti() self.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout) self.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket) def __getattr__(self, item): """ The pycurl.CurlMulti class is final and we cannot subclass it. Therefore we are wrapping it and forward everything to it here. """ return getattr(self._curl_multi, item) def add_handle(self, curl): """ Add handle variant that also takes care about the initial invocation of socket action method. This is done by setting an immediate timeout. """ result = self._curl_multi.add_handle(curl) self._set_timeout(0) return result def _handle_socket(self, event, fd, multi, data): """ Called by libcurl when it wants to change the file descriptors it cares about. """ event_map = { pycurl.POLL_NONE: core.NONE, pycurl.POLL_IN: core.READ, pycurl.POLL_OUT: core.WRITE, pycurl.POLL_INOUT: core.READ | core.WRITE } if event == pycurl.POLL_REMOVE: watcher = self._watchers.pop(fd, None) if watcher is not None: watcher.stop() else: gloop_event = event_map[event] watcher = self._watchers.get(fd) if watcher is None: watcher = self.loop.io(fd, gloop_event) watcher.start(self._handle_events, fd, pass_events=True) self._watchers[fd] = watcher else: if watcher.events != gloop_event: watcher.stop() watcher.events = gloop_event watcher.start(self._handle_events, fd, pass_events=True) def _set_timeout(self, msecs): """ Called by libcurl to schedule a timeout. """ if self._timeout is not None: self._timeout.stop() self._timeout = self.loop.timer(msecs/1000.0) self._timeout.start(self._handle_timeout) def _handle_events(self, events, fd): action = 0 if events & core.READ: action |= pycurl.CSELECT_IN if events & core.WRITE: action |= pycurl.CSELECT_OUT while True: try: ret, num_handles = self._curl_multi.socket_action(fd, action) except pycurl.error as e: ret = e.args[0] if ret != pycurl.E_CALL_MULTI_PERFORM: break self._finish_pending_requests() def _handle_timeout(self): """ Called by IOLoop when the requested timeout has passed. """ if self._timeout is not None: self._timeout.stop() self._timeout = None while True: try: ret, num_handles = self._curl_multi.socket_action( pycurl.SOCKET_TIMEOUT, 0) except pycurl.error as e: ret = e.args[0] if ret != pycurl.E_CALL_MULTI_PERFORM: break self._finish_pending_requests() # In theory, we shouldn't have to do this because curl will call # _set_timeout whenever the timeout changes. However, sometimes after # _handle_timeout we will need to reschedule immediately even though # nothing has changed from curl's perspective. This is because when # socket_action is called with SOCKET_TIMEOUT, libcurl decides # internally which timeouts need to be processed by using a monotonic # clock (where available) while tornado uses python's time.time() to # decide when timeouts have occurred. When those clocks disagree on # elapsed time (as they will whenever there is an NTP adjustment), # tornado might call _handle_timeout before libcurl is ready. After # each timeout, resync the scheduled timeout with libcurl's current # state. new_timeout = self._curl_multi.timeout() if new_timeout >= 0: self._set_timeout(new_timeout) def _finish_pending_requests(self): """ Process any requests that were completed by the last call to multi.socket_action. """ while True: num_q, ok_list, err_list = self._curl_multi.info_read() for curl in ok_list: curl.waiter.switch(None) for curl, errnum, errmsg in err_list: curl.waiter.throw(Exception('{} {}'.format(errnum, errmsg))) if num_q == 0: break class GeventCurl(object): """ Gevent compatible implementation of the pycurl.Curl class. Essentially a wrapper around pycurl.Curl with a customized perform method. It uses the GeventCurlMulti class to implement a blocking API to libcurl's "easy" interface. """ # Reference to the GeventCurlMulti instance. _multi_instance = None def __init__(self): self._curl = pycurl.Curl() def __getattr__(self, item): """ The pycurl.Curl class is final and we cannot subclass it. Therefore we are wrapping it and forward everything to it here. """ return getattr(self._curl, item) @property def _multi(self): """ Lazy property that returns the GeventCurlMulti instance. The value is cached as a class attribute. Therefore only one instance per process exists. """ if GeventCurl._multi_instance is None: GeventCurl._multi_instance = GeventCurlMulti() return GeventCurl._multi_instance def perform(self): """ This perform method is compatible with gevent because it uses gevent synchronization mechanisms to wait for the request to finish. """ if getattr(self._curl, 'waiter', None) is not None: current = greenlet.getcurrent() msg = 'This curl object is already used by another greenlet, {}, \n' \ 'this is {}'.format(self._curl.waiter, current) raise Exception(msg) waiter = self._curl.waiter = Waiter() try: self._multi.add_handle(self._curl) try: return waiter.get() finally: self._multi.remove_handle(self._curl) finally: del self._curl.waiter # Curl is originally imported from pycurl. At this point we override it with # our custom implementation. Curl = GeventCurl