##// END OF EJS Templates
fixed import
fixed import

File last commit:

r5651:bad147da default
r5659:822bcfab default
Show More
commit.py
278 lines | 9.5 KiB | text/x-python | PythonLexer
# Copyright (C) 2014-2024 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
SVN commit module
"""
import logging
import dateutil.parser
from zope.cachedescriptors.property import Lazy as LazyProperty
from rhodecode.lib.str_utils import safe_bytes, safe_str
from rhodecode.lib.vcs import nodes, path as vcspath
from rhodecode.lib.vcs.backends import base
from rhodecode.lib.vcs.exceptions import CommitError
from vcsserver.lib.vcs_common import NodeKind, FILEMODE_EXECUTABLE, FILEMODE_DEFAULT, FILEMODE_LINK
_SVN_PROP_TRUE = "*"
log = logging.getLogger(__name__)
class SubversionCommit(base.BaseCommit):
"""
Subversion specific implementation of commits
.. attribute:: branch
The Subversion backend does not support to assign branches to
specific commits. This attribute has always the value `None`.
"""
def __init__(self, repository, commit_id):
self.repository = repository
self.idx = self.repository._get_commit_idx(commit_id)
self._svn_rev = self.idx + 1
self._remote = repository._remote
# TODO: handling of raw_id should be a method on repository itself,
# which knows how to translate commit index and commit id
self.raw_id = commit_id
self.short_id = commit_id
self.id = f"r{commit_id}"
self.nodes = {}
self._path_mode_cache = {} # path stats cache, e.g filemode etc
self._path_type_cache = {} # path type dir/file/link etc cache
self.tags = []
@property
def author(self):
return safe_str(self._properties.get("svn:author"))
@property
def date(self):
return _date_from_svn_properties(self._properties)
@property
def message(self):
return safe_str(self._properties.get("svn:log"))
@LazyProperty
def _properties(self):
return self._remote.revision_properties(self._svn_rev)
@LazyProperty
def parents(self):
parent_idx = self.idx - 1
if parent_idx >= 0:
parent = self.repository.get_commit(commit_idx=parent_idx)
return [parent]
return []
@LazyProperty
def children(self):
child_idx = self.idx + 1
if child_idx < len(self.repository.commit_ids):
child = self.repository.get_commit(commit_idx=child_idx)
return [child]
return []
def _calculate_file_mode(self, path: bytes):
# Note: Subversion flags files which are executable with a special
# property `svn:executable` which is set to the value ``"*"``.
if self._get_file_property(path, "svn:executable") == _SVN_PROP_TRUE:
return FILEMODE_EXECUTABLE
else:
return FILEMODE_DEFAULT
def get_file_mode(self, path: bytes):
path = self._fix_path(path)
if path not in self._path_mode_cache:
self._path_mode_cache[path] = self._calculate_file_mode(path)
return self._path_mode_cache[path]
def _get_path_type(self, path: bytes):
if path in self._path_type_cache:
return self._path_type_cache[path]
if path == b"":
self._path_type_cache[b""] = NodeKind.DIR
return NodeKind.DIR
path_type = self._remote.get_node_type(self._svn_rev, path)
if not path_type:
raise self.no_node_at_path(path)
#flags = None
self._path_type_cache[path] = path_type
#self._path_mode_cache[path] = flags
return self._path_type_cache[path]
def is_link(self, path: bytes):
# Note: Subversion has a flag for special files, the content of the
# file contains the type of that file.
if self._get_file_property(path, "svn:special") == _SVN_PROP_TRUE:
return self.get_file_content(path).startswith(b"link")
return False
def is_node_binary(self, path):
path = self._fix_path(path)
return self._remote.is_binary(self._svn_rev, safe_str(path))
def node_md5_hash(self, path):
path = self._fix_path(path)
return self._remote.md5_hash(self._svn_rev, safe_str(path))
def _get_file_property(self, path, name):
file_properties = self._remote.node_properties(safe_str(path), self._svn_rev)
return file_properties.get(name)
def get_file_content(self, path):
path = self._fix_path(path)
return self._remote.get_file_content(self._svn_rev, safe_str(path))
def get_file_content_streamed(self, path):
path = self._fix_path(path)
stream_method = getattr(self._remote, "stream:get_file_content")
return stream_method(self._svn_rev, safe_str(path))
def get_file_size(self, path):
path = self._fix_path(path)
return self._remote.get_file_size(self._svn_rev, safe_str(path))
def get_path_history(self, path, limit=None, pre_load=None):
path = self._fix_path(path)
history = self._remote.node_history(self._svn_rev, safe_str(path), limit)
return [self.repository.get_commit(commit_id=str(svn_rev)) for svn_rev in history]
def get_file_annotate(self, path, pre_load=None):
result = self._remote.file_annotate(safe_str(path), self._svn_rev)
for zero_based_line_no, svn_rev, content in result:
commit_id = str(svn_rev)
line_no = zero_based_line_no + 1
yield line_no, commit_id, lambda: self.repository.get_commit(commit_id=commit_id), content
def get_node(self, path: bytes, pre_load=None):
path = self._fix_path(path)
# use cached, if we have one
if path in self.nodes:
return self.nodes[path]
path_type = self._get_path_type(path)
if path == b"":
node = nodes.RootNode(commit=self)
else:
if path_type == NodeKind.DIR:
node = nodes.DirNode(safe_bytes(path), commit=self)
elif path_type == NodeKind.FILE:
node = nodes.FileNode(safe_bytes(path), commit=self, pre_load=pre_load)
self._path_mode_cache[path] = node.mode
else:
raise self.no_node_at_path(path)
self.nodes[path] = node
return self.nodes[path]
def get_nodes(self, path: bytes, pre_load=None):
if self._get_kind(path) != nodes.NodeKind.DIR:
raise CommitError(f"Directory does not exist for commit {self.raw_id} at '{path}'")
path = self._fix_path(path)
path_nodes = []
for obj_path, node_kind, pre_load_data in self._remote.get_nodes(self._svn_rev, path, pre_load):
if node_kind is None:
raise CommitError(f"Requested object type={node_kind} cannot be determined")
# TODO: implement it ??
stat_ = None
# # cache file mode
# if obj_path not in self._path_mode_cache:
# self._path_mode_cache[obj_path] = stat_
# cache type
if node_kind not in self._path_type_cache:
self._path_type_cache[obj_path] = node_kind
entry = None
if obj_path in self.nodes:
entry = self.nodes[obj_path]
else:
if node_kind == NodeKind.DIR:
entry = nodes.DirNode(safe_bytes(obj_path), commit=self)
elif node_kind == NodeKind.FILE:
entry = nodes.FileNode(safe_bytes(obj_path), commit=self, mode=stat_, pre_load=pre_load, pre_load_data=pre_load_data)
if entry:
self.nodes[obj_path] = entry
path_nodes.append(entry)
path_nodes.sort()
return path_nodes
def _get_kind(self, path):
path = self._fix_path(path)
path_type = self._get_path_type(path)
return path_type
@LazyProperty
def _changes_cache(self):
return self._remote.revision_changes(self._svn_rev)
@LazyProperty
def affected_files(self) -> list[bytes]:
changed_files = set()
for files in self._changes_cache.values():
changed_files.update(files)
return list(changed_files)
@LazyProperty
def id(self):
return self.raw_id
@LazyProperty
def added_paths(self):
return [n for n in self._changes_cache["added"]]
@LazyProperty
def changed_paths(self):
return [n for n in self._changes_cache["changed"]]
@LazyProperty
def removed_paths(self):
return [n for n in self._changes_cache["removed"]]
def _date_from_svn_properties(properties):
"""
Parses the date out of given svn properties.
:return: :class:`datetime.datetime` instance. The object is naive.
"""
aware_date = dateutil.parser.parse(properties.get("svn:date"))
# final_date = aware_date.astimezone(dateutil.tz.tzlocal())
final_date = aware_date
return final_date.replace(tzinfo=None)