test_repo_summary.py
513 lines
| 18.7 KiB
| text/x-python
|
PythonLexer
r5088 | # Copyright (C) 2010-2023 RhodeCode GmbH | |||
r1785 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import re | ||||
import mock | ||||
import pytest | ||||
from rhodecode.apps.repository.views.repo_summary import RepoSummaryView | ||||
from rhodecode.lib import helpers as h | ||||
r4928 | from collections import OrderedDict | |||
r1984 | from rhodecode.lib.utils2 import AttributeDict, safe_str | |||
r1785 | from rhodecode.lib.vcs.exceptions import RepositoryRequirementError | |||
from rhodecode.model.db import Repository | ||||
from rhodecode.model.meta import Session | ||||
from rhodecode.model.repo import RepoModel | ||||
from rhodecode.model.scm import ScmModel | ||||
from rhodecode.tests import assert_session_flash | ||||
r5607 | from rhodecode.tests.fixtures.rc_fixture import Fixture | |||
r1785 | from rhodecode.tests.utils import AssertResponse, repo_on_filesystem | |||
r5173 | from rhodecode.tests.routes import route_path | |||
r1785 | ||||
fixture = Fixture() | ||||
r2499 | def assert_clone_url(response, server, repo, disabled=False): | |||
response.mustcontain( | ||||
'<input type="text" class="input-monospace clone_url_input" ' | ||||
'{disabled}readonly="readonly" ' | ||||
'value="http://test_admin@{server}/{repo}"/>'.format( | ||||
server=server, repo=repo, disabled='disabled ' if disabled else ' ') | ||||
) | ||||
r1785 | @pytest.mark.usefixtures('app') | |||
class TestSummaryView(object): | ||||
r5328 | ||||
r1785 | def test_index(self, autologin_user, backend, http_host_only_stub): | |||
repo_id = backend.repo.repo_id | ||||
repo_name = backend.repo_name | ||||
r5328 | ||||
response = self.app.get( | ||||
route_path('repo_summary', repo_name=repo_name)) | ||||
r1785 | ||||
# repo type | ||||
response.mustcontain( | ||||
'<i class="icon-%s">' % (backend.alias, ) | ||||
) | ||||
# public/private | ||||
response.mustcontain( | ||||
"""<i class="icon-unlock-alt">""" | ||||
) | ||||
# clone url... | ||||
r2499 | assert_clone_url(response, http_host_only_stub, repo_name) | |||
r5328 | assert_clone_url(response, http_host_only_stub, f'_{repo_id}') | |||
r1785 | ||||
def test_index_svn_without_proxy( | ||||
self, autologin_user, backend_svn, http_host_only_stub): | ||||
r5328 | ||||
r1785 | repo_id = backend_svn.repo.repo_id | |||
repo_name = backend_svn.repo_name | ||||
r5328 | ||||
# by default the SVN is enabled now, this is how inputs look when it's disabled | ||||
with mock.patch('rhodecode.lib.helpers.is_svn_without_proxy', return_value=True): | ||||
r2499 | ||||
r5328 | response = self.app.get( | |||
route_path('repo_summary', repo_name=repo_name), | ||||
status=200) | ||||
# clone url test... | ||||
r2499 | assert_clone_url(response, http_host_only_stub, repo_name, disabled=True) | |||
r5328 | assert_clone_url(response, http_host_only_stub, f'_{repo_id}', disabled=True) | |||
r1785 | ||||
def test_index_with_trailing_slash( | ||||
self, autologin_user, backend, http_host_only_stub): | ||||
repo_id = backend.repo.repo_id | ||||
repo_name = backend.repo_name | ||||
r5328 | trailing_slash = '/' | |||
response = self.app.get( | ||||
route_path('repo_summary', repo_name=repo_name) + trailing_slash, | ||||
status=200) | ||||
r1785 | ||||
# clone url... | ||||
r2499 | assert_clone_url(response, http_host_only_stub, repo_name) | |||
r5328 | assert_clone_url(response, http_host_only_stub, f'_{repo_id}') | |||
r1785 | ||||
def test_index_by_id(self, autologin_user, backend): | ||||
repo_id = backend.repo.repo_id | ||||
response = self.app.get( | ||||
r5328 | route_path('repo_summary', repo_name=f'_{repo_id}')) | |||
r1785 | ||||
# repo type | ||||
response.mustcontain( | ||||
'<i class="icon-%s">' % (backend.alias, ) | ||||
) | ||||
# public/private | ||||
response.mustcontain( | ||||
"""<i class="icon-unlock-alt">""" | ||||
) | ||||
def test_index_by_repo_having_id_path_in_name_hg(self, autologin_user): | ||||
fixture.create_repo(name='repo_1') | ||||
response = self.app.get(route_path('repo_summary', repo_name='repo_1')) | ||||
try: | ||||
response.mustcontain("repo_1") | ||||
finally: | ||||
RepoModel().delete(Repository.get_by_repo_name('repo_1')) | ||||
Session().commit() | ||||
def test_index_with_anonymous_access_disabled( | ||||
self, backend, disable_anonymous_user): | ||||
response = self.app.get( | ||||
route_path('repo_summary', repo_name=backend.repo_name), status=302) | ||||
assert 'login' in response.location | ||||
def _enable_stats(self, repo): | ||||
r = Repository.get_by_repo_name(repo) | ||||
r.enable_statistics = True | ||||
Session().add(r) | ||||
Session().commit() | ||||
expected_trending = { | ||||
'hg': { | ||||
"py": {"count": 68, "desc": ["Python"]}, | ||||
"rst": {"count": 16, "desc": ["Rst"]}, | ||||
"css": {"count": 2, "desc": ["Css"]}, | ||||
"sh": {"count": 2, "desc": ["Bash"]}, | ||||
"bat": {"count": 1, "desc": ["Batch"]}, | ||||
"cfg": {"count": 1, "desc": ["Ini"]}, | ||||
"html": {"count": 1, "desc": ["EvoqueHtml", "Html"]}, | ||||
"ini": {"count": 1, "desc": ["Ini"]}, | ||||
"js": {"count": 1, "desc": ["Javascript"]}, | ||||
"makefile": {"count": 1, "desc": ["Makefile", "Makefile"]} | ||||
}, | ||||
'git': { | ||||
"py": {"count": 68, "desc": ["Python"]}, | ||||
"rst": {"count": 16, "desc": ["Rst"]}, | ||||
"css": {"count": 2, "desc": ["Css"]}, | ||||
"sh": {"count": 2, "desc": ["Bash"]}, | ||||
"bat": {"count": 1, "desc": ["Batch"]}, | ||||
"cfg": {"count": 1, "desc": ["Ini"]}, | ||||
"html": {"count": 1, "desc": ["EvoqueHtml", "Html"]}, | ||||
"ini": {"count": 1, "desc": ["Ini"]}, | ||||
"js": {"count": 1, "desc": ["Javascript"]}, | ||||
"makefile": {"count": 1, "desc": ["Makefile", "Makefile"]} | ||||
}, | ||||
'svn': { | ||||
"py": {"count": 75, "desc": ["Python"]}, | ||||
"rst": {"count": 16, "desc": ["Rst"]}, | ||||
"html": {"count": 11, "desc": ["EvoqueHtml", "Html"]}, | ||||
"css": {"count": 2, "desc": ["Css"]}, | ||||
"bat": {"count": 1, "desc": ["Batch"]}, | ||||
"cfg": {"count": 1, "desc": ["Ini"]}, | ||||
"ini": {"count": 1, "desc": ["Ini"]}, | ||||
"js": {"count": 1, "desc": ["Javascript"]}, | ||||
"makefile": {"count": 1, "desc": ["Makefile", "Makefile"]}, | ||||
"sh": {"count": 1, "desc": ["Bash"]} | ||||
}, | ||||
} | ||||
def test_repo_stats(self, autologin_user, backend, xhr_header): | ||||
response = self.app.get( | ||||
route_path( | ||||
'repo_stats', repo_name=backend.repo_name, commit_id='tip'), | ||||
extra_environ=xhr_header, | ||||
status=200) | ||||
assert re.match(r'6[\d\.]+ KiB', response.json['size']) | ||||
def test_repo_stats_code_stats_enabled(self, autologin_user, backend, xhr_header): | ||||
repo_name = backend.repo_name | ||||
# codes stats | ||||
self._enable_stats(repo_name) | ||||
ScmModel().mark_for_invalidation(repo_name) | ||||
response = self.app.get( | ||||
route_path( | ||||
'repo_stats', repo_name=backend.repo_name, commit_id='tip'), | ||||
extra_environ=xhr_header, | ||||
status=200) | ||||
expected_data = self.expected_trending[backend.alias] | ||||
returned_stats = response.json['code_stats'] | ||||
for k, v in expected_data.items(): | ||||
assert v == returned_stats[k] | ||||
def test_repo_refs_data(self, backend): | ||||
response = self.app.get( | ||||
route_path('repo_refs_data', repo_name=backend.repo_name), | ||||
status=200) | ||||
# Ensure that there is the correct amount of items in the result | ||||
repo = backend.repo.scm_instance() | ||||
data = response.json['results'] | ||||
items = sum(len(section['children']) for section in data) | ||||
repo_refs = len(repo.branches) + len(repo.tags) + len(repo.bookmarks) | ||||
assert items == repo_refs | ||||
def test_index_shows_missing_requirements_message( | ||||
self, backend, autologin_user): | ||||
repo_name = backend.repo_name | ||||
scm_patcher = mock.patch.object( | ||||
Repository, 'scm_instance', side_effect=RepositoryRequirementError) | ||||
with scm_patcher: | ||||
r2625 | response = self.app.get( | |||
route_path('repo_summary', repo_name=repo_name)) | ||||
r3981 | assert_response = response.assert_response() | |||
r1785 | assert_response.element_contains( | |||
'.main .alert-warning strong', 'Missing requirements') | ||||
assert_response.element_contains( | ||||
'.main .alert-warning', | ||||
'Commits cannot be displayed, because this repository ' | ||||
'uses one or more extensions, which was not enabled.') | ||||
def test_missing_requirements_page_does_not_contains_switch_to( | ||||
self, autologin_user, backend): | ||||
repo_name = backend.repo_name | ||||
scm_patcher = mock.patch.object( | ||||
Repository, 'scm_instance', side_effect=RepositoryRequirementError) | ||||
with scm_patcher: | ||||
response = self.app.get(route_path('repo_summary', repo_name=repo_name)) | ||||
response.mustcontain(no='Switch To') | ||||
@pytest.mark.usefixtures('app') | ||||
class TestRepoLocation(object): | ||||
@pytest.mark.parametrize("suffix", [u'', u'ąęł'], ids=['', 'non-ascii']) | ||||
r1984 | def test_missing_filesystem_repo( | |||
self, autologin_user, backend, suffix, csrf_token): | ||||
r1785 | repo = backend.create_repo(name_suffix=suffix) | |||
repo_name = repo.repo_name | ||||
# delete from file system | ||||
RepoModel()._delete_filesystem_repo(repo) | ||||
# test if the repo is still in the database | ||||
new_repo = RepoModel().get_by_repo_name(repo_name) | ||||
assert new_repo.repo_name == repo_name | ||||
# check if repo is not in the filesystem | ||||
assert not repo_on_filesystem(repo_name) | ||||
r1984 | ||||
response = self.app.get( | ||||
route_path('repo_summary', repo_name=safe_str(repo_name)), status=302) | ||||
r5087 | msg = f'The repository `{repo_name}` cannot be loaded in filesystem. ' \ | |||
f'Please check if it exist, or is not damaged.' | ||||
r1984 | assert_session_flash(response, msg) | |||
@pytest.mark.parametrize("suffix", [u'', u'ąęł'], ids=['', 'non-ascii']) | ||||
def test_missing_filesystem_repo_on_repo_check( | ||||
self, autologin_user, backend, suffix, csrf_token): | ||||
repo = backend.create_repo(name_suffix=suffix) | ||||
repo_name = repo.repo_name | ||||
# delete from file system | ||||
RepoModel()._delete_filesystem_repo(repo) | ||||
r1785 | ||||
r1984 | # test if the repo is still in the database | |||
new_repo = RepoModel().get_by_repo_name(repo_name) | ||||
assert new_repo.repo_name == repo_name | ||||
# check if repo is not in the filesystem | ||||
assert not repo_on_filesystem(repo_name) | ||||
# flush the session | ||||
self.app.get( | ||||
route_path('repo_summary', repo_name=safe_str(repo_name)), | ||||
status=302) | ||||
response = self.app.get( | ||||
route_path('repo_creating_check', repo_name=safe_str(repo_name)), | ||||
status=200) | ||||
msg = 'The repository `%s` cannot be loaded in filesystem. ' \ | ||||
'Please check if it exist, or is not damaged.' % repo_name | ||||
assert_session_flash(response, msg ) | ||||
r1785 | ||||
@pytest.fixture() | ||||
def summary_view(context_stub, request_stub, user_util): | ||||
""" | ||||
Bootstrap view to test the view functions | ||||
""" | ||||
request_stub.matched_route = AttributeDict(name='test_view') | ||||
r1997 | request_stub.user = user_util.create_user().AuthUser() | |||
r1785 | request_stub.db_repo = user_util.create_repo() | |||
view = RepoSummaryView(context=context_stub, request=request_stub) | ||||
return view | ||||
@pytest.mark.usefixtures('app') | ||||
class TestCreateReferenceData(object): | ||||
r3946 | @pytest.fixture() | |||
r1785 | def example_refs(self): | |||
section_1_refs = OrderedDict((('a', 'a_id'), ('b', 'b_id'))) | ||||
example_refs = [ | ||||
('section_1', section_1_refs, 't1'), | ||||
('section_2', {'c': 'c_id'}, 't2'), | ||||
] | ||||
return example_refs | ||||
def test_generates_refs_based_on_commit_ids(self, example_refs, summary_view): | ||||
repo = mock.Mock() | ||||
repo.name = 'test-repo' | ||||
repo.alias = 'git' | ||||
full_repo_name = 'pytest-repo-group/' + repo.name | ||||
result = summary_view._create_reference_data( | ||||
repo, full_repo_name, example_refs) | ||||
expected_files_url = '/{}/files/'.format(full_repo_name) | ||||
expected_result = [ | ||||
{ | ||||
'children': [ | ||||
{ | ||||
r3777 | 'id': 'a', 'idx': 0, 'raw_id': 'a_id', 'text': 'a', 'type': 't1', | |||
r1785 | 'files_url': expected_files_url + 'a/?at=a', | |||
}, | ||||
{ | ||||
r3777 | 'id': 'b', 'idx': 0, 'raw_id': 'b_id', 'text': 'b', 'type': 't1', | |||
r1785 | 'files_url': expected_files_url + 'b/?at=b', | |||
} | ||||
], | ||||
'text': 'section_1' | ||||
}, | ||||
{ | ||||
'children': [ | ||||
{ | ||||
r3777 | 'id': 'c', 'idx': 0, 'raw_id': 'c_id', 'text': 'c', 'type': 't2', | |||
r1785 | 'files_url': expected_files_url + 'c/?at=c', | |||
} | ||||
], | ||||
'text': 'section_2' | ||||
}] | ||||
assert result == expected_result | ||||
def test_generates_refs_with_path_for_svn(self, example_refs, summary_view): | ||||
repo = mock.Mock() | ||||
repo.name = 'test-repo' | ||||
repo.alias = 'svn' | ||||
full_repo_name = 'pytest-repo-group/' + repo.name | ||||
result = summary_view._create_reference_data( | ||||
repo, full_repo_name, example_refs) | ||||
expected_files_url = '/{}/files/'.format(full_repo_name) | ||||
expected_result = [ | ||||
{ | ||||
'children': [ | ||||
{ | ||||
r3777 | 'id': 'a@a_id', 'idx': 0, 'raw_id': 'a_id', | |||
r1785 | 'text': 'a', 'type': 't1', | |||
'files_url': expected_files_url + 'a_id/a?at=a', | ||||
}, | ||||
{ | ||||
r3777 | 'id': 'b@b_id', 'idx': 0, 'raw_id': 'b_id', | |||
r1785 | 'text': 'b', 'type': 't1', | |||
'files_url': expected_files_url + 'b_id/b?at=b', | ||||
} | ||||
], | ||||
'text': 'section_1' | ||||
}, | ||||
{ | ||||
'children': [ | ||||
{ | ||||
r3777 | 'id': 'c@c_id', 'idx': 0, 'raw_id': 'c_id', | |||
r1785 | 'text': 'c', 'type': 't2', | |||
'files_url': expected_files_url + 'c_id/c?at=c', | ||||
} | ||||
], | ||||
'text': 'section_2' | ||||
} | ||||
] | ||||
assert result == expected_result | ||||
class TestCreateFilesUrl(object): | ||||
r1927 | def test_creates_non_svn_url(self, app, summary_view): | |||
r1785 | repo = mock.Mock() | |||
repo.name = 'abcde' | ||||
full_repo_name = 'test-repo-group/' + repo.name | ||||
ref_name = 'branch1' | ||||
raw_id = 'deadbeef0123456789' | ||||
is_svn = False | ||||
r1927 | with mock.patch('rhodecode.lib.helpers.route_path') as url_mock: | |||
r1785 | result = summary_view._create_files_url( | |||
repo, full_repo_name, ref_name, raw_id, is_svn) | ||||
url_mock.assert_called_once_with( | ||||
r1927 | 'repo_files', repo_name=full_repo_name, commit_id=ref_name, | |||
f_path='', _query=dict(at=ref_name)) | ||||
r1785 | assert result == url_mock.return_value | |||
r1927 | def test_creates_svn_url(self, app, summary_view): | |||
r1785 | repo = mock.Mock() | |||
repo.name = 'abcde' | ||||
full_repo_name = 'test-repo-group/' + repo.name | ||||
ref_name = 'branch1' | ||||
raw_id = 'deadbeef0123456789' | ||||
is_svn = True | ||||
r1927 | with mock.patch('rhodecode.lib.helpers.route_path') as url_mock: | |||
r1785 | result = summary_view._create_files_url( | |||
repo, full_repo_name, ref_name, raw_id, is_svn) | ||||
url_mock.assert_called_once_with( | ||||
r1927 | 'repo_files', repo_name=full_repo_name, f_path=ref_name, | |||
commit_id=raw_id, _query=dict(at=ref_name)) | ||||
r1785 | assert result == url_mock.return_value | |||
r1927 | def test_name_has_slashes(self, app, summary_view): | |||
r1785 | repo = mock.Mock() | |||
repo.name = 'abcde' | ||||
full_repo_name = 'test-repo-group/' + repo.name | ||||
ref_name = 'branch1/branch2' | ||||
raw_id = 'deadbeef0123456789' | ||||
is_svn = False | ||||
r1927 | with mock.patch('rhodecode.lib.helpers.route_path') as url_mock: | |||
r1785 | result = summary_view._create_files_url( | |||
repo, full_repo_name, ref_name, raw_id, is_svn) | ||||
url_mock.assert_called_once_with( | ||||
r1927 | 'repo_files', repo_name=full_repo_name, commit_id=raw_id, | |||
f_path='', _query=dict(at=ref_name)) | ||||
r1785 | assert result == url_mock.return_value | |||
class TestReferenceItems(object): | ||||
repo = mock.Mock() | ||||
repo.name = 'pytest-repo' | ||||
repo_full_name = 'pytest-repo-group/' + repo.name | ||||
ref_type = 'branch' | ||||
fake_url = '/abcde/' | ||||
@staticmethod | ||||
def _format_function(name, id_): | ||||
return 'format_function_{}_{}'.format(name, id_) | ||||
def test_creates_required_amount_of_items(self, summary_view): | ||||
amount = 100 | ||||
refs = { | ||||
'ref{}'.format(i): '{0:040d}'.format(i) | ||||
for i in range(amount) | ||||
} | ||||
url_patcher = mock.patch.object(summary_view, '_create_files_url') | ||||
svn_patcher = mock.patch('rhodecode.lib.helpers.is_svn', | ||||
return_value=False) | ||||
with url_patcher as url_mock, svn_patcher: | ||||
result = summary_view._create_reference_items( | ||||
self.repo, self.repo_full_name, refs, self.ref_type, | ||||
self._format_function) | ||||
assert len(result) == amount | ||||
assert url_mock.call_count == amount | ||||
def test_single_item_details(self, summary_view): | ||||
ref_name = 'ref1' | ||||
ref_id = 'deadbeef' | ||||
refs = { | ||||
ref_name: ref_id | ||||
} | ||||
svn_patcher = mock.patch('rhodecode.lib.helpers.is_svn', | ||||
return_value=False) | ||||
url_patcher = mock.patch.object( | ||||
summary_view, '_create_files_url', return_value=self.fake_url) | ||||
with url_patcher as url_mock, svn_patcher: | ||||
result = summary_view._create_reference_items( | ||||
self.repo, self.repo_full_name, refs, self.ref_type, | ||||
self._format_function) | ||||
url_mock.assert_called_once_with( | ||||
self.repo, self.repo_full_name, ref_name, ref_id, False) | ||||
expected_result = [ | ||||
{ | ||||
'text': ref_name, | ||||
'id': self._format_function(ref_name, ref_id), | ||||
'raw_id': ref_id, | ||||
r3777 | 'idx': 0, | |||
r1785 | 'type': self.ref_type, | |||
'files_url': self.fake_url | ||||
} | ||||
] | ||||
assert result == expected_result | ||||