##// END OF EJS Templates
chore(deps): bumped waitress==3.0.0
chore(deps): bumped waitress==3.0.0

File last commit:

r1152:a0c49580 default
r1201:9cc5af90 default
Show More
svnremoterepo.py
160 lines | 5.2 KiB | text/x-python | PythonLexer
svn: fixed use of hgsubversion in the code
r1046 # RhodeCode VCSServer provides access to different vcs backends via network.
source-code: updated copyrights to 2023
r1126 # Copyright (C) 2014-2023 RhodeCode GmbH
svn: fixed use of hgsubversion in the code
r1046 #
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
import os
import tempfile
from svn import client
from svn import core
from svn import ra
from mercurial import error
packages: move the str utils to it's own module
r1060 from vcsserver.str_utils import safe_bytes
svn: fixed use of hgsubversion in the code
r1046
core.svn_config_ensure(None)
svn_config = core.svn_config_get_config(None)
class RaCallbacks(ra.Callbacks):
@staticmethod
def open_tmp_file(pool): # pragma: no cover
(fd, fn) = tempfile.mkstemp()
os.close(fd)
return fn
@staticmethod
def get_client_string(pool):
return b'RhodeCode-subversion-url-checker'
class SubversionException(Exception):
pass
class SubversionConnectionException(SubversionException):
"""Exception raised when a generic error occurs when connecting to a repository."""
def normalize_url(url):
if not url:
return url
if url.startswith(b'svn+http://') or url.startswith(b'svn+https://'):
url = url[4:]
url = url.rstrip(b'/')
return url
def _create_auth_baton(pool):
"""Create a Subversion authentication baton. """
# Give the client context baton a suite of authentication
# providers.h
platform_specific = [
'svn_auth_get_gnome_keyring_simple_provider',
'svn_auth_get_gnome_keyring_ssl_client_cert_pw_provider',
'svn_auth_get_keychain_simple_provider',
'svn_auth_get_keychain_ssl_client_cert_pw_provider',
'svn_auth_get_kwallet_simple_provider',
'svn_auth_get_kwallet_ssl_client_cert_pw_provider',
'svn_auth_get_ssl_client_cert_file_provider',
'svn_auth_get_windows_simple_provider',
'svn_auth_get_windows_ssl_server_trust_provider',
code: flake8 fixes
r1063 ]
svn: fixed use of hgsubversion in the code
r1046
providers = []
for p in platform_specific:
if getattr(core, p, None) is not None:
try:
providers.append(getattr(core, p)())
except RuntimeError:
pass
providers += [
client.get_simple_provider(),
client.get_username_provider(),
client.get_ssl_client_cert_file_provider(),
client.get_ssl_client_cert_pw_file_provider(),
client.get_ssl_server_trust_file_provider(),
code: flake8 fixes
r1063 ]
svn: fixed use of hgsubversion in the code
r1046
return core.svn_auth_open(providers, pool)
lint: auto-fixes
r1152 class SubversionRepo:
svn: fixed use of hgsubversion in the code
r1046 """Wrapper for a Subversion repository.
It uses the SWIG Python bindings, see above for requirements.
"""
def __init__(self, svn_url: bytes = b'', username: bytes = b'', password: bytes = b''):
self.username = username
self.password = password
self.svn_url = core.svn_path_canonicalize(svn_url)
self.auth_baton_pool = core.Pool()
self.auth_baton = _create_auth_baton(self.auth_baton_pool)
# self.init_ra_and_client() assumes that a pool already exists
self.pool = core.Pool()
self.ra = self.init_ra_and_client()
self.uuid = ra.get_uuid(self.ra, self.pool)
def init_ra_and_client(self):
"""Initializes the RA and client layers, because sometimes getting
unified diffs runs the remote server out of open files.
"""
if self.username:
core.svn_auth_set_parameter(self.auth_baton,
core.SVN_AUTH_PARAM_DEFAULT_USERNAME,
self.username)
if self.password:
core.svn_auth_set_parameter(self.auth_baton,
core.SVN_AUTH_PARAM_DEFAULT_PASSWORD,
self.password)
callbacks = RaCallbacks()
callbacks.auth_baton = self.auth_baton
try:
return ra.open2(self.svn_url, callbacks, svn_config, self.pool)
except SubversionException as e:
# e.child contains a detailed error messages
msglist = []
svn_exc = e
while svn_exc:
if svn_exc.args[0]:
msglist.append(svn_exc.args[0])
svn_exc = svn_exc.child
msg = '\n'.join(msglist)
raise SubversionConnectionException(msg)
lint: auto-fixes
r1152 class svnremoterepo:
svn: fixed use of hgsubversion in the code
r1046 """ the dumb wrapper for actual Subversion repositories """
def __init__(self, username: bytes = b'', password: bytes = b'', svn_url: bytes = b''):
self.username = username or b''
self.password = password or b''
self.path = normalize_url(svn_url)
def svn(self):
try:
return SubversionRepo(self.path, self.username, self.password)
except SubversionConnectionException as e:
raise error.Abort(safe_bytes(e))