# Copyright (C) 2014-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ """ SVN commit module """ import logging import dateutil.parser from zope.cachedescriptors.property import Lazy as LazyProperty from rhodecode.lib.str_utils import safe_bytes, safe_str from rhodecode.lib.vcs import nodes, path as vcspath from rhodecode.lib.vcs.backends import base from rhodecode.lib.vcs.exceptions import CommitError from vcsserver.lib.vcs_common import NodeKind, FILEMODE_EXECUTABLE, FILEMODE_DEFAULT, FILEMODE_LINK _SVN_PROP_TRUE = "*" log = logging.getLogger(__name__) class SubversionCommit(base.BaseCommit): """ Subversion specific implementation of commits .. attribute:: branch The Subversion backend does not support to assign branches to specific commits. This attribute has always the value `None`. """ def __init__(self, repository, commit_id): self.repository = repository self.idx = self.repository._get_commit_idx(commit_id) self._svn_rev = self.idx + 1 self._remote = repository._remote # TODO: handling of raw_id should be a method on repository itself, # which knows how to translate commit index and commit id self.raw_id = commit_id self.short_id = commit_id self.id = f"r{commit_id}" self.nodes = {} self._path_mode_cache = {} # path stats cache, e.g filemode etc self._path_type_cache = {} # path type dir/file/link etc cache self.tags = [] @property def author(self): return safe_str(self._properties.get("svn:author")) @property def date(self): return _date_from_svn_properties(self._properties) @property def message(self): return safe_str(self._properties.get("svn:log")) @LazyProperty def _properties(self): return self._remote.revision_properties(self._svn_rev) @LazyProperty def parents(self): parent_idx = self.idx - 1 if parent_idx >= 0: parent = self.repository.get_commit(commit_idx=parent_idx) return [parent] return [] @LazyProperty def children(self): child_idx = self.idx + 1 if child_idx < len(self.repository.commit_ids): child = self.repository.get_commit(commit_idx=child_idx) return [child] return [] def _calculate_file_mode(self, path: bytes): # Note: Subversion flags files which are executable with a special # property `svn:executable` which is set to the value ``"*"``. if self._get_file_property(path, "svn:executable") == _SVN_PROP_TRUE: return FILEMODE_EXECUTABLE else: return FILEMODE_DEFAULT def get_file_mode(self, path: bytes): path = self._fix_path(path) if path not in self._path_mode_cache: self._path_mode_cache[path] = self._calculate_file_mode(path) return self._path_mode_cache[path] def _get_path_type(self, path: bytes): if path in self._path_type_cache: return self._path_type_cache[path] if path == b"": self._path_type_cache[b""] = NodeKind.DIR return NodeKind.DIR path_type = self._remote.get_node_type(self._svn_rev, path) if not path_type: raise self.no_node_at_path(path) #flags = None self._path_type_cache[path] = path_type #self._path_mode_cache[path] = flags return self._path_type_cache[path] def is_link(self, path: bytes): # Note: Subversion has a flag for special files, the content of the # file contains the type of that file. if self._get_file_property(path, "svn:special") == _SVN_PROP_TRUE: return self.get_file_content(path).startswith(b"link") return False def is_node_binary(self, path): path = self._fix_path(path) return self._remote.is_binary(self._svn_rev, safe_str(path)) def node_md5_hash(self, path): path = self._fix_path(path) return self._remote.md5_hash(self._svn_rev, safe_str(path)) def _get_file_property(self, path, name): file_properties = self._remote.node_properties(safe_str(path), self._svn_rev) return file_properties.get(name) def get_file_content(self, path): path = self._fix_path(path) return self._remote.get_file_content(self._svn_rev, safe_str(path)) def get_file_content_streamed(self, path): path = self._fix_path(path) stream_method = getattr(self._remote, "stream:get_file_content") return stream_method(self._svn_rev, safe_str(path)) def get_file_size(self, path): path = self._fix_path(path) return self._remote.get_file_size(self._svn_rev, safe_str(path)) def get_path_history(self, path, limit=None, pre_load=None): path = self._fix_path(path) history = self._remote.node_history(self._svn_rev, safe_str(path), limit) return [self.repository.get_commit(commit_id=str(svn_rev)) for svn_rev in history] def get_file_annotate(self, path, pre_load=None): result = self._remote.file_annotate(safe_str(path), self._svn_rev) for zero_based_line_no, svn_rev, content in result: commit_id = str(svn_rev) line_no = zero_based_line_no + 1 yield line_no, commit_id, lambda: self.repository.get_commit(commit_id=commit_id), content def get_node(self, path: bytes, pre_load=None): path = self._fix_path(path) # use cached, if we have one if path in self.nodes: return self.nodes[path] path_type = self._get_path_type(path) if path == b"": node = nodes.RootNode(commit=self) else: if path_type == NodeKind.DIR: node = nodes.DirNode(safe_bytes(path), commit=self) elif path_type == NodeKind.FILE: node = nodes.FileNode(safe_bytes(path), commit=self, pre_load=pre_load) self._path_mode_cache[path] = node.mode else: raise self.no_node_at_path(path) self.nodes[path] = node return self.nodes[path] def get_nodes(self, path: bytes, pre_load=None): if self._get_kind(path) != nodes.NodeKind.DIR: raise CommitError(f"Directory does not exist for commit {self.raw_id} at '{path}'") path = self._fix_path(path) path_nodes = [] for name, node_kind in self._remote.get_nodes(self._svn_rev, path): obj_path = vcspath.join(path, name) if node_kind is None: raise CommitError(f"Requested object type={node_kind} cannot be determined") # TODO: implement it ?? stat_ = None # # cache file mode # if obj_path not in self._path_mode_cache: # self._path_mode_cache[obj_path] = stat_ # cache type if node_kind not in self._path_type_cache: self._path_type_cache[obj_path] = node_kind entry = None if obj_path in self.nodes: entry = self.nodes[obj_path] else: if node_kind == NodeKind.DIR: entry = nodes.DirNode(safe_bytes(obj_path), commit=self) elif node_kind == NodeKind.FILE: entry = nodes.FileNode(safe_bytes(obj_path), commit=self, mode=stat_, pre_load=pre_load) if entry: self.nodes[obj_path] = entry path_nodes.append(entry) path_nodes.sort() return path_nodes def _get_kind(self, path): path = self._fix_path(path) path_type = self._get_path_type(path) return path_type @LazyProperty def _changes_cache(self): return self._remote.revision_changes(self._svn_rev) @LazyProperty def affected_files(self) -> list[bytes]: changed_files = set() for files in self._changes_cache.values(): changed_files.update(files) return list(changed_files) @LazyProperty def id(self): return self.raw_id @LazyProperty def added_paths(self): return [n for n in self._changes_cache["added"]] @LazyProperty def changed_paths(self): return [n for n in self._changes_cache["changed"]] @LazyProperty def removed_paths(self): return [n for n in self._changes_cache["removed"]] def _date_from_svn_properties(properties): """ Parses the date out of given svn properties. :return: :class:`datetime.datetime` instance. The object is naive. """ aware_date = dateutil.parser.parse(properties.get("svn:date")) # final_date = aware_date.astimezone(dateutil.tz.tzlocal()) final_date = aware_date return final_date.replace(tzinfo=None)