##// END OF EJS Templates
user-groups: fix potential problem with group sync of external plugins....
user-groups: fix potential problem with group sync of external plugins. - when using external plugin we used to check for a parameter that set the sync mode. The problem is we only checked if the flag was there. So toggling sync on and off set the value and then left the key still set but with None. This confused the sync and thought the group should be synced !

File last commit:

r1351:8e93f34d default
r2193:20e24a44 stable
Show More
test_repository.py
534 lines | 20.1 KiB | text/x-python | PythonLexer
project: added all source files and assets
r1 # -*- coding: utf-8 -*-
license: updated copyright year to 2017
r1271 # Copyright (C) 2010-2017 RhodeCode GmbH
project: added all source files and assets
r1 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import datetime
from urllib2 import URLError
import mock
import pytest
from rhodecode.lib.vcs import backends
from rhodecode.lib.vcs.backends.base import (
Config, BaseInMemoryCommit, Reference, MergeResponse, MergeFailureReason)
from rhodecode.lib.vcs.exceptions import VCSError, RepositoryError
from rhodecode.lib.vcs.nodes import FileNode
from rhodecode.tests.vcs.base import BackendTestMixin
class TestRepositoryBase(BackendTestMixin):
recreate_repo_per_test = False
def test_init_accepts_unicode_path(self, tmpdir):
path = unicode(tmpdir.join(u'unicode ä'))
self.Backend(path, create=True)
def test_init_accepts_str_path(self, tmpdir):
path = str(tmpdir.join('str ä'))
self.Backend(path, create=True)
def test_init_fails_if_path_does_not_exist(self, tmpdir):
path = unicode(tmpdir.join('i-do-not-exist'))
with pytest.raises(VCSError):
self.Backend(path)
def test_init_fails_if_path_is_not_a_valid_repository(self, tmpdir):
path = unicode(tmpdir.mkdir(u'unicode ä'))
with pytest.raises(VCSError):
self.Backend(path)
def test_has_commits_attribute(self):
self.repo.commit_ids
def test_name(self):
assert self.repo.name.startswith('vcs-test')
@pytest.mark.backends("hg", "git")
def test_has_default_branch_name(self):
assert self.repo.DEFAULT_BRANCH_NAME is not None
@pytest.mark.backends("svn")
def test_has_no_default_branch_name(self):
assert self.repo.DEFAULT_BRANCH_NAME is None
def test_has_empty_commit(self):
assert self.repo.EMPTY_COMMIT_ID is not None
assert self.repo.EMPTY_COMMIT is not None
def test_empty_changeset_is_deprecated(self):
def get_empty_changeset(repo):
return repo.EMPTY_CHANGESET
pytest.deprecated_call(get_empty_changeset, self.repo)
def test_bookmarks(self):
assert len(self.repo.bookmarks) == 0
# TODO: Cover two cases: Local repo path, remote URL
def test_check_url(self):
config = Config()
assert self.Backend.check_url(self.repo.path, config)
def test_check_url_invalid(self):
config = Config()
with pytest.raises(URLError):
self.Backend.check_url(self.repo.path + "invalid", config)
def test_get_contact(self):
tests: fixed tests after removing hardcoded timezone in tests.
r1351 assert self.repo.contact
project: added all source files and assets
r1
def test_get_description(self):
tests: fixed tests after removing hardcoded timezone in tests.
r1351 assert self.repo.description
project: added all source files and assets
r1
def test_get_hook_location(self):
assert len(self.repo.get_hook_location()) != 0
tests: fixed tests after removing hardcoded timezone in tests.
r1351 def test_last_change(self, local_dt_to_utc):
assert self.repo.last_change >= local_dt_to_utc(
datetime.datetime(2010, 1, 1, 21, 0))
project: added all source files and assets
r1
tests: fixed tests after removing hardcoded timezone in tests.
r1351 def test_last_change_in_empty_repository(self, vcsbackend, local_dt_to_utc):
project: added all source files and assets
r1 delta = datetime.timedelta(seconds=1)
tests: fixed tests after removing hardcoded timezone in tests.
r1351
start = local_dt_to_utc(datetime.datetime.now())
project: added all source files and assets
r1 empty_repo = vcsbackend.create_repo()
tests: fixed tests after removing hardcoded timezone in tests.
r1351 now = local_dt_to_utc(datetime.datetime.now())
project: added all source files and assets
r1 assert empty_repo.last_change >= start - delta
assert empty_repo.last_change <= now + delta
def test_repo_equality(self):
assert self.repo == self.repo
def test_repo_equality_broken_object(self):
import copy
_repo = copy.copy(self.repo)
delattr(_repo, 'path')
assert self.repo != _repo
def test_repo_equality_other_object(self):
class dummy(object):
path = self.repo.path
assert self.repo != dummy()
def test_get_commit_is_implemented(self):
self.repo.get_commit()
def test_get_commits_is_implemented(self):
commit_iter = iter(self.repo.get_commits())
commit = next(commit_iter)
assert commit.idx == 0
def test_supports_iteration(self):
repo_iter = iter(self.repo)
commit = next(repo_iter)
assert commit.idx == 0
def test_in_memory_commit(self):
imc = self.repo.in_memory_commit
assert isinstance(imc, BaseInMemoryCommit)
@pytest.mark.backends("hg")
def test__get_url_unicode(self):
url = u'/home/repos/malmö'
assert self.repo._get_url(url)
class TestDeprecatedRepositoryAPI(BackendTestMixin):
recreate_repo_per_test = False
def test_revisions_is_deprecated(self):
def get_revisions(repo):
return repo.revisions
pytest.deprecated_call(get_revisions, self.repo)
def test_get_changeset_is_deprecated(self):
pytest.deprecated_call(self.repo.get_changeset)
def test_get_changesets_is_deprecated(self):
pytest.deprecated_call(self.repo.get_changesets)
def test_in_memory_changeset_is_deprecated(self):
def get_imc(repo):
return repo.in_memory_changeset
pytest.deprecated_call(get_imc, self.repo)
# TODO: these tests are incomplete, must check the resulting compare result for
# correcteness
class TestRepositoryCompare:
@pytest.mark.parametrize('merge', [True, False])
def test_compare_commits_of_same_repository(self, vcsbackend, merge):
target_repo = vcsbackend.create_repo(number_of_commits=5)
target_repo.compare(
target_repo[1].raw_id, target_repo[3].raw_id, target_repo,
merge=merge)
@pytest.mark.xfail_backends('svn')
@pytest.mark.parametrize('merge', [True, False])
def test_compare_cloned_repositories(self, vcsbackend, merge):
target_repo = vcsbackend.create_repo(number_of_commits=5)
source_repo = vcsbackend.clone_repo(target_repo)
assert target_repo != source_repo
vcsbackend.add_file(source_repo, 'newfile', 'somecontent')
source_commit = source_repo.get_commit()
target_repo.compare(
target_repo[1].raw_id, source_repo[3].raw_id, source_repo,
merge=merge)
@pytest.mark.xfail_backends('svn')
@pytest.mark.parametrize('merge', [True, False])
def test_compare_unrelated_repositories(self, vcsbackend, merge):
orig = vcsbackend.create_repo(number_of_commits=5)
unrelated = vcsbackend.create_repo(number_of_commits=5)
assert orig != unrelated
orig.compare(
orig[1].raw_id, unrelated[3].raw_id, unrelated, merge=merge)
class TestRepositoryGetCommonAncestor:
def test_get_common_ancestor_from_same_repo_existing(self, vcsbackend):
target_repo = vcsbackend.create_repo(number_of_commits=5)
expected_ancestor = target_repo[2].raw_id
assert target_repo.get_common_ancestor(
commit_id1=target_repo[2].raw_id,
commit_id2=target_repo[4].raw_id,
repo2=target_repo
) == expected_ancestor
assert target_repo.get_common_ancestor(
commit_id1=target_repo[4].raw_id,
commit_id2=target_repo[2].raw_id,
repo2=target_repo
) == expected_ancestor
@pytest.mark.xfail_backends("svn")
def test_get_common_ancestor_from_cloned_repo_existing(self, vcsbackend):
target_repo = vcsbackend.create_repo(number_of_commits=5)
source_repo = vcsbackend.clone_repo(target_repo)
assert target_repo != source_repo
vcsbackend.add_file(source_repo, 'newfile', 'somecontent')
source_commit = source_repo.get_commit()
expected_ancestor = target_repo[4].raw_id
assert target_repo.get_common_ancestor(
commit_id1=target_repo[4].raw_id,
commit_id2=source_commit.raw_id,
repo2=source_repo
) == expected_ancestor
assert target_repo.get_common_ancestor(
commit_id1=source_commit.raw_id,
commit_id2=target_repo[4].raw_id,
repo2=target_repo
) == expected_ancestor
@pytest.mark.xfail_backends("svn")
def test_get_common_ancestor_from_unrelated_repo_missing(self, vcsbackend):
original = vcsbackend.create_repo(number_of_commits=5)
unrelated = vcsbackend.create_repo(number_of_commits=5)
assert original != unrelated
assert original.get_common_ancestor(
commit_id1=original[0].raw_id,
commit_id2=unrelated[0].raw_id,
repo2=unrelated
) == None
assert original.get_common_ancestor(
commit_id1=original[-1].raw_id,
commit_id2=unrelated[-1].raw_id,
repo2=unrelated
) == None
@pytest.mark.backends("git", "hg")
class TestRepositoryMerge:
def prepare_for_success(self, vcsbackend):
self.target_repo = vcsbackend.create_repo(number_of_commits=1)
self.source_repo = vcsbackend.clone_repo(self.target_repo)
vcsbackend.add_file(self.target_repo, 'README_MERGE1', 'Version 1')
vcsbackend.add_file(self.source_repo, 'README_MERGE2', 'Version 2')
imc = self.source_repo.in_memory_commit
imc.add(FileNode('file_x', content=self.source_repo.name))
imc.commit(
message=u'Automatic commit from repo merge test',
author=u'Automatic')
self.target_commit = self.target_repo.get_commit()
self.source_commit = self.source_repo.get_commit()
# This only works for Git and Mercurial
default_branch = self.target_repo.DEFAULT_BRANCH_NAME
self.target_ref = Reference(
'branch', default_branch, self.target_commit.raw_id)
self.source_ref = Reference(
'branch', default_branch, self.source_commit.raw_id)
self.workspace = 'test-merge'
def prepare_for_conflict(self, vcsbackend):
self.target_repo = vcsbackend.create_repo(number_of_commits=1)
self.source_repo = vcsbackend.clone_repo(self.target_repo)
vcsbackend.add_file(self.target_repo, 'README_MERGE', 'Version 1')
vcsbackend.add_file(self.source_repo, 'README_MERGE', 'Version 2')
self.target_commit = self.target_repo.get_commit()
self.source_commit = self.source_repo.get_commit()
# This only works for Git and Mercurial
default_branch = self.target_repo.DEFAULT_BRANCH_NAME
self.target_ref = Reference(
'branch', default_branch, self.target_commit.raw_id)
self.source_ref = Reference(
'branch', default_branch, self.source_commit.raw_id)
self.workspace = 'test-merge'
def test_merge_success(self, vcsbackend):
self.prepare_for_success(vcsbackend)
merge_response = self.target_repo.merge(
self.target_ref, self.source_repo, self.source_ref, self.workspace,
'test user', 'test@rhodecode.com', 'merge message 1',
dry_run=False)
expected_merge_response = MergeResponse(
Martin Bornhold
tests: Adapt tests to new merge response object.
r1053 True, True, merge_response.merge_ref,
project: added all source files and assets
r1 MergeFailureReason.NONE)
assert merge_response == expected_merge_response
target_repo = backends.get_backend(vcsbackend.alias)(
self.target_repo.path)
target_commits = list(target_repo.get_commits())
commit_ids = [c.raw_id for c in target_commits[:-1]]
assert self.source_ref.commit_id in commit_ids
assert self.target_ref.commit_id in commit_ids
merge_commit = target_commits[-1]
Martin Bornhold
tests: Adapt asserts to work with merge reference onbject in merge response.
r1059 assert merge_commit.raw_id == merge_response.merge_ref.commit_id
project: added all source files and assets
r1 assert merge_commit.message.strip() == 'merge message 1'
assert merge_commit.author == 'test user <test@rhodecode.com>'
# We call it twice so to make sure we can handle updates
target_ref = Reference(
self.target_ref.type, self.target_ref.name,
Martin Bornhold
tests: Adapt tests to new merge response object.
r1053 merge_response.merge_ref.commit_id)
project: added all source files and assets
r1
merge_response = target_repo.merge(
target_ref, self.source_repo, self.source_ref, self.workspace,
'test user', 'test@rhodecode.com', 'merge message 2',
dry_run=False)
expected_merge_response = MergeResponse(
Martin Bornhold
tests: Adapt tests to new merge response object.
r1053 True, True, merge_response.merge_ref,
project: added all source files and assets
r1 MergeFailureReason.NONE)
assert merge_response == expected_merge_response
target_repo = backends.get_backend(
vcsbackend.alias)(self.target_repo.path)
Martin Bornhold
tests: Adapt tests to new merge response object.
r1053 merge_commit = target_repo.get_commit(
merge_response.merge_ref.commit_id)
project: added all source files and assets
r1 assert merge_commit.message.strip() == 'merge message 1'
assert merge_commit.author == 'test user <test@rhodecode.com>'
def test_merge_success_dry_run(self, vcsbackend):
self.prepare_for_success(vcsbackend)
merge_response = self.target_repo.merge(
self.target_ref, self.source_repo, self.source_ref, self.workspace,
dry_run=True)
# We call it twice so to make sure we can handle updates
Martin Bornhold
tests: Adapt asserts to work with merge reference onbject in merge response.
r1059 merge_response_update = self.target_repo.merge(
project: added all source files and assets
r1 self.target_ref, self.source_repo, self.source_ref, self.workspace,
dry_run=True)
Martin Bornhold
tests: Adapt asserts to work with merge reference onbject in merge response.
r1059
Martin Bornhold
tests: Ignore merge commit id on camparison of merge responses.
r1060 # Multiple merges may differ in their commit id. Therefore we set the
# commit id to `None` before comparing the merge responses.
merge_response = merge_response._replace(
merge_ref=merge_response.merge_ref._replace(commit_id=None))
merge_response_update = merge_response_update._replace(
merge_ref=merge_response_update.merge_ref._replace(commit_id=None))
Martin Bornhold
tests: Adapt asserts to work with merge reference onbject in merge response.
r1059 assert merge_response == merge_response_update
assert merge_response.possible is True
assert merge_response.executed is False
assert merge_response.merge_ref
assert merge_response.failure_reason is MergeFailureReason.NONE
project: added all source files and assets
r1
@pytest.mark.parametrize('dry_run', [True, False])
def test_merge_conflict(self, vcsbackend, dry_run):
self.prepare_for_conflict(vcsbackend)
expected_merge_response = MergeResponse(
False, False, None, MergeFailureReason.MERGE_FAILED)
merge_response = self.target_repo.merge(
self.target_ref, self.source_repo, self.source_ref, self.workspace,
'test_user', 'test@rhodecode.com', 'test message', dry_run=dry_run)
assert merge_response == expected_merge_response
# We call it twice so to make sure we can handle updates
merge_response = self.target_repo.merge(
self.target_ref, self.source_repo, self.source_ref, self.workspace,
'test_user', 'test@rhodecode.com', 'test message', dry_run=dry_run)
assert merge_response == expected_merge_response
def test_merge_target_is_not_head(self, vcsbackend):
self.prepare_for_success(vcsbackend)
expected_merge_response = MergeResponse(
False, False, None, MergeFailureReason.TARGET_IS_NOT_HEAD)
target_ref = Reference(
self.target_ref.type, self.target_ref.name, '0' * 40)
merge_response = self.target_repo.merge(
target_ref, self.source_repo, self.source_ref, self.workspace,
dry_run=True)
assert merge_response == expected_merge_response
Martin Bornhold
vcs: Use more specific merge error reason (either target or source ref missing)
r1080 def test_merge_missing_source_reference(self, vcsbackend):
project: added all source files and assets
r1 self.prepare_for_success(vcsbackend)
expected_merge_response = MergeResponse(
Martin Bornhold
vcs: Use more specific merge error reason (either target or source ref missing)
r1080 False, False, None, MergeFailureReason.MISSING_SOURCE_REF)
project: added all source files and assets
r1
source_ref = Reference(
self.source_ref.type, 'not_existing', self.source_ref.commit_id)
merge_response = self.target_repo.merge(
self.target_ref, self.source_repo, source_ref, self.workspace,
dry_run=True)
assert merge_response == expected_merge_response
def test_merge_raises_exception(self, vcsbackend):
self.prepare_for_success(vcsbackend)
expected_merge_response = MergeResponse(
False, False, None, MergeFailureReason.UNKNOWN)
with mock.patch.object(self.target_repo, '_merge_repo',
side_effect=RepositoryError()):
merge_response = self.target_repo.merge(
self.target_ref, self.source_repo, self.source_ref,
self.workspace, dry_run=True)
assert merge_response == expected_merge_response
def test_merge_invalid_user_name(self, vcsbackend):
repo = vcsbackend.create_repo(number_of_commits=1)
ref = Reference('branch', 'master', 'not_used')
with pytest.raises(ValueError):
repo.merge(ref, self, ref, 'workspace_id')
def test_merge_invalid_user_email(self, vcsbackend):
repo = vcsbackend.create_repo(number_of_commits=1)
ref = Reference('branch', 'master', 'not_used')
with pytest.raises(ValueError):
repo.merge(ref, self, ref, 'workspace_id', 'user name')
def test_merge_invalid_message(self, vcsbackend):
repo = vcsbackend.create_repo(number_of_commits=1)
ref = Reference('branch', 'master', 'not_used')
with pytest.raises(ValueError):
repo.merge(
ref, self, ref, 'workspace_id', 'user name', 'user@email.com')
class TestRepositoryStrip(BackendTestMixin):
recreate_repo_per_test = True
@classmethod
def _get_commits(cls):
commits = [
{
'message': 'Initial commit',
'author': 'Joe Doe <joe.doe@example.com>',
'date': datetime.datetime(2010, 1, 1, 20),
'branch': 'master',
'added': [
FileNode('foobar', content='foobar'),
FileNode('foobar2', content='foobar2'),
],
},
]
for x in xrange(10):
commit_data = {
'message': 'Changed foobar - commit%s' % x,
'author': 'Jane Doe <jane.doe@example.com>',
'date': datetime.datetime(2010, 1, 1, 21, x),
'branch': 'master',
'changed': [
FileNode('foobar', 'FOOBAR - %s' % x),
],
}
commits.append(commit_data)
return commits
@pytest.mark.backends("git", "hg")
def test_strip_commit(self):
tip = self.repo.get_commit()
assert tip.idx == 10
self.repo.strip(tip.raw_id, self.repo.DEFAULT_BRANCH_NAME)
tip = self.repo.get_commit()
assert tip.idx == 9
@pytest.mark.backends("git", "hg")
def test_strip_multiple_commits(self):
tip = self.repo.get_commit()
assert tip.idx == 10
old = self.repo.get_commit(commit_idx=5)
self.repo.strip(old.raw_id, self.repo.DEFAULT_BRANCH_NAME)
tip = self.repo.get_commit()
assert tip.idx == 4
@pytest.mark.backends('hg', 'git')
class TestRepositoryPull:
def test_pull(self, vcsbackend):
source_repo = vcsbackend.repo
target_repo = vcsbackend.create_repo()
assert len(source_repo.commit_ids) > len(target_repo.commit_ids)
target_repo.pull(source_repo.path)
# Note: Get a fresh instance, avoids caching trouble
target_repo = vcsbackend.backend(target_repo.path)
assert len(source_repo.commit_ids) == len(target_repo.commit_ids)
def test_pull_wrong_path(self, vcsbackend):
target_repo = vcsbackend.create_repo()
with pytest.raises(RepositoryError):
target_repo.pull(target_repo.path + "wrong")
def test_pull_specific_commits(self, vcsbackend):
source_repo = vcsbackend.repo
target_repo = vcsbackend.create_repo()
second_commit = source_repo[1].raw_id
if vcsbackend.alias == 'git':
second_commit_ref = 'refs/test-refs/a'
source_repo.set_refs(second_commit_ref, second_commit)
target_repo.pull(source_repo.path, commit_ids=[second_commit])
target_repo = vcsbackend.backend(target_repo.path)
assert 2 == len(target_repo.commit_ids)
assert second_commit == target_repo.get_commit().raw_id