# -*- coding: utf-8 -*- # Copyright (C) 2014-2016 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ """ Provides the implementation of various client utilities to reach the vcsserver. """ import copy import logging import threading import urlparse import uuid import weakref from urllib2 import URLError import msgpack import Pyro4 import requests from Pyro4.errors import CommunicationError, ConnectionClosedError, DaemonError from rhodecode.lib.vcs import exceptions from rhodecode.lib.vcs.conf import settings log = logging.getLogger(__name__) # TODO: mikhail: Keep it in sync with vcsserver's # HTTPApplication.ALLOWED_EXCEPTIONS EXCEPTIONS_MAP = { 'KeyError': KeyError, 'URLError': URLError, } class HTTPRepoMaker(object): def __init__(self, server_and_port, backend_endpoint): self.url = urlparse.urljoin( 'http://%s' % server_and_port, backend_endpoint) def __call__(self, path, config, with_wire=None): log.debug('HTTPRepoMaker call on %s', path) return HTTPRemoteRepo(path, config, self.url, with_wire=with_wire) def __getattr__(self, name): def f(*args, **kwargs): return self._call(name, *args, **kwargs) return f @exceptions.map_vcs_exceptions def _call(self, name, *args, **kwargs): payload = { 'id': str(uuid.uuid4()), 'method': name, 'params': {'args': args, 'kwargs': kwargs} } return _remote_call(self.url, payload, EXCEPTIONS_MAP) class VcsHttpProxy(object): CHUNK_SIZE = 16384 def __init__(self, server_and_port, backend_endpoint): adapter = requests.adapters.HTTPAdapter(max_retries=5) self.base_url = urlparse.urljoin( 'http://%s' % server_and_port, backend_endpoint) self.session = requests.Session() self.session.mount('http://', adapter) def handle(self, environment, input_data, *args, **kwargs): data = { 'environment': environment, 'input_data': input_data, 'args': args, 'kwargs': kwargs } result = self.session.post( self.base_url, msgpack.packb(data), stream=True) return self._get_result(result) def _deserialize_and_raise(self, error): exception = Exception(error['message']) try: exception._vcs_kind = error['_vcs_kind'] except KeyError: pass raise exception def _iterate(self, result): unpacker = msgpack.Unpacker() for line in result.iter_content(chunk_size=self.CHUNK_SIZE): unpacker.feed(line) for chunk in unpacker: yield chunk def _get_result(self, result): iterator = self._iterate(result) error = iterator.next() if error: self._deserialize_and_raise(error) status = iterator.next() headers = iterator.next() return iterator, status, headers class HTTPRemoteRepo(object): def __init__(self, path, config, url, with_wire=None): self.url = url self._wire = { "path": path, "config": config, "context": str(uuid.uuid4()), } if with_wire: self._wire.update(with_wire) def __getattr__(self, name): def f(*args, **kwargs): return self._call(name, *args, **kwargs) return f @exceptions.map_vcs_exceptions def _call(self, name, *args, **kwargs): log.debug('Calling %s@%s', self.url, name) # TODO: oliver: This is currently necessary pre-call since the # config object is being changed for hooking scenarios wire = copy.deepcopy(self._wire) wire["config"] = wire["config"].serialize() payload = { 'id': str(uuid.uuid4()), 'method': name, 'params': {'wire': wire, 'args': args, 'kwargs': kwargs} } return _remote_call(self.url, payload, EXCEPTIONS_MAP) def __getitem__(self, key): return self.revision(key) def _remote_call(url, payload, exceptions_map): response = requests.post(url, data=msgpack.packb(payload)) response = msgpack.unpackb(response.content) error = response.get('error') if error: type_ = error.get('type', 'Exception') exc = exceptions_map.get(type_, Exception) exc = exc(error.get('message')) try: exc._vcs_kind = error['_vcs_kind'] except KeyError: pass raise exc return response.get('result') class RepoMaker(object): def __init__(self, proxy_factory): self._proxy_factory = proxy_factory def __call__(self, path, config, with_wire=None): log.debug('RepoMaker call on %s', path) return RemoteRepo( path, config, remote_proxy=self._proxy_factory(), with_wire=with_wire) def __getattr__(self, name): remote_proxy = self._proxy_factory() func = _get_proxy_method(remote_proxy, name) return _wrap_remote_call(remote_proxy, func) class ThreadlocalProxyFactory(object): """ Creates one Pyro4 proxy per thread on demand. """ def __init__(self, remote_uri): self._remote_uri = remote_uri self._thread_local = threading.local() def __call__(self): if not hasattr(self._thread_local, 'proxy'): self._thread_local.proxy = Pyro4.Proxy(self._remote_uri) return self._thread_local.proxy class RemoteRepo(object): def __init__(self, path, config, remote_proxy, with_wire=None): self._wire = { "path": path, "config": config, "context": uuid.uuid4(), } if with_wire: self._wire.update(with_wire) self._remote_proxy = remote_proxy self.refs = RefsWrapper(self) def __getattr__(self, name): log.debug('Calling %s@%s', self._remote_proxy, name) # TODO: oliver: This is currently necessary pre-call since the # config object is being changed for hooking scenarios wire = copy.deepcopy(self._wire) wire["config"] = wire["config"].serialize() try: func = _get_proxy_method(self._remote_proxy, name) except DaemonError as e: if e.message == 'unknown object': raise exceptions.VCSBackendNotSupportedError else: raise return _wrap_remote_call(self._remote_proxy, func, wire) def __getitem__(self, key): return self.revision(key) def _get_proxy_method(proxy, name): try: return getattr(proxy, name) except CommunicationError: raise CommunicationError( 'Unable to connect to remote pyro server %s' % proxy) def _wrap_remote_call(proxy, func, *args): all_args = list(args) @exceptions.map_vcs_exceptions def caller(*args, **kwargs): all_args.extend(args) try: return func(*all_args, **kwargs) except ConnectionClosedError: log.debug('Connection to VCSServer closed, trying to reconnect.') proxy._pyroReconnect(tries=settings.PYRO_RECONNECT_TRIES) return func(*all_args, **kwargs) return caller class RefsWrapper(object): def __init__(self, repo): self._repo = weakref.proxy(repo) def __setitem__(self, key, value): self._repo._assign_ref(key, value) class FunctionWrapper(object): def __init__(self, func, wire): self._func = func self._wire = wire @exceptions.map_vcs_exceptions def __call__(self, *args, **kwargs): return self._func(self._wire, *args, **kwargs)