# Copyright (C) 2010-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import os import mock import pytest from collections import OrderedDict from rhodecode.apps.repository.tests.test_repo_compare import ComparePage from rhodecode.apps.repository.views.repo_files import RepoFilesView, get_archive_name, get_path_sha from rhodecode.lib import helpers as h from rhodecode.lib.ext_json import json from rhodecode.lib.str_utils import safe_str from rhodecode.lib.vcs import nodes from rhodecode.lib.vcs.conf import settings from rhodecode.model.db import Session, Repository from rhodecode.tests import assert_session_flash from rhodecode.tests.fixture import Fixture from rhodecode.tests.routes import route_path fixture = Fixture() def get_node_history(backend_type): return { 'hg': json.loads(fixture.load_resource('hg_node_history_response.json')), 'git': json.loads(fixture.load_resource('git_node_history_response.json')), 'svn': json.loads(fixture.load_resource('svn_node_history_response.json')), }[backend_type] def assert_files_in_response(response, files, params): template = ( 'href="/%(repo_name)s/files/%(commit_id)s/%(name)s"') _assert_items_in_response(response, files, template, params) def assert_dirs_in_response(response, dirs, params): template = ( 'href="/%(repo_name)s/files/%(commit_id)s/%(name)s"') _assert_items_in_response(response, dirs, template, params) def _assert_items_in_response(response, items, template, params): for item in items: item_params = {'name': item} item_params.update(params) response.mustcontain(template % item_params) def assert_timeago_in_response(response, items, params): for item in items: response.mustcontain(h.age_component(params['date'])) @pytest.mark.usefixtures("app") class TestFilesViews(object): def test_show_files(self, backend): response = self.app.get( route_path('repo_files', repo_name=backend.repo_name, commit_id='tip', f_path='/')) commit = backend.repo.get_commit() params = { 'repo_name': backend.repo_name, 'commit_id': commit.raw_id, 'date': commit.date } assert_dirs_in_response(response, ['docs', 'vcs'], params) files = [ '.gitignore', '.hgignore', '.hgtags', # TODO: missing in Git # '.travis.yml', 'MANIFEST.in', 'README.rst', # TODO: File is missing in svn repository # 'run_test_and_report.sh', 'setup.cfg', 'setup.py', 'test_and_report.sh', 'tox.ini', ] assert_files_in_response(response, files, params) assert_timeago_in_response(response, files, params) def test_show_files_links_submodules_with_absolute_url(self, backend_hg): repo = backend_hg['subrepos'] response = self.app.get( route_path('repo_files', repo_name=repo.repo_name, commit_id='tip', f_path='/')) assert_response = response.assert_response() assert_response.contains_one_link( 'absolute-path @ 000000000000', 'http://example.com/absolute-path') def test_show_files_links_submodules_with_absolute_url_subpaths( self, backend_hg): repo = backend_hg['subrepos'] response = self.app.get( route_path('repo_files', repo_name=repo.repo_name, commit_id='tip', f_path='/')) assert_response = response.assert_response() assert_response.contains_one_link( 'subpaths-path @ 000000000000', 'http://sub-base.example.com/subpaths-path') @pytest.mark.xfail_backends("svn", reason="Depends on branch support") def test_files_menu(self, backend): new_branch = "temp_branch_name" commits = [ {'message': 'a'}, {'message': 'b', 'branch': new_branch} ] backend.create_repo(commits) backend.repo.landing_rev = f"branch:{new_branch}" Session().commit() # get response based on tip and not new commit response = self.app.get( route_path('repo_files', repo_name=backend.repo_name, commit_id='tip', f_path='/')) # make sure Files menu url is not tip but new commit landing_rev = backend.repo.landing_ref_name files_url = route_path('repo_files:default_path', repo_name=backend.repo_name, commit_id=landing_rev, params={'at': landing_rev}) assert landing_rev != 'tip' response.mustcontain(f'<li class="active"><a class="menulink" href="{files_url}">') def test_show_files_commit(self, backend): commit = backend.repo.get_commit(commit_idx=32) response = self.app.get( route_path('repo_files', repo_name=backend.repo_name, commit_id=commit.raw_id, f_path='/')) dirs = ['docs', 'tests'] files = ['README.rst'] params = { 'repo_name': backend.repo_name, 'commit_id': commit.raw_id, } assert_dirs_in_response(response, dirs, params) assert_files_in_response(response, files, params) def test_show_files_different_branch(self, backend): branches = dict( hg=(150, ['git']), # TODO: Git test repository does not contain other branches git=(633, ['master']), # TODO: Branch support in Subversion svn=(150, []) ) idx, branches = branches[backend.alias] commit = backend.repo.get_commit(commit_idx=idx) response = self.app.get( route_path('repo_files', repo_name=backend.repo_name, commit_id=commit.raw_id, f_path='/')) assert_response = response.assert_response() for branch in branches: assert_response.element_contains('.tags .branchtag', branch) def test_show_files_paging(self, backend): repo = backend.repo indexes = [73, 92, 109, 1, 0] idx_map = [(rev, repo.get_commit(commit_idx=rev).raw_id) for rev in indexes] for idx in idx_map: response = self.app.get( route_path('repo_files', repo_name=backend.repo_name, commit_id=idx[1], f_path='/')) response.mustcontain("""r%s:%s""" % (idx[0], idx[1][:8])) def test_file_source(self, backend): commit = backend.repo.get_commit(commit_idx=167) response = self.app.get( route_path('repo_files', repo_name=backend.repo_name, commit_id=commit.raw_id, f_path='vcs/nodes.py')) msgbox = """<div class="commit">%s</div>""" response.mustcontain(msgbox % (commit.message, )) assert_response = response.assert_response() if commit.branch: assert_response.element_contains( '.tags.tags-main .branchtag', commit.branch) if commit.tags: for tag in commit.tags: assert_response.element_contains('.tags.tags-main .tagtag', tag) def test_file_source_annotated(self, backend): response = self.app.get( route_path('repo_files:annotated', repo_name=backend.repo_name, commit_id='tip', f_path='vcs/nodes.py')) expected_commits = { 'hg': 'r356', 'git': 'r345', 'svn': 'r208', } response.mustcontain(expected_commits[backend.alias]) def test_file_source_authors(self, backend): response = self.app.get( route_path('repo_file_authors', repo_name=backend.repo_name, commit_id='tip', f_path='vcs/nodes.py')) expected_authors = { 'hg': ('Marcin Kuzminski', 'Lukasz Balcerzak'), 'git': ('Marcin Kuzminski', 'Lukasz Balcerzak'), 'svn': ('marcin', 'lukasz'), } for author in expected_authors[backend.alias]: response.mustcontain(author) def test_file_source_authors_with_annotation(self, backend): response = self.app.get( route_path('repo_file_authors', repo_name=backend.repo_name, commit_id='tip', f_path='vcs/nodes.py', params=dict(annotate=1))) expected_authors = { 'hg': ('Marcin Kuzminski', 'Lukasz Balcerzak'), 'git': ('Marcin Kuzminski', 'Lukasz Balcerzak'), 'svn': ('marcin', 'lukasz'), } for author in expected_authors[backend.alias]: response.mustcontain(author) def test_file_source_history(self, backend, xhr_header): response = self.app.get( route_path('repo_file_history', repo_name=backend.repo_name, commit_id='tip', f_path='vcs/nodes.py'), extra_environ=xhr_header) assert get_node_history(backend.alias) == json.loads(response.body) def test_file_source_history_svn(self, backend_svn, xhr_header): simple_repo = backend_svn['svn-simple-layout'] response = self.app.get( route_path('repo_file_history', repo_name=simple_repo.repo_name, commit_id='tip', f_path='trunk/example.py'), extra_environ=xhr_header) expected_data = json.loads( fixture.load_resource('svn_node_history_branches.json')) assert expected_data == response.json def test_file_source_history_with_annotation(self, backend, xhr_header): response = self.app.get( route_path('repo_file_history', repo_name=backend.repo_name, commit_id='tip', f_path='vcs/nodes.py', params=dict(annotate=1)), extra_environ=xhr_header) assert get_node_history(backend.alias) == json.loads(response.body) def test_tree_search_top_level(self, backend, xhr_header): commit = backend.repo.get_commit(commit_idx=173) response = self.app.get( route_path('repo_files_nodelist', repo_name=backend.repo_name, commit_id=commit.raw_id, f_path='/'), extra_environ=xhr_header) assert 'nodes' in response.json assert {'name': 'docs', 'type': 'dir'} in response.json['nodes'] def test_tree_search_missing_xhr(self, backend): self.app.get( route_path('repo_files_nodelist', repo_name=backend.repo_name, commit_id='tip', f_path='/'), status=404) def test_tree_search_at_path(self, backend, xhr_header): commit = backend.repo.get_commit(commit_idx=173) response = self.app.get( route_path('repo_files_nodelist', repo_name=backend.repo_name, commit_id=commit.raw_id, f_path='/docs'), extra_environ=xhr_header) assert 'nodes' in response.json nodes = response.json['nodes'] assert {'name': 'docs/api', 'type': 'dir'} in nodes assert {'name': 'docs/index.rst', 'type': 'file'} in nodes def test_tree_search_at_path_2nd_level(self, backend, xhr_header): commit = backend.repo.get_commit(commit_idx=173) response = self.app.get( route_path('repo_files_nodelist', repo_name=backend.repo_name, commit_id=commit.raw_id, f_path='/docs/api'), extra_environ=xhr_header) assert 'nodes' in response.json nodes = response.json['nodes'] assert {'name': 'docs/api/index.rst', 'type': 'file'} in nodes def test_tree_search_at_path_missing_xhr(self, backend): self.app.get( route_path('repo_files_nodelist', repo_name=backend.repo_name, commit_id='tip', f_path='/docs'), status=404) def test_nodetree(self, backend, xhr_header): commit = backend.repo.get_commit(commit_idx=173) response = self.app.get( route_path('repo_nodetree_full', repo_name=backend.repo_name, commit_id=commit.raw_id, f_path='/'), extra_environ=xhr_header) assert_response = response.assert_response() for attr in ['data-commit-id', 'data-date', 'data-author']: elements = assert_response.get_elements('[{}]'.format(attr)) assert len(elements) > 1 for element in elements: assert element.get(attr) def test_nodetree_if_file(self, backend, xhr_header): commit = backend.repo.get_commit(commit_idx=173) response = self.app.get( route_path('repo_nodetree_full', repo_name=backend.repo_name, commit_id=commit.raw_id, f_path='README.rst'), extra_environ=xhr_header) assert response.text == '' def test_nodetree_wrong_path(self, backend, xhr_header): commit = backend.repo.get_commit(commit_idx=173) response = self.app.get( route_path('repo_nodetree_full', repo_name=backend.repo_name, commit_id=commit.raw_id, f_path='/dont-exist'), extra_environ=xhr_header) err = 'error: There is no file nor ' \ 'directory at the given path' assert err in response.text def test_nodetree_missing_xhr(self, backend): self.app.get( route_path('repo_nodetree_full', repo_name=backend.repo_name, commit_id='tip', f_path='/'), status=404) @pytest.mark.usefixtures("app", "autologin_user") class TestRawFileHandling(object): def test_download_file(self, backend): commit = backend.repo.get_commit(commit_idx=173) response = self.app.get( route_path('repo_file_download', repo_name=backend.repo_name, commit_id=commit.raw_id, f_path='vcs/nodes.py'),) assert response.content_disposition == 'attachment; filename="nodes.py"; filename*=UTF-8\'\'nodes.py' assert response.content_type == "text/x-python" def test_download_file_wrong_cs(self, backend): raw_id = u'ERRORce30c96924232dffcd24178a07ffeb5dfc' response = self.app.get( route_path('repo_file_download', repo_name=backend.repo_name, commit_id=raw_id, f_path='vcs/nodes.svg'), status=404) msg = """No such commit exists for this repository""" response.mustcontain(msg) def test_download_file_wrong_f_path(self, backend): commit = backend.repo.get_commit(commit_idx=173) f_path = 'vcs/ERRORnodes.py' response = self.app.get( route_path('repo_file_download', repo_name=backend.repo_name, commit_id=commit.raw_id, f_path=f_path), status=404) msg = ( "There is no file nor directory at the given path: " "`%s` at commit %s" % (f_path, commit.short_id)) response.mustcontain(msg) def test_file_raw(self, backend): commit = backend.repo.get_commit(commit_idx=173) response = self.app.get( route_path('repo_file_raw', repo_name=backend.repo_name, commit_id=commit.raw_id, f_path='vcs/nodes.py'),) assert response.content_type == "text/plain" def test_file_raw_binary(self, backend): commit = backend.repo.get_commit() response = self.app.get( route_path('repo_file_raw', repo_name=backend.repo_name, commit_id=commit.raw_id, f_path='docs/theme/ADC/static/breadcrumb_background.png'),) assert response.content_disposition == 'inline' def test_raw_file_wrong_cs(self, backend): raw_id = u'ERRORcce30c96924232dffcd24178a07ffeb5dfc' response = self.app.get( route_path('repo_file_raw', repo_name=backend.repo_name, commit_id=raw_id, f_path='vcs/nodes.svg'), status=404) msg = """No such commit exists for this repository""" response.mustcontain(msg) def test_raw_wrong_f_path(self, backend): commit = backend.repo.get_commit(commit_idx=173) f_path = 'vcs/ERRORnodes.py' response = self.app.get( route_path('repo_file_raw', repo_name=backend.repo_name, commit_id=commit.raw_id, f_path=f_path), status=404) msg = ( "There is no file nor directory at the given path: " "`%s` at commit %s" % (f_path, commit.short_id)) response.mustcontain(msg) def test_raw_svg_should_not_be_rendered(self, backend): backend.create_repo() backend.ensure_file(b"xss.svg") response = self.app.get( route_path('repo_file_raw', repo_name=backend.repo_name, commit_id='tip', f_path='xss.svg'),) # If the content type is image/svg+xml then it allows to render HTML # and malicious SVG. assert response.content_type == "text/plain" @pytest.mark.usefixtures("app") class TestRepositoryArchival(object): def test_archival(self, backend): backend.enable_downloads() commit = backend.repo.get_commit(commit_idx=173) for a_type, content_type, extension in settings.ARCHIVE_SPECS: path_sha = get_path_sha('/') filename = get_archive_name(backend.repo_id, backend.repo_name, commit_sha=commit.short_id, ext=extension, path_sha=path_sha) fname = commit.raw_id + extension response = self.app.get( route_path('repo_archivefile', repo_name=backend.repo_name, fname=fname)) assert response.status == '200 OK' headers = [ ('Content-Disposition', f'attachment; filename={filename}'), ('Content-Type', content_type), ] for header in headers: assert header in list(response.headers.items()) def test_archival_no_hash(self, backend): backend.enable_downloads() commit = backend.repo.get_commit(commit_idx=173) for a_type, content_type, extension in settings.ARCHIVE_SPECS: path_sha = get_path_sha('/') filename = get_archive_name(backend.repo_id, backend.repo_name, commit_sha=commit.short_id, ext=extension, path_sha=path_sha, with_hash=False) fname = commit.raw_id + extension response = self.app.get( route_path('repo_archivefile', repo_name=backend.repo_name, fname=fname, params={'with_hash': 0})) assert response.status == '200 OK' headers = [ ('Content-Disposition', f'attachment; filename={filename}'), ('Content-Type', content_type), ] for header in headers: assert header in list(response.headers.items()) def test_archival_at_path(self, backend): backend.enable_downloads() commit = backend.repo.get_commit(commit_idx=190) at_path = 'vcs' for a_type, content_type, extension in settings.ARCHIVE_SPECS: path_sha = get_path_sha(at_path) filename = get_archive_name(backend.repo_id, backend.repo_name, commit_sha=commit.short_id, ext=extension, path_sha=path_sha) fname = commit.raw_id + extension response = self.app.get( route_path('repo_archivefile', repo_name=backend.repo_name, fname=fname, params={'at_path': at_path})) assert response.status == '200 OK' headers = [ ('Content-Disposition', f'attachment; filename={filename}'), ('Content-Type', content_type), ] for header in headers: assert header in list(response.headers.items()) @pytest.mark.parametrize('arch_ext',[ 'tar', 'rar', 'x', '..ax', '.zipz', 'tar.gz.tar']) def test_archival_wrong_ext(self, backend, arch_ext): backend.enable_downloads() commit = backend.repo.get_commit(commit_idx=173) fname = commit.raw_id + '.' + arch_ext response = self.app.get( route_path('repo_archivefile', repo_name=backend.repo_name, fname=fname)) response.mustcontain( 'Unknown archive type for: `{}`'.format(fname)) @pytest.mark.parametrize('commit_id', [ '00x000000', 'tar', 'wrong', '@$@$42413232', '232dffcd']) def test_archival_wrong_commit_id(self, backend, commit_id): backend.enable_downloads() fname = f'{commit_id}.zip' response = self.app.get( route_path('repo_archivefile', repo_name=backend.repo_name, fname=fname)) response.mustcontain('Unknown commit_id') @pytest.mark.usefixtures("app") class TestFilesDiff(object): @pytest.mark.parametrize("diff", ['diff', 'download', 'raw']) def test_file_full_diff(self, backend, diff): commit1 = backend.repo.get_commit(commit_idx=-1) commit2 = backend.repo.get_commit(commit_idx=-2) response = self.app.get( route_path('repo_files_diff', repo_name=backend.repo_name, f_path='README'), params={ 'diff1': commit2.raw_id, 'diff2': commit1.raw_id, 'fulldiff': '1', 'diff': diff, }) if diff == 'diff': # use redirect since this is OLD view redirecting to compare page response = response.follow() # It's a symlink to README.rst response.mustcontain('README.rst') response.mustcontain('No newline at end of file') def test_file_binary_diff(self, backend): commits = [ {'message': 'First commit'}, {'message': 'Commit with binary', 'added': [nodes.FileNode(b'file.bin', content='\0BINARY\0')]}, ] repo = backend.create_repo(commits=commits) response = self.app.get( route_path('repo_files_diff', repo_name=backend.repo_name, f_path='file.bin'), params={ 'diff1': repo.get_commit(commit_idx=0).raw_id, 'diff2': repo.get_commit(commit_idx=1).raw_id, 'fulldiff': '1', 'diff': 'diff', }) # use redirect since this is OLD view redirecting to compare page response = response.follow() response.mustcontain('Collapse 1 commit') file_changes = (1, 0, 0) compare_page = ComparePage(response) compare_page.contains_change_summary(*file_changes) if backend.alias == 'svn': response.mustcontain('new file 10644') # TODO(marcink): SVN doesn't yet detect binary changes else: response.mustcontain('new file 100644') response.mustcontain('binary diff hidden') def test_diff_2way(self, backend): commit1 = backend.repo.get_commit(commit_idx=-1) commit2 = backend.repo.get_commit(commit_idx=-2) response = self.app.get( route_path('repo_files_diff_2way_redirect', repo_name=backend.repo_name, f_path='README'), params={ 'diff1': commit2.raw_id, 'diff2': commit1.raw_id, }) # use redirect since this is OLD view redirecting to compare page response = response.follow() # It's a symlink to README.rst response.mustcontain('README.rst') response.mustcontain('No newline at end of file') def test_requires_one_commit_id(self, backend, autologin_user): response = self.app.get( route_path('repo_files_diff', repo_name=backend.repo_name, f_path='README.rst'), status=400) response.mustcontain( 'Need query parameter', 'diff1', 'diff2', 'to generate a diff.') def test_returns_no_files_if_file_does_not_exist(self, vcsbackend): repo = vcsbackend.repo response = self.app.get( route_path('repo_files_diff', repo_name=repo.name, f_path='does-not-exist-in-any-commit'), params={ 'diff1': repo[0].raw_id, 'diff2': repo[1].raw_id }) response = response.follow() response.mustcontain('No files') def test_returns_redirect_if_file_not_changed(self, backend): commit = backend.repo.get_commit(commit_idx=-1) response = self.app.get( route_path('repo_files_diff_2way_redirect', repo_name=backend.repo_name, f_path='README'), params={ 'diff1': commit.raw_id, 'diff2': commit.raw_id, }) response = response.follow() response.mustcontain('No files') response.mustcontain('No commits in this compare') def test_supports_diff_to_different_path_svn(self, backend_svn): #TODO: check this case return repo = backend_svn['svn-simple-layout'].scm_instance() commit_id_1 = '24' commit_id_2 = '26' response = self.app.get( route_path('repo_files_diff', repo_name=backend_svn.repo_name, f_path='trunk/example.py'), params={ 'diff1': 'tags/v0.2/example.py@' + commit_id_1, 'diff2': commit_id_2, }) response = response.follow() response.mustcontain( # diff contains this "Will print out a useful message on invocation.") # Note: Expecting that we indicate the user what's being compared response.mustcontain("trunk/example.py") response.mustcontain("tags/v0.2/example.py") def test_show_rev_redirects_to_svn_path(self, backend_svn): #TODO: check this case return repo = backend_svn['svn-simple-layout'].scm_instance() commit_id = repo[-1].raw_id response = self.app.get( route_path('repo_files_diff', repo_name=backend_svn.repo_name, f_path='trunk/example.py'), params={ 'diff1': 'branches/argparse/example.py@' + commit_id, 'diff2': commit_id, }, status=302) response = response.follow() assert response.headers['Location'].endswith( 'svn-svn-simple-layout/files/26/branches/argparse/example.py') def test_show_rev_and_annotate_redirects_to_svn_path(self, backend_svn): #TODO: check this case return repo = backend_svn['svn-simple-layout'].scm_instance() commit_id = repo[-1].raw_id response = self.app.get( route_path('repo_files_diff', repo_name=backend_svn.repo_name, f_path='trunk/example.py'), params={ 'diff1': 'branches/argparse/example.py@' + commit_id, 'diff2': commit_id, 'show_rev': 'Show at Revision', 'annotate': 'true', }, status=302) response = response.follow() assert response.headers['Location'].endswith( 'svn-svn-simple-layout/annotate/26/branches/argparse/example.py') @pytest.mark.usefixtures("app", "autologin_user") class TestModifyFilesWithWebInterface(object): def test_add_file_view(self, backend): self.app.get( route_path('repo_files_add_file', repo_name=backend.repo_name, commit_id='tip', f_path='/') ) @pytest.mark.xfail_backends("svn", reason="Depends on online editing") def test_add_file_into_repo_missing_content(self, backend, csrf_token): backend.create_repo() filename = 'init.py' response = self.app.post( route_path('repo_files_create_file', repo_name=backend.repo_name, commit_id='tip', f_path='/'), params={ 'content': "", 'filename': filename, 'csrf_token': csrf_token, }, status=302) expected_msg = 'Successfully committed new file `{}`'.format(os.path.join(filename)) assert_session_flash(response, expected_msg) def test_add_file_into_repo_missing_filename(self, backend, csrf_token): commit_id = backend.repo.get_commit().raw_id response = self.app.post( route_path('repo_files_create_file', repo_name=backend.repo_name, commit_id=commit_id, f_path='/'), params={ 'content': "foo", 'csrf_token': csrf_token, }, status=302) assert_session_flash(response, 'No filename specified') def test_add_file_into_repo_errors_and_no_commits( self, backend, csrf_token): repo = backend.create_repo() # Create a file with no filename, it will display an error but # the repo has no commits yet response = self.app.post( route_path('repo_files_create_file', repo_name=repo.repo_name, commit_id='tip', f_path='/'), params={ 'content': "foo", 'csrf_token': csrf_token, }, status=302) assert_session_flash(response, 'No filename specified') # Not allowed, redirect to the summary redirected = response.follow() summary_url = h.route_path('repo_summary', repo_name=repo.repo_name) # As there are no commits, displays the summary page with the error of # creating a file with no filename assert redirected.request.path == summary_url @pytest.mark.parametrize("filename, clean_filename", [ ('/abs/foo', 'abs/foo'), ('../rel/foo', 'rel/foo'), ('file/../foo/foo', 'file/foo/foo'), ]) def test_add_file_into_repo_bad_filenames(self, filename, clean_filename, backend, csrf_token): repo = backend.create_repo() commit_id = repo.get_commit().raw_id response = self.app.post( route_path('repo_files_create_file', repo_name=repo.repo_name, commit_id=commit_id, f_path='/'), params={ 'content': "foo", 'filename': filename, 'csrf_token': csrf_token, }, status=302) expected_msg = 'Successfully committed new file `{}`'.format(clean_filename) assert_session_flash(response, expected_msg) @pytest.mark.parametrize("cnt, filename, content", [ (1, 'foo.txt', "Content"), (2, 'dir/foo.rst', "Content"), (3, 'dir/foo-second.rst', "Content"), (4, 'rel/dir/foo.bar', "Content"), ]) def test_add_file_into_empty_repo(self, cnt, filename, content, backend, csrf_token): repo = backend.create_repo() commit_id = repo.get_commit().raw_id response = self.app.post( route_path('repo_files_create_file', repo_name=repo.repo_name, commit_id=commit_id, f_path='/'), params={ 'content': content, 'filename': filename, 'csrf_token': csrf_token, }, status=302) expected_msg = 'Successfully committed new file `{}`'.format(filename) assert_session_flash(response, expected_msg) def test_edit_file_view(self, backend): response = self.app.get( route_path('repo_files_edit_file', repo_name=backend.repo_name, commit_id=backend.default_head_id, f_path='vcs/nodes.py'), status=200) response.mustcontain("Module holding everything related to vcs nodes.") def test_edit_file_view_not_on_branch(self, backend): repo = backend.create_repo() backend.ensure_file(b"vcs/nodes.py") response = self.app.get( route_path('repo_files_edit_file', repo_name=repo.repo_name, commit_id='tip', f_path='vcs/nodes.py'), status=302) assert_session_flash( response, 'Cannot modify file. Given commit `tip` is not head of a branch.') def test_edit_file_view_commit_changes(self, backend, csrf_token): repo = backend.create_repo() backend.ensure_file(b"vcs/nodes.py", content=b"print 'hello'") response = self.app.post( route_path('repo_files_update_file', repo_name=repo.repo_name, commit_id=backend.default_head_id, f_path='vcs/nodes.py'), params={ 'content': "print 'hello world'", 'message': 'I committed', 'filename': "vcs/nodes.py", 'csrf_token': csrf_token, }, status=302) assert_session_flash( response, 'Successfully committed changes to file `vcs/nodes.py`') tip = repo.get_commit(commit_idx=-1) assert tip.message == 'I committed' def test_edit_file_view_commit_changes_default_message(self, backend, csrf_token): repo = backend.create_repo() backend.ensure_file(b"vcs/nodes.py", content=b"print 'hello'") commit_id = ( backend.default_branch_name or backend.repo.scm_instance().commit_ids[-1]) response = self.app.post( route_path('repo_files_update_file', repo_name=repo.repo_name, commit_id=commit_id, f_path='vcs/nodes.py'), params={ 'content': "print 'hello world'", 'message': '', 'filename': "vcs/nodes.py", 'csrf_token': csrf_token, }, status=302) assert_session_flash( response, 'Successfully committed changes to file `vcs/nodes.py`') tip = repo.get_commit(commit_idx=-1) assert tip.message == 'Edited file vcs/nodes.py via RhodeCode Enterprise' def test_delete_file_view(self, backend): self.app.get( route_path('repo_files_remove_file', repo_name=backend.repo_name, commit_id=backend.default_head_id, f_path='vcs/nodes.py'), status=200) def test_delete_file_view_not_on_branch(self, backend): repo = backend.create_repo() backend.ensure_file(b'vcs/nodes.py') response = self.app.get( route_path('repo_files_remove_file', repo_name=repo.repo_name, commit_id='tip', f_path='vcs/nodes.py'), status=302) assert_session_flash( response, 'Cannot modify file. Given commit `tip` is not head of a branch.') def test_delete_file_view_commit_changes(self, backend, csrf_token): repo = backend.create_repo() backend.ensure_file(b"vcs/nodes.py") response = self.app.post( route_path('repo_files_delete_file', repo_name=repo.repo_name, commit_id=backend.default_head_id, f_path='vcs/nodes.py'), params={ 'message': 'i committed', 'csrf_token': csrf_token, }, status=302) assert_session_flash( response, 'Successfully deleted file `vcs/nodes.py`') @pytest.mark.usefixtures("app") class TestFilesViewOtherCases(object): def test_access_empty_repo_redirect_to_summary_with_alert_write_perms( self, backend_stub, autologin_regular_user, user_regular, user_util): repo = backend_stub.create_repo() user_util.grant_user_permission_to_repo( repo, user_regular, 'repository.write') response = self.app.get( route_path('repo_files', repo_name=repo.repo_name, commit_id='tip', f_path='/')) repo_file_add_url = route_path( 'repo_files_add_file', repo_name=repo.repo_name, commit_id=0, f_path='') add_new = f'<a class="alert-link" href="{repo_file_add_url}">add a new file</a>' repo_file_upload_url = route_path( 'repo_files_upload_file', repo_name=repo.repo_name, commit_id=0, f_path='') upload_new = f'<a class="alert-link" href="{repo_file_upload_url}">upload a new file</a>' assert_session_flash( response, 'There are no files yet. Click here to %s or %s.' % (add_new, upload_new) ) def test_access_empty_repo_redirect_to_summary_with_alert_no_write_perms( self, backend_stub, autologin_regular_user): repo = backend_stub.create_repo() # init session for anon user route_path('repo_summary', repo_name=repo.repo_name) repo_file_add_url = route_path( 'repo_files_add_file', repo_name=repo.repo_name, commit_id=0, f_path='') response = self.app.get( route_path('repo_files', repo_name=repo.repo_name, commit_id='tip', f_path='/')) assert_session_flash(response, no_=repo_file_add_url) @pytest.mark.parametrize('file_node', [ b'archive/file.zip', b'diff/my-file.txt', b'render.py', b'render', b'remove_file', b'remove_file/to-delete.txt', ]) def test_file_names_equal_to_routes_parts(self, backend, file_node): backend.create_repo() backend.ensure_file(file_node) self.app.get( route_path('repo_files', repo_name=backend.repo_name, commit_id='tip', f_path=safe_str(file_node)), status=200) class TestAdjustFilePathForSvn(object): """ SVN specific adjustments of node history in RepoFilesView. """ def test_returns_path_relative_to_matched_reference(self): repo = self._repo(branches=['trunk']) self.assert_file_adjustment('trunk/file', 'file', repo) def test_does_not_modify_file_if_no_reference_matches(self): repo = self._repo(branches=['trunk']) self.assert_file_adjustment('notes/file', 'notes/file', repo) def test_does_not_adjust_partial_directory_names(self): repo = self._repo(branches=['trun']) self.assert_file_adjustment('trunk/file', 'trunk/file', repo) def test_is_robust_to_patterns_which_prefix_other_patterns(self): repo = self._repo(branches=['trunk', 'trunk/new', 'trunk/old']) self.assert_file_adjustment('trunk/new/file', 'file', repo) def assert_file_adjustment(self, f_path, expected, repo): result = RepoFilesView.adjust_file_path_for_svn(f_path, repo) assert result == expected def _repo(self, branches=None): repo = mock.Mock() repo.branches = OrderedDict((name, '0') for name in branches or []) repo.tags = {} return repo