diff --git a/rhodecode/lib/vcs/__init__.py b/rhodecode/lib/vcs/__init__.py --- a/rhodecode/lib/vcs/__init__.py +++ b/rhodecode/lib/vcs/__init__.py @@ -40,7 +40,6 @@ import time import urlparse from cStringIO import StringIO -import pycurl import Pyro4 from Pyro4.errors import CommunicationError @@ -49,8 +48,21 @@ from rhodecode.lib.vcs.backends import g from rhodecode.lib.vcs.exceptions import ( VCSError, RepositoryError, CommitError) +log = logging.getLogger(__name__) -log = logging.getLogger(__name__) +# The pycurl library directly accesses C API functions and is not patched by +# gevent. This will potentially lead to deadlocks due to incompatibility to +# gevent. Therefore we check if gevent is active and import a gevent compatible +# wrapper in that case. +try: + from gevent import monkey + if monkey.is_module_patched('__builtin__'): + import geventcurl as pycurl + log.debug('Using gevent comapatible pycurl: %s', pycurl) + else: + import pycurl +except ImportError: + import pycurl def get_version(): diff --git a/rhodecode/lib/vcs/geventcurl.py b/rhodecode/lib/vcs/geventcurl.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/vcs/geventcurl.py @@ -0,0 +1,224 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2016-2016 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ + +""" +This serves as a drop in replacement for pycurl. It implements the pycurl Curl +class in a way that is compatible with gevent. +""" + + +import logging +import gevent +import pycurl + +# Import everything from pycurl. +# This allows us to use this module as a drop in replacement of pycurl. +from pycurl import * # noqa + +from gevent import core +from gevent.hub import Waiter + + +log = logging.getLogger(__name__) + + +class GeventCurlMulti(object): + """ + Wrapper around pycurl.CurlMulti that integrates it into gevent's event + loop. + """ + + def __init__(self, loop=None): + self._watchers = {} + self._timeout = None + self.loop = loop or gevent.get_hub().loop + + # Setup curl's multi instance. + self._curl_multi = pycurl.CurlMulti() + self.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout) + self.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket) + + def __getattr__(self, item): + """ + The pycurl.CurlMulti class is final and we cannot subclass it. + Therefore we are wrapping it and forward everything to it here. + """ + return getattr(self._curl_multi, item) + + def add_handle(self, curl): + """ + Add handle variant that also takes care about the initial invocation of + socket action method. This is done by setting an immediate timeout. + """ + result = self._curl_multi.add_handle(curl) + self._set_timeout(0) + return result + + def _handle_socket(self, event, fd, multi, data): + """ + Called by libcurl when it wants to change the file descriptors it cares + about. + """ + event_map = { + pycurl.POLL_NONE: core.NONE, + pycurl.POLL_IN: core.READ, + pycurl.POLL_OUT: core.WRITE, + pycurl.POLL_INOUT: core.READ | core.WRITE + } + + if event == pycurl.POLL_REMOVE: + watcher = self._watchers.pop(fd, None) + if watcher is not None: + watcher.stop() + else: + gloop_event = event_map[event] + watcher = self._watchers.get(fd) + if watcher is None: + watcher = self.loop.io(fd, gloop_event) + watcher.start(self._handle_events, fd, pass_events=True) + self._watchers[fd] = watcher + else: + if watcher.events != gloop_event: + watcher.stop() + watcher.events = gloop_event + watcher.start(self._handle_events, fd, pass_events=True) + + def _set_timeout(self, msecs): + """ + Called by libcurl to schedule a timeout. + """ + if self._timeout is not None: + self._timeout.stop() + self._timeout = self.loop.timer(msecs/1000.0) + self._timeout.start(self._handle_timeout) + + def _handle_events(self, events, fd): + action = 0 + if events & core.READ: + action |= pycurl.CSELECT_IN + if events & core.WRITE: + action |= pycurl.CSELECT_OUT + while True: + try: + ret, num_handles = self._curl_multi.socket_action(fd, action) + except pycurl.error, e: + ret = e.args[0] + if ret != pycurl.E_CALL_MULTI_PERFORM: + break + self._finish_pending_requests() + + def _handle_timeout(self): + """ + Called by IOLoop when the requested timeout has passed. + """ + if self._timeout is not None: + self._timeout.stop() + self._timeout = None + while True: + try: + ret, num_handles = self._curl_multi.socket_action( + pycurl.SOCKET_TIMEOUT, 0) + except pycurl.error, e: + ret = e.args[0] + if ret != pycurl.E_CALL_MULTI_PERFORM: + break + self._finish_pending_requests() + + # In theory, we shouldn't have to do this because curl will call + # _set_timeout whenever the timeout changes. However, sometimes after + # _handle_timeout we will need to reschedule immediately even though + # nothing has changed from curl's perspective. This is because when + # socket_action is called with SOCKET_TIMEOUT, libcurl decides + # internally which timeouts need to be processed by using a monotonic + # clock (where available) while tornado uses python's time.time() to + # decide when timeouts have occurred. When those clocks disagree on + # elapsed time (as they will whenever there is an NTP adjustment), + # tornado might call _handle_timeout before libcurl is ready. After + # each timeout, resync the scheduled timeout with libcurl's current + # state. + new_timeout = self._curl_multi.timeout() + if new_timeout >= 0: + self._set_timeout(new_timeout) + + def _finish_pending_requests(self): + """ + Process any requests that were completed by the last call to + multi.socket_action. + """ + while True: + num_q, ok_list, err_list = self._curl_multi.info_read() + for curl in ok_list: + curl.waiter.switch() + for curl, errnum, errmsg in err_list: + curl.waiter.throw(Exception('%s %s' % (errnum, errmsg))) + if num_q == 0: + break + + +class GeventCurl(object): + """ + Gevent compatible implementation of the pycurl.Curl class. Essentially a + wrapper around pycurl.Curl with a customized perform method. It uses the + GeventCurlMulti class to implement a blocking API to libcurl's "easy" + interface. + """ + + # Reference to the GeventCurlMulti instance. + _multi_instance = None + + def __init__(self): + self._curl = pycurl.Curl() + + def __getattr__(self, item): + """ + The pycurl.Curl class is final and we cannot subclass it. Therefore we + are wrapping it and forward everything to it here. + """ + return getattr(self._curl, item) + + @property + def _multi(self): + """ + Lazy property that returns the GeventCurlMulti instance. The value is + cached as a class attribute. Therefore only one instance per process + exists. + """ + if GeventCurl._multi_instance is None: + GeventCurl._multi_instance = GeventCurlMulti() + return GeventCurl._multi_instance + + def perform(self): + """ + This perform method is compatible with gevent because it uses gevent + synchronization mechanisms to wait for the request to finish. + """ + waiter = self._curl.waiter = Waiter() + try: + self._multi.add_handle(self._curl) + response = waiter.get() + finally: + self._multi.remove_handle(self._curl) + del self._curl.waiter + + return response + +# Curl is originally imported from pycurl. At this point we override it with +# our custom implementation. +Curl = GeventCurl