svnremoterepo.py
160 lines
| 5.2 KiB
| text/x-python
|
PythonLexer
r1046 | # RhodeCode VCSServer provides access to different vcs backends via network. | |||
r1126 | # Copyright (C) 2014-2023 RhodeCode GmbH | |||
r1046 | # | |||
# This program is free software; you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation; either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program; if not, write to the Free Software Foundation, | ||||
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | ||||
import os | ||||
import tempfile | ||||
from svn import client | ||||
from svn import core | ||||
from svn import ra | ||||
from mercurial import error | ||||
r1060 | from vcsserver.str_utils import safe_bytes | |||
r1046 | ||||
core.svn_config_ensure(None) | ||||
svn_config = core.svn_config_get_config(None) | ||||
class RaCallbacks(ra.Callbacks): | ||||
@staticmethod | ||||
def open_tmp_file(pool): # pragma: no cover | ||||
(fd, fn) = tempfile.mkstemp() | ||||
os.close(fd) | ||||
return fn | ||||
@staticmethod | ||||
def get_client_string(pool): | ||||
return b'RhodeCode-subversion-url-checker' | ||||
class SubversionException(Exception): | ||||
pass | ||||
class SubversionConnectionException(SubversionException): | ||||
"""Exception raised when a generic error occurs when connecting to a repository.""" | ||||
def normalize_url(url): | ||||
if not url: | ||||
return url | ||||
if url.startswith(b'svn+http://') or url.startswith(b'svn+https://'): | ||||
url = url[4:] | ||||
url = url.rstrip(b'/') | ||||
return url | ||||
def _create_auth_baton(pool): | ||||
"""Create a Subversion authentication baton. """ | ||||
# Give the client context baton a suite of authentication | ||||
# providers.h | ||||
platform_specific = [ | ||||
'svn_auth_get_gnome_keyring_simple_provider', | ||||
'svn_auth_get_gnome_keyring_ssl_client_cert_pw_provider', | ||||
'svn_auth_get_keychain_simple_provider', | ||||
'svn_auth_get_keychain_ssl_client_cert_pw_provider', | ||||
'svn_auth_get_kwallet_simple_provider', | ||||
'svn_auth_get_kwallet_ssl_client_cert_pw_provider', | ||||
'svn_auth_get_ssl_client_cert_file_provider', | ||||
'svn_auth_get_windows_simple_provider', | ||||
'svn_auth_get_windows_ssl_server_trust_provider', | ||||
r1063 | ] | |||
r1046 | ||||
providers = [] | ||||
for p in platform_specific: | ||||
if getattr(core, p, None) is not None: | ||||
try: | ||||
providers.append(getattr(core, p)()) | ||||
except RuntimeError: | ||||
pass | ||||
providers += [ | ||||
client.get_simple_provider(), | ||||
client.get_username_provider(), | ||||
client.get_ssl_client_cert_file_provider(), | ||||
client.get_ssl_client_cert_pw_file_provider(), | ||||
client.get_ssl_server_trust_file_provider(), | ||||
r1063 | ] | |||
r1046 | ||||
return core.svn_auth_open(providers, pool) | ||||
class SubversionRepo(object): | ||||
"""Wrapper for a Subversion repository. | ||||
It uses the SWIG Python bindings, see above for requirements. | ||||
""" | ||||
def __init__(self, svn_url: bytes = b'', username: bytes = b'', password: bytes = b''): | ||||
self.username = username | ||||
self.password = password | ||||
self.svn_url = core.svn_path_canonicalize(svn_url) | ||||
self.auth_baton_pool = core.Pool() | ||||
self.auth_baton = _create_auth_baton(self.auth_baton_pool) | ||||
# self.init_ra_and_client() assumes that a pool already exists | ||||
self.pool = core.Pool() | ||||
self.ra = self.init_ra_and_client() | ||||
self.uuid = ra.get_uuid(self.ra, self.pool) | ||||
def init_ra_and_client(self): | ||||
"""Initializes the RA and client layers, because sometimes getting | ||||
unified diffs runs the remote server out of open files. | ||||
""" | ||||
if self.username: | ||||
core.svn_auth_set_parameter(self.auth_baton, | ||||
core.SVN_AUTH_PARAM_DEFAULT_USERNAME, | ||||
self.username) | ||||
if self.password: | ||||
core.svn_auth_set_parameter(self.auth_baton, | ||||
core.SVN_AUTH_PARAM_DEFAULT_PASSWORD, | ||||
self.password) | ||||
callbacks = RaCallbacks() | ||||
callbacks.auth_baton = self.auth_baton | ||||
try: | ||||
return ra.open2(self.svn_url, callbacks, svn_config, self.pool) | ||||
except SubversionException as e: | ||||
# e.child contains a detailed error messages | ||||
msglist = [] | ||||
svn_exc = e | ||||
while svn_exc: | ||||
if svn_exc.args[0]: | ||||
msglist.append(svn_exc.args[0]) | ||||
svn_exc = svn_exc.child | ||||
msg = '\n'.join(msglist) | ||||
raise SubversionConnectionException(msg) | ||||
class svnremoterepo(object): | ||||
""" the dumb wrapper for actual Subversion repositories """ | ||||
def __init__(self, username: bytes = b'', password: bytes = b'', svn_url: bytes = b''): | ||||
self.username = username or b'' | ||||
self.password = password or b'' | ||||
self.path = normalize_url(svn_url) | ||||
def svn(self): | ||||
try: | ||||
return SubversionRepo(self.path, self.username, self.password) | ||||
except SubversionConnectionException as e: | ||||
raise error.Abort(safe_bytes(e)) | ||||