# RhodeCode VCSServer provides access to different vcs backends via network. # Copyright (C) 2014-2020 RhodeCode GmbH # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA import os import tempfile from svn import client from svn import core from svn import ra from mercurial import error from vcsserver.str_utils import safe_bytes core.svn_config_ensure(None) svn_config = core.svn_config_get_config(None) class RaCallbacks(ra.Callbacks): @staticmethod def open_tmp_file(pool): # pragma: no cover (fd, fn) = tempfile.mkstemp() os.close(fd) return fn @staticmethod def get_client_string(pool): return b'RhodeCode-subversion-url-checker' class SubversionException(Exception): pass class SubversionConnectionException(SubversionException): """Exception raised when a generic error occurs when connecting to a repository.""" def normalize_url(url): if not url: return url if url.startswith(b'svn+http://') or url.startswith(b'svn+https://'): url = url[4:] url = url.rstrip(b'/') return url def _create_auth_baton(pool): """Create a Subversion authentication baton. """ # Give the client context baton a suite of authentication # providers.h platform_specific = [ 'svn_auth_get_gnome_keyring_simple_provider', 'svn_auth_get_gnome_keyring_ssl_client_cert_pw_provider', 'svn_auth_get_keychain_simple_provider', 'svn_auth_get_keychain_ssl_client_cert_pw_provider', 'svn_auth_get_kwallet_simple_provider', 'svn_auth_get_kwallet_ssl_client_cert_pw_provider', 'svn_auth_get_ssl_client_cert_file_provider', 'svn_auth_get_windows_simple_provider', 'svn_auth_get_windows_ssl_server_trust_provider', ] providers = [] for p in platform_specific: if getattr(core, p, None) is not None: try: providers.append(getattr(core, p)()) except RuntimeError: pass providers += [ client.get_simple_provider(), client.get_username_provider(), client.get_ssl_client_cert_file_provider(), client.get_ssl_client_cert_pw_file_provider(), client.get_ssl_server_trust_file_provider(), ] return core.svn_auth_open(providers, pool) class SubversionRepo(object): """Wrapper for a Subversion repository. It uses the SWIG Python bindings, see above for requirements. """ def __init__(self, svn_url: bytes = b'', username: bytes = b'', password: bytes = b''): self.username = username self.password = password self.svn_url = core.svn_path_canonicalize(svn_url) self.auth_baton_pool = core.Pool() self.auth_baton = _create_auth_baton(self.auth_baton_pool) # self.init_ra_and_client() assumes that a pool already exists self.pool = core.Pool() self.ra = self.init_ra_and_client() self.uuid = ra.get_uuid(self.ra, self.pool) def init_ra_and_client(self): """Initializes the RA and client layers, because sometimes getting unified diffs runs the remote server out of open files. """ if self.username: core.svn_auth_set_parameter(self.auth_baton, core.SVN_AUTH_PARAM_DEFAULT_USERNAME, self.username) if self.password: core.svn_auth_set_parameter(self.auth_baton, core.SVN_AUTH_PARAM_DEFAULT_PASSWORD, self.password) callbacks = RaCallbacks() callbacks.auth_baton = self.auth_baton try: return ra.open2(self.svn_url, callbacks, svn_config, self.pool) except SubversionException as e: # e.child contains a detailed error messages msglist = [] svn_exc = e while svn_exc: if svn_exc.args[0]: msglist.append(svn_exc.args[0]) svn_exc = svn_exc.child msg = '\n'.join(msglist) raise SubversionConnectionException(msg) class svnremoterepo(object): """ the dumb wrapper for actual Subversion repositories """ def __init__(self, username: bytes = b'', password: bytes = b'', svn_url: bytes = b''): self.username = username or b'' self.password = password or b'' self.path = normalize_url(svn_url) def svn(self): try: return SubversionRepo(self.path, self.username, self.password) except SubversionConnectionException as e: raise error.Abort(safe_bytes(e))