##// END OF EJS Templates
deps: bumped rhodecode-tools to 0.10.0
deps: bumped rhodecode-tools to 0.10.0

File last commit:

r404:94c8766a default
r642:2eb87d39 default
Show More
client.py
346 lines | 10.7 KiB | text/x-python | PythonLexer
project: added all source files and assets
r1 # -*- coding: utf-8 -*-
# Copyright (C) 2014-2016 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
Provides the implementation of various client utilities to reach the vcsserver.
"""
import copy
import logging
import threading
import urlparse
import uuid
import weakref
from urllib2 import URLError
import msgpack
import Pyro4
import requests
Martin Bornhold
pyro4: Create pyro proxy factory which returns instaces scoped by the request.
r341 from pyramid.threadlocal import get_current_request
project: added all source files and assets
r1 from Pyro4.errors import CommunicationError, ConnectionClosedError, DaemonError
from rhodecode.lib.vcs import exceptions
from rhodecode.lib.vcs.conf import settings
log = logging.getLogger(__name__)
# TODO: mikhail: Keep it in sync with vcsserver's
# HTTPApplication.ALLOWED_EXCEPTIONS
EXCEPTIONS_MAP = {
'KeyError': KeyError,
'URLError': URLError,
}
class HTTPRepoMaker(object):
def __init__(self, server_and_port, backend_endpoint):
self.url = urlparse.urljoin(
'http://%s' % server_and_port, backend_endpoint)
def __call__(self, path, config, with_wire=None):
log.debug('HTTPRepoMaker call on %s', path)
return HTTPRemoteRepo(path, config, self.url, with_wire=with_wire)
def __getattr__(self, name):
def f(*args, **kwargs):
return self._call(name, *args, **kwargs)
return f
@exceptions.map_vcs_exceptions
def _call(self, name, *args, **kwargs):
payload = {
'id': str(uuid.uuid4()),
'method': name,
'params': {'args': args, 'kwargs': kwargs}
}
return _remote_call(self.url, payload, EXCEPTIONS_MAP)
class VcsHttpProxy(object):
CHUNK_SIZE = 16384
def __init__(self, server_and_port, backend_endpoint):
adapter = requests.adapters.HTTPAdapter(max_retries=5)
self.base_url = urlparse.urljoin(
'http://%s' % server_and_port, backend_endpoint)
self.session = requests.Session()
self.session.mount('http://', adapter)
def handle(self, environment, input_data, *args, **kwargs):
data = {
'environment': environment,
'input_data': input_data,
'args': args,
'kwargs': kwargs
}
result = self.session.post(
self.base_url, msgpack.packb(data), stream=True)
return self._get_result(result)
def _deserialize_and_raise(self, error):
exception = Exception(error['message'])
try:
exception._vcs_kind = error['_vcs_kind']
except KeyError:
pass
raise exception
def _iterate(self, result):
unpacker = msgpack.Unpacker()
for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
unpacker.feed(line)
for chunk in unpacker:
yield chunk
def _get_result(self, result):
iterator = self._iterate(result)
error = iterator.next()
if error:
self._deserialize_and_raise(error)
status = iterator.next()
headers = iterator.next()
return iterator, status, headers
class HTTPRemoteRepo(object):
def __init__(self, path, config, url, with_wire=None):
self.url = url
self._wire = {
"path": path,
"config": config,
"context": str(uuid.uuid4()),
}
if with_wire:
self._wire.update(with_wire)
def __getattr__(self, name):
def f(*args, **kwargs):
return self._call(name, *args, **kwargs)
return f
@exceptions.map_vcs_exceptions
def _call(self, name, *args, **kwargs):
log.debug('Calling %s@%s', self.url, name)
# TODO: oliver: This is currently necessary pre-call since the
# config object is being changed for hooking scenarios
wire = copy.deepcopy(self._wire)
wire["config"] = wire["config"].serialize()
payload = {
'id': str(uuid.uuid4()),
'method': name,
'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
}
return _remote_call(self.url, payload, EXCEPTIONS_MAP)
def __getitem__(self, key):
return self.revision(key)
def _remote_call(url, payload, exceptions_map):
response = requests.post(url, data=msgpack.packb(payload))
response = msgpack.unpackb(response.content)
error = response.get('error')
if error:
type_ = error.get('type', 'Exception')
exc = exceptions_map.get(type_, Exception)
exc = exc(error.get('message'))
try:
exc._vcs_kind = error['_vcs_kind']
except KeyError:
pass
raise exc
return response.get('result')
class RepoMaker(object):
def __init__(self, proxy_factory):
self._proxy_factory = proxy_factory
def __call__(self, path, config, with_wire=None):
log.debug('RepoMaker call on %s', path)
return RemoteRepo(
path, config, remote_proxy=self._proxy_factory(),
with_wire=with_wire)
def __getattr__(self, name):
remote_proxy = self._proxy_factory()
func = _get_proxy_method(remote_proxy, name)
return _wrap_remote_call(remote_proxy, func)
Martin Bornhold
pyro4: Create pyro proxy factory which returns instaces scoped by the request.
r341 class RequestScopeProxyFactory(object):
project: added all source files and assets
r1 """
Martin Bornhold
pyro4: Create pyro proxy factory which returns instaces scoped by the request.
r341 This factory returns pyro proxy instances based on a per request scope.
It returns the same instance if called from within the same request and
different instances if called from different requests.
project: added all source files and assets
r1 """
def __init__(self, remote_uri):
self._remote_uri = remote_uri
Martin Bornhold
pyro4: Create pyro proxy factory which returns instaces scoped by the request.
r341 self._proxy_pool = []
self._borrowed_proxies = {}
def __call__(self, request=None):
"""
Wrapper around `getProxy`.
"""
request = request or get_current_request()
return self.getProxy(request)
Martin Bornhold
pyro: Make request argument mandatory.
r342 def getProxy(self, request):
Martin Bornhold
pyro4: Create pyro proxy factory which returns instaces scoped by the request.
r341 """
Call this to get the pyro proxy instance for the request.
"""
Martin Bornhold
pyro: Return new proxy instances if called from outside of a request context....
r354
# If called without a request context we return new proxy instances
# on every call. This allows to run e.g. invoke tasks.
if request is None:
log.info('Creating pyro proxy without request context for '
'remote_uri=%s', self._remote_uri)
return Pyro4.Proxy(self._remote_uri)
# If there is an already borrowed proxy for the request context we
# return that instance instead of creating a new one.
Martin Bornhold
pyro4: Create pyro proxy factory which returns instaces scoped by the request.
r341 if request in self._borrowed_proxies:
return self._borrowed_proxies[request]
project: added all source files and assets
r1
Martin Bornhold
pyro4: Create pyro proxy factory which returns instaces scoped by the request.
r341 # Get proxy from pool or create new instance.
try:
proxy = self._proxy_pool.pop()
except IndexError:
Martin Bornhold
pyro: Slightly improved log message and comment.
r355 log.info('Creating pyro proxy for remote_uri=%s', self._remote_uri)
Martin Bornhold
pyro4: Create pyro proxy factory which returns instaces scoped by the request.
r341 proxy = Pyro4.Proxy(self._remote_uri)
Martin Bornhold
pyro: Slightly improved log message and comment.
r355 # Mark proxy as borrowed for the request context and add a callback
# that returns it when the request processing is finished.
Martin Bornhold
pyro4: Create pyro proxy factory which returns instaces scoped by the request.
r341 self._borrowed_proxies[request] = proxy
request.add_finished_callback(self._returnProxy)
return proxy
Martin Bornhold
pyro: Make request argument mandatory.
r342 def _returnProxy(self, request):
Martin Bornhold
pyro4: Create pyro proxy factory which returns instaces scoped by the request.
r341 """
Callback that gets called by pyramid when the request is finished.
It puts the proxy back into the pool.
"""
if request in self._borrowed_proxies:
proxy = self._borrowed_proxies.pop(request)
self._proxy_pool.append(proxy)
else:
log.warn('Return proxy for remote_uri=%s but no proxy borrowed '
'for this request.', self._remote_uri)
project: added all source files and assets
r1
class RemoteRepo(object):
def __init__(self, path, config, remote_proxy, with_wire=None):
self._wire = {
"path": path,
"config": config,
Martin Bornhold
pyro4: Adapt pyro4 RemoteRepo to have the same API as http RemoteRepo class.
r404 "context": self._create_vcs_cache_context(),
project: added all source files and assets
r1 }
if with_wire:
self._wire.update(with_wire)
self._remote_proxy = remote_proxy
self.refs = RefsWrapper(self)
def __getattr__(self, name):
log.debug('Calling %s@%s', self._remote_proxy, name)
# TODO: oliver: This is currently necessary pre-call since the
# config object is being changed for hooking scenarios
wire = copy.deepcopy(self._wire)
wire["config"] = wire["config"].serialize()
try:
func = _get_proxy_method(self._remote_proxy, name)
except DaemonError as e:
if e.message == 'unknown object':
raise exceptions.VCSBackendNotSupportedError
else:
raise
return _wrap_remote_call(self._remote_proxy, func, wire)
def __getitem__(self, key):
return self.revision(key)
Martin Bornhold
pyro4: Adapt pyro4 RemoteRepo to have the same API as http RemoteRepo class.
r404 def _create_vcs_cache_context(self):
"""
Creates a unique string which is passed to the VCSServer on every
remote call. It is used as cache key in the VCSServer.
"""
return str(uuid.uuid4())
def invalidate_vcs_cache(self):
"""
This is a no-op method for the pyro4 backend but we want to have the
same API for client.RemoteRepo and client_http.RemoteRepo classes.
"""
project: added all source files and assets
r1
def _get_proxy_method(proxy, name):
try:
return getattr(proxy, name)
except CommunicationError:
raise CommunicationError(
'Unable to connect to remote pyro server %s' % proxy)
def _wrap_remote_call(proxy, func, *args):
all_args = list(args)
@exceptions.map_vcs_exceptions
def caller(*args, **kwargs):
all_args.extend(args)
try:
return func(*all_args, **kwargs)
except ConnectionClosedError:
log.debug('Connection to VCSServer closed, trying to reconnect.')
proxy._pyroReconnect(tries=settings.PYRO_RECONNECT_TRIES)
return func(*all_args, **kwargs)
return caller
class RefsWrapper(object):
def __init__(self, repo):
self._repo = weakref.proxy(repo)
def __setitem__(self, key, value):
self._repo._assign_ref(key, value)
class FunctionWrapper(object):
def __init__(self, func, wire):
self._func = func
self._wire = wire
@exceptions.map_vcs_exceptions
def __call__(self, *args, **kwargs):
return self._func(self._wire, *args, **kwargs)