|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
# Copyright (C) 2014-2016 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
"""
|
|
|
Provides the implementation of various client utilities to reach the vcsserver.
|
|
|
"""
|
|
|
|
|
|
|
|
|
import copy
|
|
|
import logging
|
|
|
import threading
|
|
|
import urlparse
|
|
|
import uuid
|
|
|
import weakref
|
|
|
from urllib2 import URLError
|
|
|
|
|
|
import msgpack
|
|
|
import Pyro4
|
|
|
import requests
|
|
|
from pyramid.threadlocal import get_current_request
|
|
|
from Pyro4.errors import CommunicationError, ConnectionClosedError, DaemonError
|
|
|
|
|
|
from rhodecode.lib.vcs import exceptions
|
|
|
from rhodecode.lib.vcs.conf import settings
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
# TODO: mikhail: Keep it in sync with vcsserver's
|
|
|
# HTTPApplication.ALLOWED_EXCEPTIONS
|
|
|
EXCEPTIONS_MAP = {
|
|
|
'KeyError': KeyError,
|
|
|
'URLError': URLError,
|
|
|
}
|
|
|
|
|
|
|
|
|
class HTTPRepoMaker(object):
|
|
|
def __init__(self, server_and_port, backend_endpoint):
|
|
|
self.url = urlparse.urljoin(
|
|
|
'http://%s' % server_and_port, backend_endpoint)
|
|
|
|
|
|
def __call__(self, path, config, with_wire=None):
|
|
|
log.debug('HTTPRepoMaker call on %s', path)
|
|
|
return HTTPRemoteRepo(path, config, self.url, with_wire=with_wire)
|
|
|
|
|
|
def __getattr__(self, name):
|
|
|
def f(*args, **kwargs):
|
|
|
return self._call(name, *args, **kwargs)
|
|
|
return f
|
|
|
|
|
|
@exceptions.map_vcs_exceptions
|
|
|
def _call(self, name, *args, **kwargs):
|
|
|
payload = {
|
|
|
'id': str(uuid.uuid4()),
|
|
|
'method': name,
|
|
|
'params': {'args': args, 'kwargs': kwargs}
|
|
|
}
|
|
|
return _remote_call(self.url, payload, EXCEPTIONS_MAP)
|
|
|
|
|
|
|
|
|
class VcsHttpProxy(object):
|
|
|
|
|
|
CHUNK_SIZE = 16384
|
|
|
|
|
|
def __init__(self, server_and_port, backend_endpoint):
|
|
|
adapter = requests.adapters.HTTPAdapter(max_retries=5)
|
|
|
self.base_url = urlparse.urljoin(
|
|
|
'http://%s' % server_and_port, backend_endpoint)
|
|
|
self.session = requests.Session()
|
|
|
self.session.mount('http://', adapter)
|
|
|
|
|
|
def handle(self, environment, input_data, *args, **kwargs):
|
|
|
data = {
|
|
|
'environment': environment,
|
|
|
'input_data': input_data,
|
|
|
'args': args,
|
|
|
'kwargs': kwargs
|
|
|
}
|
|
|
result = self.session.post(
|
|
|
self.base_url, msgpack.packb(data), stream=True)
|
|
|
return self._get_result(result)
|
|
|
|
|
|
def _deserialize_and_raise(self, error):
|
|
|
exception = Exception(error['message'])
|
|
|
try:
|
|
|
exception._vcs_kind = error['_vcs_kind']
|
|
|
except KeyError:
|
|
|
pass
|
|
|
raise exception
|
|
|
|
|
|
def _iterate(self, result):
|
|
|
unpacker = msgpack.Unpacker()
|
|
|
for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
|
|
|
unpacker.feed(line)
|
|
|
for chunk in unpacker:
|
|
|
yield chunk
|
|
|
|
|
|
def _get_result(self, result):
|
|
|
iterator = self._iterate(result)
|
|
|
error = iterator.next()
|
|
|
if error:
|
|
|
self._deserialize_and_raise(error)
|
|
|
|
|
|
status = iterator.next()
|
|
|
headers = iterator.next()
|
|
|
|
|
|
return iterator, status, headers
|
|
|
|
|
|
|
|
|
class HTTPRemoteRepo(object):
|
|
|
def __init__(self, path, config, url, with_wire=None):
|
|
|
self.url = url
|
|
|
self._wire = {
|
|
|
"path": path,
|
|
|
"config": config,
|
|
|
"context": str(uuid.uuid4()),
|
|
|
}
|
|
|
if with_wire:
|
|
|
self._wire.update(with_wire)
|
|
|
|
|
|
def __getattr__(self, name):
|
|
|
def f(*args, **kwargs):
|
|
|
return self._call(name, *args, **kwargs)
|
|
|
return f
|
|
|
|
|
|
@exceptions.map_vcs_exceptions
|
|
|
def _call(self, name, *args, **kwargs):
|
|
|
log.debug('Calling %s@%s', self.url, name)
|
|
|
# TODO: oliver: This is currently necessary pre-call since the
|
|
|
# config object is being changed for hooking scenarios
|
|
|
wire = copy.deepcopy(self._wire)
|
|
|
wire["config"] = wire["config"].serialize()
|
|
|
payload = {
|
|
|
'id': str(uuid.uuid4()),
|
|
|
'method': name,
|
|
|
'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
|
|
|
}
|
|
|
return _remote_call(self.url, payload, EXCEPTIONS_MAP)
|
|
|
|
|
|
def __getitem__(self, key):
|
|
|
return self.revision(key)
|
|
|
|
|
|
|
|
|
def _remote_call(url, payload, exceptions_map):
|
|
|
response = requests.post(url, data=msgpack.packb(payload))
|
|
|
response = msgpack.unpackb(response.content)
|
|
|
error = response.get('error')
|
|
|
if error:
|
|
|
type_ = error.get('type', 'Exception')
|
|
|
exc = exceptions_map.get(type_, Exception)
|
|
|
exc = exc(error.get('message'))
|
|
|
try:
|
|
|
exc._vcs_kind = error['_vcs_kind']
|
|
|
except KeyError:
|
|
|
pass
|
|
|
raise exc
|
|
|
return response.get('result')
|
|
|
|
|
|
|
|
|
class RepoMaker(object):
|
|
|
|
|
|
def __init__(self, proxy_factory):
|
|
|
self._proxy_factory = proxy_factory
|
|
|
|
|
|
def __call__(self, path, config, with_wire=None):
|
|
|
log.debug('RepoMaker call on %s', path)
|
|
|
return RemoteRepo(
|
|
|
path, config, remote_proxy=self._proxy_factory(),
|
|
|
with_wire=with_wire)
|
|
|
|
|
|
def __getattr__(self, name):
|
|
|
remote_proxy = self._proxy_factory()
|
|
|
func = _get_proxy_method(remote_proxy, name)
|
|
|
return _wrap_remote_call(remote_proxy, func)
|
|
|
|
|
|
|
|
|
class RequestScopeProxyFactory(object):
|
|
|
"""
|
|
|
This factory returns pyro proxy instances based on a per request scope.
|
|
|
It returns the same instance if called from within the same request and
|
|
|
different instances if called from different requests.
|
|
|
"""
|
|
|
|
|
|
def __init__(self, remote_uri):
|
|
|
self._remote_uri = remote_uri
|
|
|
self._proxy_pool = []
|
|
|
self._borrowed_proxies = {}
|
|
|
|
|
|
def __call__(self, request=None):
|
|
|
"""
|
|
|
Wrapper around `getProxy`.
|
|
|
"""
|
|
|
request = request or get_current_request()
|
|
|
return self.getProxy(request)
|
|
|
|
|
|
def getProxy(self, request):
|
|
|
"""
|
|
|
Call this to get the pyro proxy instance for the request.
|
|
|
"""
|
|
|
|
|
|
# If called without a request context we return new proxy instances
|
|
|
# on every call. This allows to run e.g. invoke tasks.
|
|
|
if request is None:
|
|
|
log.info('Creating pyro proxy without request context for '
|
|
|
'remote_uri=%s', self._remote_uri)
|
|
|
return Pyro4.Proxy(self._remote_uri)
|
|
|
|
|
|
# If there is an already borrowed proxy for the request context we
|
|
|
# return that instance instead of creating a new one.
|
|
|
if request in self._borrowed_proxies:
|
|
|
return self._borrowed_proxies[request]
|
|
|
|
|
|
# Get proxy from pool or create new instance.
|
|
|
try:
|
|
|
proxy = self._proxy_pool.pop()
|
|
|
except IndexError:
|
|
|
log.info('Creating pyro proxy for remote_uri=%s', self._remote_uri)
|
|
|
proxy = Pyro4.Proxy(self._remote_uri)
|
|
|
|
|
|
# Mark proxy as borrowed for the request context and add a callback
|
|
|
# that returns it when the request processing is finished.
|
|
|
self._borrowed_proxies[request] = proxy
|
|
|
request.add_finished_callback(self._returnProxy)
|
|
|
|
|
|
return proxy
|
|
|
|
|
|
def _returnProxy(self, request):
|
|
|
"""
|
|
|
Callback that gets called by pyramid when the request is finished.
|
|
|
It puts the proxy back into the pool.
|
|
|
"""
|
|
|
if request in self._borrowed_proxies:
|
|
|
proxy = self._borrowed_proxies.pop(request)
|
|
|
self._proxy_pool.append(proxy)
|
|
|
else:
|
|
|
log.warn('Return proxy for remote_uri=%s but no proxy borrowed '
|
|
|
'for this request.', self._remote_uri)
|
|
|
|
|
|
|
|
|
class RemoteRepo(object):
|
|
|
|
|
|
def __init__(self, path, config, remote_proxy, with_wire=None):
|
|
|
self._wire = {
|
|
|
"path": path,
|
|
|
"config": config,
|
|
|
"context": self._create_vcs_cache_context(),
|
|
|
}
|
|
|
if with_wire:
|
|
|
self._wire.update(with_wire)
|
|
|
self._remote_proxy = remote_proxy
|
|
|
self.refs = RefsWrapper(self)
|
|
|
|
|
|
def __getattr__(self, name):
|
|
|
log.debug('Calling %s@%s', self._remote_proxy, name)
|
|
|
# TODO: oliver: This is currently necessary pre-call since the
|
|
|
# config object is being changed for hooking scenarios
|
|
|
wire = copy.deepcopy(self._wire)
|
|
|
wire["config"] = wire["config"].serialize()
|
|
|
|
|
|
try:
|
|
|
func = _get_proxy_method(self._remote_proxy, name)
|
|
|
except DaemonError as e:
|
|
|
if e.message == 'unknown object':
|
|
|
raise exceptions.VCSBackendNotSupportedError
|
|
|
else:
|
|
|
raise
|
|
|
|
|
|
return _wrap_remote_call(self._remote_proxy, func, wire)
|
|
|
|
|
|
def __getitem__(self, key):
|
|
|
return self.revision(key)
|
|
|
|
|
|
def _create_vcs_cache_context(self):
|
|
|
"""
|
|
|
Creates a unique string which is passed to the VCSServer on every
|
|
|
remote call. It is used as cache key in the VCSServer.
|
|
|
"""
|
|
|
return str(uuid.uuid4())
|
|
|
|
|
|
def invalidate_vcs_cache(self):
|
|
|
"""
|
|
|
This is a no-op method for the pyro4 backend but we want to have the
|
|
|
same API for client.RemoteRepo and client_http.RemoteRepo classes.
|
|
|
"""
|
|
|
|
|
|
|
|
|
def _get_proxy_method(proxy, name):
|
|
|
try:
|
|
|
return getattr(proxy, name)
|
|
|
except CommunicationError:
|
|
|
raise CommunicationError(
|
|
|
'Unable to connect to remote pyro server %s' % proxy)
|
|
|
|
|
|
|
|
|
def _wrap_remote_call(proxy, func, *args):
|
|
|
all_args = list(args)
|
|
|
|
|
|
@exceptions.map_vcs_exceptions
|
|
|
def caller(*args, **kwargs):
|
|
|
all_args.extend(args)
|
|
|
try:
|
|
|
return func(*all_args, **kwargs)
|
|
|
except ConnectionClosedError:
|
|
|
log.debug('Connection to VCSServer closed, trying to reconnect.')
|
|
|
proxy._pyroReconnect(tries=settings.PYRO_RECONNECT_TRIES)
|
|
|
|
|
|
return func(*all_args, **kwargs)
|
|
|
|
|
|
return caller
|
|
|
|
|
|
|
|
|
class RefsWrapper(object):
|
|
|
|
|
|
def __init__(self, repo):
|
|
|
self._repo = weakref.proxy(repo)
|
|
|
|
|
|
def __setitem__(self, key, value):
|
|
|
self._repo._assign_ref(key, value)
|
|
|
|
|
|
|
|
|
class FunctionWrapper(object):
|
|
|
|
|
|
def __init__(self, func, wire):
|
|
|
self._func = func
|
|
|
self._wire = wire
|
|
|
|
|
|
@exceptions.map_vcs_exceptions
|
|
|
def __call__(self, *args, **kwargs):
|
|
|
return self._func(self._wire, *args, **kwargs)
|
|
|
|