test_inmemchangesets.py
330 lines
| 12.7 KiB
| text/x-python
|
PythonLexer
|
r7250 | # encoding: utf-8 | ||
|
r4187 | """ | ||
Tests so called "in memory changesets" commit API of vcs. | ||||
""" | ||||
|
r7718 | import datetime | ||
|
r4187 | |||
|
r7048 | import pytest | ||
|
r7045 | |||
|
r7718 | from kallithea.lib.vcs.exceptions import ( | ||
EmptyRepositoryError, NodeAlreadyAddedError, NodeAlreadyChangedError, NodeAlreadyExistsError, NodeAlreadyRemovedError, NodeDoesNotExistError, NodeNotChangedError) | ||||
from kallithea.lib.vcs.nodes import DirNode, FileNode | ||||
|
r7141 | from kallithea.tests.vcs.base import _BackendTestMixin | ||
|
r7045 | |||
|
r4187 | |||
|
r7141 | class InMemoryChangesetTestMixin(_BackendTestMixin): | ||
|
r4187 | |||
|
r7141 | @classmethod | ||
def _get_commits(cls): | ||||
# Note: this is slightly different than the regular _get_commits methods | ||||
# as we don't actually return any commits. The creation of commits is | ||||
# handled in the tests themselves. | ||||
cls.nodes = [ | ||||
|
r4187 | FileNode('foobar', content='Foo & bar'), | ||
FileNode('foobar2', content='Foo & bar, doubled!'), | ||||
FileNode('foo bar with spaces', content=''), | ||||
FileNode('foo/bar/baz', content='Inside'), | ||||
FileNode('foo/bar/file.bin', content='\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00;\x00\x03\x00\xfe\xff\t\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x10\x00\x00\x18\x00\x00\x00\x01\x00\x00\x00\xfe\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'), | ||||
] | ||||
|
r7141 | commits = [] | ||
return commits | ||||
|
r4187 | |||
def test_add(self): | ||||
rev_count = len(self.repo.revisions) | ||||
to_add = [FileNode(node.path, content=node.content) | ||||
for node in self.nodes] | ||||
for node in to_add: | ||||
self.imc.add(node) | ||||
message = u'Added: %s' % ', '.join((node.path for node in self.nodes)) | ||||
|
r8081 | author = str(self.__class__) | ||
|
r4187 | changeset = self.imc.commit(message=message, author=author) | ||
newtip = self.repo.get_changeset() | ||||
|
r7048 | assert changeset == newtip | ||
assert rev_count + 1 == len(self.repo.revisions) | ||||
assert newtip.message == message | ||||
assert newtip.author == author | ||||
assert not any(( | ||||
self.imc.added, | ||||
self.imc.changed, | ||||
self.imc.removed | ||||
)) | ||||
|
r4187 | for node in to_add: | ||
|
r7048 | assert newtip.get_node(node.path).content == node.content | ||
|
r4187 | |||
def test_add_in_bulk(self): | ||||
rev_count = len(self.repo.revisions) | ||||
to_add = [FileNode(node.path, content=node.content) | ||||
for node in self.nodes] | ||||
self.imc.add(*to_add) | ||||
message = u'Added: %s' % ', '.join((node.path for node in self.nodes)) | ||||
|
r8081 | author = str(self.__class__) | ||
|
r4187 | changeset = self.imc.commit(message=message, author=author) | ||
newtip = self.repo.get_changeset() | ||||
|
r7048 | assert changeset == newtip | ||
assert rev_count + 1 == len(self.repo.revisions) | ||||
assert newtip.message == message | ||||
assert newtip.author == author | ||||
assert not any(( | ||||
self.imc.added, | ||||
self.imc.changed, | ||||
self.imc.removed | ||||
)) | ||||
|
r4187 | for node in to_add: | ||
|
r7048 | assert newtip.get_node(node.path).content == node.content | ||
|
r4187 | |||
def test_add_actually_adds_all_nodes_at_second_commit_too(self): | ||||
self.imc.add(FileNode('foo/bar/image.png', content='\0')) | ||||
self.imc.add(FileNode('foo/README.txt', content='readme!')) | ||||
changeset = self.imc.commit(u'Initial', u'joe.doe@example.com') | ||||
|
r7048 | assert isinstance(changeset.get_node('foo'), DirNode) | ||
assert isinstance(changeset.get_node('foo/bar'), DirNode) | ||||
|
r7958 | assert changeset.get_node('foo/bar/image.png').content == b'\0' | ||
assert changeset.get_node('foo/README.txt').content == b'readme!' | ||||
|
r4187 | |||
# commit some more files again | ||||
to_add = [ | ||||
FileNode('foo/bar/foobaz/bar', content='foo'), | ||||
FileNode('foo/bar/another/bar', content='foo'), | ||||
FileNode('foo/baz.txt', content='foo'), | ||||
FileNode('foobar/foobaz/file', content='foo'), | ||||
FileNode('foobar/barbaz', content='foo'), | ||||
] | ||||
self.imc.add(*to_add) | ||||
changeset = self.imc.commit(u'Another', u'joe.doe@example.com') | ||||
|
r7958 | changeset.get_node('foo/bar/foobaz/bar').content == b'foo' | ||
changeset.get_node('foo/bar/another/bar').content == b'foo' | ||||
changeset.get_node('foo/baz.txt').content == b'foo' | ||||
changeset.get_node('foobar/foobaz/file').content == b'foo' | ||||
changeset.get_node('foobar/barbaz').content == b'foo' | ||||
|
r4187 | |||
def test_add_non_ascii_files(self): | ||||
rev_count = len(self.repo.revisions) | ||||
to_add = [ | ||||
FileNode('żółwik/zwierzątko', content='ćććć'), | ||||
FileNode(u'żółwik/zwierzątko_uni', content=u'ćććć'), | ||||
] | ||||
for node in to_add: | ||||
self.imc.add(node) | ||||
message = u'Added: %s' % ', '.join((node.path for node in self.nodes)) | ||||
|
r8081 | author = str(self.__class__) | ||
|
r4187 | changeset = self.imc.commit(message=message, author=author) | ||
newtip = self.repo.get_changeset() | ||||
|
r7048 | assert changeset == newtip | ||
assert rev_count + 1 == len(self.repo.revisions) | ||||
assert newtip.message == message | ||||
assert newtip.author == author | ||||
assert not any(( | ||||
self.imc.added, | ||||
self.imc.changed, | ||||
self.imc.removed | ||||
)) | ||||
|
r4187 | for node in to_add: | ||
|
r7048 | assert newtip.get_node(node.path).content == node.content | ||
|
r4187 | |||
def test_add_raise_already_added(self): | ||||
node = FileNode('foobar', content='baz') | ||||
self.imc.add(node) | ||||
|
r7048 | with pytest.raises(NodeAlreadyAddedError): | ||
self.imc.add(node) | ||||
|
r4187 | |||
def test_check_integrity_raise_already_exist(self): | ||||
node = FileNode('foobar', content='baz') | ||||
self.imc.add(node) | ||||
|
r8081 | self.imc.commit(message=u'Added foobar', author=str(self)) | ||
|
r4187 | self.imc.add(node) | ||
|
r7048 | with pytest.raises(NodeAlreadyExistsError): | ||
self.imc.commit(message='new message', | ||||
author=str(self)) | ||||
|
r4187 | |||
def test_change(self): | ||||
self.imc.add(FileNode('foo/bar/baz', content='foo')) | ||||
self.imc.add(FileNode('foo/fbar', content='foobar')) | ||||
tip = self.imc.commit(u'Initial', u'joe.doe@example.com') | ||||
# Change node's content | ||||
node = FileNode('foo/bar/baz', content='My **changed** content') | ||||
self.imc.change(node) | ||||
self.imc.commit(u'Changed %s' % node.path, u'joe.doe@example.com') | ||||
newtip = self.repo.get_changeset() | ||||
|
r7048 | assert tip != newtip | ||
|
r7984 | assert tip.raw_id != newtip.raw_id | ||
|
r7958 | assert newtip.get_node('foo/bar/baz').content == b'My **changed** content' | ||
|
r4187 | |||
def test_change_non_ascii(self): | ||||
to_add = [ | ||||
FileNode('żółwik/zwierzątko', content='ćććć'), | ||||
FileNode(u'żółwik/zwierzątko_uni', content=u'ćććć'), | ||||
] | ||||
for node in to_add: | ||||
self.imc.add(node) | ||||
tip = self.imc.commit(u'Initial', u'joe.doe@example.com') | ||||
# Change node's content | ||||
node = FileNode('żółwik/zwierzątko', content='My **changed** content') | ||||
self.imc.change(node) | ||||
|
r8075 | self.imc.commit(u'Changed %s' % node.path, u'joe.doe@example.com') | ||
|
r4187 | |||
node = FileNode(u'żółwik/zwierzątko_uni', content=u'My **changed** content') | ||||
self.imc.change(node) | ||||
|
r8075 | self.imc.commit(u'Changed %s' % node.path, u'joe.doe@example.com') | ||
|
r4187 | |||
newtip = self.repo.get_changeset() | ||||
|
r7048 | assert tip != newtip | ||
|
r7984 | assert tip.raw_id != newtip.raw_id | ||
|
r4187 | |||
|
r7958 | assert newtip.get_node('żółwik/zwierzątko').content == b'My **changed** content' | ||
assert newtip.get_node('żółwik/zwierzątko_uni').content == b'My **changed** content' | ||||
|
r4187 | |||
def test_change_raise_empty_repository(self): | ||||
node = FileNode('foobar') | ||||
|
r7048 | with pytest.raises(EmptyRepositoryError): | ||
self.imc.change(node) | ||||
|
r4187 | |||
def test_check_integrity_change_raise_node_does_not_exist(self): | ||||
node = FileNode('foobar', content='baz') | ||||
self.imc.add(node) | ||||
|
r8081 | self.imc.commit(message=u'Added foobar', author=str(self)) | ||
|
r4187 | node = FileNode('not-foobar', content='') | ||
self.imc.change(node) | ||||
|
r7048 | with pytest.raises(NodeDoesNotExistError): | ||
self.imc.commit(message='Changed not existing node', author=str(self)) | ||||
|
r4187 | |||
def test_change_raise_node_already_changed(self): | ||||
node = FileNode('foobar', content='baz') | ||||
self.imc.add(node) | ||||
|
r8081 | self.imc.commit(message=u'Added foobar', author=str(self)) | ||
|
r4187 | node = FileNode('foobar', content='more baz') | ||
self.imc.change(node) | ||||
|
r7048 | with pytest.raises(NodeAlreadyChangedError): | ||
self.imc.change(node) | ||||
|
r4187 | |||
def test_check_integrity_change_raise_node_not_changed(self): | ||||
self.test_add() # Performs first commit | ||||
node = FileNode(self.nodes[0].path, content=self.nodes[0].content) | ||||
self.imc.change(node) | ||||
|
r7048 | with pytest.raises(NodeNotChangedError): | ||
self.imc.commit( | ||||
message=u'Trying to mark node as changed without touching it', | ||||
|
r8081 | author=str(self), | ||
|
r7048 | ) | ||
|
r4187 | |||
def test_change_raise_node_already_removed(self): | ||||
node = FileNode('foobar', content='baz') | ||||
self.imc.add(node) | ||||
|
r8081 | self.imc.commit(message=u'Added foobar', author=str(self)) | ||
|
r4187 | self.imc.remove(FileNode('foobar')) | ||
|
r7048 | with pytest.raises(NodeAlreadyRemovedError): | ||
self.imc.change(node) | ||||
|
r4187 | |||
def test_remove(self): | ||||
self.test_add() # Performs first commit | ||||
tip = self.repo.get_changeset() | ||||
node = self.nodes[0] | ||||
|
r7048 | assert node.content == tip.get_node(node.path).content | ||
|
r4187 | self.imc.remove(node) | ||
|
r8081 | self.imc.commit(message=u'Removed %s' % node.path, author=str(self)) | ||
|
r4187 | |||
newtip = self.repo.get_changeset() | ||||
|
r7048 | assert tip != newtip | ||
|
r7984 | assert tip.raw_id != newtip.raw_id | ||
|
r7048 | with pytest.raises(NodeDoesNotExistError): | ||
newtip.get_node(node.path) | ||||
|
r4187 | |||
def test_remove_last_file_from_directory(self): | ||||
node = FileNode('omg/qwe/foo/bar', content='foobar') | ||||
self.imc.add(node) | ||||
self.imc.commit(u'added', u'joe doe') | ||||
self.imc.remove(node) | ||||
tip = self.imc.commit(u'removed', u'joe doe') | ||||
|
r7048 | with pytest.raises(NodeDoesNotExistError): | ||
tip.get_node('omg/qwe/foo/bar') | ||||
|
r4187 | |||
def test_remove_raise_node_does_not_exist(self): | ||||
self.imc.remove(self.nodes[0]) | ||||
|
r7048 | with pytest.raises(NodeDoesNotExistError): | ||
self.imc.commit( | ||||
message='Trying to remove node at empty repository', | ||||
|
r8081 | author=str(self), | ||
|
r7048 | ) | ||
|
r4187 | |||
def test_check_integrity_remove_raise_node_does_not_exist(self): | ||||
self.test_add() # Performs first commit | ||||
node = FileNode('no-such-file') | ||||
self.imc.remove(node) | ||||
|
r7048 | with pytest.raises(NodeDoesNotExistError): | ||
self.imc.commit( | ||||
message=u'Trying to remove not existing node', | ||||
|
r8081 | author=str(self), | ||
|
r7048 | ) | ||
|
r4187 | |||
def test_remove_raise_node_already_removed(self): | ||||
self.test_add() # Performs first commit | ||||
node = FileNode(self.nodes[0].path) | ||||
self.imc.remove(node) | ||||
|
r7048 | with pytest.raises(NodeAlreadyRemovedError): | ||
self.imc.remove(node) | ||||
|
r4187 | |||
def test_remove_raise_node_already_changed(self): | ||||
self.test_add() # Performs first commit | ||||
node = FileNode(self.nodes[0].path, content='Bending time') | ||||
self.imc.change(node) | ||||
|
r7048 | with pytest.raises(NodeAlreadyChangedError): | ||
self.imc.remove(node) | ||||
|
r4187 | |||
def test_reset(self): | ||||
self.imc.add(FileNode('foo', content='bar')) | ||||
#self.imc.change(FileNode('baz', content='new')) | ||||
#self.imc.remove(FileNode('qwe')) | ||||
self.imc.reset() | ||||
|
r7048 | assert not any(( | ||
self.imc.added, | ||||
self.imc.changed, | ||||
self.imc.removed | ||||
)) | ||||
|
r4187 | |||
def test_multiple_commits(self): | ||||
N = 3 # number of commits to perform | ||||
last = None | ||||
|
r8061 | for x in range(N): | ||
|
r4187 | fname = 'file%s' % str(x).rjust(5, '0') | ||
content = 'foobar\n' * x | ||||
node = FileNode(fname, content=content) | ||||
self.imc.add(node) | ||||
commit = self.imc.commit(u"Commit no. %s" % (x + 1), author=u'vcs') | ||||
|
r7048 | assert last != commit | ||
|
r4187 | last = commit | ||
# Check commit number for same repo | ||||
|
r7048 | assert len(self.repo.revisions) == N | ||
|
r4187 | |||
# Check commit number for recreated repo | ||||
|
r7141 | assert len(self.repo.revisions) == N | ||
|
r4187 | |||
def test_date_attr(self): | ||||
node = FileNode('foobar.txt', content='Foobared!') | ||||
self.imc.add(node) | ||||
date = datetime.datetime(1985, 1, 30, 1, 45) | ||||
commit = self.imc.commit(u"Committed at time when I was born ;-)", | ||||
|
r7012 | author=u'lb <lb@example.com>', date=date) | ||
|
r4187 | |||
|
r7048 | assert commit.date == date | ||
|
r4187 | |||
|
r7134 | class TestGitInMemoryChangeset(InMemoryChangesetTestMixin): | ||
backend_alias = 'git' | ||||
class TestHgInMemoryChangeset(InMemoryChangesetTestMixin): | ||||
backend_alias = 'hg' | ||||