client.py
346 lines
| 10.7 KiB
| text/x-python
|
PythonLexer
r1 | # -*- coding: utf-8 -*- | |||
# Copyright (C) 2014-2016 RhodeCode GmbH | ||||
# | ||||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
Provides the implementation of various client utilities to reach the vcsserver. | ||||
""" | ||||
import copy | ||||
import logging | ||||
import threading | ||||
import urlparse | ||||
import uuid | ||||
import weakref | ||||
from urllib2 import URLError | ||||
import msgpack | ||||
import Pyro4 | ||||
import requests | ||||
Martin Bornhold
|
r341 | from pyramid.threadlocal import get_current_request | ||
r1 | from Pyro4.errors import CommunicationError, ConnectionClosedError, DaemonError | |||
from rhodecode.lib.vcs import exceptions | ||||
from rhodecode.lib.vcs.conf import settings | ||||
log = logging.getLogger(__name__) | ||||
# TODO: mikhail: Keep it in sync with vcsserver's | ||||
# HTTPApplication.ALLOWED_EXCEPTIONS | ||||
EXCEPTIONS_MAP = { | ||||
'KeyError': KeyError, | ||||
'URLError': URLError, | ||||
} | ||||
class HTTPRepoMaker(object): | ||||
def __init__(self, server_and_port, backend_endpoint): | ||||
self.url = urlparse.urljoin( | ||||
'http://%s' % server_and_port, backend_endpoint) | ||||
def __call__(self, path, config, with_wire=None): | ||||
log.debug('HTTPRepoMaker call on %s', path) | ||||
return HTTPRemoteRepo(path, config, self.url, with_wire=with_wire) | ||||
def __getattr__(self, name): | ||||
def f(*args, **kwargs): | ||||
return self._call(name, *args, **kwargs) | ||||
return f | ||||
@exceptions.map_vcs_exceptions | ||||
def _call(self, name, *args, **kwargs): | ||||
payload = { | ||||
'id': str(uuid.uuid4()), | ||||
'method': name, | ||||
'params': {'args': args, 'kwargs': kwargs} | ||||
} | ||||
return _remote_call(self.url, payload, EXCEPTIONS_MAP) | ||||
class VcsHttpProxy(object): | ||||
CHUNK_SIZE = 16384 | ||||
def __init__(self, server_and_port, backend_endpoint): | ||||
adapter = requests.adapters.HTTPAdapter(max_retries=5) | ||||
self.base_url = urlparse.urljoin( | ||||
'http://%s' % server_and_port, backend_endpoint) | ||||
self.session = requests.Session() | ||||
self.session.mount('http://', adapter) | ||||
def handle(self, environment, input_data, *args, **kwargs): | ||||
data = { | ||||
'environment': environment, | ||||
'input_data': input_data, | ||||
'args': args, | ||||
'kwargs': kwargs | ||||
} | ||||
result = self.session.post( | ||||
self.base_url, msgpack.packb(data), stream=True) | ||||
return self._get_result(result) | ||||
def _deserialize_and_raise(self, error): | ||||
exception = Exception(error['message']) | ||||
try: | ||||
exception._vcs_kind = error['_vcs_kind'] | ||||
except KeyError: | ||||
pass | ||||
raise exception | ||||
def _iterate(self, result): | ||||
unpacker = msgpack.Unpacker() | ||||
for line in result.iter_content(chunk_size=self.CHUNK_SIZE): | ||||
unpacker.feed(line) | ||||
for chunk in unpacker: | ||||
yield chunk | ||||
def _get_result(self, result): | ||||
iterator = self._iterate(result) | ||||
error = iterator.next() | ||||
if error: | ||||
self._deserialize_and_raise(error) | ||||
status = iterator.next() | ||||
headers = iterator.next() | ||||
return iterator, status, headers | ||||
class HTTPRemoteRepo(object): | ||||
def __init__(self, path, config, url, with_wire=None): | ||||
self.url = url | ||||
self._wire = { | ||||
"path": path, | ||||
"config": config, | ||||
"context": str(uuid.uuid4()), | ||||
} | ||||
if with_wire: | ||||
self._wire.update(with_wire) | ||||
def __getattr__(self, name): | ||||
def f(*args, **kwargs): | ||||
return self._call(name, *args, **kwargs) | ||||
return f | ||||
@exceptions.map_vcs_exceptions | ||||
def _call(self, name, *args, **kwargs): | ||||
log.debug('Calling %s@%s', self.url, name) | ||||
# TODO: oliver: This is currently necessary pre-call since the | ||||
# config object is being changed for hooking scenarios | ||||
wire = copy.deepcopy(self._wire) | ||||
wire["config"] = wire["config"].serialize() | ||||
payload = { | ||||
'id': str(uuid.uuid4()), | ||||
'method': name, | ||||
'params': {'wire': wire, 'args': args, 'kwargs': kwargs} | ||||
} | ||||
return _remote_call(self.url, payload, EXCEPTIONS_MAP) | ||||
def __getitem__(self, key): | ||||
return self.revision(key) | ||||
def _remote_call(url, payload, exceptions_map): | ||||
response = requests.post(url, data=msgpack.packb(payload)) | ||||
response = msgpack.unpackb(response.content) | ||||
error = response.get('error') | ||||
if error: | ||||
type_ = error.get('type', 'Exception') | ||||
exc = exceptions_map.get(type_, Exception) | ||||
exc = exc(error.get('message')) | ||||
try: | ||||
exc._vcs_kind = error['_vcs_kind'] | ||||
except KeyError: | ||||
pass | ||||
raise exc | ||||
return response.get('result') | ||||
class RepoMaker(object): | ||||
def __init__(self, proxy_factory): | ||||
self._proxy_factory = proxy_factory | ||||
def __call__(self, path, config, with_wire=None): | ||||
log.debug('RepoMaker call on %s', path) | ||||
return RemoteRepo( | ||||
path, config, remote_proxy=self._proxy_factory(), | ||||
with_wire=with_wire) | ||||
def __getattr__(self, name): | ||||
remote_proxy = self._proxy_factory() | ||||
func = _get_proxy_method(remote_proxy, name) | ||||
return _wrap_remote_call(remote_proxy, func) | ||||
Martin Bornhold
|
r341 | class RequestScopeProxyFactory(object): | ||
r1 | """ | |||
Martin Bornhold
|
r341 | This factory returns pyro proxy instances based on a per request scope. | ||
It returns the same instance if called from within the same request and | ||||
different instances if called from different requests. | ||||
r1 | """ | |||
def __init__(self, remote_uri): | ||||
self._remote_uri = remote_uri | ||||
Martin Bornhold
|
r341 | self._proxy_pool = [] | ||
self._borrowed_proxies = {} | ||||
def __call__(self, request=None): | ||||
""" | ||||
Wrapper around `getProxy`. | ||||
""" | ||||
request = request or get_current_request() | ||||
return self.getProxy(request) | ||||
Martin Bornhold
|
r342 | def getProxy(self, request): | ||
Martin Bornhold
|
r341 | """ | ||
Call this to get the pyro proxy instance for the request. | ||||
""" | ||||
Martin Bornhold
|
r354 | |||
# If called without a request context we return new proxy instances | ||||
# on every call. This allows to run e.g. invoke tasks. | ||||
if request is None: | ||||
log.info('Creating pyro proxy without request context for ' | ||||
'remote_uri=%s', self._remote_uri) | ||||
return Pyro4.Proxy(self._remote_uri) | ||||
# If there is an already borrowed proxy for the request context we | ||||
# return that instance instead of creating a new one. | ||||
Martin Bornhold
|
r341 | if request in self._borrowed_proxies: | ||
return self._borrowed_proxies[request] | ||||
r1 | ||||
Martin Bornhold
|
r341 | # Get proxy from pool or create new instance. | ||
try: | ||||
proxy = self._proxy_pool.pop() | ||||
except IndexError: | ||||
Martin Bornhold
|
r355 | log.info('Creating pyro proxy for remote_uri=%s', self._remote_uri) | ||
Martin Bornhold
|
r341 | proxy = Pyro4.Proxy(self._remote_uri) | ||
Martin Bornhold
|
r355 | # Mark proxy as borrowed for the request context and add a callback | ||
# that returns it when the request processing is finished. | ||||
Martin Bornhold
|
r341 | self._borrowed_proxies[request] = proxy | ||
request.add_finished_callback(self._returnProxy) | ||||
return proxy | ||||
Martin Bornhold
|
r342 | def _returnProxy(self, request): | ||
Martin Bornhold
|
r341 | """ | ||
Callback that gets called by pyramid when the request is finished. | ||||
It puts the proxy back into the pool. | ||||
""" | ||||
if request in self._borrowed_proxies: | ||||
proxy = self._borrowed_proxies.pop(request) | ||||
self._proxy_pool.append(proxy) | ||||
else: | ||||
log.warn('Return proxy for remote_uri=%s but no proxy borrowed ' | ||||
'for this request.', self._remote_uri) | ||||
r1 | ||||
class RemoteRepo(object): | ||||
def __init__(self, path, config, remote_proxy, with_wire=None): | ||||
self._wire = { | ||||
"path": path, | ||||
"config": config, | ||||
Martin Bornhold
|
r404 | "context": self._create_vcs_cache_context(), | ||
r1 | } | |||
if with_wire: | ||||
self._wire.update(with_wire) | ||||
self._remote_proxy = remote_proxy | ||||
self.refs = RefsWrapper(self) | ||||
def __getattr__(self, name): | ||||
log.debug('Calling %s@%s', self._remote_proxy, name) | ||||
# TODO: oliver: This is currently necessary pre-call since the | ||||
# config object is being changed for hooking scenarios | ||||
wire = copy.deepcopy(self._wire) | ||||
wire["config"] = wire["config"].serialize() | ||||
try: | ||||
func = _get_proxy_method(self._remote_proxy, name) | ||||
except DaemonError as e: | ||||
if e.message == 'unknown object': | ||||
raise exceptions.VCSBackendNotSupportedError | ||||
else: | ||||
raise | ||||
return _wrap_remote_call(self._remote_proxy, func, wire) | ||||
def __getitem__(self, key): | ||||
return self.revision(key) | ||||
Martin Bornhold
|
r404 | def _create_vcs_cache_context(self): | ||
""" | ||||
Creates a unique string which is passed to the VCSServer on every | ||||
remote call. It is used as cache key in the VCSServer. | ||||
""" | ||||
return str(uuid.uuid4()) | ||||
def invalidate_vcs_cache(self): | ||||
""" | ||||
This is a no-op method for the pyro4 backend but we want to have the | ||||
same API for client.RemoteRepo and client_http.RemoteRepo classes. | ||||
""" | ||||
r1 | ||||
def _get_proxy_method(proxy, name): | ||||
try: | ||||
return getattr(proxy, name) | ||||
except CommunicationError: | ||||
raise CommunicationError( | ||||
'Unable to connect to remote pyro server %s' % proxy) | ||||
def _wrap_remote_call(proxy, func, *args): | ||||
all_args = list(args) | ||||
@exceptions.map_vcs_exceptions | ||||
def caller(*args, **kwargs): | ||||
all_args.extend(args) | ||||
try: | ||||
return func(*all_args, **kwargs) | ||||
except ConnectionClosedError: | ||||
log.debug('Connection to VCSServer closed, trying to reconnect.') | ||||
proxy._pyroReconnect(tries=settings.PYRO_RECONNECT_TRIES) | ||||
return func(*all_args, **kwargs) | ||||
return caller | ||||
class RefsWrapper(object): | ||||
def __init__(self, repo): | ||||
self._repo = weakref.proxy(repo) | ||||
def __setitem__(self, key, value): | ||||
self._repo._assign_ref(key, value) | ||||
class FunctionWrapper(object): | ||||
def __init__(self, func, wire): | ||||
self._func = func | ||||
self._wire = wire | ||||
@exceptions.map_vcs_exceptions | ||||
def __call__(self, *args, **kwargs): | ||||
return self._func(self._wire, *args, **kwargs) | ||||