# Copyright (C) 2014-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ """ GIT commit module """ import io import configparser import logging from itertools import chain from zope.cachedescriptors.property import Lazy as LazyProperty from rhodecode.lib.datelib import utcdate_fromtimestamp from rhodecode.lib.str_utils import safe_bytes, safe_str from rhodecode.lib.vcs.backends import base from rhodecode.lib.vcs.exceptions import CommitError, NodeDoesNotExistError from rhodecode.lib.vcs.nodes import ( FileNode, DirNode, NodeKind, RootNode, SubModuleNode, LargeFileNode, ) from rhodecode.lib.vcs_common import FILEMODE_LINK log = logging.getLogger(__name__) class GitCommit(base.BaseCommit): """ Represents state of the repository at single commit id. """ _filter_pre_load = [ # done through a more complex tree walk on parents "affected_files", # done through subprocess not remote call "children", # done through a more complex tree walk on parents "status", # mercurial specific property not supported here "obsolete", # mercurial specific property not supported here "phase", # mercurial specific property not supported here "hidden", ] def __init__(self, repository, raw_id, idx, pre_load=None): self.repository = repository self._remote = repository._remote # TODO: johbo: Tweak of raw_id should not be necessary self.raw_id = safe_str(raw_id) self.idx = idx self._set_bulk_properties(pre_load) # caches self.nodes = {} self._path_mode_cache = {} # path stats cache, e.g filemode etc self._path_type_cache = {} # path type dir/file/link etc cache self._submodules = None def _set_bulk_properties(self, pre_load): if not pre_load: return pre_load = [entry for entry in pre_load if entry not in self._filter_pre_load] if not pre_load: return result = self._remote.bulk_request(self.raw_id, pre_load) for attr, value in result.items(): if attr in ["author", "message"]: if value: value = safe_str(value) elif attr == "date": value = utcdate_fromtimestamp(*value) elif attr == "parents": value = self._make_commits(value) elif attr == "branch": value = self._set_branch(value) self.__dict__[attr] = value @LazyProperty def _commit(self): return self._remote[self.raw_id] @LazyProperty def _tree_id(self): return self._remote[self._commit["tree"]]["id"] @LazyProperty def id(self): return self.raw_id @LazyProperty def short_id(self): return self.raw_id[:12] @LazyProperty def message(self): return safe_str(self._remote.message(self.id)) @LazyProperty def committer(self): return safe_str(self._remote.author(self.id)) @LazyProperty def author(self): return safe_str(self._remote.author(self.id)) @LazyProperty def date(self): unix_ts, tz = self._remote.date(self.raw_id) return utcdate_fromtimestamp(unix_ts, tz) @LazyProperty def status(self): """ Returns modified, added, removed, deleted files for current commit """ added, modified, deleted = self._changes_cache return list(modified), list(modified), list(deleted) @LazyProperty def tags(self): tags = [safe_str(name) for name, commit_id in self.repository.tags.items() if commit_id == self.raw_id] return tags @LazyProperty def commit_branches(self): branches = [] for name, commit_id in self.repository.branches.items(): if commit_id == self.raw_id: branches.append(name) return branches def _set_branch(self, branches): if branches: # actually commit can have multiple branches in git return safe_str(branches[0]) @LazyProperty def branch(self): branches = self._remote.branch(self.raw_id) return self._set_branch(branches) def _get_path_tree_id_and_type(self, path: bytes): if path in self._path_type_cache: return self._path_type_cache[path] if path == b"": self._path_type_cache[b""] = [self._tree_id, NodeKind.DIR] return self._path_type_cache[path] tree_id, tree_type, tree_mode = self._remote.tree_and_type_for_path(self.raw_id, path) if tree_id is None: raise self.no_node_at_path(path) self._path_type_cache[path] = [tree_id, tree_type] self._path_mode_cache[path] = tree_mode return self._path_type_cache[path] def _get_kind(self, path): path = self._fix_path(path) _, path_type = self._get_path_tree_id_and_type(path) return path_type def _assert_is_path(self, path): path = self._fix_path(path) if self._get_kind(path) != NodeKind.FILE: raise CommitError(f"File at path={path} does not exist for commit {self.raw_id}") return path def _get_file_nodes(self): return chain(*(t[2] for t in self.walk())) @LazyProperty def parents(self): """ Returns list of parent commits. """ parent_ids = self._remote.parents(self.id) return self._make_commits(parent_ids) @LazyProperty def children(self): """ Returns list of child commits. """ children = self._remote.children(self.raw_id) return self._make_commits(children) def _make_commits(self, commit_ids): def commit_maker(_commit_id): return self.repository.get_commit(commit_id=_commit_id) return [commit_maker(commit_id) for commit_id in commit_ids] def get_file_mode(self, path: bytes): """ Returns stat mode of the file at the given `path`. """ path = self._assert_is_path(path) # ensure path is traversed self._get_path_tree_id_and_type(path) return self._path_mode_cache[path] def is_link(self, path: bytes): path = self._assert_is_path(path) if path not in self._path_mode_cache: self._path_mode_cache[path] = self._remote.fctx_flags(self.raw_id, path) return self._path_mode_cache[path] == FILEMODE_LINK def is_node_binary(self, path): tree_id, _ = self._get_path_tree_id_and_type(path) return self._remote.is_binary(tree_id) def node_md5_hash(self, path): path = self._assert_is_path(path) return self._remote.md5_hash(self.raw_id, path) def get_file_content(self, path): """ Returns content of the file at given `path`. """ tree_id, _ = self._get_path_tree_id_and_type(path) return self._remote.blob_as_pretty_string(tree_id) def get_file_content_streamed(self, path): tree_id, _ = self._get_path_tree_id_and_type(path) stream_method = getattr(self._remote, "stream:blob_as_pretty_string") return stream_method(tree_id) def get_file_size(self, path): """ Returns size of the file at given `path`. """ tree_id, _ = self._get_path_tree_id_and_type(path) return self._remote.blob_raw_length(tree_id) def get_path_history(self, path, limit=None, pre_load=None): """ Returns history of file as reversed list of `GitCommit` objects for which file at given `path` has been modified. """ path = self._assert_is_path(path) history = self._remote.node_history(self.raw_id, path, limit) return [self.repository.get_commit(commit_id=commit_id, pre_load=pre_load) for commit_id in history] def get_file_annotate(self, path, pre_load=None): """ Returns a generator of four element tuples with lineno, commit_id, commit lazy loader and line """ result = self._remote.node_annotate(self.raw_id, path) for ln_no, commit_id, content in result: yield ( ln_no, commit_id, lambda: self.repository.get_commit(commit_id=commit_id, pre_load=pre_load), content, ) def get_nodes(self, path: bytes, pre_load=None): if self._get_kind(path) != NodeKind.DIR: raise CommitError(f"Directory does not exist for commit {self.raw_id} at '{path}'") path = self._fix_path(path) # call and check tree_id for this path tree_id, _ = self._get_path_tree_id_and_type(path) path_nodes = [] for bytes_name, stat_, tree_item_id, node_kind in self._remote.tree_items(tree_id): if node_kind is None: raise CommitError(f"Requested object type={node_kind} cannot be determined") if path != b"": obj_path = b"/".join((path, bytes_name)) else: obj_path = bytes_name # cache file mode for git, since we have it already if obj_path not in self._path_mode_cache: self._path_mode_cache[obj_path] = stat_ # cache type if node_kind not in self._path_type_cache: self._path_type_cache[obj_path] = [tree_item_id, node_kind] entry = None if obj_path in self.nodes: entry = self.nodes[obj_path] else: if node_kind == NodeKind.SUBMODULE: url = self._get_submodule_url(b"/".join((path, bytes_name))) entry= SubModuleNode(bytes_name, url=url, commit=tree_item_id, alias=self.repository.alias) elif node_kind == NodeKind.DIR: entry = DirNode(safe_bytes(obj_path), commit=self) elif node_kind == NodeKind.FILE: entry = FileNode(safe_bytes(obj_path), commit=self, mode=stat_, pre_load=pre_load) if entry: self.nodes[obj_path] = entry path_nodes.append(entry) path_nodes.sort() return path_nodes def get_node(self, path: bytes, pre_load=None): path = self._fix_path(path) # use cached, if we have one if path in self.nodes: return self.nodes[path] try: tree_id, path_type = self._get_path_tree_id_and_type(path) except CommitError: raise NodeDoesNotExistError(f"Cannot find one of parents' directories for a given path: {path}") if path == b"": node = RootNode(commit=self) else: if path_type == NodeKind.SUBMODULE: url = self._get_submodule_url(path) node = SubModuleNode(path, url=url, commit=tree_id, alias=self.repository.alias) elif path_type == NodeKind.DIR: node = DirNode(safe_bytes(path), commit=self) elif path_type == NodeKind.FILE: node = FileNode(safe_bytes(path), commit=self, pre_load=pre_load) self._path_mode_cache[path] = node.mode else: raise self.no_node_at_path(path) # cache node self.nodes[path] = node return self.nodes[path] def get_largefile_node(self, path: bytes): tree_id, _ = self._get_path_tree_id_and_type(path) pointer_spec = self._remote.is_large_file(tree_id) if pointer_spec: # content of that file regular FileNode is the hash of largefile file_id = pointer_spec.get("oid_hash") if not self._remote.in_largefiles_store(file_id): log.warning(f'Largefile oid={file_id} not found in store') return None lf_path = self._remote.store_path(file_id) return LargeFileNode(safe_bytes(lf_path), commit=self, org_path=path) @LazyProperty def affected_files(self) -> list[bytes]: """ Gets a fast accessible file changes for given commit """ added, modified, deleted = self._changes_cache return list(added.union(modified).union(deleted)) @LazyProperty def _changes_cache(self) -> tuple[set, set, set]: added = set() modified = set() deleted = set() parents = self.parents if not self.parents: parents = [base.EmptyCommit()] for parent in parents: if isinstance(parent, base.EmptyCommit): oid = None else: oid = parent.raw_id _added, _modified, _deleted = self._remote.tree_changes(oid, self.raw_id) added = added | set(_added) modified = modified | set(_modified) deleted = deleted | set(_deleted) return added, modified, deleted def _get_paths_for_status(self, status): """ Returns sorted list of paths for given ``status``. :param status: one of: *added*, *modified* or *deleted* """ added, modified, deleted = self._changes_cache return sorted({"added": list(added), "modified": list(modified), "deleted": list(deleted)}[status]) @LazyProperty def added_paths(self): return [n for n in self._get_paths_for_status("added")] @LazyProperty def changed_paths(self): return [n for n in self._get_paths_for_status("modified")] @LazyProperty def removed_paths(self): return [n for n in self._get_paths_for_status("deleted")] def _get_submodule_url(self, submodule_path: bytes): git_modules_path = b".gitmodules" if self._submodules is None: self._submodules = {} try: submodules_node = self.get_node(git_modules_path) except NodeDoesNotExistError: return None parser = configparser.RawConfigParser() parser.read_file(io.StringIO(submodules_node.str_content)) for section in parser.sections(): path = parser.get(section, "path") url = parser.get(section, "url") if path and url: self._submodules[safe_bytes(path).strip(b"/")] = url return self._submodules.get(submodule_path.strip(b"/"))