# Copyright (C) 2014-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ """ HG commit module """ import os from zope.cachedescriptors.property import Lazy as LazyProperty from rhodecode.lib.datelib import utcdate_fromtimestamp from rhodecode.lib.str_utils import safe_bytes, safe_str from rhodecode.lib.vcs import path as vcspath from rhodecode.lib.vcs.backends import base from rhodecode.lib.vcs.exceptions import CommitError from rhodecode.lib.vcs.nodes import ( AddedFileNodesGenerator, ChangedFileNodesGenerator, DirNode, FileNode, NodeKind, RemovedFileNodesGenerator, RootNode, SubModuleNode, LargeFileNode) from rhodecode.lib.vcs.utils.paths import get_dirs_for_path class MercurialCommit(base.BaseCommit): """ Represents state of the repository at the single commit. """ _filter_pre_load = [ # git specific property not supported here "_commit", ] def __init__(self, repository, raw_id, idx, pre_load=None): raw_id = safe_str(raw_id) self.repository = repository self._remote = repository._remote self.raw_id = raw_id self.idx = idx self._set_bulk_properties(pre_load) # caches self.nodes = {} self._stat_modes = {} # stat info for paths def _set_bulk_properties(self, pre_load): if not pre_load: return pre_load = [entry for entry in pre_load if entry not in self._filter_pre_load] if not pre_load: return result = self._remote.bulk_request(self.raw_id, pre_load) for attr, value in result.items(): if attr in ["author", "branch", "message"]: value = safe_str(value) elif attr == "affected_files": value = list(map(safe_str, value)) elif attr == "date": value = utcdate_fromtimestamp(*value) elif attr in ["children", "parents"]: value = self._make_commits(value) elif attr in ["phase"]: value = self._get_phase_text(value) self.__dict__[attr] = value @LazyProperty def tags(self): tags = [name for name, commit_id in self.repository.tags.items() if commit_id == self.raw_id] return tags @LazyProperty def branch(self): return safe_str(self._remote.ctx_branch(self.raw_id)) @LazyProperty def bookmarks(self): bookmarks = [ name for name, commit_id in self.repository.bookmarks.items() if commit_id == self.raw_id] return bookmarks @LazyProperty def message(self): return safe_str(self._remote.ctx_description(self.raw_id)) @LazyProperty def committer(self): return safe_str(self.author) @LazyProperty def author(self): return safe_str(self._remote.ctx_user(self.raw_id)) @LazyProperty def date(self): return utcdate_fromtimestamp(*self._remote.ctx_date(self.raw_id)) @LazyProperty def status(self): """ Returns modified, added, removed, deleted files for current commit """ return self._remote.ctx_status(self.raw_id) @LazyProperty def _file_paths(self): return self._remote.ctx_list(self.raw_id) @LazyProperty def _dir_paths(self): dir_paths = [''] dir_paths.extend(list(set(get_dirs_for_path(*self._file_paths)))) return dir_paths @LazyProperty def _paths(self): return self._dir_paths + self._file_paths @LazyProperty def id(self): if self.last: return 'tip' return self.short_id @LazyProperty def short_id(self): return self.raw_id[:12] def _make_commits(self, commit_ids, pre_load=None): return [self.repository.get_commit(commit_id=commit_id, pre_load=pre_load) for commit_id in commit_ids] @LazyProperty def parents(self): """ Returns list of parent commits. """ parents = self._remote.ctx_parents(self.raw_id) return self._make_commits(parents) def _get_phase_text(self, phase_id): return { 0: 'public', 1: 'draft', 2: 'secret', }.get(phase_id) or '' @LazyProperty def phase(self): phase_id = self._remote.ctx_phase(self.raw_id) phase_text = self._get_phase_text(phase_id) return safe_str(phase_text) @LazyProperty def obsolete(self): obsolete = self._remote.ctx_obsolete(self.raw_id) return obsolete @LazyProperty def hidden(self): hidden = self._remote.ctx_hidden(self.raw_id) return hidden @LazyProperty def children(self): """ Returns list of child commits. """ children = self._remote.ctx_children(self.raw_id) return self._make_commits(children) def _get_kind(self, path): path = self._fix_path(path) if path in self._file_paths: return NodeKind.FILE elif path in self._dir_paths: return NodeKind.DIR else: raise CommitError(f"Node does not exist at the given path '{path}'") def _assert_is_path(self, path) -> str: path = self._fix_path(path) if self._get_kind(path) != NodeKind.FILE: raise CommitError(f"File does not exist for commit {self.raw_id} at '{path}'") return path def get_file_mode(self, path: bytes): """ Returns stat mode of the file at the given ``path``. """ path = self._assert_is_path(path) if path not in self._stat_modes: self._stat_modes[path] = self._remote.fctx_flags(self.raw_id, path) if 'x' in self._stat_modes[path]: return base.FILEMODE_EXECUTABLE return base.FILEMODE_DEFAULT def is_link(self, path): path = self._assert_is_path(path) if path not in self._stat_modes: self._stat_modes[path] = self._remote.fctx_flags(self.raw_id, path) return 'l' in self._stat_modes[path] def is_node_binary(self, path): path = self._assert_is_path(path) return self._remote.is_binary(self.raw_id, path) def node_md5_hash(self, path): path = self._assert_is_path(path) return self._remote.md5_hash(self.raw_id, path) def get_file_content(self, path): """ Returns content of the file at given ``path``. """ path = self._assert_is_path(path) return self._remote.fctx_node_data(self.raw_id, path) def get_file_content_streamed(self, path): path = self._assert_is_path(path) stream_method = getattr(self._remote, 'stream:fctx_node_data') return stream_method(self.raw_id, path) def get_file_size(self, path): """ Returns size of the file at given ``path``. """ path = self._assert_is_path(path) return self._remote.fctx_size(self.raw_id, path) def get_path_history(self, path, limit=None, pre_load=None): """ Returns history of file as reversed list of `MercurialCommit` objects for which file at given ``path`` has been modified. """ path = self._assert_is_path(path) hist = self._remote.node_history(self.raw_id, path, limit) return [ self.repository.get_commit(commit_id=commit_id, pre_load=pre_load) for commit_id in hist] def get_file_annotate(self, path, pre_load=None): """ Returns a generator of four element tuples with lineno, commit_id, commit lazy loader and line """ result = self._remote.fctx_annotate(self.raw_id, path) for ln_no, commit_id, content in result: yield ( ln_no, commit_id, lambda: self.repository.get_commit(commit_id=commit_id, pre_load=pre_load), content) def get_nodes(self, path, pre_load=None): """ Returns combined ``DirNode`` and ``FileNode`` objects list representing state of commit at the given ``path``. If node at the given ``path`` is not instance of ``DirNode``, CommitError would be raised. """ if self._get_kind(path) != NodeKind.DIR: raise CommitError( f"Directory does not exist for idx {self.raw_id} at '{path}'") path = self._fix_path(path) filenodes = [ FileNode(safe_bytes(f), commit=self, pre_load=pre_load) for f in self._file_paths if os.path.dirname(f) == path] # TODO: johbo: Check if this can be done in a more obvious way dirs = path == '' and '' or [ d for d in self._dir_paths if d and vcspath.dirname(d) == path] dirnodes = [ DirNode(safe_bytes(d), commit=self) for d in dirs if os.path.dirname(d) == path] alias = self.repository.alias for k, vals in self._submodules.items(): if vcspath.dirname(k) == path: loc = vals[0] commit = vals[1] dirnodes.append(SubModuleNode(k, url=loc, commit=commit, alias=alias)) nodes = dirnodes + filenodes for node in nodes: if node.path not in self.nodes: self.nodes[node.path] = node nodes.sort() return nodes def get_node(self, path, pre_load=None): """ Returns `Node` object from the given `path`. If there is no node at the given `path`, `NodeDoesNotExistError` would be raised. """ path = self._fix_path(path) if path not in self.nodes: if path in self._file_paths: node = FileNode(safe_bytes(path), commit=self, pre_load=pre_load) elif path in self._dir_paths: if path == '': node = RootNode(commit=self) else: node = DirNode(safe_bytes(path), commit=self) else: raise self.no_node_at_path(path) # cache node self.nodes[path] = node return self.nodes[path] def get_largefile_node(self, path): pointer_spec = self._remote.is_large_file(self.raw_id, path) if pointer_spec: # content of that file regular FileNode is the hash of largefile file_id = self.get_file_content(path).strip() if self._remote.in_largefiles_store(file_id): lf_path = self._remote.store_path(file_id) return LargeFileNode(safe_bytes(lf_path), commit=self, org_path=path) elif self._remote.in_user_cache(file_id): lf_path = self._remote.store_path(file_id) self._remote.link(file_id, path) return LargeFileNode(safe_bytes(lf_path), commit=self, org_path=path) @LazyProperty def _submodules(self): """ Returns a dictionary with submodule information from substate file of hg repository. """ return self._remote.ctx_substate(self.raw_id) @LazyProperty def affected_files(self): """ Gets a fast accessible file changes for given commit """ return self._remote.ctx_files(self.raw_id) @property def added(self): """ Returns list of added ``FileNode`` objects. """ return AddedFileNodesGenerator(self.added_paths, self) @LazyProperty def added_paths(self): return [n for n in self.status[1]] @property def changed(self): """ Returns list of modified ``FileNode`` objects. """ return ChangedFileNodesGenerator(self.changed_paths, self) @LazyProperty def changed_paths(self): return [n for n in self.status[0]] @property def removed(self): """ Returns list of removed ``FileNode`` objects. """ return RemovedFileNodesGenerator(self.removed_paths, self) @LazyProperty def removed_paths(self): return [n for n in self.status[2]]