|
|
# Copyright (C) 2014-2024 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
"""
|
|
|
Module holding everything related to vcs nodes, with vcs2 architecture.
|
|
|
"""
|
|
|
|
|
|
import functools
|
|
|
import os
|
|
|
import stat
|
|
|
|
|
|
from zope.cachedescriptors.property import Lazy as LazyProperty
|
|
|
|
|
|
from rhodecode.config.conf import LANGUAGES_EXTENSIONS_MAP
|
|
|
from rhodecode.lib.str_utils import safe_str, safe_bytes
|
|
|
from rhodecode.lib.hash_utils import md5
|
|
|
from rhodecode.lib.vcs import path as vcspath
|
|
|
from rhodecode.lib.vcs.backends.base import EmptyCommit
|
|
|
from rhodecode.lib.vcs.conf.mtypes import get_mimetypes_db
|
|
|
from rhodecode.lib.vcs.exceptions import NodeError
|
|
|
from rhodecode.lib.vcs_common import NodeKind, FILEMODE_DEFAULT
|
|
|
|
|
|
LARGEFILE_PREFIX = ".hglf"
|
|
|
|
|
|
|
|
|
class NodeState:
|
|
|
ADDED = "added"
|
|
|
CHANGED = "changed"
|
|
|
NOT_CHANGED = "not changed"
|
|
|
REMOVED = "removed"
|
|
|
|
|
|
|
|
|
# TODO: not sure if that should be bytes or str ?
|
|
|
# most probably bytes because content should be bytes and we check it
|
|
|
BIN_BYTE_MARKER = b"\0"
|
|
|
|
|
|
|
|
|
|
|
|
@functools.total_ordering
|
|
|
class Node(object):
|
|
|
"""
|
|
|
Simplest class representing file or directory on repository. SCM backends
|
|
|
should use ``FileNode`` and ``DirNode`` subclasses rather than ``Node``
|
|
|
directly.
|
|
|
|
|
|
Node's ``path`` cannot start with slash as we operate on *relative* paths
|
|
|
only. Moreover, every single node is identified by the ``path`` attribute,
|
|
|
so it cannot end with slash, too. Otherwise, path could lead to mistakes.
|
|
|
"""
|
|
|
|
|
|
# RTLO marker allows swapping text, and certain
|
|
|
# security attacks could be used with this
|
|
|
RTLO_MARKER = "\u202e"
|
|
|
|
|
|
commit = None
|
|
|
|
|
|
def __init__(self, path: bytes, kind):
|
|
|
self._validate_path(path) # can throw exception if path is invalid
|
|
|
|
|
|
self.bytes_path: bytes = path.rstrip(b"/") # store for mixed encoding, and raw version
|
|
|
self.str_path: str = safe_str(self.bytes_path) # we store paths as str
|
|
|
self.path: str = self.str_path
|
|
|
|
|
|
if self.bytes_path == b"" and kind != NodeKind.DIR:
|
|
|
raise NodeError("Only DirNode and its subclasses may be initialized with empty path")
|
|
|
self.kind = kind
|
|
|
|
|
|
if self.is_root() and not self.is_dir():
|
|
|
raise NodeError("Root node cannot be FILE kind")
|
|
|
|
|
|
def __eq__(self, other):
|
|
|
if type(self) is not type(other):
|
|
|
return False
|
|
|
for attr in ["name", "path", "kind"]:
|
|
|
if getattr(self, attr) != getattr(other, attr):
|
|
|
return False
|
|
|
if self.is_file():
|
|
|
# FileNode compare, we need to fallback to content compare
|
|
|
return None
|
|
|
else:
|
|
|
# For DirNode's check without entering each dir
|
|
|
self_nodes_paths = list(sorted(n.path for n in self.nodes))
|
|
|
other_nodes_paths = list(sorted(n.path for n in self.nodes))
|
|
|
if self_nodes_paths != other_nodes_paths:
|
|
|
return False
|
|
|
return True
|
|
|
|
|
|
def __lt__(self, other):
|
|
|
if self.kind < other.kind:
|
|
|
return True
|
|
|
if self.kind > other.kind:
|
|
|
return False
|
|
|
if self.path < other.path:
|
|
|
return True
|
|
|
if self.path > other.path:
|
|
|
return False
|
|
|
|
|
|
def __repr__(self):
|
|
|
maybe_path = getattr(self, "path", "UNKNOWN_PATH")
|
|
|
return f"<{self.__class__.__name__} {maybe_path!r}>"
|
|
|
|
|
|
def __str__(self):
|
|
|
return self.name
|
|
|
|
|
|
def _validate_path(self, path: bytes):
|
|
|
self._assert_bytes(path)
|
|
|
|
|
|
if path.startswith(b"/"):
|
|
|
raise NodeError(
|
|
|
f"Cannot initialize Node objects with slash at "
|
|
|
f"the beginning as only relative paths are supported. "
|
|
|
f"Got {path}"
|
|
|
)
|
|
|
|
|
|
@classmethod
|
|
|
def _assert_bytes(cls, value):
|
|
|
if not isinstance(value, bytes):
|
|
|
raise TypeError(f"Bytes required as input, got {type(value)} of {value}.")
|
|
|
|
|
|
@LazyProperty
|
|
|
def parent(self):
|
|
|
parent_path: bytes = self.get_parent_path()
|
|
|
if parent_path:
|
|
|
if self.commit:
|
|
|
return self.commit.get_node(parent_path)
|
|
|
return DirNode(parent_path)
|
|
|
return None
|
|
|
|
|
|
@LazyProperty
|
|
|
def has_rtlo(self):
|
|
|
"""Detects if a path has right-to-left-override marker"""
|
|
|
return self.RTLO_MARKER in self.str_path
|
|
|
|
|
|
@LazyProperty
|
|
|
def dir_path(self):
|
|
|
"""
|
|
|
Returns name of the directory from full path of this vcs node. Empty
|
|
|
string is returned if there's no directory in the path
|
|
|
"""
|
|
|
_parts = self.path.rstrip("/").rsplit("/", 1)
|
|
|
if len(_parts) == 2:
|
|
|
return _parts[0]
|
|
|
return ""
|
|
|
|
|
|
@LazyProperty
|
|
|
def name(self):
|
|
|
"""
|
|
|
Returns name of the node so if its path
|
|
|
then only last part is returned.
|
|
|
"""
|
|
|
return self.str_path.rstrip("/").split("/")[-1]
|
|
|
|
|
|
@property
|
|
|
def kind(self):
|
|
|
return self._kind
|
|
|
|
|
|
@kind.setter
|
|
|
def kind(self, kind):
|
|
|
if hasattr(self, "_kind"):
|
|
|
raise NodeError("Cannot change node's kind")
|
|
|
else:
|
|
|
self._kind = kind
|
|
|
# Post setter check (path's trailing slash)
|
|
|
if self.str_path.endswith("/"):
|
|
|
raise NodeError("Node's path cannot end with slash")
|
|
|
|
|
|
def get_parent_path(self) -> bytes:
|
|
|
"""
|
|
|
Returns node's parent path or empty string if node is root.
|
|
|
"""
|
|
|
if self.is_root():
|
|
|
return b""
|
|
|
str_path = vcspath.dirname(self.bytes_path.rstrip(b"/")) + b"/"
|
|
|
|
|
|
return safe_bytes(str_path)
|
|
|
|
|
|
def is_file(self):
|
|
|
"""
|
|
|
Returns ``True`` if node's kind is ``NodeKind.FILE``, ``False``
|
|
|
otherwise.
|
|
|
"""
|
|
|
return self.kind == NodeKind.FILE
|
|
|
|
|
|
def is_dir(self):
|
|
|
"""
|
|
|
Returns ``True`` if node's kind is ``NodeKind.DIR``, ``False``
|
|
|
otherwise.
|
|
|
"""
|
|
|
return self.kind == NodeKind.DIR
|
|
|
|
|
|
def is_root(self):
|
|
|
"""
|
|
|
Returns ``True`` if node is a root node and ``False`` otherwise.
|
|
|
"""
|
|
|
return self.kind == NodeKind.DIR and self.path == ""
|
|
|
|
|
|
def is_submodule(self):
|
|
|
"""
|
|
|
Returns ``True`` if node's kind is ``NodeKind.SUBMODULE``, ``False``
|
|
|
otherwise.
|
|
|
"""
|
|
|
return self.kind == NodeKind.SUBMODULE
|
|
|
|
|
|
def is_largefile(self):
|
|
|
"""
|
|
|
Returns ``True`` if node's kind is ``NodeKind.LARGEFILE``, ``False``
|
|
|
otherwise
|
|
|
"""
|
|
|
return self.kind == NodeKind.LARGE_FILE
|
|
|
|
|
|
def is_link(self):
|
|
|
if self.commit:
|
|
|
return self.commit.is_link(self.bytes_path)
|
|
|
return False
|
|
|
|
|
|
|
|
|
class FileNode(Node):
|
|
|
"""
|
|
|
Class representing file nodes.
|
|
|
|
|
|
:attribute: path: path to the node, relative to repository's root
|
|
|
:attribute: content: if given arbitrary sets content of the file
|
|
|
:attribute: commit: if given, first time content is accessed, callback
|
|
|
:attribute: mode: stat mode for a node. Default is `FILEMODE_DEFAULT`.
|
|
|
"""
|
|
|
|
|
|
_filter_pre_load = []
|
|
|
|
|
|
def __init__(self, path: bytes, content: bytes | None = None, commit=None, mode=None, pre_load=None):
|
|
|
"""
|
|
|
Only one of ``content`` and ``commit`` may be given. Passing both
|
|
|
would raise ``NodeError`` exception.
|
|
|
|
|
|
:param path: relative path to the node
|
|
|
:param content: content may be passed to constructor
|
|
|
:param commit: if given, will use it to lazily fetch content
|
|
|
:param mode: ST_MODE (i.e. 0100644)
|
|
|
"""
|
|
|
if content and commit:
|
|
|
raise NodeError("Cannot use both content and commit")
|
|
|
|
|
|
super().__init__(path, kind=NodeKind.FILE)
|
|
|
|
|
|
self.commit = commit
|
|
|
if content and not isinstance(content, bytes):
|
|
|
# File content is one thing that inherently must be bytes
|
|
|
# we support passing str too, and convert the content
|
|
|
content = safe_bytes(content)
|
|
|
self._content = content
|
|
|
self._mode = mode or FILEMODE_DEFAULT
|
|
|
|
|
|
self._set_bulk_properties(pre_load)
|
|
|
|
|
|
def __eq__(self, other):
|
|
|
eq = super().__eq__(other)
|
|
|
if eq is not None:
|
|
|
return eq
|
|
|
return self.content == other.content
|
|
|
|
|
|
def __hash__(self):
|
|
|
raw_id = getattr(self.commit, "raw_id", "")
|
|
|
return hash((self.path, raw_id))
|
|
|
|
|
|
def __lt__(self, other):
|
|
|
lt = super().__lt__(other)
|
|
|
if lt is not None:
|
|
|
return lt
|
|
|
return self.content < other.content
|
|
|
|
|
|
def __repr__(self):
|
|
|
short_id = getattr(self.commit, "short_id", "")
|
|
|
return f"<{self.__class__.__name__} path={self.str_path!r}, short_id={short_id}>"
|
|
|
|
|
|
def _set_bulk_properties(self, pre_load):
|
|
|
if not pre_load:
|
|
|
return
|
|
|
pre_load = [entry for entry in pre_load if entry not in self._filter_pre_load]
|
|
|
if not pre_load:
|
|
|
return
|
|
|
|
|
|
remote = self.commit.get_remote()
|
|
|
result = remote.bulk_file_request(self.commit.raw_id, self.bytes_path, pre_load)
|
|
|
|
|
|
for attr, value in result.items():
|
|
|
if attr == "flags":
|
|
|
self.__dict__["mode"] = safe_str(value)
|
|
|
elif attr == "size":
|
|
|
self.__dict__["size"] = value
|
|
|
elif attr == "data":
|
|
|
self.__dict__["_content"] = value
|
|
|
elif attr == "is_binary":
|
|
|
self.__dict__["is_binary"] = value
|
|
|
elif attr == "md5":
|
|
|
self.__dict__["md5"] = value
|
|
|
else:
|
|
|
raise ValueError(f"Unsupported attr in bulk_property: {attr}")
|
|
|
|
|
|
@LazyProperty
|
|
|
def mode(self):
|
|
|
"""
|
|
|
Returns lazily mode of the FileNode. If `commit` is not set, would
|
|
|
use value given at initialization or `FILEMODE_DEFAULT` (default).
|
|
|
"""
|
|
|
if self.commit:
|
|
|
mode = self.commit.get_file_mode(self.bytes_path)
|
|
|
else:
|
|
|
mode = self._mode
|
|
|
return mode
|
|
|
|
|
|
@LazyProperty
|
|
|
def raw_bytes(self) -> bytes:
|
|
|
"""
|
|
|
Returns lazily the raw bytes of the FileNode.
|
|
|
"""
|
|
|
if self.commit:
|
|
|
if self._content is None:
|
|
|
self._content = self.commit.get_file_content(self.bytes_path)
|
|
|
content = self._content
|
|
|
else:
|
|
|
content = self._content
|
|
|
return content
|
|
|
|
|
|
def content_uncached(self):
|
|
|
"""
|
|
|
Returns lazily content of the FileNode.
|
|
|
"""
|
|
|
if self.commit:
|
|
|
content = self.commit.get_file_content(self.bytes_path)
|
|
|
else:
|
|
|
content = self._content
|
|
|
return content
|
|
|
|
|
|
def stream_bytes(self):
|
|
|
"""
|
|
|
Returns an iterator that will stream the content of the file directly from
|
|
|
vcsserver without loading it to memory.
|
|
|
"""
|
|
|
if self.commit:
|
|
|
return self.commit.get_file_content_streamed(self.bytes_path)
|
|
|
raise NodeError("Cannot retrieve stream_bytes without related commit attribute")
|
|
|
|
|
|
def metadata_uncached(self):
|
|
|
"""
|
|
|
Returns md5, binary flag of the file node, without any cache usage.
|
|
|
"""
|
|
|
|
|
|
content = self.content_uncached()
|
|
|
|
|
|
is_binary = bool(content and BIN_BYTE_MARKER in content)
|
|
|
size = 0
|
|
|
if content:
|
|
|
size = len(content)
|
|
|
|
|
|
return is_binary, md5(content), size, content
|
|
|
|
|
|
@LazyProperty
|
|
|
def content(self) -> bytes:
|
|
|
"""
|
|
|
Returns lazily content of the FileNode.
|
|
|
"""
|
|
|
content = self.raw_bytes
|
|
|
if content and not isinstance(content, bytes):
|
|
|
raise ValueError(f"Content is of type {type(content)} instead of bytes")
|
|
|
return content
|
|
|
|
|
|
@LazyProperty
|
|
|
def str_content(self) -> str:
|
|
|
return safe_str(self.raw_bytes)
|
|
|
|
|
|
@LazyProperty
|
|
|
def size(self):
|
|
|
if self.commit:
|
|
|
return self.commit.get_file_size(self.bytes_path)
|
|
|
raise NodeError("Cannot retrieve size of the file without related commit attribute")
|
|
|
|
|
|
@LazyProperty
|
|
|
def message(self):
|
|
|
if self.commit:
|
|
|
return self.last_commit.message
|
|
|
raise NodeError("Cannot retrieve message of the file without related " "commit attribute")
|
|
|
|
|
|
@LazyProperty
|
|
|
def last_commit(self):
|
|
|
if self.commit:
|
|
|
pre_load = ["author", "date", "message", "parents"]
|
|
|
return self.commit.get_path_commit(self.bytes_path, pre_load=pre_load)
|
|
|
raise NodeError("Cannot retrieve last commit of the file without related commit attribute")
|
|
|
|
|
|
def get_mimetype(self):
|
|
|
"""
|
|
|
Mimetype is calculated based on the file's content. If ``_mimetype``
|
|
|
attribute is available, it will be returned (backends which store
|
|
|
mimetypes or can easily recognize them, should set this private
|
|
|
attribute to indicate that type should *NOT* be calculated).
|
|
|
"""
|
|
|
|
|
|
if hasattr(self, "_mimetype"):
|
|
|
if isinstance(self._mimetype, (tuple, list)) and len(self._mimetype) == 2:
|
|
|
return self._mimetype
|
|
|
else:
|
|
|
raise NodeError("given _mimetype attribute must be an 2 element list or tuple")
|
|
|
|
|
|
db = get_mimetypes_db()
|
|
|
mtype, encoding = db.guess_type(self.name)
|
|
|
|
|
|
if mtype is None:
|
|
|
if not self.is_largefile() and self.is_binary:
|
|
|
mtype = "application/octet-stream"
|
|
|
encoding = None
|
|
|
else:
|
|
|
mtype = "text/plain"
|
|
|
encoding = None
|
|
|
|
|
|
# try with pygments
|
|
|
try:
|
|
|
from pygments.lexers import get_lexer_for_filename
|
|
|
|
|
|
mt = get_lexer_for_filename(self.name).mimetypes
|
|
|
except Exception:
|
|
|
mt = None
|
|
|
|
|
|
if mt:
|
|
|
mtype = mt[0]
|
|
|
|
|
|
return mtype, encoding
|
|
|
|
|
|
@LazyProperty
|
|
|
def mimetype(self):
|
|
|
"""
|
|
|
Wrapper around full mimetype info. It returns only type of fetched
|
|
|
mimetype without the encoding part. use get_mimetype function to fetch
|
|
|
full set of (type,encoding)
|
|
|
"""
|
|
|
return self.get_mimetype()[0]
|
|
|
|
|
|
@LazyProperty
|
|
|
def mimetype_main(self):
|
|
|
return self.mimetype.split("/")[0]
|
|
|
|
|
|
@classmethod
|
|
|
def get_lexer(cls, filename, content=None):
|
|
|
from pygments import lexers
|
|
|
|
|
|
extension = filename.split(".")[-1]
|
|
|
lexer = None
|
|
|
|
|
|
try:
|
|
|
lexer = lexers.guess_lexer_for_filename(filename, content, stripnl=False)
|
|
|
except lexers.ClassNotFound:
|
|
|
pass
|
|
|
|
|
|
# try our EXTENSION_MAP
|
|
|
if not lexer:
|
|
|
try:
|
|
|
lexer_class = LANGUAGES_EXTENSIONS_MAP.get(extension)
|
|
|
if lexer_class:
|
|
|
lexer = lexers.get_lexer_by_name(lexer_class[0])
|
|
|
except lexers.ClassNotFound:
|
|
|
pass
|
|
|
|
|
|
if not lexer:
|
|
|
lexer = lexers.TextLexer(stripnl=False)
|
|
|
|
|
|
return lexer
|
|
|
|
|
|
@LazyProperty
|
|
|
def lexer(self):
|
|
|
"""
|
|
|
Returns pygment's lexer class. Would try to guess lexer taking file's
|
|
|
content, name and mimetype.
|
|
|
"""
|
|
|
# TODO: this is more proper, but super heavy on investigating the type based on the content
|
|
|
# self.get_lexer(self.name, self.content)
|
|
|
|
|
|
return self.get_lexer(self.name)
|
|
|
|
|
|
@LazyProperty
|
|
|
def lexer_alias(self):
|
|
|
"""
|
|
|
Returns first alias of the lexer guessed for this file.
|
|
|
"""
|
|
|
return self.lexer.aliases[0]
|
|
|
|
|
|
@LazyProperty
|
|
|
def history(self):
|
|
|
"""
|
|
|
Returns a list of commit for this file in which the file was changed
|
|
|
"""
|
|
|
if self.commit is None:
|
|
|
raise NodeError("Unable to get commit for this FileNode")
|
|
|
return self.commit.get_path_history(self.bytes_path)
|
|
|
|
|
|
@LazyProperty
|
|
|
def annotate(self):
|
|
|
"""
|
|
|
Returns a list of three element tuples with lineno, commit and line
|
|
|
"""
|
|
|
if self.commit is None:
|
|
|
raise NodeError("Unable to get commit for this FileNode")
|
|
|
pre_load = ["author", "date", "message", "parents"]
|
|
|
return self.commit.get_file_annotate(self.bytes_path, pre_load=pre_load)
|
|
|
|
|
|
@LazyProperty
|
|
|
def is_binary(self):
|
|
|
"""
|
|
|
Returns True if file has binary content.
|
|
|
"""
|
|
|
if self.commit:
|
|
|
return self.commit.is_node_binary(self.bytes_path)
|
|
|
else:
|
|
|
raw_bytes = self._content
|
|
|
return bool(raw_bytes and BIN_BYTE_MARKER in raw_bytes)
|
|
|
|
|
|
@LazyProperty
|
|
|
def md5(self):
|
|
|
"""
|
|
|
Returns md5 of the file node.
|
|
|
"""
|
|
|
|
|
|
if self.commit:
|
|
|
return self.commit.node_md5_hash(self.bytes_path)
|
|
|
else:
|
|
|
raw_bytes = self._content
|
|
|
# TODO: this sucks, we're computing md5 on potentially super big stream data...
|
|
|
return md5(raw_bytes)
|
|
|
|
|
|
@LazyProperty
|
|
|
def extension(self):
|
|
|
"""Returns filenode extension"""
|
|
|
return self.name.split(".")[-1]
|
|
|
|
|
|
@property
|
|
|
def is_executable(self):
|
|
|
"""
|
|
|
Returns ``True`` if file has executable flag turned on.
|
|
|
"""
|
|
|
return bool(self.mode & stat.S_IXUSR)
|
|
|
|
|
|
def get_largefile_node(self):
|
|
|
"""
|
|
|
Try to return a Mercurial FileNode from this node. It does internal
|
|
|
checks inside largefile store, if that file exist there it will
|
|
|
create special instance of LargeFileNode which can get content from
|
|
|
LF store.
|
|
|
"""
|
|
|
if self.commit:
|
|
|
return self.commit.get_largefile_node(self.bytes_path)
|
|
|
|
|
|
def count_lines(self, content: str | bytes, count_empty=False):
|
|
|
if isinstance(content, str):
|
|
|
newline_marker = "\n"
|
|
|
elif isinstance(content, bytes):
|
|
|
newline_marker = b"\n"
|
|
|
else:
|
|
|
raise ValueError("content must be bytes or str got {type(content)} instead")
|
|
|
|
|
|
if count_empty:
|
|
|
all_lines = 0
|
|
|
empty_lines = 0
|
|
|
for line in content.splitlines(True):
|
|
|
if line == newline_marker:
|
|
|
empty_lines += 1
|
|
|
all_lines += 1
|
|
|
|
|
|
return all_lines, all_lines - empty_lines
|
|
|
else:
|
|
|
# fast method
|
|
|
empty_lines = all_lines = content.count(newline_marker)
|
|
|
if all_lines == 0 and content:
|
|
|
# one-line without a newline
|
|
|
empty_lines = all_lines = 1
|
|
|
|
|
|
return all_lines, empty_lines
|
|
|
|
|
|
def lines(self, count_empty=False):
|
|
|
all_lines, empty_lines = 0, 0
|
|
|
|
|
|
if not self.is_binary:
|
|
|
content = self.content
|
|
|
all_lines, empty_lines = self.count_lines(content, count_empty=count_empty)
|
|
|
return all_lines, empty_lines
|
|
|
|
|
|
|
|
|
class DirNode(Node):
|
|
|
"""
|
|
|
DirNode stores list of files and directories within this node.
|
|
|
Nodes may be used standalone but within repository context they
|
|
|
lazily fetch data within same repository's commit.
|
|
|
"""
|
|
|
|
|
|
def __init__(self, path, nodes=(), commit=None, default_pre_load=None):
|
|
|
"""
|
|
|
Only one of ``nodes`` and ``commit`` may be given. Passing both
|
|
|
would raise ``NodeError`` exception.
|
|
|
|
|
|
:param path: relative path to the node
|
|
|
:param nodes: content may be passed to constructor
|
|
|
:param commit: if given, will use it to lazily fetch content
|
|
|
"""
|
|
|
if nodes and commit:
|
|
|
raise NodeError("Cannot use both nodes and commit")
|
|
|
super().__init__(path, NodeKind.DIR)
|
|
|
self.commit = commit
|
|
|
self._nodes = nodes
|
|
|
self.default_pre_load = default_pre_load or ["is_binary", "size"]
|
|
|
|
|
|
def __iter__(self):
|
|
|
yield from self.nodes
|
|
|
|
|
|
def __eq__(self, other):
|
|
|
eq = super().__eq__(other)
|
|
|
if eq is not None:
|
|
|
return eq
|
|
|
# check without entering each dir
|
|
|
self_nodes_paths = list(sorted(n.path for n in self.nodes))
|
|
|
other_nodes_paths = list(sorted(n.path for n in self.nodes))
|
|
|
return self_nodes_paths == other_nodes_paths
|
|
|
|
|
|
def __lt__(self, other):
|
|
|
lt = super().__lt__(other)
|
|
|
if lt is not None:
|
|
|
return lt
|
|
|
# check without entering each dir
|
|
|
self_nodes_paths = list(sorted(n.path for n in self.nodes))
|
|
|
other_nodes_paths = list(sorted(n.path for n in self.nodes))
|
|
|
return self_nodes_paths < other_nodes_paths
|
|
|
|
|
|
@LazyProperty
|
|
|
def content(self):
|
|
|
raise NodeError(f"{self} represents a dir and has no `content` attribute")
|
|
|
|
|
|
@LazyProperty
|
|
|
def nodes(self):
|
|
|
if self.commit:
|
|
|
nodes = self.commit.get_nodes(self.bytes_path, pre_load=self.default_pre_load)
|
|
|
else:
|
|
|
nodes = self._nodes
|
|
|
return sorted(nodes)
|
|
|
|
|
|
@LazyProperty
|
|
|
def files(self):
|
|
|
return sorted(node for node in self.nodes if node.is_file())
|
|
|
|
|
|
@LazyProperty
|
|
|
def dirs(self):
|
|
|
return sorted(node for node in self.nodes if node.is_dir())
|
|
|
|
|
|
@LazyProperty
|
|
|
def state(self):
|
|
|
raise NodeError("Cannot access state of DirNode")
|
|
|
|
|
|
@LazyProperty
|
|
|
def size(self):
|
|
|
size = 0
|
|
|
for root, dirs, files in self.commit.walk(self.bytes_path):
|
|
|
for f in files:
|
|
|
size += f.size
|
|
|
|
|
|
return size
|
|
|
|
|
|
@LazyProperty
|
|
|
def last_commit(self):
|
|
|
if self.commit:
|
|
|
pre_load = ["author", "date", "message", "parents"]
|
|
|
return self.commit.get_path_commit(self.bytes_path, pre_load=pre_load)
|
|
|
raise NodeError("Cannot retrieve last commit of the file without related commit attribute")
|
|
|
|
|
|
def __repr__(self):
|
|
|
short_id = getattr(self.commit, "short_id", "")
|
|
|
return f"<{self.__class__.__name__} path={self.str_path!r}, short_id={short_id}>"
|
|
|
|
|
|
|
|
|
class RootNode(DirNode):
|
|
|
"""
|
|
|
DirNode being the root node of the repository.
|
|
|
"""
|
|
|
|
|
|
def __init__(self, nodes=(), commit=None):
|
|
|
super().__init__(path=b"", nodes=nodes, commit=commit)
|
|
|
|
|
|
def __repr__(self):
|
|
|
short_id = getattr(self.commit, "short_id", "")
|
|
|
return f"<{self.__class__.__name__} path={self.str_path!r}, short_id={short_id}>"
|
|
|
|
|
|
|
|
|
class SubModuleNode(Node):
|
|
|
"""
|
|
|
represents a SubModule of Git or SubRepo of Mercurial
|
|
|
"""
|
|
|
|
|
|
is_binary = False
|
|
|
size = 0
|
|
|
|
|
|
def __init__(self, name, url=None, commit=None, alias=None):
|
|
|
self.path = name
|
|
|
self.str_path: str = safe_str(self.path) # we store paths as str
|
|
|
self.kind = NodeKind.SUBMODULE
|
|
|
self.alias = alias
|
|
|
|
|
|
# we have to use EmptyCommit here since this can point to svn/git/hg
|
|
|
# submodules we cannot get from repository
|
|
|
self.commit = EmptyCommit(str(commit), alias=alias)
|
|
|
self.url = url or self._extract_submodule_url()
|
|
|
|
|
|
def __repr__(self):
|
|
|
short_id = getattr(self.commit, "short_id", "")
|
|
|
return f"<{self.__class__.__name__} {self.str_path!r} @ {short_id}>"
|
|
|
|
|
|
def _extract_submodule_url(self):
|
|
|
# TODO: find a way to parse gits submodule file and extract the
|
|
|
# linking URL
|
|
|
return self.path
|
|
|
|
|
|
@LazyProperty
|
|
|
def name(self):
|
|
|
"""
|
|
|
Returns name of the node so if its path
|
|
|
then only last part is returned.
|
|
|
"""
|
|
|
org = self.str_path.rstrip("/").split("/")[-1]
|
|
|
return f"{org} @ {self.commit.short_id}"
|
|
|
|
|
|
|
|
|
class LargeFileNode(FileNode):
|
|
|
def __init__(self, path, url=None, commit=None, alias=None, org_path=None):
|
|
|
self._validate_path(path) # can throw exception if path is invalid
|
|
|
self.org_path = org_path # as stored in VCS as LF pointer
|
|
|
|
|
|
self.bytes_path = path.rstrip(b"/") # store for __repr__
|
|
|
self.str_path = safe_str(self.bytes_path)
|
|
|
self.path = self.str_path
|
|
|
|
|
|
self.kind = NodeKind.LARGE_FILE
|
|
|
self.alias = alias
|
|
|
self._content = b""
|
|
|
|
|
|
def _validate_path(self, path: bytes):
|
|
|
"""
|
|
|
we override check since the LargeFileNode path is system absolute, but we check for bytes only
|
|
|
"""
|
|
|
self._assert_bytes(path)
|
|
|
|
|
|
def __repr__(self):
|
|
|
return f"<{self.__class__.__name__} {self.org_path} -> {self.str_path!r}>"
|
|
|
|
|
|
@LazyProperty
|
|
|
def size(self):
|
|
|
return os.stat(self.path).st_size
|
|
|
|
|
|
@LazyProperty
|
|
|
def raw_bytes(self):
|
|
|
with open(self.path, "rb") as f:
|
|
|
content = f.read()
|
|
|
return content
|
|
|
|
|
|
@LazyProperty
|
|
|
def name(self):
|
|
|
"""
|
|
|
Overwrites name to be the org lf path
|
|
|
"""
|
|
|
return self.org_path
|
|
|
|
|
|
def stream_bytes(self):
|
|
|
with open(self.path, "rb") as stream:
|
|
|
while True:
|
|
|
data = stream.read(16 * 1024)
|
|
|
if not data:
|
|
|
break
|
|
|
yield data
|
|
|
|