commit.py
278 lines
| 9.5 KiB
| text/x-python
|
PythonLexer
r5608 | # Copyright (C) 2014-2024 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
SVN commit module | ||||
""" | ||||
r5647 | import logging | |||
r1 | import dateutil.parser | |||
from zope.cachedescriptors.property import Lazy as LazyProperty | ||||
r5074 | from rhodecode.lib.str_utils import safe_bytes, safe_str | |||
r1 | from rhodecode.lib.vcs import nodes, path as vcspath | |||
from rhodecode.lib.vcs.backends import base | ||||
r5074 | from rhodecode.lib.vcs.exceptions import CommitError | |||
r5647 | from vcsserver.lib.vcs_common import NodeKind, FILEMODE_EXECUTABLE, FILEMODE_DEFAULT, FILEMODE_LINK | |||
_SVN_PROP_TRUE = "*" | ||||
r1 | ||||
r5647 | log = logging.getLogger(__name__) | |||
r1 | ||||
class SubversionCommit(base.BaseCommit): | ||||
""" | ||||
Subversion specific implementation of commits | ||||
.. attribute:: branch | ||||
The Subversion backend does not support to assign branches to | ||||
specific commits. This attribute has always the value `None`. | ||||
""" | ||||
def __init__(self, repository, commit_id): | ||||
self.repository = repository | ||||
self.idx = self.repository._get_commit_idx(commit_id) | ||||
self._svn_rev = self.idx + 1 | ||||
self._remote = repository._remote | ||||
# TODO: handling of raw_id should be a method on repository itself, | ||||
# which knows how to translate commit index and commit id | ||||
self.raw_id = commit_id | ||||
self.short_id = commit_id | ||||
r5647 | self.id = f"r{commit_id}" | |||
r1 | ||||
self.nodes = {} | ||||
r5647 | self._path_mode_cache = {} # path stats cache, e.g filemode etc | |||
self._path_type_cache = {} # path type dir/file/link etc cache | ||||
r1 | self.tags = [] | |||
@property | ||||
def author(self): | ||||
r5647 | return safe_str(self._properties.get("svn:author")) | |||
r1 | ||||
@property | ||||
def date(self): | ||||
return _date_from_svn_properties(self._properties) | ||||
@property | ||||
def message(self): | ||||
r5647 | return safe_str(self._properties.get("svn:log")) | |||
r1 | ||||
@LazyProperty | ||||
def _properties(self): | ||||
return self._remote.revision_properties(self._svn_rev) | ||||
@LazyProperty | ||||
def parents(self): | ||||
parent_idx = self.idx - 1 | ||||
if parent_idx >= 0: | ||||
parent = self.repository.get_commit(commit_idx=parent_idx) | ||||
return [parent] | ||||
return [] | ||||
@LazyProperty | ||||
def children(self): | ||||
child_idx = self.idx + 1 | ||||
if child_idx < len(self.repository.commit_ids): | ||||
child = self.repository.get_commit(commit_idx=child_idx) | ||||
return [child] | ||||
return [] | ||||
r5647 | def _calculate_file_mode(self, path: bytes): | |||
r1 | # Note: Subversion flags files which are executable with a special | |||
# property `svn:executable` which is set to the value ``"*"``. | ||||
r5647 | if self._get_file_property(path, "svn:executable") == _SVN_PROP_TRUE: | |||
return FILEMODE_EXECUTABLE | ||||
r1 | else: | |||
r5647 | return FILEMODE_DEFAULT | |||
def get_file_mode(self, path: bytes): | ||||
path = self._fix_path(path) | ||||
if path not in self._path_mode_cache: | ||||
self._path_mode_cache[path] = self._calculate_file_mode(path) | ||||
return self._path_mode_cache[path] | ||||
def _get_path_type(self, path: bytes): | ||||
if path in self._path_type_cache: | ||||
return self._path_type_cache[path] | ||||
r1 | ||||
r5647 | if path == b"": | |||
self._path_type_cache[b""] = NodeKind.DIR | ||||
return NodeKind.DIR | ||||
path_type = self._remote.get_node_type(self._svn_rev, path) | ||||
if not path_type: | ||||
raise self.no_node_at_path(path) | ||||
#flags = None | ||||
self._path_type_cache[path] = path_type | ||||
#self._path_mode_cache[path] = flags | ||||
return self._path_type_cache[path] | ||||
def is_link(self, path: bytes): | ||||
r1 | # Note: Subversion has a flag for special files, the content of the | |||
# file contains the type of that file. | ||||
r5647 | if self._get_file_property(path, "svn:special") == _SVN_PROP_TRUE: | |||
return self.get_file_content(path).startswith(b"link") | ||||
r1 | return False | |||
r3896 | def is_node_binary(self, path): | |||
path = self._fix_path(path) | ||||
return self._remote.is_binary(self._svn_rev, safe_str(path)) | ||||
r5074 | def node_md5_hash(self, path): | |||
path = self._fix_path(path) | ||||
return self._remote.md5_hash(self._svn_rev, safe_str(path)) | ||||
r1 | def _get_file_property(self, path, name): | |||
r5647 | file_properties = self._remote.node_properties(safe_str(path), self._svn_rev) | |||
r1 | return file_properties.get(name) | |||
def get_file_content(self, path): | ||||
path = self._fix_path(path) | ||||
r5074 | return self._remote.get_file_content(self._svn_rev, safe_str(path)) | |||
r1 | ||||
r3895 | def get_file_content_streamed(self, path): | |||
path = self._fix_path(path) | ||||
r5074 | ||||
r5647 | stream_method = getattr(self._remote, "stream:get_file_content") | |||
r5074 | return stream_method(self._svn_rev, safe_str(path)) | |||
r3895 | ||||
r1 | def get_file_size(self, path): | |||
path = self._fix_path(path) | ||||
r5074 | return self._remote.get_file_size(self._svn_rev, safe_str(path)) | |||
r1 | ||||
r3275 | def get_path_history(self, path, limit=None, pre_load=None): | |||
r5647 | path = self._fix_path(path) | |||
history = self._remote.node_history(self._svn_rev, safe_str(path), limit) | ||||
return [self.repository.get_commit(commit_id=str(svn_rev)) for svn_rev in history] | ||||
r1 | ||||
def get_file_annotate(self, path, pre_load=None): | ||||
result = self._remote.file_annotate(safe_str(path), self._svn_rev) | ||||
for zero_based_line_no, svn_rev, content in result: | ||||
commit_id = str(svn_rev) | ||||
line_no = zero_based_line_no + 1 | ||||
r5647 | yield line_no, commit_id, lambda: self.repository.get_commit(commit_id=commit_id), content | |||
r1 | ||||
r5647 | def get_node(self, path: bytes, pre_load=None): | |||
r1 | path = self._fix_path(path) | |||
r5647 | ||||
# use cached, if we have one | ||||
if path in self.nodes: | ||||
return self.nodes[path] | ||||
r1 | ||||
r5647 | path_type = self._get_path_type(path) | |||
if path == b"": | ||||
node = nodes.RootNode(commit=self) | ||||
else: | ||||
if path_type == NodeKind.DIR: | ||||
node = nodes.DirNode(safe_bytes(path), commit=self) | ||||
elif path_type == NodeKind.FILE: | ||||
node = nodes.FileNode(safe_bytes(path), commit=self, pre_load=pre_load) | ||||
self._path_mode_cache[path] = node.mode | ||||
r1 | else: | |||
r5647 | raise self.no_node_at_path(path) | |||
r1 | ||||
r5647 | self.nodes[path] = node | |||
r1 | return self.nodes[path] | |||
r5647 | def get_nodes(self, path: bytes, pre_load=None): | |||
r1 | if self._get_kind(path) != nodes.NodeKind.DIR: | |||
r5647 | raise CommitError(f"Directory does not exist for commit {self.raw_id} at '{path}'") | |||
path = self._fix_path(path) | ||||
r1 | ||||
path_nodes = [] | ||||
r5651 | ||||
for obj_path, node_kind, pre_load_data in self._remote.get_nodes(self._svn_rev, path, pre_load): | ||||
r5647 | ||||
if node_kind is None: | ||||
raise CommitError(f"Requested object type={node_kind} cannot be determined") | ||||
# TODO: implement it ?? | ||||
stat_ = None | ||||
# # cache file mode | ||||
# if obj_path not in self._path_mode_cache: | ||||
# self._path_mode_cache[obj_path] = stat_ | ||||
# cache type | ||||
if node_kind not in self._path_type_cache: | ||||
self._path_type_cache[obj_path] = node_kind | ||||
entry = None | ||||
if obj_path in self.nodes: | ||||
entry = self.nodes[obj_path] | ||||
r1 | else: | |||
r5647 | if node_kind == NodeKind.DIR: | |||
entry = nodes.DirNode(safe_bytes(obj_path), commit=self) | ||||
elif node_kind == NodeKind.FILE: | ||||
r5651 | entry = nodes.FileNode(safe_bytes(obj_path), commit=self, mode=stat_, pre_load=pre_load, pre_load_data=pre_load_data) | |||
r5647 | if entry: | |||
self.nodes[obj_path] = entry | ||||
path_nodes.append(entry) | ||||
r1 | ||||
r5647 | path_nodes.sort() | |||
r1 | return path_nodes | |||
def _get_kind(self, path): | ||||
path = self._fix_path(path) | ||||
r5647 | path_type = self._get_path_type(path) | |||
return path_type | ||||
r1 | ||||
@LazyProperty | ||||
def _changes_cache(self): | ||||
return self._remote.revision_changes(self._svn_rev) | ||||
@LazyProperty | ||||
r5647 | def affected_files(self) -> list[bytes]: | |||
r1 | changed_files = set() | |||
r4961 | for files in self._changes_cache.values(): | |||
r1 | changed_files.update(files) | |||
return list(changed_files) | ||||
r621 | @LazyProperty | |||
def id(self): | ||||
return self.raw_id | ||||
r4242 | @LazyProperty | |||
def added_paths(self): | ||||
r5647 | return [n for n in self._changes_cache["added"]] | |||
r4242 | ||||
@LazyProperty | ||||
def changed_paths(self): | ||||
r5647 | return [n for n in self._changes_cache["changed"]] | |||
r4242 | ||||
@LazyProperty | ||||
def removed_paths(self): | ||||
r5647 | return [n for n in self._changes_cache["removed"]] | |||
r1 | ||||
def _date_from_svn_properties(properties): | ||||
""" | ||||
Parses the date out of given svn properties. | ||||
:return: :class:`datetime.datetime` instance. The object is naive. | ||||
""" | ||||
r1348 | ||||
r5647 | aware_date = dateutil.parser.parse(properties.get("svn:date")) | |||
r1348 | # final_date = aware_date.astimezone(dateutil.tz.tzlocal()) | |||
final_date = aware_date | ||||
return final_date.replace(tzinfo=None) | ||||