# -*- coding: utf-8 -*- # Copyright (C) 2016-2017 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ """ Client for the VCSServer implemented based on HTTP. """ import copy import logging import threading import urllib2 import urlparse import uuid import pycurl import msgpack import requests from requests.packages.urllib3.util.retry import Retry from . import exceptions, CurlSession log = logging.getLogger(__name__) # TODO: mikhail: Keep it in sync with vcsserver's # HTTPApplication.ALLOWED_EXCEPTIONS EXCEPTIONS_MAP = { 'KeyError': KeyError, 'URLError': urllib2.URLError, } class RepoMaker(object): def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory): self.url = urlparse.urljoin( 'http://%s' % server_and_port, backend_endpoint) self._session_factory = session_factory self.backend_type = backend_type def __call__(self, path, config, with_wire=None): log.debug('RepoMaker call on %s', path) return RemoteRepo( path, config, self.url, self._session_factory(), with_wire=with_wire) def __getattr__(self, name): def f(*args, **kwargs): return self._call(name, *args, **kwargs) return f @exceptions.map_vcs_exceptions def _call(self, name, *args, **kwargs): payload = { 'id': str(uuid.uuid4()), 'method': name, 'backend': self.backend_type, 'params': {'args': args, 'kwargs': kwargs} } return _remote_call( self.url, payload, EXCEPTIONS_MAP, self._session_factory()) class ServiceConnection(object): def __init__(self, server_and_port, backend_endpoint, session_factory): self.url = urlparse.urljoin( 'http://%s' % server_and_port, backend_endpoint) self._session_factory = session_factory def __getattr__(self, name): def f(*args, **kwargs): return self._call(name, *args, **kwargs) return f @exceptions.map_vcs_exceptions def _call(self, name, *args, **kwargs): payload = { 'id': str(uuid.uuid4()), 'method': name, 'params': {'args': args, 'kwargs': kwargs} } return _remote_call( self.url, payload, EXCEPTIONS_MAP, self._session_factory()) class RemoteRepo(object): def __init__(self, path, config, url, session, with_wire=None): self.url = url self._session = session self._wire = { "path": path, "config": config, "context": self._create_vcs_cache_context(), } if with_wire: self._wire.update(with_wire) # johbo: Trading complexity for performance. Avoiding the call to # log.debug brings a few percent gain even if is is not active. if log.isEnabledFor(logging.DEBUG): self._call = self._call_with_logging def __getattr__(self, name): def f(*args, **kwargs): return self._call(name, *args, **kwargs) return f @exceptions.map_vcs_exceptions def _call(self, name, *args, **kwargs): # TODO: oliver: This is currently necessary pre-call since the # config object is being changed for hooking scenarios wire = copy.deepcopy(self._wire) wire["config"] = wire["config"].serialize() payload = { 'id': str(uuid.uuid4()), 'method': name, 'params': {'wire': wire, 'args': args, 'kwargs': kwargs} } return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session) def _call_with_logging(self, name, *args, **kwargs): log.debug('Calling %s@%s with args:%r', self.url, name, args) return RemoteRepo._call(self, name, *args, **kwargs) def __getitem__(self, key): return self.revision(key) def _create_vcs_cache_context(self): """ Creates a unique string which is passed to the VCSServer on every remote call. It is used as cache key in the VCSServer. """ return str(uuid.uuid4()) def invalidate_vcs_cache(self): """ This invalidates the context which is sent to the VCSServer on every call to a remote method. It forces the VCSServer to create a fresh repository instance on the next call to a remote method. """ self._wire['context'] = self._create_vcs_cache_context() class RemoteObject(object): def __init__(self, url, session): self._url = url self._session = session # johbo: Trading complexity for performance. Avoiding the call to # log.debug brings a few percent gain even if is is not active. if log.isEnabledFor(logging.DEBUG): self._call = self._call_with_logging def __getattr__(self, name): def f(*args, **kwargs): return self._call(name, *args, **kwargs) return f @exceptions.map_vcs_exceptions def _call(self, name, *args, **kwargs): payload = { 'id': str(uuid.uuid4()), 'method': name, 'params': {'args': args, 'kwargs': kwargs} } return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session) def _call_with_logging(self, name, *args, **kwargs): log.debug('Calling %s@%s', self._url, name) return RemoteObject._call(self, name, *args, **kwargs) def _remote_call(url, payload, exceptions_map, session): try: response = session.post(url, data=msgpack.packb(payload)) except pycurl.error as e: raise exceptions.HttpVCSCommunicationError(e) if response.status_code >= 400: log.error('Call to %s returned non 200 HTTP code: %s', url, response.status_code) raise exceptions.HttpVCSCommunicationError(repr(response.content)) try: response = msgpack.unpackb(response.content) except Exception: log.exception('Failed to decode response %r', response.content) raise error = response.get('error') if error: type_ = error.get('type', 'Exception') exc = exceptions_map.get(type_, Exception) exc = exc(error.get('message')) try: exc._vcs_kind = error['_vcs_kind'] except KeyError: pass try: exc._vcs_server_traceback = error['traceback'] except KeyError: pass raise exc return response.get('result') class VcsHttpProxy(object): CHUNK_SIZE = 16384 def __init__(self, server_and_port, backend_endpoint): retries = Retry(total=5, connect=None, read=None, redirect=None) adapter = requests.adapters.HTTPAdapter(max_retries=retries) self.base_url = urlparse.urljoin( 'http://%s' % server_and_port, backend_endpoint) self.session = requests.Session() self.session.mount('http://', adapter) def handle(self, environment, input_data, *args, **kwargs): data = { 'environment': environment, 'input_data': input_data, 'args': args, 'kwargs': kwargs } result = self.session.post( self.base_url, msgpack.packb(data), stream=True) return self._get_result(result) def _deserialize_and_raise(self, error): exception = Exception(error['message']) try: exception._vcs_kind = error['_vcs_kind'] except KeyError: pass raise exception def _iterate(self, result): unpacker = msgpack.Unpacker() for line in result.iter_content(chunk_size=self.CHUNK_SIZE): unpacker.feed(line) for chunk in unpacker: yield chunk def _get_result(self, result): iterator = self._iterate(result) error = iterator.next() if error: self._deserialize_and_raise(error) status = iterator.next() headers = iterator.next() return iterator, status, headers class ThreadlocalSessionFactory(object): """ Creates one CurlSession per thread on demand. """ def __init__(self): self._thread_local = threading.local() def __call__(self): if not hasattr(self._thread_local, 'curl_session'): self._thread_local.curl_session = CurlSession() return self._thread_local.curl_session