|
|
# Copyright (C) 2010-2023 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
"""
|
|
|
Tests so called "in memory commits" commit API of vcs.
|
|
|
"""
|
|
|
|
|
|
import datetime
|
|
|
|
|
|
import pytest
|
|
|
|
|
|
from rhodecode.lib.str_utils import safe_bytes, safe_str
|
|
|
from rhodecode.lib.vcs.exceptions import (
|
|
|
EmptyRepositoryError,
|
|
|
NodeAlreadyAddedError,
|
|
|
NodeAlreadyExistsError,
|
|
|
NodeAlreadyRemovedError,
|
|
|
NodeAlreadyChangedError,
|
|
|
NodeDoesNotExistError,
|
|
|
NodeNotChangedError,
|
|
|
)
|
|
|
from rhodecode.lib.vcs.nodes import DirNode, FileNode
|
|
|
from rhodecode.tests.vcs.conftest import BackendTestMixin
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
def nodes():
|
|
|
nodes = [
|
|
|
FileNode(b"foobar", content=b"Foo & bar"),
|
|
|
FileNode(b"foobar2", content=b"Foo & bar, doubled!"),
|
|
|
FileNode(b"foo bar with spaces", content=b""),
|
|
|
FileNode(b"foo/bar/baz", content=b"Inside"),
|
|
|
FileNode(
|
|
|
b"foo/bar/file.bin",
|
|
|
content=(
|
|
|
b"\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1\x00\x00\x00\x00\x00\x00"
|
|
|
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00;\x00\x03\x00\xfe"
|
|
|
b"\xff\t\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
|
|
b"\x01\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x10\x00"
|
|
|
b"\x00\x18\x00\x00\x00\x01\x00\x00\x00\xfe\xff\xff\xff\x00\x00"
|
|
|
b"\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff"
|
|
|
b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"
|
|
|
),
|
|
|
),
|
|
|
]
|
|
|
return nodes
|
|
|
|
|
|
|
|
|
@pytest.mark.usefixtures("vcs_repository_support")
|
|
|
class TestInMemoryCommit(BackendTestMixin):
|
|
|
"""
|
|
|
This is a backend independent test case class which should be created
|
|
|
with ``type`` method.
|
|
|
|
|
|
It is required to set following attributes at subclass:
|
|
|
|
|
|
- ``backend_alias``: alias of used backend (see ``vcs.BACKENDS``)
|
|
|
"""
|
|
|
|
|
|
@classmethod
|
|
|
def _get_commits(cls):
|
|
|
return []
|
|
|
|
|
|
def test_add(self, nodes):
|
|
|
for node in nodes:
|
|
|
self.imc.add(node)
|
|
|
|
|
|
self.commit()
|
|
|
self.assert_successful_commit(nodes)
|
|
|
|
|
|
@pytest.mark.backends("hg")
|
|
|
def test_add_on_branch_hg(self, nodes):
|
|
|
for node in nodes:
|
|
|
self.imc.add(node)
|
|
|
self.commit(branch="stable")
|
|
|
self.assert_successful_commit(nodes)
|
|
|
|
|
|
@pytest.mark.backends("git")
|
|
|
def test_add_on_branch_git(self, nodes):
|
|
|
for node in nodes:
|
|
|
self.imc.add(node)
|
|
|
self.commit(branch="stable")
|
|
|
self.assert_successful_commit(nodes)
|
|
|
|
|
|
def test_add_in_bulk(self, nodes):
|
|
|
self.imc.add(*nodes)
|
|
|
|
|
|
self.commit()
|
|
|
self.assert_successful_commit(nodes)
|
|
|
|
|
|
def test_add_non_ascii_files(self):
|
|
|
nodes = [
|
|
|
FileNode(safe_bytes("żółwik/zwierzątko_utf8_str"), content=safe_bytes("ćććć")),
|
|
|
FileNode(safe_bytes("żółwik/zwierzątko_unicode"), content=safe_bytes("ćććć")),
|
|
|
]
|
|
|
|
|
|
for node in nodes:
|
|
|
self.imc.add(node)
|
|
|
|
|
|
self.commit()
|
|
|
self.assert_successful_commit(nodes)
|
|
|
|
|
|
def commit(self, branch=None):
|
|
|
self.old_commit_count = len(self.repo.commit_ids)
|
|
|
self.commit_message = "Test commit with unicode: żółwik"
|
|
|
self.commit_author = f"{self.__class__.__name__} <foo@email.com>"
|
|
|
self.commit = self.imc.commit(message=self.commit_message, author=self.commit_author, branch=branch)
|
|
|
|
|
|
def test_add_actually_adds_all_nodes_at_second_commit_too(self):
|
|
|
to_add = [
|
|
|
FileNode(b"foo/bar/image.png", content=b"\0"),
|
|
|
FileNode(b"foo/README.txt", content=b"readme!"),
|
|
|
]
|
|
|
self.imc.add(*to_add)
|
|
|
commit = self.imc.commit("Initial", "joe doe <joe.doe@example.com>")
|
|
|
assert isinstance(commit.get_node("foo"), DirNode)
|
|
|
assert isinstance(commit.get_node("foo/bar"), DirNode)
|
|
|
self.assert_nodes_in_commit(commit, to_add)
|
|
|
|
|
|
# commit some more files again
|
|
|
to_add = [
|
|
|
FileNode(b"foo/bar/foobaz/bar", content=b"foo"),
|
|
|
FileNode(b"foo/bar/another/bar", content=b"foo"),
|
|
|
FileNode(b"foo/baz.txt", content=b"foo"),
|
|
|
FileNode(b"foobar/foobaz/file", content=b"foo"),
|
|
|
FileNode(b"foobar/barbaz", content=b"foo"),
|
|
|
]
|
|
|
self.imc.add(*to_add)
|
|
|
commit = self.imc.commit("Another", "joe doe <joe.doe@example.com>")
|
|
|
self.assert_nodes_in_commit(commit, to_add)
|
|
|
|
|
|
def test_add_raise_already_added(self):
|
|
|
node = FileNode(b"foobar", content=b"baz")
|
|
|
self.imc.add(node)
|
|
|
with pytest.raises(NodeAlreadyAddedError):
|
|
|
self.imc.add(node)
|
|
|
|
|
|
def test_check_integrity_raise_already_exist(self):
|
|
|
node = FileNode(b"foobar", content=b"baz")
|
|
|
self.imc.add(node)
|
|
|
self.imc.commit(message="Added foobar", author="Some Name <foo@bar.com>")
|
|
|
self.imc.add(node)
|
|
|
with pytest.raises(NodeAlreadyExistsError):
|
|
|
self.imc.commit(message="new message", author="Some Name <foo@bar.com>")
|
|
|
|
|
|
def test_change(self):
|
|
|
self.imc.add(FileNode(b"foo/bar/baz", content=b"foo"))
|
|
|
self.imc.add(FileNode(b"foo/fbar", content=b"foobar"))
|
|
|
tip = self.imc.commit("Initial", "joe doe <joe.doe@example.com>")
|
|
|
|
|
|
# Change node's content
|
|
|
node = FileNode(b"foo/bar/baz", content=b"My **changed** content")
|
|
|
self.imc.change(node)
|
|
|
self.imc.commit("Changed %s" % node.path, "joe doe <joe.doe@example.com>")
|
|
|
|
|
|
newtip = self.repo.get_commit()
|
|
|
assert tip != newtip
|
|
|
assert tip.id != newtip.id
|
|
|
self.assert_nodes_in_commit(newtip, (node,))
|
|
|
|
|
|
def test_change_non_ascii(self):
|
|
|
to_add = [
|
|
|
FileNode(safe_bytes("żółwik/zwierzątko"), content=safe_bytes("ćććć")),
|
|
|
FileNode(safe_bytes("żółwik/zwierzątko_uni"), content=safe_bytes("ćććć")),
|
|
|
]
|
|
|
for node in to_add:
|
|
|
self.imc.add(node)
|
|
|
|
|
|
tip = self.imc.commit("Initial", "joe doe <joe.doe@example.com>")
|
|
|
|
|
|
# Change node's content
|
|
|
node = FileNode(safe_bytes("żółwik/zwierzątko"), content=b"My **changed** content")
|
|
|
self.imc.change(node)
|
|
|
self.imc.commit("Changed %s" % safe_str(node.path), author="joe doe <joe.doe@example.com>")
|
|
|
|
|
|
node_uni = FileNode(safe_bytes("żółwik/zwierzątko_uni"), content=b"My **changed** content")
|
|
|
self.imc.change(node_uni)
|
|
|
self.imc.commit("Changed %s" % safe_str(node_uni.path), author="joe doe <joe.doe@example.com>")
|
|
|
|
|
|
newtip = self.repo.get_commit()
|
|
|
assert tip != newtip
|
|
|
assert tip.id != newtip.id
|
|
|
|
|
|
self.assert_nodes_in_commit(newtip, (node, node_uni))
|
|
|
|
|
|
def test_change_raise_empty_repository(self):
|
|
|
node = FileNode(b"foobar")
|
|
|
with pytest.raises(EmptyRepositoryError):
|
|
|
self.imc.change(node)
|
|
|
|
|
|
def test_check_integrity_change_raise_node_does_not_exist(self):
|
|
|
node = FileNode(b"foobar", content=b"baz")
|
|
|
self.imc.add(node)
|
|
|
self.imc.commit(message="Added foobar", author="Some Name <foo@bar.com>")
|
|
|
node = FileNode(b"not-foobar", content=b"")
|
|
|
self.imc.change(node)
|
|
|
with pytest.raises(NodeDoesNotExistError):
|
|
|
self.imc.commit(message="Changed not existing node", author="Some Name <foo@bar.com>")
|
|
|
|
|
|
def test_change_raise_node_already_changed(self):
|
|
|
node = FileNode(b"foobar", content=b"baz")
|
|
|
self.imc.add(node)
|
|
|
self.imc.commit(message="Added foobar", author="Some Nam <foo@bar.com>")
|
|
|
node = FileNode(b"foobar", content=b"more baz")
|
|
|
self.imc.change(node)
|
|
|
with pytest.raises(NodeAlreadyChangedError):
|
|
|
self.imc.change(node)
|
|
|
|
|
|
def test_check_integrity_change_raise_node_not_changed(self, nodes):
|
|
|
self.test_add(nodes) # Performs first commit
|
|
|
|
|
|
node = FileNode(nodes[0].bytes_path, content=nodes[0].content)
|
|
|
self.imc.change(node)
|
|
|
with pytest.raises(NodeNotChangedError):
|
|
|
self.imc.commit(
|
|
|
message="Trying to mark node as changed without touching it", author="Some Name <foo@bar.com>"
|
|
|
)
|
|
|
|
|
|
def test_change_raise_node_already_removed(self):
|
|
|
node = FileNode(b"foobar", content=b"baz")
|
|
|
self.imc.add(node)
|
|
|
self.imc.commit(message="Added foobar", author="Some Name <foo@bar.com>")
|
|
|
self.imc.remove(FileNode(b"foobar"))
|
|
|
with pytest.raises(NodeAlreadyRemovedError):
|
|
|
self.imc.change(node)
|
|
|
|
|
|
def test_remove(self, nodes):
|
|
|
self.test_add(nodes) # Performs first commit
|
|
|
|
|
|
tip = self.repo.get_commit()
|
|
|
node = nodes[0]
|
|
|
assert node.content == tip.get_node(node.path).content
|
|
|
self.imc.remove(node)
|
|
|
self.imc.commit(message=f"Removed {node.path}", author="Some Name <foo@bar.com>")
|
|
|
|
|
|
newtip = self.repo.get_commit()
|
|
|
assert tip != newtip
|
|
|
assert tip.id != newtip.id
|
|
|
with pytest.raises(NodeDoesNotExistError):
|
|
|
newtip.get_node(node.path)
|
|
|
|
|
|
def test_remove_last_file_from_directory(self):
|
|
|
node = FileNode(b"omg/qwe/foo/bar", content=b"foobar")
|
|
|
self.imc.add(node)
|
|
|
self.imc.commit("added", author="joe doe <joe@doe.com>")
|
|
|
|
|
|
self.imc.remove(node)
|
|
|
tip = self.imc.commit("removed", "joe doe <joe@doe.com>")
|
|
|
with pytest.raises(NodeDoesNotExistError):
|
|
|
tip.get_node("omg/qwe/foo/bar")
|
|
|
|
|
|
def test_remove_raise_node_does_not_exist(self, nodes):
|
|
|
self.imc.remove(nodes[0])
|
|
|
with pytest.raises(NodeDoesNotExistError):
|
|
|
self.imc.commit(message="Trying to remove node at empty repository", author="Some Name <foo@bar.com>")
|
|
|
|
|
|
def test_check_integrity_remove_raise_node_does_not_exist(self, nodes):
|
|
|
self.test_add(nodes) # Performs first commit
|
|
|
|
|
|
node = FileNode(b"no-such-file")
|
|
|
self.imc.remove(node)
|
|
|
with pytest.raises(NodeDoesNotExistError):
|
|
|
self.imc.commit(message="Trying to remove not existing node", author="Some Name <foo@bar.com>")
|
|
|
|
|
|
def test_remove_raise_node_already_removed(self, nodes):
|
|
|
self.test_add(nodes) # Performs first commit
|
|
|
|
|
|
node = FileNode(nodes[0].bytes_path)
|
|
|
self.imc.remove(node)
|
|
|
with pytest.raises(NodeAlreadyRemovedError):
|
|
|
self.imc.remove(node)
|
|
|
|
|
|
def test_remove_raise_node_already_changed(self, nodes):
|
|
|
self.test_add(nodes) # Performs first commit
|
|
|
|
|
|
node = FileNode(nodes[0].bytes_path, content=b"Bending time")
|
|
|
self.imc.change(node)
|
|
|
with pytest.raises(NodeAlreadyChangedError):
|
|
|
self.imc.remove(node)
|
|
|
|
|
|
def test_reset(self):
|
|
|
self.imc.add(FileNode(b"foo", content=b"bar"))
|
|
|
# self.imc.change(FileNode(b'baz', content='new'))
|
|
|
# self.imc.remove(FileNode(b'qwe'))
|
|
|
self.imc.reset()
|
|
|
assert not any((self.imc.added, self.imc.changed, self.imc.removed))
|
|
|
|
|
|
def test_multiple_commits(self):
|
|
|
N = 3 # number of commits to perform
|
|
|
last = None
|
|
|
for x in range(N):
|
|
|
fname = safe_bytes("file%s" % str(x).rjust(5, "0"))
|
|
|
content = safe_bytes("foobar\n" * x)
|
|
|
node = FileNode(fname, content=content)
|
|
|
self.imc.add(node)
|
|
|
commit = self.imc.commit("Commit no. %s" % (x + 1), author="Vcs User <foo@bar.com>")
|
|
|
assert last != commit
|
|
|
last = commit
|
|
|
|
|
|
# Check commit number for same repo
|
|
|
assert len(self.repo.commit_ids) == N
|
|
|
|
|
|
# Check commit number for recreated repo
|
|
|
repo = self.Backend(self.repo_path)
|
|
|
assert len(repo.commit_ids) == N
|
|
|
|
|
|
def test_date_attr(self, local_dt_to_utc):
|
|
|
node = FileNode(b"foobar.txt", content=b"Foobared!")
|
|
|
self.imc.add(node)
|
|
|
date = datetime.datetime(1985, 1, 30, 1, 45)
|
|
|
commit = self.imc.commit("Committed at time when I was born ;-)", author="Test User <foo@bar.com>", date=date)
|
|
|
|
|
|
assert commit.date == local_dt_to_utc(date)
|
|
|
|
|
|
def assert_successful_commit(self, added_nodes):
|
|
|
newtip = self.repo.get_commit()
|
|
|
assert self.commit == newtip
|
|
|
assert self.old_commit_count + 1 == len(self.repo.commit_ids)
|
|
|
assert newtip.message == self.commit_message
|
|
|
assert newtip.author == self.commit_author
|
|
|
assert not any((self.imc.added, self.imc.changed, self.imc.removed))
|
|
|
self.assert_nodes_in_commit(newtip, added_nodes)
|
|
|
|
|
|
def assert_nodes_in_commit(self, commit, nodes):
|
|
|
for node in nodes:
|
|
|
assert commit.get_node(node.path).path == node.path
|
|
|
assert commit.get_node(node.path).content == node.content
|
|
|
|