##// END OF EJS Templates
caches: use repo.lru based Dict cache. This LRUDict uses Timing Algo to not have to use locking...
caches: use repo.lru based Dict cache. This LRUDict uses Timing Algo to not have to use locking for the LRU implementation, this it's safer to use for dogpile. We used it before with beaker, so it's generally more stable.

File last commit:

r2810:a15bd3a8 default
r2945:ec5716e4 default
Show More
test_repository.py
553 lines | 21.0 KiB | text/x-python | PythonLexer
project: added all source files and assets
r1 # -*- coding: utf-8 -*-
release: update copyright year to 2018
r2487 # Copyright (C) 2010-2018 RhodeCode GmbH
project: added all source files and assets
r1 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import datetime
from urllib2 import URLError
import mock
import pytest
from rhodecode.lib.vcs import backends
from rhodecode.lib.vcs.backends.base import (
Config, BaseInMemoryCommit, Reference, MergeResponse, MergeFailureReason)
from rhodecode.lib.vcs.exceptions import VCSError, RepositoryError
from rhodecode.lib.vcs.nodes import FileNode
tests: use gunicorn for testing. This is close to production testing...
r2453 from rhodecode.tests.vcs.conftest import BackendTestMixin
shadow-repos: use numeric repo id for creation of shadow repos....
r2810 from rhodecode.tests import repo_id_generator
project: added all source files and assets
r1
tests: use gunicorn for testing. This is close to production testing...
r2453 @pytest.mark.usefixtures("vcs_repository_support")
project: added all source files and assets
r1 class TestRepositoryBase(BackendTestMixin):
recreate_repo_per_test = False
def test_init_accepts_unicode_path(self, tmpdir):
path = unicode(tmpdir.join(u'unicode ä'))
self.Backend(path, create=True)
def test_init_accepts_str_path(self, tmpdir):
path = str(tmpdir.join('str ä'))
self.Backend(path, create=True)
def test_init_fails_if_path_does_not_exist(self, tmpdir):
path = unicode(tmpdir.join('i-do-not-exist'))
with pytest.raises(VCSError):
self.Backend(path)
def test_init_fails_if_path_is_not_a_valid_repository(self, tmpdir):
path = unicode(tmpdir.mkdir(u'unicode ä'))
with pytest.raises(VCSError):
self.Backend(path)
def test_has_commits_attribute(self):
self.repo.commit_ids
def test_name(self):
assert self.repo.name.startswith('vcs-test')
@pytest.mark.backends("hg", "git")
def test_has_default_branch_name(self):
assert self.repo.DEFAULT_BRANCH_NAME is not None
@pytest.mark.backends("svn")
def test_has_no_default_branch_name(self):
assert self.repo.DEFAULT_BRANCH_NAME is None
def test_has_empty_commit(self):
assert self.repo.EMPTY_COMMIT_ID is not None
assert self.repo.EMPTY_COMMIT is not None
def test_empty_changeset_is_deprecated(self):
def get_empty_changeset(repo):
return repo.EMPTY_CHANGESET
pytest.deprecated_call(get_empty_changeset, self.repo)
def test_bookmarks(self):
assert len(self.repo.bookmarks) == 0
# TODO: Cover two cases: Local repo path, remote URL
def test_check_url(self):
config = Config()
assert self.Backend.check_url(self.repo.path, config)
def test_check_url_invalid(self):
config = Config()
with pytest.raises(URLError):
self.Backend.check_url(self.repo.path + "invalid", config)
def test_get_contact(self):
tests: fixed tests after removing hardcoded timezone in tests.
r1351 assert self.repo.contact
project: added all source files and assets
r1
def test_get_description(self):
tests: fixed tests after removing hardcoded timezone in tests.
r1351 assert self.repo.description
project: added all source files and assets
r1
def test_get_hook_location(self):
assert len(self.repo.get_hook_location()) != 0
tests: fixed tests after removing hardcoded timezone in tests.
r1351 def test_last_change(self, local_dt_to_utc):
assert self.repo.last_change >= local_dt_to_utc(
datetime.datetime(2010, 1, 1, 21, 0))
project: added all source files and assets
r1
tests: fixed tests after removing hardcoded timezone in tests.
r1351 def test_last_change_in_empty_repository(self, vcsbackend, local_dt_to_utc):
project: added all source files and assets
r1 delta = datetime.timedelta(seconds=1)
tests: fixed tests after removing hardcoded timezone in tests.
r1351
start = local_dt_to_utc(datetime.datetime.now())
project: added all source files and assets
r1 empty_repo = vcsbackend.create_repo()
tests: fixed tests after removing hardcoded timezone in tests.
r1351 now = local_dt_to_utc(datetime.datetime.now())
project: added all source files and assets
r1 assert empty_repo.last_change >= start - delta
assert empty_repo.last_change <= now + delta
def test_repo_equality(self):
assert self.repo == self.repo
def test_repo_equality_broken_object(self):
import copy
_repo = copy.copy(self.repo)
delattr(_repo, 'path')
assert self.repo != _repo
def test_repo_equality_other_object(self):
class dummy(object):
path = self.repo.path
assert self.repo != dummy()
def test_get_commit_is_implemented(self):
self.repo.get_commit()
def test_get_commits_is_implemented(self):
commit_iter = iter(self.repo.get_commits())
commit = next(commit_iter)
assert commit.idx == 0
def test_supports_iteration(self):
repo_iter = iter(self.repo)
commit = next(repo_iter)
assert commit.idx == 0
def test_in_memory_commit(self):
imc = self.repo.in_memory_commit
assert isinstance(imc, BaseInMemoryCommit)
@pytest.mark.backends("hg")
def test__get_url_unicode(self):
url = u'/home/repos/malmö'
assert self.repo._get_url(url)
tests: use gunicorn for testing. This is close to production testing...
r2453 @pytest.mark.usefixtures("vcs_repository_support")
project: added all source files and assets
r1 class TestDeprecatedRepositoryAPI(BackendTestMixin):
recreate_repo_per_test = False
def test_revisions_is_deprecated(self):
def get_revisions(repo):
return repo.revisions
pytest.deprecated_call(get_revisions, self.repo)
def test_get_changeset_is_deprecated(self):
pytest.deprecated_call(self.repo.get_changeset)
def test_get_changesets_is_deprecated(self):
pytest.deprecated_call(self.repo.get_changesets)
def test_in_memory_changeset_is_deprecated(self):
def get_imc(repo):
return repo.in_memory_changeset
pytest.deprecated_call(get_imc, self.repo)
# TODO: these tests are incomplete, must check the resulting compare result for
# correcteness
class TestRepositoryCompare:
@pytest.mark.parametrize('merge', [True, False])
def test_compare_commits_of_same_repository(self, vcsbackend, merge):
target_repo = vcsbackend.create_repo(number_of_commits=5)
target_repo.compare(
target_repo[1].raw_id, target_repo[3].raw_id, target_repo,
merge=merge)
@pytest.mark.xfail_backends('svn')
@pytest.mark.parametrize('merge', [True, False])
def test_compare_cloned_repositories(self, vcsbackend, merge):
target_repo = vcsbackend.create_repo(number_of_commits=5)
source_repo = vcsbackend.clone_repo(target_repo)
assert target_repo != source_repo
vcsbackend.add_file(source_repo, 'newfile', 'somecontent')
source_commit = source_repo.get_commit()
target_repo.compare(
target_repo[1].raw_id, source_repo[3].raw_id, source_repo,
merge=merge)
@pytest.mark.xfail_backends('svn')
@pytest.mark.parametrize('merge', [True, False])
def test_compare_unrelated_repositories(self, vcsbackend, merge):
orig = vcsbackend.create_repo(number_of_commits=5)
unrelated = vcsbackend.create_repo(number_of_commits=5)
assert orig != unrelated
orig.compare(
orig[1].raw_id, unrelated[3].raw_id, unrelated, merge=merge)
class TestRepositoryGetCommonAncestor:
def test_get_common_ancestor_from_same_repo_existing(self, vcsbackend):
target_repo = vcsbackend.create_repo(number_of_commits=5)
expected_ancestor = target_repo[2].raw_id
assert target_repo.get_common_ancestor(
commit_id1=target_repo[2].raw_id,
commit_id2=target_repo[4].raw_id,
repo2=target_repo
) == expected_ancestor
assert target_repo.get_common_ancestor(
commit_id1=target_repo[4].raw_id,
commit_id2=target_repo[2].raw_id,
repo2=target_repo
) == expected_ancestor
@pytest.mark.xfail_backends("svn")
def test_get_common_ancestor_from_cloned_repo_existing(self, vcsbackend):
target_repo = vcsbackend.create_repo(number_of_commits=5)
source_repo = vcsbackend.clone_repo(target_repo)
assert target_repo != source_repo
vcsbackend.add_file(source_repo, 'newfile', 'somecontent')
source_commit = source_repo.get_commit()
expected_ancestor = target_repo[4].raw_id
assert target_repo.get_common_ancestor(
commit_id1=target_repo[4].raw_id,
commit_id2=source_commit.raw_id,
repo2=source_repo
) == expected_ancestor
assert target_repo.get_common_ancestor(
commit_id1=source_commit.raw_id,
commit_id2=target_repo[4].raw_id,
repo2=target_repo
) == expected_ancestor
@pytest.mark.xfail_backends("svn")
def test_get_common_ancestor_from_unrelated_repo_missing(self, vcsbackend):
original = vcsbackend.create_repo(number_of_commits=5)
unrelated = vcsbackend.create_repo(number_of_commits=5)
assert original != unrelated
assert original.get_common_ancestor(
commit_id1=original[0].raw_id,
commit_id2=unrelated[0].raw_id,
repo2=unrelated
) == None
assert original.get_common_ancestor(
commit_id1=original[-1].raw_id,
commit_id2=unrelated[-1].raw_id,
repo2=unrelated
) == None
@pytest.mark.backends("git", "hg")
shadow-repos: use numeric repo id for creation of shadow repos....
r2810 class TestRepositoryMerge(object):
project: added all source files and assets
r1 def prepare_for_success(self, vcsbackend):
self.target_repo = vcsbackend.create_repo(number_of_commits=1)
self.source_repo = vcsbackend.clone_repo(self.target_repo)
vcsbackend.add_file(self.target_repo, 'README_MERGE1', 'Version 1')
vcsbackend.add_file(self.source_repo, 'README_MERGE2', 'Version 2')
imc = self.source_repo.in_memory_commit
imc.add(FileNode('file_x', content=self.source_repo.name))
imc.commit(
message=u'Automatic commit from repo merge test',
author=u'Automatic')
self.target_commit = self.target_repo.get_commit()
self.source_commit = self.source_repo.get_commit()
# This only works for Git and Mercurial
default_branch = self.target_repo.DEFAULT_BRANCH_NAME
self.target_ref = Reference(
'branch', default_branch, self.target_commit.raw_id)
self.source_ref = Reference(
'branch', default_branch, self.source_commit.raw_id)
shadow-repos: use numeric repo id for creation of shadow repos....
r2810 self.workspace_id = 'test-merge'
self.repo_id = repo_id_generator(self.target_repo.path)
project: added all source files and assets
r1
def prepare_for_conflict(self, vcsbackend):
self.target_repo = vcsbackend.create_repo(number_of_commits=1)
self.source_repo = vcsbackend.clone_repo(self.target_repo)
vcsbackend.add_file(self.target_repo, 'README_MERGE', 'Version 1')
vcsbackend.add_file(self.source_repo, 'README_MERGE', 'Version 2')
self.target_commit = self.target_repo.get_commit()
self.source_commit = self.source_repo.get_commit()
# This only works for Git and Mercurial
default_branch = self.target_repo.DEFAULT_BRANCH_NAME
self.target_ref = Reference(
'branch', default_branch, self.target_commit.raw_id)
self.source_ref = Reference(
'branch', default_branch, self.source_commit.raw_id)
shadow-repos: use numeric repo id for creation of shadow repos....
r2810 self.workspace_id = 'test-merge'
self.repo_id = repo_id_generator(self.target_repo.path)
project: added all source files and assets
r1
def test_merge_success(self, vcsbackend):
self.prepare_for_success(vcsbackend)
merge_response = self.target_repo.merge(
shadow-repos: use numeric repo id for creation of shadow repos....
r2810 self.repo_id, self.workspace_id, self.target_ref, self.source_repo,
self.source_ref,
project: added all source files and assets
r1 'test user', 'test@rhodecode.com', 'merge message 1',
dry_run=False)
expected_merge_response = MergeResponse(
Martin Bornhold
tests: Adapt tests to new merge response object.
r1053 True, True, merge_response.merge_ref,
project: added all source files and assets
r1 MergeFailureReason.NONE)
assert merge_response == expected_merge_response
target_repo = backends.get_backend(vcsbackend.alias)(
self.target_repo.path)
target_commits = list(target_repo.get_commits())
commit_ids = [c.raw_id for c in target_commits[:-1]]
assert self.source_ref.commit_id in commit_ids
assert self.target_ref.commit_id in commit_ids
merge_commit = target_commits[-1]
Martin Bornhold
tests: Adapt asserts to work with merge reference onbject in merge response.
r1059 assert merge_commit.raw_id == merge_response.merge_ref.commit_id
project: added all source files and assets
r1 assert merge_commit.message.strip() == 'merge message 1'
assert merge_commit.author == 'test user <test@rhodecode.com>'
# We call it twice so to make sure we can handle updates
target_ref = Reference(
self.target_ref.type, self.target_ref.name,
Martin Bornhold
tests: Adapt tests to new merge response object.
r1053 merge_response.merge_ref.commit_id)
project: added all source files and assets
r1
merge_response = target_repo.merge(
shadow-repos: use numeric repo id for creation of shadow repos....
r2810 self.repo_id, self.workspace_id, target_ref, self.source_repo, self.source_ref,
project: added all source files and assets
r1 'test user', 'test@rhodecode.com', 'merge message 2',
dry_run=False)
expected_merge_response = MergeResponse(
Martin Bornhold
tests: Adapt tests to new merge response object.
r1053 True, True, merge_response.merge_ref,
project: added all source files and assets
r1 MergeFailureReason.NONE)
assert merge_response == expected_merge_response
target_repo = backends.get_backend(
vcsbackend.alias)(self.target_repo.path)
Martin Bornhold
tests: Adapt tests to new merge response object.
r1053 merge_commit = target_repo.get_commit(
merge_response.merge_ref.commit_id)
project: added all source files and assets
r1 assert merge_commit.message.strip() == 'merge message 1'
assert merge_commit.author == 'test user <test@rhodecode.com>'
def test_merge_success_dry_run(self, vcsbackend):
self.prepare_for_success(vcsbackend)
merge_response = self.target_repo.merge(
shadow-repos: use numeric repo id for creation of shadow repos....
r2810 self.repo_id, self.workspace_id, self.target_ref, self.source_repo,
self.source_ref, dry_run=True)
project: added all source files and assets
r1
# We call it twice so to make sure we can handle updates
Martin Bornhold
tests: Adapt asserts to work with merge reference onbject in merge response.
r1059 merge_response_update = self.target_repo.merge(
shadow-repos: use numeric repo id for creation of shadow repos....
r2810 self.repo_id, self.workspace_id, self.target_ref, self.source_repo,
self.source_ref, dry_run=True)
Martin Bornhold
tests: Adapt asserts to work with merge reference onbject in merge response.
r1059
Martin Bornhold
tests: Ignore merge commit id on camparison of merge responses.
r1060 # Multiple merges may differ in their commit id. Therefore we set the
# commit id to `None` before comparing the merge responses.
merge_response = merge_response._replace(
merge_ref=merge_response.merge_ref._replace(commit_id=None))
merge_response_update = merge_response_update._replace(
merge_ref=merge_response_update.merge_ref._replace(commit_id=None))
Martin Bornhold
tests: Adapt asserts to work with merge reference onbject in merge response.
r1059 assert merge_response == merge_response_update
assert merge_response.possible is True
assert merge_response.executed is False
assert merge_response.merge_ref
assert merge_response.failure_reason is MergeFailureReason.NONE
project: added all source files and assets
r1
@pytest.mark.parametrize('dry_run', [True, False])
def test_merge_conflict(self, vcsbackend, dry_run):
self.prepare_for_conflict(vcsbackend)
expected_merge_response = MergeResponse(
False, False, None, MergeFailureReason.MERGE_FAILED)
merge_response = self.target_repo.merge(
shadow-repos: use numeric repo id for creation of shadow repos....
r2810 self.repo_id, self.workspace_id, self.target_ref,
self.source_repo, self.source_ref,
project: added all source files and assets
r1 'test_user', 'test@rhodecode.com', 'test message', dry_run=dry_run)
assert merge_response == expected_merge_response
# We call it twice so to make sure we can handle updates
merge_response = self.target_repo.merge(
shadow-repos: use numeric repo id for creation of shadow repos....
r2810 self.repo_id, self.workspace_id, self.target_ref, self.source_repo,
self.source_ref,
project: added all source files and assets
r1 'test_user', 'test@rhodecode.com', 'test message', dry_run=dry_run)
assert merge_response == expected_merge_response
def test_merge_target_is_not_head(self, vcsbackend):
self.prepare_for_success(vcsbackend)
expected_merge_response = MergeResponse(
False, False, None, MergeFailureReason.TARGET_IS_NOT_HEAD)
target_ref = Reference(
self.target_ref.type, self.target_ref.name, '0' * 40)
merge_response = self.target_repo.merge(
shadow-repos: use numeric repo id for creation of shadow repos....
r2810 self.repo_id, self.workspace_id, target_ref, self.source_repo,
self.source_ref, dry_run=True)
project: added all source files and assets
r1
assert merge_response == expected_merge_response
Martin Bornhold
vcs: Use more specific merge error reason (either target or source ref missing)
r1080 def test_merge_missing_source_reference(self, vcsbackend):
project: added all source files and assets
r1 self.prepare_for_success(vcsbackend)
expected_merge_response = MergeResponse(
Martin Bornhold
vcs: Use more specific merge error reason (either target or source ref missing)
r1080 False, False, None, MergeFailureReason.MISSING_SOURCE_REF)
project: added all source files and assets
r1
source_ref = Reference(
self.source_ref.type, 'not_existing', self.source_ref.commit_id)
merge_response = self.target_repo.merge(
shadow-repos: use numeric repo id for creation of shadow repos....
r2810 self.repo_id, self.workspace_id, self.target_ref,
self.source_repo, source_ref,
project: added all source files and assets
r1 dry_run=True)
assert merge_response == expected_merge_response
def test_merge_raises_exception(self, vcsbackend):
self.prepare_for_success(vcsbackend)
expected_merge_response = MergeResponse(
False, False, None, MergeFailureReason.UNKNOWN)
with mock.patch.object(self.target_repo, '_merge_repo',
side_effect=RepositoryError()):
merge_response = self.target_repo.merge(
shadow-repos: use numeric repo id for creation of shadow repos....
r2810 self.repo_id, self.workspace_id, self.target_ref,
self.source_repo, self.source_ref,
dry_run=True)
project: added all source files and assets
r1
assert merge_response == expected_merge_response
def test_merge_invalid_user_name(self, vcsbackend):
repo = vcsbackend.create_repo(number_of_commits=1)
ref = Reference('branch', 'master', 'not_used')
shadow-repos: use numeric repo id for creation of shadow repos....
r2810 workspace_id = 'test-errors-in-merge'
repo_id = repo_id_generator(workspace_id)
project: added all source files and assets
r1 with pytest.raises(ValueError):
shadow-repos: use numeric repo id for creation of shadow repos....
r2810 repo.merge(repo_id, workspace_id, ref, self, ref)
project: added all source files and assets
r1
def test_merge_invalid_user_email(self, vcsbackend):
repo = vcsbackend.create_repo(number_of_commits=1)
ref = Reference('branch', 'master', 'not_used')
shadow-repos: use numeric repo id for creation of shadow repos....
r2810 workspace_id = 'test-errors-in-merge'
repo_id = repo_id_generator(workspace_id)
project: added all source files and assets
r1 with pytest.raises(ValueError):
shadow-repos: use numeric repo id for creation of shadow repos....
r2810 repo.merge(
repo_id, workspace_id, ref, self, ref, 'user name')
project: added all source files and assets
r1
def test_merge_invalid_message(self, vcsbackend):
repo = vcsbackend.create_repo(number_of_commits=1)
ref = Reference('branch', 'master', 'not_used')
shadow-repos: use numeric repo id for creation of shadow repos....
r2810 workspace_id = 'test-errors-in-merge'
repo_id = repo_id_generator(workspace_id)
project: added all source files and assets
r1 with pytest.raises(ValueError):
repo.merge(
shadow-repos: use numeric repo id for creation of shadow repos....
r2810 repo_id, workspace_id, ref, self, ref,
'user name', 'user@email.com')
project: added all source files and assets
r1
tests: use gunicorn for testing. This is close to production testing...
r2453 @pytest.mark.usefixtures("vcs_repository_support")
project: added all source files and assets
r1 class TestRepositoryStrip(BackendTestMixin):
recreate_repo_per_test = True
@classmethod
def _get_commits(cls):
commits = [
{
'message': 'Initial commit',
'author': 'Joe Doe <joe.doe@example.com>',
'date': datetime.datetime(2010, 1, 1, 20),
'branch': 'master',
'added': [
FileNode('foobar', content='foobar'),
FileNode('foobar2', content='foobar2'),
],
},
]
for x in xrange(10):
commit_data = {
'message': 'Changed foobar - commit%s' % x,
'author': 'Jane Doe <jane.doe@example.com>',
'date': datetime.datetime(2010, 1, 1, 21, x),
'branch': 'master',
'changed': [
FileNode('foobar', 'FOOBAR - %s' % x),
],
}
commits.append(commit_data)
return commits
@pytest.mark.backends("git", "hg")
def test_strip_commit(self):
tip = self.repo.get_commit()
assert tip.idx == 10
self.repo.strip(tip.raw_id, self.repo.DEFAULT_BRANCH_NAME)
tip = self.repo.get_commit()
assert tip.idx == 9
@pytest.mark.backends("git", "hg")
def test_strip_multiple_commits(self):
tip = self.repo.get_commit()
assert tip.idx == 10
old = self.repo.get_commit(commit_idx=5)
self.repo.strip(old.raw_id, self.repo.DEFAULT_BRANCH_NAME)
tip = self.repo.get_commit()
assert tip.idx == 4
@pytest.mark.backends('hg', 'git')
shadow-repos: use numeric repo id for creation of shadow repos....
r2810 class TestRepositoryPull(object):
project: added all source files and assets
r1
def test_pull(self, vcsbackend):
source_repo = vcsbackend.repo
target_repo = vcsbackend.create_repo()
assert len(source_repo.commit_ids) > len(target_repo.commit_ids)
target_repo.pull(source_repo.path)
# Note: Get a fresh instance, avoids caching trouble
target_repo = vcsbackend.backend(target_repo.path)
assert len(source_repo.commit_ids) == len(target_repo.commit_ids)
def test_pull_wrong_path(self, vcsbackend):
target_repo = vcsbackend.create_repo()
with pytest.raises(RepositoryError):
target_repo.pull(target_repo.path + "wrong")
def test_pull_specific_commits(self, vcsbackend):
source_repo = vcsbackend.repo
target_repo = vcsbackend.create_repo()
second_commit = source_repo[1].raw_id
if vcsbackend.alias == 'git':
second_commit_ref = 'refs/test-refs/a'
source_repo.set_refs(second_commit_ref, second_commit)
target_repo.pull(source_repo.path, commit_ids=[second_commit])
target_repo = vcsbackend.backend(target_repo.path)
assert 2 == len(target_repo.commit_ids)
assert second_commit == target_repo.get_commit().raw_id