test_inmemory.py
353 lines
| 13.4 KiB
| text/x-python
|
PythonLexer
r1 | ||||
r5088 | # Copyright (C) 2010-2023 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
Tests so called "in memory commits" commit API of vcs. | ||||
""" | ||||
import datetime | ||||
import pytest | ||||
r5087 | from rhodecode.lib.str_utils import safe_bytes, safe_str | |||
r1 | from rhodecode.lib.vcs.exceptions import ( | |||
EmptyRepositoryError, NodeAlreadyAddedError, NodeAlreadyExistsError, | ||||
NodeAlreadyRemovedError, NodeAlreadyChangedError, NodeDoesNotExistError, | ||||
NodeNotChangedError) | ||||
from rhodecode.lib.vcs.nodes import DirNode, FileNode | ||||
r2453 | from rhodecode.tests.vcs.conftest import BackendTestMixin | |||
r1 | ||||
r3946 | @pytest.fixture() | |||
r1 | def nodes(): | |||
nodes = [ | ||||
r5087 | FileNode(b'foobar', content=b'Foo & bar'), | |||
FileNode(b'foobar2', content=b'Foo & bar, doubled!'), | ||||
FileNode(b'foo bar with spaces', content=b''), | ||||
FileNode(b'foo/bar/baz', content=b'Inside'), | ||||
FileNode(b'foo/bar/file.bin', content=( | ||||
b'\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1\x00\x00\x00\x00\x00\x00' | ||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00;\x00\x03\x00\xfe' | ||||
b'\xff\t\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' | ||||
b'\x01\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x10\x00' | ||||
b'\x00\x18\x00\x00\x00\x01\x00\x00\x00\xfe\xff\xff\xff\x00\x00' | ||||
b'\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff' | ||||
b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff' | ||||
r1 | ) | |||
), | ||||
] | ||||
return nodes | ||||
r2453 | @pytest.mark.usefixtures("vcs_repository_support") | |||
r1 | class TestInMemoryCommit(BackendTestMixin): | |||
""" | ||||
This is a backend independent test case class which should be created | ||||
with ``type`` method. | ||||
It is required to set following attributes at subclass: | ||||
- ``backend_alias``: alias of used backend (see ``vcs.BACKENDS``) | ||||
""" | ||||
@classmethod | ||||
def _get_commits(cls): | ||||
return [] | ||||
def test_add(self, nodes): | ||||
for node in nodes: | ||||
self.imc.add(node) | ||||
self.commit() | ||||
r5087 | self.assert_successful_commit(nodes) | |||
r1 | ||||
r3743 | @pytest.mark.backends("hg") | |||
def test_add_on_branch_hg(self, nodes): | ||||
for node in nodes: | ||||
self.imc.add(node) | ||||
r5087 | self.commit(branch='stable') | |||
self.assert_successful_commit(nodes) | ||||
r3743 | ||||
@pytest.mark.backends("git") | ||||
def test_add_on_branch_git(self, nodes): | ||||
r1 | for node in nodes: | |||
self.imc.add(node) | ||||
r5087 | self.commit(branch='stable') | |||
self.assert_successful_commit(nodes) | ||||
r1 | ||||
def test_add_in_bulk(self, nodes): | ||||
self.imc.add(*nodes) | ||||
self.commit() | ||||
r5087 | self.assert_successful_commit(nodes) | |||
r1 | ||||
def test_add_non_ascii_files(self): | ||||
nodes = [ | ||||
r5087 | FileNode(safe_bytes('żółwik/zwierzątko_utf8_str'), | |||
content=safe_bytes('ćććć')), | ||||
FileNode(safe_bytes('żółwik/zwierzątko_unicode'), | ||||
content=safe_bytes('ćććć')), | ||||
r1 | ] | |||
for node in nodes: | ||||
self.imc.add(node) | ||||
self.commit() | ||||
r5087 | self.assert_successful_commit(nodes) | |||
r1 | ||||
def commit(self, branch=None): | ||||
self.old_commit_count = len(self.repo.commit_ids) | ||||
r5087 | self.commit_message = 'Test commit with unicode: żółwik' | |||
self.commit_author = f'{self.__class__.__name__} <foo@email.com>' | ||||
r1 | self.commit = self.imc.commit( | |||
message=self.commit_message, author=self.commit_author, | ||||
branch=branch) | ||||
def test_add_actually_adds_all_nodes_at_second_commit_too(self): | ||||
to_add = [ | ||||
r5087 | FileNode(b'foo/bar/image.png', content=b'\0'), | |||
FileNode(b'foo/README.txt', content=b'readme!'), | ||||
r1 | ] | |||
self.imc.add(*to_add) | ||||
r5087 | commit = self.imc.commit('Initial', 'joe doe <joe.doe@example.com>') | |||
r1 | assert isinstance(commit.get_node('foo'), DirNode) | |||
assert isinstance(commit.get_node('foo/bar'), DirNode) | ||||
self.assert_nodes_in_commit(commit, to_add) | ||||
# commit some more files again | ||||
to_add = [ | ||||
r5087 | FileNode(b'foo/bar/foobaz/bar', content=b'foo'), | |||
FileNode(b'foo/bar/another/bar', content=b'foo'), | ||||
FileNode(b'foo/baz.txt', content=b'foo'), | ||||
FileNode(b'foobar/foobaz/file', content=b'foo'), | ||||
FileNode(b'foobar/barbaz', content=b'foo'), | ||||
r1 | ] | |||
self.imc.add(*to_add) | ||||
r5087 | commit = self.imc.commit('Another', 'joe doe <joe.doe@example.com>') | |||
r1 | self.assert_nodes_in_commit(commit, to_add) | |||
def test_add_raise_already_added(self): | ||||
r5087 | node = FileNode(b'foobar', content=b'baz') | |||
r1 | self.imc.add(node) | |||
with pytest.raises(NodeAlreadyAddedError): | ||||
self.imc.add(node) | ||||
def test_check_integrity_raise_already_exist(self): | ||||
r5087 | node = FileNode(b'foobar', content=b'baz') | |||
r1 | self.imc.add(node) | |||
r5087 | self.imc.commit(message='Added foobar', | |||
author='Some Name <foo@bar.com>') | ||||
r1 | self.imc.add(node) | |||
with pytest.raises(NodeAlreadyExistsError): | ||||
r5087 | self.imc.commit(message='new message', | |||
author='Some Name <foo@bar.com>') | ||||
r1 | ||||
def test_change(self): | ||||
r5087 | self.imc.add(FileNode(b'foo/bar/baz', content=b'foo')) | |||
self.imc.add(FileNode(b'foo/fbar', content=b'foobar')) | ||||
tip = self.imc.commit('Initial', 'joe doe <joe.doe@example.com>') | ||||
r1 | ||||
# Change node's content | ||||
r5087 | node = FileNode(b'foo/bar/baz', content=b'My **changed** content') | |||
r1 | self.imc.change(node) | |||
r5087 | self.imc.commit('Changed %s' % node.path, 'joe doe <joe.doe@example.com>') | |||
r1 | ||||
newtip = self.repo.get_commit() | ||||
assert tip != newtip | ||||
assert tip.id != newtip.id | ||||
self.assert_nodes_in_commit(newtip, (node,)) | ||||
def test_change_non_ascii(self): | ||||
to_add = [ | ||||
r5087 | FileNode(safe_bytes('żółwik/zwierzątko'), | |||
content=safe_bytes('ćććć')), | ||||
FileNode(safe_bytes('żółwik/zwierzątko_uni'), | ||||
content=safe_bytes('ćććć')), | ||||
r1 | ] | |||
for node in to_add: | ||||
self.imc.add(node) | ||||
r5087 | tip = self.imc.commit('Initial', 'joe doe <joe.doe@example.com>') | |||
r1 | ||||
# Change node's content | ||||
r5087 | node = FileNode(safe_bytes('żółwik/zwierzątko'), | |||
content=b'My **changed** content') | ||||
r1 | self.imc.change(node) | |||
r5087 | self.imc.commit('Changed %s' % safe_str(node.path), | |||
author='joe doe <joe.doe@example.com>') | ||||
r1 | ||||
r5087 | node_uni = FileNode(safe_bytes('żółwik/zwierzątko_uni'), | |||
content=b'My **changed** content') | ||||
r1 | self.imc.change(node_uni) | |||
r5087 | self.imc.commit('Changed %s' % safe_str(node_uni.path), | |||
author='joe doe <joe.doe@example.com>') | ||||
r1 | ||||
newtip = self.repo.get_commit() | ||||
assert tip != newtip | ||||
assert tip.id != newtip.id | ||||
self.assert_nodes_in_commit(newtip, (node, node_uni)) | ||||
def test_change_raise_empty_repository(self): | ||||
r5087 | node = FileNode(b'foobar') | |||
r1 | with pytest.raises(EmptyRepositoryError): | |||
self.imc.change(node) | ||||
def test_check_integrity_change_raise_node_does_not_exist(self): | ||||
r5087 | node = FileNode(b'foobar', content=b'baz') | |||
r1 | self.imc.add(node) | |||
r5087 | self.imc.commit(message='Added foobar', author='Some Name <foo@bar.com>') | |||
node = FileNode(b'not-foobar', content=b'') | ||||
r1 | self.imc.change(node) | |||
with pytest.raises(NodeDoesNotExistError): | ||||
r5087 | self.imc.commit(message='Changed not existing node', author='Some Name <foo@bar.com>') | |||
r1 | ||||
def test_change_raise_node_already_changed(self): | ||||
r5087 | node = FileNode(b'foobar', content=b'baz') | |||
r1 | self.imc.add(node) | |||
r5087 | self.imc.commit(message='Added foobar', author='Some Nam <foo@bar.com>') | |||
node = FileNode(b'foobar', content=b'more baz') | ||||
r1 | self.imc.change(node) | |||
with pytest.raises(NodeAlreadyChangedError): | ||||
self.imc.change(node) | ||||
def test_check_integrity_change_raise_node_not_changed(self, nodes): | ||||
self.test_add(nodes) # Performs first commit | ||||
r5087 | node = FileNode(nodes[0].bytes_path, content=nodes[0].content) | |||
r1 | self.imc.change(node) | |||
with pytest.raises(NodeNotChangedError): | ||||
self.imc.commit( | ||||
r5087 | message='Trying to mark node as changed without touching it', | |||
author='Some Name <foo@bar.com>') | ||||
r1 | ||||
def test_change_raise_node_already_removed(self): | ||||
r5087 | node = FileNode(b'foobar', content=b'baz') | |||
r1 | self.imc.add(node) | |||
r5087 | self.imc.commit(message='Added foobar', author='Some Name <foo@bar.com>') | |||
self.imc.remove(FileNode(b'foobar')) | ||||
r1 | with pytest.raises(NodeAlreadyRemovedError): | |||
self.imc.change(node) | ||||
def test_remove(self, nodes): | ||||
self.test_add(nodes) # Performs first commit | ||||
tip = self.repo.get_commit() | ||||
node = nodes[0] | ||||
assert node.content == tip.get_node(node.path).content | ||||
self.imc.remove(node) | ||||
r5087 | self.imc.commit(message=f'Removed {node.path}', author='Some Name <foo@bar.com>') | |||
r1 | ||||
newtip = self.repo.get_commit() | ||||
assert tip != newtip | ||||
assert tip.id != newtip.id | ||||
with pytest.raises(NodeDoesNotExistError): | ||||
newtip.get_node(node.path) | ||||
def test_remove_last_file_from_directory(self): | ||||
r5087 | node = FileNode(b'omg/qwe/foo/bar', content=b'foobar') | |||
r1 | self.imc.add(node) | |||
r5087 | self.imc.commit('added', author='joe doe <joe@doe.com>') | |||
r1 | ||||
self.imc.remove(node) | ||||
r5087 | tip = self.imc.commit('removed', 'joe doe <joe@doe.com>') | |||
r1 | with pytest.raises(NodeDoesNotExistError): | |||
tip.get_node('omg/qwe/foo/bar') | ||||
def test_remove_raise_node_does_not_exist(self, nodes): | ||||
self.imc.remove(nodes[0]) | ||||
with pytest.raises(NodeDoesNotExistError): | ||||
self.imc.commit( | ||||
message='Trying to remove node at empty repository', | ||||
r5087 | author='Some Name <foo@bar.com>') | |||
r1 | ||||
def test_check_integrity_remove_raise_node_does_not_exist(self, nodes): | ||||
self.test_add(nodes) # Performs first commit | ||||
r5087 | node = FileNode(b'no-such-file') | |||
r1 | self.imc.remove(node) | |||
with pytest.raises(NodeDoesNotExistError): | ||||
self.imc.commit( | ||||
r5087 | message='Trying to remove not existing node', | |||
author='Some Name <foo@bar.com>') | ||||
r1 | ||||
def test_remove_raise_node_already_removed(self, nodes): | ||||
self.test_add(nodes) # Performs first commit | ||||
r5087 | node = FileNode(nodes[0].bytes_path) | |||
r1 | self.imc.remove(node) | |||
with pytest.raises(NodeAlreadyRemovedError): | ||||
self.imc.remove(node) | ||||
def test_remove_raise_node_already_changed(self, nodes): | ||||
self.test_add(nodes) # Performs first commit | ||||
r5087 | node = FileNode(nodes[0].bytes_path, content=b'Bending time') | |||
r1 | self.imc.change(node) | |||
with pytest.raises(NodeAlreadyChangedError): | ||||
self.imc.remove(node) | ||||
def test_reset(self): | ||||
r5087 | self.imc.add(FileNode(b'foo', content=b'bar')) | |||
# self.imc.change(FileNode(b'baz', content='new')) | ||||
# self.imc.remove(FileNode(b'qwe')) | ||||
r1 | self.imc.reset() | |||
assert not any((self.imc.added, self.imc.changed, self.imc.removed)) | ||||
def test_multiple_commits(self): | ||||
N = 3 # number of commits to perform | ||||
last = None | ||||
r4906 | for x in range(N): | |||
r5087 | fname = safe_bytes('file%s' % str(x).rjust(5, '0')) | |||
content = safe_bytes('foobar\n' * x) | ||||
r1 | node = FileNode(fname, content=content) | |||
self.imc.add(node) | ||||
r5087 | commit = self.imc.commit("Commit no. %s" % (x + 1), author='Vcs User <foo@bar.com>') | |||
r1 | assert last != commit | |||
last = commit | ||||
# Check commit number for same repo | ||||
assert len(self.repo.commit_ids) == N | ||||
# Check commit number for recreated repo | ||||
repo = self.Backend(self.repo_path) | ||||
assert len(repo.commit_ids) == N | ||||
r1351 | def test_date_attr(self, local_dt_to_utc): | |||
r5087 | node = FileNode(b'foobar.txt', content=b'Foobared!') | |||
r1 | self.imc.add(node) | |||
date = datetime.datetime(1985, 1, 30, 1, 45) | ||||
commit = self.imc.commit( | ||||
r5087 | "Committed at time when I was born ;-)", | |||
author='Test User <foo@bar.com>', date=date) | ||||
r1 | ||||
r1351 | assert commit.date == local_dt_to_utc(date) | |||
r1 | ||||
r5087 | def assert_successful_commit(self, added_nodes): | |||
r1 | newtip = self.repo.get_commit() | |||
assert self.commit == newtip | ||||
assert self.old_commit_count + 1 == len(self.repo.commit_ids) | ||||
assert newtip.message == self.commit_message | ||||
assert newtip.author == self.commit_author | ||||
assert not any((self.imc.added, self.imc.changed, self.imc.removed)) | ||||
self.assert_nodes_in_commit(newtip, added_nodes) | ||||
def assert_nodes_in_commit(self, commit, nodes): | ||||
for node in nodes: | ||||
r5087 | assert commit.get_node(node.path).path == node.path | |||
r1 | assert commit.get_node(node.path).content == node.content | |||