##// END OF EJS Templates
fix(caching): fixed problems with Cache query for users....
fix(caching): fixed problems with Cache query for users. The old way of querying caused the user get query to be always cached, and returning old results even in 2fa forms. The new limited query doesn't cache the user object resolving issues

File last commit:

r5274:6d0b768f default
r5365:ae8a165b default
Show More
test_repo_files.py
1152 lines | 43.3 KiB | text/x-python | PythonLexer
# Copyright (C) 2010-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import os
import mock
import pytest
from collections import OrderedDict
from rhodecode.apps.repository.tests.test_repo_compare import ComparePage
from rhodecode.apps.repository.views.repo_files import RepoFilesView, get_archive_name, get_path_sha
from rhodecode.lib import helpers as h
from rhodecode.lib.ext_json import json
from rhodecode.lib.str_utils import safe_str
from rhodecode.lib.vcs import nodes
from rhodecode.lib.vcs.conf import settings
from rhodecode.model.db import Session, Repository
from rhodecode.tests import assert_session_flash
from rhodecode.tests.fixture import Fixture
from rhodecode.tests.routes import route_path
fixture = Fixture()
def get_node_history(backend_type):
return {
'hg': json.loads(fixture.load_resource('hg_node_history_response.json')),
'git': json.loads(fixture.load_resource('git_node_history_response.json')),
'svn': json.loads(fixture.load_resource('svn_node_history_response.json')),
}[backend_type]
def assert_files_in_response(response, files, params):
template = (
'href="/%(repo_name)s/files/%(commit_id)s/%(name)s"')
_assert_items_in_response(response, files, template, params)
def assert_dirs_in_response(response, dirs, params):
template = (
'href="/%(repo_name)s/files/%(commit_id)s/%(name)s"')
_assert_items_in_response(response, dirs, template, params)
def _assert_items_in_response(response, items, template, params):
for item in items:
item_params = {'name': item}
item_params.update(params)
response.mustcontain(template % item_params)
def assert_timeago_in_response(response, items, params):
for item in items:
response.mustcontain(h.age_component(params['date']))
@pytest.mark.usefixtures("app")
class TestFilesViews(object):
def test_show_files(self, backend):
response = self.app.get(
route_path('repo_files',
repo_name=backend.repo_name,
commit_id='tip', f_path='/'))
commit = backend.repo.get_commit()
params = {
'repo_name': backend.repo_name,
'commit_id': commit.raw_id,
'date': commit.date
}
assert_dirs_in_response(response, ['docs', 'vcs'], params)
files = [
'.gitignore',
'.hgignore',
'.hgtags',
# TODO: missing in Git
# '.travis.yml',
'MANIFEST.in',
'README.rst',
# TODO: File is missing in svn repository
# 'run_test_and_report.sh',
'setup.cfg',
'setup.py',
'test_and_report.sh',
'tox.ini',
]
assert_files_in_response(response, files, params)
assert_timeago_in_response(response, files, params)
def test_show_files_links_submodules_with_absolute_url(self, backend_hg):
repo = backend_hg['subrepos']
response = self.app.get(
route_path('repo_files',
repo_name=repo.repo_name,
commit_id='tip', f_path='/'))
assert_response = response.assert_response()
assert_response.contains_one_link(
'absolute-path @ 000000000000', 'http://example.com/absolute-path')
def test_show_files_links_submodules_with_absolute_url_subpaths(
self, backend_hg):
repo = backend_hg['subrepos']
response = self.app.get(
route_path('repo_files',
repo_name=repo.repo_name,
commit_id='tip', f_path='/'))
assert_response = response.assert_response()
assert_response.contains_one_link(
'subpaths-path @ 000000000000',
'http://sub-base.example.com/subpaths-path')
@pytest.mark.xfail_backends("svn", reason="Depends on branch support")
def test_files_menu(self, backend):
new_branch = "temp_branch_name"
commits = [
{'message': 'a'},
{'message': 'b', 'branch': new_branch}
]
backend.create_repo(commits)
backend.repo.landing_rev = f"branch:{new_branch}"
Session().commit()
# get response based on tip and not new commit
response = self.app.get(
route_path('repo_files',
repo_name=backend.repo_name,
commit_id='tip', f_path='/'))
# make sure Files menu url is not tip but new commit
landing_rev = backend.repo.landing_ref_name
files_url = route_path('repo_files:default_path',
repo_name=backend.repo_name,
commit_id=landing_rev, params={'at': landing_rev})
assert landing_rev != 'tip'
response.mustcontain(f'<li class="active"><a class="menulink" href="{files_url}">')
def test_show_files_commit(self, backend):
commit = backend.repo.get_commit(commit_idx=32)
response = self.app.get(
route_path('repo_files',
repo_name=backend.repo_name,
commit_id=commit.raw_id, f_path='/'))
dirs = ['docs', 'tests']
files = ['README.rst']
params = {
'repo_name': backend.repo_name,
'commit_id': commit.raw_id,
}
assert_dirs_in_response(response, dirs, params)
assert_files_in_response(response, files, params)
def test_show_files_different_branch(self, backend):
branches = dict(
hg=(150, ['git']),
# TODO: Git test repository does not contain other branches
git=(633, ['master']),
# TODO: Branch support in Subversion
svn=(150, [])
)
idx, branches = branches[backend.alias]
commit = backend.repo.get_commit(commit_idx=idx)
response = self.app.get(
route_path('repo_files',
repo_name=backend.repo_name,
commit_id=commit.raw_id, f_path='/'))
assert_response = response.assert_response()
for branch in branches:
assert_response.element_contains('.tags .branchtag', branch)
def test_show_files_paging(self, backend):
repo = backend.repo
indexes = [73, 92, 109, 1, 0]
idx_map = [(rev, repo.get_commit(commit_idx=rev).raw_id)
for rev in indexes]
for idx in idx_map:
response = self.app.get(
route_path('repo_files',
repo_name=backend.repo_name,
commit_id=idx[1], f_path='/'))
response.mustcontain("""r%s:%s""" % (idx[0], idx[1][:8]))
def test_file_source(self, backend):
commit = backend.repo.get_commit(commit_idx=167)
response = self.app.get(
route_path('repo_files',
repo_name=backend.repo_name,
commit_id=commit.raw_id, f_path='vcs/nodes.py'))
msgbox = """<div class="commit">%s</div>"""
response.mustcontain(msgbox % (commit.message, ))
assert_response = response.assert_response()
if commit.branch:
assert_response.element_contains(
'.tags.tags-main .branchtag', commit.branch)
if commit.tags:
for tag in commit.tags:
assert_response.element_contains('.tags.tags-main .tagtag', tag)
def test_file_source_annotated(self, backend):
response = self.app.get(
route_path('repo_files:annotated',
repo_name=backend.repo_name,
commit_id='tip', f_path='vcs/nodes.py'))
expected_commits = {
'hg': 'r356',
'git': 'r345',
'svn': 'r208',
}
response.mustcontain(expected_commits[backend.alias])
def test_file_source_authors(self, backend):
response = self.app.get(
route_path('repo_file_authors',
repo_name=backend.repo_name,
commit_id='tip', f_path='vcs/nodes.py'))
expected_authors = {
'hg': ('Marcin Kuzminski', 'Lukasz Balcerzak'),
'git': ('Marcin Kuzminski', 'Lukasz Balcerzak'),
'svn': ('marcin', 'lukasz'),
}
for author in expected_authors[backend.alias]:
response.mustcontain(author)
def test_file_source_authors_with_annotation(self, backend):
response = self.app.get(
route_path('repo_file_authors',
repo_name=backend.repo_name,
commit_id='tip', f_path='vcs/nodes.py',
params=dict(annotate=1)))
expected_authors = {
'hg': ('Marcin Kuzminski', 'Lukasz Balcerzak'),
'git': ('Marcin Kuzminski', 'Lukasz Balcerzak'),
'svn': ('marcin', 'lukasz'),
}
for author in expected_authors[backend.alias]:
response.mustcontain(author)
def test_file_source_history(self, backend, xhr_header):
response = self.app.get(
route_path('repo_file_history',
repo_name=backend.repo_name,
commit_id='tip', f_path='vcs/nodes.py'),
extra_environ=xhr_header)
assert get_node_history(backend.alias) == json.loads(response.body)
def test_file_source_history_svn(self, backend_svn, xhr_header):
simple_repo = backend_svn['svn-simple-layout']
response = self.app.get(
route_path('repo_file_history',
repo_name=simple_repo.repo_name,
commit_id='tip', f_path='trunk/example.py'),
extra_environ=xhr_header)
expected_data = json.loads(
fixture.load_resource('svn_node_history_branches.json'))
assert expected_data == response.json
def test_file_source_history_with_annotation(self, backend, xhr_header):
response = self.app.get(
route_path('repo_file_history',
repo_name=backend.repo_name,
commit_id='tip', f_path='vcs/nodes.py',
params=dict(annotate=1)),
extra_environ=xhr_header)
assert get_node_history(backend.alias) == json.loads(response.body)
def test_tree_search_top_level(self, backend, xhr_header):
commit = backend.repo.get_commit(commit_idx=173)
response = self.app.get(
route_path('repo_files_nodelist',
repo_name=backend.repo_name,
commit_id=commit.raw_id, f_path='/'),
extra_environ=xhr_header)
assert 'nodes' in response.json
assert {'name': 'docs', 'type': 'dir'} in response.json['nodes']
def test_tree_search_missing_xhr(self, backend):
self.app.get(
route_path('repo_files_nodelist',
repo_name=backend.repo_name,
commit_id='tip', f_path='/'),
status=404)
def test_tree_search_at_path(self, backend, xhr_header):
commit = backend.repo.get_commit(commit_idx=173)
response = self.app.get(
route_path('repo_files_nodelist',
repo_name=backend.repo_name,
commit_id=commit.raw_id, f_path='/docs'),
extra_environ=xhr_header)
assert 'nodes' in response.json
nodes = response.json['nodes']
assert {'name': 'docs/api', 'type': 'dir'} in nodes
assert {'name': 'docs/index.rst', 'type': 'file'} in nodes
def test_tree_search_at_path_2nd_level(self, backend, xhr_header):
commit = backend.repo.get_commit(commit_idx=173)
response = self.app.get(
route_path('repo_files_nodelist',
repo_name=backend.repo_name,
commit_id=commit.raw_id, f_path='/docs/api'),
extra_environ=xhr_header)
assert 'nodes' in response.json
nodes = response.json['nodes']
assert {'name': 'docs/api/index.rst', 'type': 'file'} in nodes
def test_tree_search_at_path_missing_xhr(self, backend):
self.app.get(
route_path('repo_files_nodelist',
repo_name=backend.repo_name,
commit_id='tip', f_path='/docs'),
status=404)
def test_nodetree(self, backend, xhr_header):
commit = backend.repo.get_commit(commit_idx=173)
response = self.app.get(
route_path('repo_nodetree_full',
repo_name=backend.repo_name,
commit_id=commit.raw_id, f_path='/'),
extra_environ=xhr_header)
assert_response = response.assert_response()
for attr in ['data-commit-id', 'data-date', 'data-author']:
elements = assert_response.get_elements('[{}]'.format(attr))
assert len(elements) > 1
for element in elements:
assert element.get(attr)
def test_nodetree_if_file(self, backend, xhr_header):
commit = backend.repo.get_commit(commit_idx=173)
response = self.app.get(
route_path('repo_nodetree_full',
repo_name=backend.repo_name,
commit_id=commit.raw_id, f_path='README.rst'),
extra_environ=xhr_header)
assert response.text == ''
def test_nodetree_wrong_path(self, backend, xhr_header):
commit = backend.repo.get_commit(commit_idx=173)
response = self.app.get(
route_path('repo_nodetree_full',
repo_name=backend.repo_name,
commit_id=commit.raw_id, f_path='/dont-exist'),
extra_environ=xhr_header)
err = 'error: There is no file nor ' \
'directory at the given path'
assert err in response.text
def test_nodetree_missing_xhr(self, backend):
self.app.get(
route_path('repo_nodetree_full',
repo_name=backend.repo_name,
commit_id='tip', f_path='/'),
status=404)
@pytest.mark.usefixtures("app", "autologin_user")
class TestRawFileHandling(object):
def test_download_file(self, backend):
commit = backend.repo.get_commit(commit_idx=173)
response = self.app.get(
route_path('repo_file_download',
repo_name=backend.repo_name,
commit_id=commit.raw_id, f_path='vcs/nodes.py'),)
assert response.content_disposition == 'attachment; filename="nodes.py"; filename*=UTF-8\'\'nodes.py'
assert response.content_type == "text/x-python"
def test_download_file_wrong_cs(self, backend):
raw_id = u'ERRORce30c96924232dffcd24178a07ffeb5dfc'
response = self.app.get(
route_path('repo_file_download',
repo_name=backend.repo_name,
commit_id=raw_id, f_path='vcs/nodes.svg'),
status=404)
msg = """No such commit exists for this repository"""
response.mustcontain(msg)
def test_download_file_wrong_f_path(self, backend):
commit = backend.repo.get_commit(commit_idx=173)
f_path = 'vcs/ERRORnodes.py'
response = self.app.get(
route_path('repo_file_download',
repo_name=backend.repo_name,
commit_id=commit.raw_id, f_path=f_path),
status=404)
msg = (
"There is no file nor directory at the given path: "
"`%s` at commit %s" % (f_path, commit.short_id))
response.mustcontain(msg)
def test_file_raw(self, backend):
commit = backend.repo.get_commit(commit_idx=173)
response = self.app.get(
route_path('repo_file_raw',
repo_name=backend.repo_name,
commit_id=commit.raw_id, f_path='vcs/nodes.py'),)
assert response.content_type == "text/plain"
def test_file_raw_binary(self, backend):
commit = backend.repo.get_commit()
response = self.app.get(
route_path('repo_file_raw',
repo_name=backend.repo_name,
commit_id=commit.raw_id,
f_path='docs/theme/ADC/static/breadcrumb_background.png'),)
assert response.content_disposition == 'inline'
def test_raw_file_wrong_cs(self, backend):
raw_id = u'ERRORcce30c96924232dffcd24178a07ffeb5dfc'
response = self.app.get(
route_path('repo_file_raw',
repo_name=backend.repo_name,
commit_id=raw_id, f_path='vcs/nodes.svg'),
status=404)
msg = """No such commit exists for this repository"""
response.mustcontain(msg)
def test_raw_wrong_f_path(self, backend):
commit = backend.repo.get_commit(commit_idx=173)
f_path = 'vcs/ERRORnodes.py'
response = self.app.get(
route_path('repo_file_raw',
repo_name=backend.repo_name,
commit_id=commit.raw_id, f_path=f_path),
status=404)
msg = (
"There is no file nor directory at the given path: "
"`%s` at commit %s" % (f_path, commit.short_id))
response.mustcontain(msg)
def test_raw_svg_should_not_be_rendered(self, backend):
backend.create_repo()
backend.ensure_file(b"xss.svg")
response = self.app.get(
route_path('repo_file_raw',
repo_name=backend.repo_name,
commit_id='tip', f_path='xss.svg'),)
# If the content type is image/svg+xml then it allows to render HTML
# and malicious SVG.
assert response.content_type == "text/plain"
@pytest.mark.usefixtures("app")
class TestRepositoryArchival(object):
def test_archival(self, backend):
backend.enable_downloads()
commit = backend.repo.get_commit(commit_idx=173)
for a_type, content_type, extension in settings.ARCHIVE_SPECS:
path_sha = get_path_sha('/')
filename = get_archive_name(backend.repo_id, backend.repo_name, commit_sha=commit.short_id, ext=extension, path_sha=path_sha)
fname = commit.raw_id + extension
response = self.app.get(
route_path('repo_archivefile',
repo_name=backend.repo_name,
fname=fname))
assert response.status == '200 OK'
headers = [
('Content-Disposition', f'attachment; filename={filename}'),
('Content-Type', content_type),
]
for header in headers:
assert header in list(response.headers.items())
def test_archival_no_hash(self, backend):
backend.enable_downloads()
commit = backend.repo.get_commit(commit_idx=173)
for a_type, content_type, extension in settings.ARCHIVE_SPECS:
path_sha = get_path_sha('/')
filename = get_archive_name(backend.repo_id, backend.repo_name, commit_sha=commit.short_id, ext=extension, path_sha=path_sha, with_hash=False)
fname = commit.raw_id + extension
response = self.app.get(
route_path('repo_archivefile',
repo_name=backend.repo_name,
fname=fname, params={'with_hash': 0}))
assert response.status == '200 OK'
headers = [
('Content-Disposition', f'attachment; filename={filename}'),
('Content-Type', content_type),
]
for header in headers:
assert header in list(response.headers.items())
def test_archival_at_path(self, backend):
backend.enable_downloads()
commit = backend.repo.get_commit(commit_idx=190)
at_path = 'vcs'
for a_type, content_type, extension in settings.ARCHIVE_SPECS:
path_sha = get_path_sha(at_path)
filename = get_archive_name(backend.repo_id, backend.repo_name, commit_sha=commit.short_id, ext=extension, path_sha=path_sha)
fname = commit.raw_id + extension
response = self.app.get(
route_path('repo_archivefile',
repo_name=backend.repo_name,
fname=fname, params={'at_path': at_path}))
assert response.status == '200 OK'
headers = [
('Content-Disposition', f'attachment; filename={filename}'),
('Content-Type', content_type),
]
for header in headers:
assert header in list(response.headers.items())
@pytest.mark.parametrize('arch_ext',[
'tar', 'rar', 'x', '..ax', '.zipz', 'tar.gz.tar'])
def test_archival_wrong_ext(self, backend, arch_ext):
backend.enable_downloads()
commit = backend.repo.get_commit(commit_idx=173)
fname = commit.raw_id + '.' + arch_ext
response = self.app.get(
route_path('repo_archivefile',
repo_name=backend.repo_name,
fname=fname))
response.mustcontain(
'Unknown archive type for: `{}`'.format(fname))
@pytest.mark.parametrize('commit_id', [
'00x000000', 'tar', 'wrong', '@$@$42413232', '232dffcd'])
def test_archival_wrong_commit_id(self, backend, commit_id):
backend.enable_downloads()
fname = f'{commit_id}.zip'
response = self.app.get(
route_path('repo_archivefile',
repo_name=backend.repo_name,
fname=fname))
response.mustcontain('Unknown commit_id')
@pytest.mark.usefixtures("app")
class TestFilesDiff(object):
@pytest.mark.parametrize("diff", ['diff', 'download', 'raw'])
def test_file_full_diff(self, backend, diff):
commit1 = backend.repo.get_commit(commit_idx=-1)
commit2 = backend.repo.get_commit(commit_idx=-2)
response = self.app.get(
route_path('repo_files_diff',
repo_name=backend.repo_name,
f_path='README'),
params={
'diff1': commit2.raw_id,
'diff2': commit1.raw_id,
'fulldiff': '1',
'diff': diff,
})
if diff == 'diff':
# use redirect since this is OLD view redirecting to compare page
response = response.follow()
# It's a symlink to README.rst
response.mustcontain('README.rst')
response.mustcontain('No newline at end of file')
def test_file_binary_diff(self, backend):
commits = [
{'message': 'First commit'},
{'message': 'Commit with binary',
'added': [nodes.FileNode(b'file.bin', content='\0BINARY\0')]},
]
repo = backend.create_repo(commits=commits)
response = self.app.get(
route_path('repo_files_diff',
repo_name=backend.repo_name,
f_path='file.bin'),
params={
'diff1': repo.get_commit(commit_idx=0).raw_id,
'diff2': repo.get_commit(commit_idx=1).raw_id,
'fulldiff': '1',
'diff': 'diff',
})
# use redirect since this is OLD view redirecting to compare page
response = response.follow()
response.mustcontain('Collapse 1 commit')
file_changes = (1, 0, 0)
compare_page = ComparePage(response)
compare_page.contains_change_summary(*file_changes)
if backend.alias == 'svn':
response.mustcontain('new file 10644')
# TODO(marcink): SVN doesn't yet detect binary changes
else:
response.mustcontain('new file 100644')
response.mustcontain('binary diff hidden')
def test_diff_2way(self, backend):
commit1 = backend.repo.get_commit(commit_idx=-1)
commit2 = backend.repo.get_commit(commit_idx=-2)
response = self.app.get(
route_path('repo_files_diff_2way_redirect',
repo_name=backend.repo_name,
f_path='README'),
params={
'diff1': commit2.raw_id,
'diff2': commit1.raw_id,
})
# use redirect since this is OLD view redirecting to compare page
response = response.follow()
# It's a symlink to README.rst
response.mustcontain('README.rst')
response.mustcontain('No newline at end of file')
def test_requires_one_commit_id(self, backend, autologin_user):
response = self.app.get(
route_path('repo_files_diff',
repo_name=backend.repo_name,
f_path='README.rst'),
status=400)
response.mustcontain(
'Need query parameter', 'diff1', 'diff2', 'to generate a diff.')
def test_returns_no_files_if_file_does_not_exist(self, vcsbackend):
repo = vcsbackend.repo
response = self.app.get(
route_path('repo_files_diff',
repo_name=repo.name,
f_path='does-not-exist-in-any-commit'),
params={
'diff1': repo[0].raw_id,
'diff2': repo[1].raw_id
})
response = response.follow()
response.mustcontain('No files')
def test_returns_redirect_if_file_not_changed(self, backend):
commit = backend.repo.get_commit(commit_idx=-1)
response = self.app.get(
route_path('repo_files_diff_2way_redirect',
repo_name=backend.repo_name,
f_path='README'),
params={
'diff1': commit.raw_id,
'diff2': commit.raw_id,
})
response = response.follow()
response.mustcontain('No files')
response.mustcontain('No commits in this compare')
def test_supports_diff_to_different_path_svn(self, backend_svn):
#TODO: check this case
return
repo = backend_svn['svn-simple-layout'].scm_instance()
commit_id_1 = '24'
commit_id_2 = '26'
response = self.app.get(
route_path('repo_files_diff',
repo_name=backend_svn.repo_name,
f_path='trunk/example.py'),
params={
'diff1': 'tags/v0.2/example.py@' + commit_id_1,
'diff2': commit_id_2,
})
response = response.follow()
response.mustcontain(
# diff contains this
"Will print out a useful message on invocation.")
# Note: Expecting that we indicate the user what's being compared
response.mustcontain("trunk/example.py")
response.mustcontain("tags/v0.2/example.py")
def test_show_rev_redirects_to_svn_path(self, backend_svn):
#TODO: check this case
return
repo = backend_svn['svn-simple-layout'].scm_instance()
commit_id = repo[-1].raw_id
response = self.app.get(
route_path('repo_files_diff',
repo_name=backend_svn.repo_name,
f_path='trunk/example.py'),
params={
'diff1': 'branches/argparse/example.py@' + commit_id,
'diff2': commit_id,
},
status=302)
response = response.follow()
assert response.headers['Location'].endswith(
'svn-svn-simple-layout/files/26/branches/argparse/example.py')
def test_show_rev_and_annotate_redirects_to_svn_path(self, backend_svn):
#TODO: check this case
return
repo = backend_svn['svn-simple-layout'].scm_instance()
commit_id = repo[-1].raw_id
response = self.app.get(
route_path('repo_files_diff',
repo_name=backend_svn.repo_name,
f_path='trunk/example.py'),
params={
'diff1': 'branches/argparse/example.py@' + commit_id,
'diff2': commit_id,
'show_rev': 'Show at Revision',
'annotate': 'true',
},
status=302)
response = response.follow()
assert response.headers['Location'].endswith(
'svn-svn-simple-layout/annotate/26/branches/argparse/example.py')
@pytest.mark.usefixtures("app", "autologin_user")
class TestModifyFilesWithWebInterface(object):
def test_add_file_view(self, backend):
self.app.get(
route_path('repo_files_add_file',
repo_name=backend.repo_name,
commit_id='tip', f_path='/')
)
@pytest.mark.xfail_backends("svn", reason="Depends on online editing")
def test_add_file_into_repo_missing_content(self, backend, csrf_token):
backend.create_repo()
filename = 'init.py'
response = self.app.post(
route_path('repo_files_create_file',
repo_name=backend.repo_name,
commit_id='tip', f_path='/'),
params={
'content': "",
'filename': filename,
'csrf_token': csrf_token,
},
status=302)
expected_msg = 'Successfully committed new file `{}`'.format(os.path.join(filename))
assert_session_flash(response, expected_msg)
def test_add_file_into_repo_missing_filename(self, backend, csrf_token):
commit_id = backend.repo.get_commit().raw_id
response = self.app.post(
route_path('repo_files_create_file',
repo_name=backend.repo_name,
commit_id=commit_id, f_path='/'),
params={
'content': "foo",
'csrf_token': csrf_token,
},
status=302)
assert_session_flash(response, 'No filename specified')
def test_add_file_into_repo_errors_and_no_commits(
self, backend, csrf_token):
repo = backend.create_repo()
# Create a file with no filename, it will display an error but
# the repo has no commits yet
response = self.app.post(
route_path('repo_files_create_file',
repo_name=repo.repo_name,
commit_id='tip', f_path='/'),
params={
'content': "foo",
'csrf_token': csrf_token,
},
status=302)
assert_session_flash(response, 'No filename specified')
# Not allowed, redirect to the summary
redirected = response.follow()
summary_url = h.route_path('repo_summary', repo_name=repo.repo_name)
# As there are no commits, displays the summary page with the error of
# creating a file with no filename
assert redirected.request.path == summary_url
@pytest.mark.parametrize("filename, clean_filename", [
('/abs/foo', 'abs/foo'),
('../rel/foo', 'rel/foo'),
('file/../foo/foo', 'file/foo/foo'),
])
def test_add_file_into_repo_bad_filenames(self, filename, clean_filename, backend, csrf_token):
repo = backend.create_repo()
commit_id = repo.get_commit().raw_id
response = self.app.post(
route_path('repo_files_create_file',
repo_name=repo.repo_name,
commit_id=commit_id, f_path='/'),
params={
'content': "foo",
'filename': filename,
'csrf_token': csrf_token,
},
status=302)
expected_msg = 'Successfully committed new file `{}`'.format(clean_filename)
assert_session_flash(response, expected_msg)
@pytest.mark.parametrize("cnt, filename, content", [
(1, 'foo.txt', "Content"),
(2, 'dir/foo.rst', "Content"),
(3, 'dir/foo-second.rst', "Content"),
(4, 'rel/dir/foo.bar', "Content"),
])
def test_add_file_into_empty_repo(self, cnt, filename, content, backend, csrf_token):
repo = backend.create_repo()
commit_id = repo.get_commit().raw_id
response = self.app.post(
route_path('repo_files_create_file',
repo_name=repo.repo_name,
commit_id=commit_id, f_path='/'),
params={
'content': content,
'filename': filename,
'csrf_token': csrf_token,
},
status=302)
expected_msg = 'Successfully committed new file `{}`'.format(filename)
assert_session_flash(response, expected_msg)
def test_edit_file_view(self, backend):
response = self.app.get(
route_path('repo_files_edit_file',
repo_name=backend.repo_name,
commit_id=backend.default_head_id,
f_path='vcs/nodes.py'),
status=200)
response.mustcontain("Module holding everything related to vcs nodes.")
def test_edit_file_view_not_on_branch(self, backend):
repo = backend.create_repo()
backend.ensure_file(b"vcs/nodes.py")
response = self.app.get(
route_path('repo_files_edit_file',
repo_name=repo.repo_name,
commit_id='tip',
f_path='vcs/nodes.py'),
status=302)
assert_session_flash(
response, 'Cannot modify file. Given commit `tip` is not head of a branch.')
def test_edit_file_view_commit_changes(self, backend, csrf_token):
repo = backend.create_repo()
backend.ensure_file(b"vcs/nodes.py", content=b"print 'hello'")
response = self.app.post(
route_path('repo_files_update_file',
repo_name=repo.repo_name,
commit_id=backend.default_head_id,
f_path='vcs/nodes.py'),
params={
'content': "print 'hello world'",
'message': 'I committed',
'filename': "vcs/nodes.py",
'csrf_token': csrf_token,
},
status=302)
assert_session_flash(
response, 'Successfully committed changes to file `vcs/nodes.py`')
tip = repo.get_commit(commit_idx=-1)
assert tip.message == 'I committed'
def test_replace_binary_file_view_commit_changes(self, backend, csrf_token):
repo = backend.create_repo()
backend.ensure_file(b"vcs/nodes.docx", content=b"PREVIOUS CONTENT'")
response = self.app.post(
route_path('repo_files_replace_binary',
repo_name=repo.repo_name,
commit_id=backend.default_head_id,
f_path='vcs/nodes.docx'),
params={
'message': 'I committed',
'csrf_token': csrf_token,
},
upload_files=[('files_upload', 'vcs/nodes.docx', b'SOME CONTENT')],
status=200)
assert_session_flash(
response, 'Successfully committed 1 new file')
tip = repo.get_commit(commit_idx=-1)
assert tip.message == 'I committed'
def test_edit_file_view_commit_changes_default_message(self, backend,
csrf_token):
repo = backend.create_repo()
backend.ensure_file(b"vcs/nodes.py", content=b"print 'hello'")
commit_id = (
backend.default_branch_name or
backend.repo.scm_instance().commit_ids[-1])
response = self.app.post(
route_path('repo_files_update_file',
repo_name=repo.repo_name,
commit_id=commit_id,
f_path='vcs/nodes.py'),
params={
'content': "print 'hello world'",
'message': '',
'filename': "vcs/nodes.py",
'csrf_token': csrf_token,
},
status=302)
assert_session_flash(
response, 'Successfully committed changes to file `vcs/nodes.py`')
tip = repo.get_commit(commit_idx=-1)
assert tip.message == 'Edited file vcs/nodes.py via RhodeCode Enterprise'
def test_replace_binary_file_content_with_content_that_not_belong_to_original_type(self, backend, csrf_token):
repo = backend.create_repo()
backend.ensure_file(b"vcs/sheet.xlsx", content=b"PREVIOUS CONTENT'")
response = self.app.post(
route_path('repo_files_replace_binary',
repo_name=repo.repo_name,
commit_id=backend.default_head_id,
f_path='vcs/sheet.xlsx'),
params={
'message': 'I committed',
'csrf_token': csrf_token,
},
upload_files=[('files_upload', 'vcs/sheet.docx', b'SOME CONTENT')],
status=200)
assert response.json['error'] == "file extension of uploaded file doesn't match an original file's extension"
@pytest.mark.parametrize("replacement_files, expected_error", [
([], 'missing files'),
(
[('files_upload', 'vcs/node1.docx', b'SOME CONTENT'),
('files_upload', 'vcs/node2.docx', b'SOME CONTENT')],
'too many files for replacement'),
])
def test_replace_binary_with_wrong_amount_of_content_sources(self, replacement_files, expected_error, backend,
csrf_token):
repo = backend.create_repo()
backend.ensure_file(b"vcs/node.docx", content=b"PREVIOUS CONTENT'")
response = self.app.post(
route_path('repo_files_replace_binary',
repo_name=repo.repo_name,
commit_id=backend.default_head_id,
f_path='vcs/node.docx'),
params={
'message': 'I committed',
'csrf_token': csrf_token,
},
upload_files=replacement_files,
status=200)
assert response.json['error'] == expected_error
def test_delete_file_view(self, backend):
self.app.get(
route_path('repo_files_remove_file',
repo_name=backend.repo_name,
commit_id=backend.default_head_id,
f_path='vcs/nodes.py'),
status=200)
def test_delete_file_view_not_on_branch(self, backend):
repo = backend.create_repo()
backend.ensure_file(b'vcs/nodes.py')
response = self.app.get(
route_path('repo_files_remove_file',
repo_name=repo.repo_name,
commit_id='tip',
f_path='vcs/nodes.py'),
status=302)
assert_session_flash(
response, 'Cannot modify file. Given commit `tip` is not head of a branch.')
def test_delete_file_view_commit_changes(self, backend, csrf_token):
repo = backend.create_repo()
backend.ensure_file(b"vcs/nodes.py")
response = self.app.post(
route_path('repo_files_delete_file',
repo_name=repo.repo_name,
commit_id=backend.default_head_id,
f_path='vcs/nodes.py'),
params={
'message': 'i committed',
'csrf_token': csrf_token,
},
status=302)
assert_session_flash(
response, 'Successfully deleted file `vcs/nodes.py`')
@pytest.mark.usefixtures("app")
class TestFilesViewOtherCases(object):
def test_access_empty_repo_redirect_to_summary_with_alert_write_perms(
self, backend_stub, autologin_regular_user, user_regular,
user_util):
repo = backend_stub.create_repo()
user_util.grant_user_permission_to_repo(
repo, user_regular, 'repository.write')
response = self.app.get(
route_path('repo_files',
repo_name=repo.repo_name,
commit_id='tip', f_path='/'))
repo_file_add_url = route_path(
'repo_files_add_file',
repo_name=repo.repo_name,
commit_id=0, f_path='')
add_new = f'<a class="alert-link" href="{repo_file_add_url}">add a new file</a>'
repo_file_upload_url = route_path(
'repo_files_upload_file',
repo_name=repo.repo_name,
commit_id=0, f_path='')
upload_new = f'<a class="alert-link" href="{repo_file_upload_url}">upload a new file</a>'
assert_session_flash(
response,
'There are no files yet. Click here to %s or %s.' % (add_new, upload_new)
)
def test_access_empty_repo_redirect_to_summary_with_alert_no_write_perms(
self, backend_stub, autologin_regular_user):
repo = backend_stub.create_repo()
# init session for anon user
route_path('repo_summary', repo_name=repo.repo_name)
repo_file_add_url = route_path(
'repo_files_add_file',
repo_name=repo.repo_name,
commit_id=0, f_path='')
response = self.app.get(
route_path('repo_files',
repo_name=repo.repo_name,
commit_id='tip', f_path='/'))
assert_session_flash(response, no_=repo_file_add_url)
@pytest.mark.parametrize('file_node', [
b'archive/file.zip',
b'diff/my-file.txt',
b'render.py',
b'render',
b'remove_file',
b'remove_file/to-delete.txt',
])
def test_file_names_equal_to_routes_parts(self, backend, file_node):
backend.create_repo()
backend.ensure_file(file_node)
self.app.get(
route_path('repo_files',
repo_name=backend.repo_name,
commit_id='tip', f_path=safe_str(file_node)),
status=200)
class TestAdjustFilePathForSvn(object):
"""
SVN specific adjustments of node history in RepoFilesView.
"""
def test_returns_path_relative_to_matched_reference(self):
repo = self._repo(branches=['trunk'])
self.assert_file_adjustment('trunk/file', 'file', repo)
def test_does_not_modify_file_if_no_reference_matches(self):
repo = self._repo(branches=['trunk'])
self.assert_file_adjustment('notes/file', 'notes/file', repo)
def test_does_not_adjust_partial_directory_names(self):
repo = self._repo(branches=['trun'])
self.assert_file_adjustment('trunk/file', 'trunk/file', repo)
def test_is_robust_to_patterns_which_prefix_other_patterns(self):
repo = self._repo(branches=['trunk', 'trunk/new', 'trunk/old'])
self.assert_file_adjustment('trunk/new/file', 'file', repo)
def assert_file_adjustment(self, f_path, expected, repo):
result = RepoFilesView.adjust_file_path_for_svn(f_path, repo)
assert result == expected
def _repo(self, branches=None):
repo = mock.Mock()
repo.branches = OrderedDict((name, '0') for name in branches or [])
repo.tags = {}
return repo