# -*- coding: utf-8 -*- # Copyright (C) 2014-2017 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ """ Provides the implementation of various client utilities to reach the vcsserver. """ import copy import logging import uuid import weakref import Pyro4 from pyramid.threadlocal import get_current_request from Pyro4.errors import CommunicationError, ConnectionClosedError, DaemonError from rhodecode.lib.vcs import exceptions from rhodecode.lib.vcs.conf import settings log = logging.getLogger(__name__) class RepoMaker(object): def __init__(self, proxy_factory): self._proxy_factory = proxy_factory def __call__(self, path, config, with_wire=None): log.debug('RepoMaker call on %s', path) return RemoteRepo( path, config, remote_proxy=self._proxy_factory(), with_wire=with_wire) def __getattr__(self, name): remote_proxy = self._proxy_factory() func = _get_proxy_method(remote_proxy, name) return _wrap_remote_call(remote_proxy, func) class RequestScopeProxyFactory(object): """ This factory returns pyro proxy instances based on a per request scope. It returns the same instance if called from within the same request and different instances if called from different requests. """ def __init__(self, remote_uri): self._remote_uri = remote_uri self._proxy_pool = [] self._borrowed_proxies = {} def __call__(self, request=None): """ Wrapper around `getProxy`. """ request = request or get_current_request() return self.getProxy(request) def getProxy(self, request): """ Call this to get the pyro proxy instance for the request. """ # If called without a request context we return new proxy instances # on every call. This allows to run e.g. invoke tasks. if request is None: log.info('Creating pyro proxy without request context for ' 'remote_uri=%s', self._remote_uri) return Pyro4.Proxy(self._remote_uri) # If there is an already borrowed proxy for the request context we # return that instance instead of creating a new one. if request in self._borrowed_proxies: return self._borrowed_proxies[request] # Get proxy from pool or create new instance. try: proxy = self._proxy_pool.pop() except IndexError: log.info('Creating pyro proxy for remote_uri=%s', self._remote_uri) proxy = Pyro4.Proxy(self._remote_uri) # Mark proxy as borrowed for the request context and add a callback # that returns it when the request processing is finished. self._borrowed_proxies[request] = proxy request.add_finished_callback(self._returnProxy) return proxy def _returnProxy(self, request): """ Callback that gets called by pyramid when the request is finished. It puts the proxy back into the pool. """ if request in self._borrowed_proxies: proxy = self._borrowed_proxies.pop(request) self._proxy_pool.append(proxy) else: log.warn('Return proxy for remote_uri=%s but no proxy borrowed ' 'for this request.', self._remote_uri) class RemoteRepo(object): def __init__(self, path, config, remote_proxy, with_wire=None): self._wire = { "path": path, "config": config, "context": self._create_vcs_cache_context(), } if with_wire: self._wire.update(with_wire) self._remote_proxy = remote_proxy self.refs = RefsWrapper(self) def __getattr__(self, name): log.debug('Calling %s@%s', self._remote_proxy, name) # TODO: oliver: This is currently necessary pre-call since the # config object is being changed for hooking scenarios wire = copy.deepcopy(self._wire) wire["config"] = wire["config"].serialize() try: func = _get_proxy_method(self._remote_proxy, name) except DaemonError as e: if e.message == 'unknown object': raise exceptions.VCSBackendNotSupportedError else: raise return _wrap_remote_call(self._remote_proxy, func, wire) def __getitem__(self, key): return self.revision(key) def _create_vcs_cache_context(self): """ Creates a unique string which is passed to the VCSServer on every remote call. It is used as cache key in the VCSServer. """ return str(uuid.uuid4()) def invalidate_vcs_cache(self): """ This is a no-op method for the pyro4 backend but we want to have the same API for client.RemoteRepo and client_http.RemoteRepo classes. """ def _get_proxy_method(proxy, name): try: return getattr(proxy, name) except CommunicationError: raise exceptions.PyroVCSCommunicationError( 'Unable to connect to remote pyro server %s' % proxy) def _wrap_remote_call(proxy, func, *args): all_args = list(args) @exceptions.map_vcs_exceptions def caller(*args, **kwargs): all_args.extend(args) try: return func(*all_args, **kwargs) except ConnectionClosedError: log.debug('Connection to VCSServer closed, trying to reconnect.') proxy._pyroReconnect(tries=settings.PYRO_RECONNECT_TRIES) return func(*all_args, **kwargs) return caller class RefsWrapper(object): def __init__(self, repo): self._repo = weakref.proxy(repo) def __setitem__(self, key, value): self._repo._assign_ref(key, value) class FunctionWrapper(object): def __init__(self, func, wire): self._func = func self._wire = wire @exceptions.map_vcs_exceptions def __call__(self, *args, **kwargs): return self._func(self._wire, *args, **kwargs)