test_files.py
1078 lines
| 39.7 KiB
| text/x-python
|
PythonLexer
r1 | # -*- coding: utf-8 -*- | |||
# Copyright (C) 2010-2016 RhodeCode GmbH | ||||
# | ||||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import os | ||||
import mock | ||||
import pytest | ||||
from rhodecode.controllers.files import FilesController | ||||
from rhodecode.lib import helpers as h | ||||
from rhodecode.lib.compat import OrderedDict | ||||
from rhodecode.lib.ext_json import json | ||||
from rhodecode.lib.vcs import nodes | ||||
r622 | from rhodecode.lib.vcs.backends.base import EmptyCommit | |||
r1 | from rhodecode.lib.vcs.conf import settings | |||
r622 | from rhodecode.lib.vcs.nodes import FileNode | |||
from rhodecode.model.db import Repository | ||||
from rhodecode.model.scm import ScmModel | ||||
r1 | from rhodecode.tests import ( | |||
r622 | url, TEST_USER_ADMIN_LOGIN, assert_session_flash, assert_not_in_session_flash) | |||
r1 | from rhodecode.tests.fixture import Fixture | |||
from rhodecode.tests.utils import AssertResponse | ||||
fixture = Fixture() | ||||
NODE_HISTORY = { | ||||
'hg': json.loads(fixture.load_resource('hg_node_history_response.json')), | ||||
'git': json.loads(fixture.load_resource('git_node_history_response.json')), | ||||
'svn': json.loads(fixture.load_resource('svn_node_history_response.json')), | ||||
} | ||||
r622 | ||||
def _commit_change( | ||||
repo, filename, content, message, vcs_type, parent=None, | ||||
newfile=False): | ||||
repo = Repository.get_by_repo_name(repo) | ||||
_commit = parent | ||||
if not parent: | ||||
_commit = EmptyCommit(alias=vcs_type) | ||||
if newfile: | ||||
nodes = { | ||||
filename: { | ||||
'content': content | ||||
} | ||||
} | ||||
commit = ScmModel().create_nodes( | ||||
user=TEST_USER_ADMIN_LOGIN, repo=repo, | ||||
message=message, | ||||
nodes=nodes, | ||||
parent_commit=_commit, | ||||
author=TEST_USER_ADMIN_LOGIN, | ||||
) | ||||
else: | ||||
commit = ScmModel().commit_change( | ||||
repo=repo.scm_instance(), repo_name=repo.repo_name, | ||||
commit=parent, user=TEST_USER_ADMIN_LOGIN, | ||||
author=TEST_USER_ADMIN_LOGIN, | ||||
message=message, | ||||
content=content, | ||||
f_path=filename | ||||
) | ||||
return commit | ||||
r986 | ||||
r1 | @pytest.mark.usefixtures("app") | |||
class TestFilesController: | ||||
def test_index(self, backend): | ||||
response = self.app.get(url( | ||||
controller='files', action='index', | ||||
repo_name=backend.repo_name, revision='tip', f_path='/')) | ||||
commit = backend.repo.get_commit() | ||||
params = { | ||||
'repo_name': backend.repo_name, | ||||
r423 | 'commit_id': commit.raw_id, | |||
r1 | 'date': commit.date | |||
} | ||||
assert_dirs_in_response(response, ['docs', 'vcs'], params) | ||||
files = [ | ||||
'.gitignore', | ||||
'.hgignore', | ||||
'.hgtags', | ||||
# TODO: missing in Git | ||||
# '.travis.yml', | ||||
'MANIFEST.in', | ||||
'README.rst', | ||||
# TODO: File is missing in svn repository | ||||
# 'run_test_and_report.sh', | ||||
'setup.cfg', | ||||
'setup.py', | ||||
'test_and_report.sh', | ||||
'tox.ini', | ||||
] | ||||
assert_files_in_response(response, files, params) | ||||
assert_timeago_in_response(response, files, params) | ||||
def test_index_links_submodules_with_absolute_url(self, backend_hg): | ||||
repo = backend_hg['subrepos'] | ||||
response = self.app.get(url( | ||||
controller='files', action='index', | ||||
repo_name=repo.repo_name, revision='tip', f_path='/')) | ||||
assert_response = AssertResponse(response) | ||||
assert_response.contains_one_link( | ||||
'absolute-path @ 000000000000', 'http://example.com/absolute-path') | ||||
def test_index_links_submodules_with_absolute_url_subpaths( | ||||
self, backend_hg): | ||||
repo = backend_hg['subrepos'] | ||||
response = self.app.get(url( | ||||
controller='files', action='index', | ||||
repo_name=repo.repo_name, revision='tip', f_path='/')) | ||||
assert_response = AssertResponse(response) | ||||
assert_response.contains_one_link( | ||||
'subpaths-path @ 000000000000', | ||||
'http://sub-base.example.com/subpaths-path') | ||||
@pytest.mark.xfail_backends("svn", reason="Depends on branch support") | ||||
def test_files_menu(self, backend): | ||||
new_branch = "temp_branch_name" | ||||
commits = [ | ||||
{'message': 'a'}, | ||||
{'message': 'b', 'branch': new_branch} | ||||
] | ||||
backend.create_repo(commits) | ||||
backend.repo.landing_rev = "branch:%s" % new_branch | ||||
# get response based on tip and not new revision | ||||
response = self.app.get(url( | ||||
controller='files', action='index', | ||||
repo_name=backend.repo_name, revision='tip', f_path='/'), | ||||
status=200) | ||||
# make sure Files menu url is not tip but new revision | ||||
landing_rev = backend.repo.landing_rev[1] | ||||
files_url = url('files_home', repo_name=backend.repo_name, | ||||
revision=landing_rev) | ||||
assert landing_rev != 'tip' | ||||
response.mustcontain('<li class="active"><a class="menulink" href="%s">' % files_url) | ||||
def test_index_commit(self, backend): | ||||
commit = backend.repo.get_commit(commit_idx=32) | ||||
response = self.app.get(url( | ||||
controller='files', action='index', | ||||
repo_name=backend.repo_name, | ||||
revision=commit.raw_id, | ||||
f_path='/') | ||||
) | ||||
dirs = ['docs', 'tests'] | ||||
files = ['README.rst'] | ||||
params = { | ||||
'repo_name': backend.repo_name, | ||||
r423 | 'commit_id': commit.raw_id, | |||
r1 | } | |||
assert_dirs_in_response(response, dirs, params) | ||||
assert_files_in_response(response, files, params) | ||||
@pytest.mark.xfail_backends("git", reason="Missing branches in git repo") | ||||
@pytest.mark.xfail_backends("svn", reason="Depends on branch support") | ||||
def test_index_different_branch(self, backend): | ||||
# TODO: Git test repository does not contain branches | ||||
# TODO: Branch support in Subversion | ||||
commit = backend.repo.get_commit(commit_idx=150) | ||||
response = self.app.get(url( | ||||
controller='files', action='index', | ||||
repo_name=backend.repo_name, | ||||
revision=commit.raw_id, | ||||
f_path='/')) | ||||
assert_response = AssertResponse(response) | ||||
assert_response.element_contains( | ||||
'.tags .branchtag', 'git') | ||||
def test_index_paging(self, backend): | ||||
repo = backend.repo | ||||
indexes = [73, 92, 109, 1, 0] | ||||
idx_map = [(rev, repo.get_commit(commit_idx=rev).raw_id) | ||||
for rev in indexes] | ||||
for idx in idx_map: | ||||
response = self.app.get(url( | ||||
controller='files', action='index', | ||||
repo_name=backend.repo_name, | ||||
revision=idx[1], | ||||
f_path='/')) | ||||
response.mustcontain("""r%s:%s""" % (idx[0], idx[1][:8])) | ||||
def test_file_source(self, backend): | ||||
commit = backend.repo.get_commit(commit_idx=167) | ||||
response = self.app.get(url( | ||||
controller='files', action='index', | ||||
repo_name=backend.repo_name, | ||||
revision=commit.raw_id, | ||||
f_path='vcs/nodes.py')) | ||||
msgbox = """<div class="commit right-content">%s</div>""" | ||||
response.mustcontain(msgbox % (commit.message, )) | ||||
assert_response = AssertResponse(response) | ||||
if commit.branch: | ||||
assert_response.element_contains('.tags.tags-main .branchtag', commit.branch) | ||||
if commit.tags: | ||||
for tag in commit.tags: | ||||
assert_response.element_contains('.tags.tags-main .tagtag', tag) | ||||
def test_file_source_history(self, backend): | ||||
response = self.app.get( | ||||
url( | ||||
controller='files', action='history', | ||||
repo_name=backend.repo_name, | ||||
revision='tip', | ||||
f_path='vcs/nodes.py'), | ||||
extra_environ={'HTTP_X_PARTIAL_XHR': '1'}) | ||||
assert NODE_HISTORY[backend.alias] == json.loads(response.body) | ||||
def test_file_source_history_svn(self, backend_svn): | ||||
simple_repo = backend_svn['svn-simple-layout'] | ||||
response = self.app.get( | ||||
url( | ||||
controller='files', action='history', | ||||
repo_name=simple_repo.repo_name, | ||||
revision='tip', | ||||
f_path='trunk/example.py'), | ||||
extra_environ={'HTTP_X_PARTIAL_XHR': '1'}) | ||||
expected_data = json.loads( | ||||
fixture.load_resource('svn_node_history_branches.json')) | ||||
assert expected_data == response.json | ||||
def test_file_annotation_history(self, backend): | ||||
response = self.app.get( | ||||
url( | ||||
controller='files', action='history', | ||||
repo_name=backend.repo_name, | ||||
revision='tip', | ||||
f_path='vcs/nodes.py', | ||||
annotate=True), | ||||
extra_environ={'HTTP_X_PARTIAL_XHR': '1'}) | ||||
assert NODE_HISTORY[backend.alias] == json.loads(response.body) | ||||
def test_file_annotation(self, backend): | ||||
response = self.app.get(url( | ||||
controller='files', action='index', | ||||
repo_name=backend.repo_name, revision='tip', f_path='vcs/nodes.py', | ||||
annotate=True)) | ||||
expected_revisions = { | ||||
r986 | 'hg': 'r356', | |||
'git': 'r345', | ||||
'svn': 'r208', | ||||
r1 | } | |||
response.mustcontain(expected_revisions[backend.alias]) | ||||
def test_file_authors(self, backend): | ||||
response = self.app.get(url( | ||||
controller='files', action='authors', | ||||
repo_name=backend.repo_name, | ||||
revision='tip', | ||||
f_path='vcs/nodes.py', | ||||
annotate=True)) | ||||
expected_authors = { | ||||
'hg': ('Marcin Kuzminski', 'Lukasz Balcerzak'), | ||||
'git': ('Marcin Kuzminski', 'Lukasz Balcerzak'), | ||||
'svn': ('marcin', 'lukasz'), | ||||
} | ||||
for author in expected_authors[backend.alias]: | ||||
response.mustcontain(author) | ||||
def test_tree_search_top_level(self, backend, xhr_header): | ||||
commit = backend.repo.get_commit(commit_idx=173) | ||||
response = self.app.get( | ||||
url('files_nodelist_home', repo_name=backend.repo_name, | ||||
revision=commit.raw_id, f_path='/'), | ||||
extra_environ=xhr_header) | ||||
assert 'nodes' in response.json | ||||
assert {'name': 'docs', 'type': 'dir'} in response.json['nodes'] | ||||
def test_tree_search_at_path(self, backend, xhr_header): | ||||
commit = backend.repo.get_commit(commit_idx=173) | ||||
response = self.app.get( | ||||
url('files_nodelist_home', repo_name=backend.repo_name, | ||||
revision=commit.raw_id, f_path='/docs'), | ||||
extra_environ=xhr_header) | ||||
assert 'nodes' in response.json | ||||
nodes = response.json['nodes'] | ||||
assert {'name': 'docs/api', 'type': 'dir'} in nodes | ||||
assert {'name': 'docs/index.rst', 'type': 'file'} in nodes | ||||
def test_tree_search_at_path_missing_xhr(self, backend): | ||||
self.app.get( | ||||
url('files_nodelist_home', repo_name=backend.repo_name, | ||||
revision='tip', f_path=''), status=400) | ||||
def test_tree_view_list(self, backend, xhr_header): | ||||
commit = backend.repo.get_commit(commit_idx=173) | ||||
response = self.app.get( | ||||
url('files_nodelist_home', repo_name=backend.repo_name, | ||||
f_path='/', revision=commit.raw_id), | ||||
extra_environ=xhr_header, | ||||
) | ||||
response.mustcontain("vcs/web/simplevcs/views/repository.py") | ||||
def test_tree_view_list_at_path(self, backend, xhr_header): | ||||
commit = backend.repo.get_commit(commit_idx=173) | ||||
response = self.app.get( | ||||
url('files_nodelist_home', repo_name=backend.repo_name, | ||||
f_path='/docs', revision=commit.raw_id), | ||||
extra_environ=xhr_header, | ||||
) | ||||
response.mustcontain("docs/index.rst") | ||||
def test_tree_view_list_missing_xhr(self, backend): | ||||
self.app.get( | ||||
url('files_nodelist_home', repo_name=backend.repo_name, | ||||
f_path='/', revision='tip'), status=400) | ||||
r423 | def test_nodetree_full_success(self, backend, xhr_header): | |||
r1 | commit = backend.repo.get_commit(commit_idx=173) | |||
response = self.app.get( | ||||
r423 | url('files_nodetree_full', repo_name=backend.repo_name, | |||
f_path='/', commit_id=commit.raw_id), | ||||
r1 | extra_environ=xhr_header) | |||
r423 | assert_response = AssertResponse(response) | |||
r1 | ||||
r423 | for attr in ['data-commit-id', 'data-date', 'data-author']: | |||
elements = assert_response.get_elements('[{}]'.format(attr)) | ||||
assert len(elements) > 1 | ||||
for element in elements: | ||||
assert element.get(attr) | ||||
def test_nodetree_full_if_file(self, backend, xhr_header): | ||||
r1 | commit = backend.repo.get_commit(commit_idx=173) | |||
response = self.app.get( | ||||
r423 | url('files_nodetree_full', repo_name=backend.repo_name, | |||
f_path='README.rst', commit_id=commit.raw_id), | ||||
r1 | extra_environ=xhr_header) | |||
r423 | assert response.body == '' | |||
r1 | ||||
def test_tree_metadata_list_missing_xhr(self, backend): | ||||
self.app.get( | ||||
r423 | url('files_nodetree_full', repo_name=backend.repo_name, | |||
f_path='/', commit_id='tip'), status=400) | ||||
r1 | ||||
def test_access_empty_repo_redirect_to_summary_with_alert_write_perms( | ||||
self, app, backend_stub, autologin_regular_user, user_regular, | ||||
user_util): | ||||
repo = backend_stub.create_repo() | ||||
user_util.grant_user_permission_to_repo( | ||||
repo, user_regular, 'repository.write') | ||||
response = self.app.get(url( | ||||
controller='files', action='index', | ||||
repo_name=repo.repo_name, revision='tip', f_path='/')) | ||||
assert_session_flash( | ||||
response, | ||||
'There are no files yet. <a class="alert-link" ' | ||||
'href="/%s/add/0/#edit">Click here to add a new file.</a>' | ||||
% (repo.repo_name)) | ||||
def test_access_empty_repo_redirect_to_summary_with_alert_no_write_perms( | ||||
self, backend_stub, user_util): | ||||
repo = backend_stub.create_repo() | ||||
repo_file_url = url( | ||||
'files_add_home', | ||||
repo_name=repo.repo_name, | ||||
revision=0, f_path='', anchor='edit') | ||||
response = self.app.get(url( | ||||
controller='files', action='index', | ||||
repo_name=repo.repo_name, revision='tip', f_path='/')) | ||||
assert_not_in_session_flash(response, repo_file_url) | ||||
# TODO: johbo: Think about a better place for these tests. Either controller | ||||
# specific unit tests or we move down the whole logic further towards the vcs | ||||
# layer | ||||
class TestAdjustFilePathForSvn: | ||||
"""SVN specific adjustments of node history in FileController.""" | ||||
def test_returns_path_relative_to_matched_reference(self): | ||||
repo = self._repo(branches=['trunk']) | ||||
self.assert_file_adjustment('trunk/file', 'file', repo) | ||||
def test_does_not_modify_file_if_no_reference_matches(self): | ||||
repo = self._repo(branches=['trunk']) | ||||
self.assert_file_adjustment('notes/file', 'notes/file', repo) | ||||
def test_does_not_adjust_partial_directory_names(self): | ||||
repo = self._repo(branches=['trun']) | ||||
self.assert_file_adjustment('trunk/file', 'trunk/file', repo) | ||||
def test_is_robust_to_patterns_which_prefix_other_patterns(self): | ||||
repo = self._repo(branches=['trunk', 'trunk/new', 'trunk/old']) | ||||
self.assert_file_adjustment('trunk/new/file', 'file', repo) | ||||
def assert_file_adjustment(self, f_path, expected, repo): | ||||
controller = FilesController() | ||||
result = controller._adjust_file_path_for_svn(f_path, repo) | ||||
assert result == expected | ||||
def _repo(self, branches=None): | ||||
repo = mock.Mock() | ||||
repo.branches = OrderedDict((name, '0') for name in branches or []) | ||||
repo.tags = {} | ||||
return repo | ||||
@pytest.mark.usefixtures("app") | ||||
class TestRepositoryArchival: | ||||
def test_archival(self, backend): | ||||
backend.enable_downloads() | ||||
commit = backend.repo.get_commit(commit_idx=173) | ||||
for archive, info in settings.ARCHIVE_SPECS.items(): | ||||
mime_type, arch_ext = info | ||||
short = commit.short_id + arch_ext | ||||
fname = commit.raw_id + arch_ext | ||||
filename = '%s-%s' % (backend.repo_name, short) | ||||
response = self.app.get(url(controller='files', | ||||
action='archivefile', | ||||
repo_name=backend.repo_name, | ||||
fname=fname)) | ||||
assert response.status == '200 OK' | ||||
headers = { | ||||
'Pragma': 'no-cache', | ||||
'Cache-Control': 'no-cache', | ||||
'Content-Disposition': 'attachment; filename=%s' % filename, | ||||
'Content-Type': '%s; charset=utf-8' % mime_type, | ||||
} | ||||
if 'Set-Cookie' in response.response.headers: | ||||
del response.response.headers['Set-Cookie'] | ||||
assert response.response.headers == headers | ||||
def test_archival_wrong_ext(self, backend): | ||||
backend.enable_downloads() | ||||
commit = backend.repo.get_commit(commit_idx=173) | ||||
for arch_ext in ['tar', 'rar', 'x', '..ax', '.zipz']: | ||||
fname = commit.raw_id + arch_ext | ||||
response = self.app.get(url(controller='files', | ||||
action='archivefile', | ||||
repo_name=backend.repo_name, | ||||
fname=fname)) | ||||
response.mustcontain('Unknown archive type') | ||||
def test_archival_wrong_commit_id(self, backend): | ||||
backend.enable_downloads() | ||||
for commit_id in ['00x000000', 'tar', 'wrong', '@##$@$42413232', | ||||
'232dffcd']: | ||||
fname = '%s.zip' % commit_id | ||||
response = self.app.get(url(controller='files', | ||||
action='archivefile', | ||||
repo_name=backend.repo_name, | ||||
fname=fname)) | ||||
response.mustcontain('Unknown revision') | ||||
@pytest.mark.usefixtures("app", "autologin_user") | ||||
class TestRawFileHandling: | ||||
def test_raw_file_ok(self, backend): | ||||
commit = backend.repo.get_commit(commit_idx=173) | ||||
response = self.app.get(url(controller='files', action='rawfile', | ||||
repo_name=backend.repo_name, | ||||
revision=commit.raw_id, | ||||
f_path='vcs/nodes.py')) | ||||
assert response.content_disposition == "attachment; filename=nodes.py" | ||||
assert response.content_type == "text/x-python" | ||||
def test_raw_file_wrong_cs(self, backend): | ||||
commit_id = u'ERRORce30c96924232dffcd24178a07ffeb5dfc' | ||||
f_path = 'vcs/nodes.py' | ||||
response = self.app.get(url(controller='files', action='rawfile', | ||||
repo_name=backend.repo_name, | ||||
revision=commit_id, | ||||
f_path=f_path), status=404) | ||||
msg = """No such commit exists for this repository""" | ||||
response.mustcontain(msg) | ||||
def test_raw_file_wrong_f_path(self, backend): | ||||
commit = backend.repo.get_commit(commit_idx=173) | ||||
f_path = 'vcs/ERRORnodes.py' | ||||
response = self.app.get(url(controller='files', action='rawfile', | ||||
repo_name=backend.repo_name, | ||||
revision=commit.raw_id, | ||||
f_path=f_path), status=404) | ||||
msg = ( | ||||
"There is no file nor directory at the given path: " | ||||
"'%s' at commit %s" % (f_path, commit.short_id)) | ||||
response.mustcontain(msg) | ||||
def test_raw_ok(self, backend): | ||||
commit = backend.repo.get_commit(commit_idx=173) | ||||
response = self.app.get(url(controller='files', action='raw', | ||||
repo_name=backend.repo_name, | ||||
revision=commit.raw_id, | ||||
f_path='vcs/nodes.py')) | ||||
assert response.content_type == "text/plain" | ||||
def test_raw_wrong_cs(self, backend): | ||||
commit_id = u'ERRORcce30c96924232dffcd24178a07ffeb5dfc' | ||||
f_path = 'vcs/nodes.py' | ||||
response = self.app.get(url(controller='files', action='raw', | ||||
repo_name=backend.repo_name, | ||||
revision=commit_id, | ||||
f_path=f_path), status=404) | ||||
msg = """No such commit exists for this repository""" | ||||
response.mustcontain(msg) | ||||
def test_raw_wrong_f_path(self, backend): | ||||
commit = backend.repo.get_commit(commit_idx=173) | ||||
f_path = 'vcs/ERRORnodes.py' | ||||
response = self.app.get(url(controller='files', action='raw', | ||||
repo_name=backend.repo_name, | ||||
revision=commit.raw_id, | ||||
f_path=f_path), status=404) | ||||
msg = ( | ||||
"There is no file nor directory at the given path: " | ||||
"'%s' at commit %s" % (f_path, commit.short_id)) | ||||
response.mustcontain(msg) | ||||
def test_raw_svg_should_not_be_rendered(self, backend): | ||||
backend.create_repo() | ||||
backend.ensure_file("xss.svg") | ||||
response = self.app.get(url(controller='files', action='raw', | ||||
repo_name=backend.repo_name, | ||||
revision='tip', | ||||
f_path='xss.svg')) | ||||
# If the content type is image/svg+xml then it allows to render HTML | ||||
# and malicious SVG. | ||||
assert response.content_type == "text/plain" | ||||
@pytest.mark.usefixtures("app") | ||||
class TestFilesDiff: | ||||
@pytest.mark.parametrize("diff", ['diff', 'download', 'raw']) | ||||
def test_file_full_diff(self, backend, diff): | ||||
commit1 = backend.repo.get_commit(commit_idx=-1) | ||||
commit2 = backend.repo.get_commit(commit_idx=-2) | ||||
response = self.app.get( | ||||
url( | ||||
controller='files', | ||||
action='diff', | ||||
repo_name=backend.repo_name, | ||||
f_path='README'), | ||||
params={ | ||||
'diff1': commit1.raw_id, | ||||
'diff2': commit2.raw_id, | ||||
'fulldiff': '1', | ||||
'diff': diff, | ||||
}) | ||||
response.mustcontain('README.rst') | ||||
response.mustcontain('No newline at end of file') | ||||
def test_file_binary_diff(self, backend): | ||||
commits = [ | ||||
{'message': 'First commit'}, | ||||
{'message': 'Commit with binary', | ||||
'added': [nodes.FileNode('file.bin', content='\0BINARY\0')]}, | ||||
] | ||||
repo = backend.create_repo(commits=commits) | ||||
response = self.app.get( | ||||
url( | ||||
controller='files', | ||||
action='diff', | ||||
repo_name=backend.repo_name, | ||||
f_path='file.bin'), | ||||
params={ | ||||
'diff1': repo.get_commit(commit_idx=0).raw_id, | ||||
'diff2': repo.get_commit(commit_idx=1).raw_id, | ||||
'fulldiff': '1', | ||||
'diff': 'diff', | ||||
}) | ||||
response.mustcontain('Cannot diff binary files') | ||||
def test_diff_2way(self, backend): | ||||
commit1 = backend.repo.get_commit(commit_idx=-1) | ||||
commit2 = backend.repo.get_commit(commit_idx=-2) | ||||
response = self.app.get( | ||||
url( | ||||
controller='files', | ||||
action='diff_2way', | ||||
repo_name=backend.repo_name, | ||||
f_path='README'), | ||||
params={ | ||||
'diff1': commit1.raw_id, | ||||
'diff2': commit2.raw_id, | ||||
}) | ||||
# Expecting links to both variants of the file. Links are used | ||||
# to load the content dynamically. | ||||
response.mustcontain('/%s/README' % commit1.raw_id) | ||||
response.mustcontain('/%s/README' % commit2.raw_id) | ||||
def test_requires_one_commit_id(self, backend, autologin_user): | ||||
response = self.app.get( | ||||
url( | ||||
controller='files', | ||||
action='diff', | ||||
repo_name=backend.repo_name, | ||||
f_path='README.rst'), | ||||
status=400) | ||||
response.mustcontain( | ||||
'Need query parameter', 'diff1', 'diff2', 'to generate a diff.') | ||||
def test_returns_not_found_if_file_does_not_exist(self, vcsbackend): | ||||
repo = vcsbackend.repo | ||||
self.app.get( | ||||
url( | ||||
controller='files', | ||||
action='diff', | ||||
repo_name=repo.name, | ||||
f_path='does-not-exist-in-any-commit', | ||||
diff1=repo[0].raw_id, | ||||
diff2=repo[1].raw_id), | ||||
status=404) | ||||
def test_returns_redirect_if_file_not_changed(self, backend): | ||||
commit = backend.repo.get_commit(commit_idx=-1) | ||||
f_path= 'README' | ||||
response = self.app.get( | ||||
url( | ||||
controller='files', | ||||
action='diff_2way', | ||||
repo_name=backend.repo_name, | ||||
f_path=f_path, | ||||
diff1=commit.raw_id, | ||||
diff2=commit.raw_id, | ||||
), | ||||
status=302 | ||||
) | ||||
assert response.headers['Location'].endswith(f_path) | ||||
redirected = response.follow() | ||||
redirected.mustcontain('has not changed between') | ||||
def test_supports_diff_to_different_path_svn(self, backend_svn): | ||||
repo = backend_svn['svn-simple-layout'].scm_instance() | ||||
commit_id = repo[-1].raw_id | ||||
response = self.app.get( | ||||
url( | ||||
controller='files', | ||||
action='diff', | ||||
repo_name=repo.name, | ||||
f_path='trunk/example.py', | ||||
diff1='tags/v0.2/example.py@' + commit_id, | ||||
diff2=commit_id), | ||||
status=200) | ||||
response.mustcontain( | ||||
"Will print out a useful message on invocation.") | ||||
# Note: Expecting that we indicate the user what's being compared | ||||
response.mustcontain("trunk/example.py") | ||||
response.mustcontain("tags/v0.2/example.py") | ||||
def test_show_rev_redirects_to_svn_path(self, backend_svn): | ||||
repo = backend_svn['svn-simple-layout'].scm_instance() | ||||
commit_id = repo[-1].raw_id | ||||
response = self.app.get( | ||||
url( | ||||
controller='files', | ||||
action='diff', | ||||
repo_name=repo.name, | ||||
f_path='trunk/example.py', | ||||
diff1='branches/argparse/example.py@' + commit_id, | ||||
diff2=commit_id), | ||||
params={'show_rev': 'Show at Revision'}, | ||||
status=302) | ||||
assert response.headers['Location'].endswith( | ||||
'svn-svn-simple-layout/files/26/branches/argparse/example.py') | ||||
def test_show_rev_and_annotate_redirects_to_svn_path(self, backend_svn): | ||||
repo = backend_svn['svn-simple-layout'].scm_instance() | ||||
commit_id = repo[-1].raw_id | ||||
response = self.app.get( | ||||
url( | ||||
controller='files', | ||||
action='diff', | ||||
repo_name=repo.name, | ||||
f_path='trunk/example.py', | ||||
diff1='branches/argparse/example.py@' + commit_id, | ||||
diff2=commit_id), | ||||
params={ | ||||
'show_rev': 'Show at Revision', | ||||
'annotate': 'true', | ||||
}, | ||||
status=302) | ||||
assert response.headers['Location'].endswith( | ||||
'svn-svn-simple-layout/annotate/26/branches/argparse/example.py') | ||||
@pytest.mark.usefixtures("app", "autologin_user") | ||||
class TestChangingFiles: | ||||
def test_add_file_view(self, backend): | ||||
self.app.get(url( | ||||
'files_add_home', | ||||
repo_name=backend.repo_name, | ||||
revision='tip', f_path='/')) | ||||
@pytest.mark.xfail_backends("svn", reason="Depends on online editing") | ||||
def test_add_file_into_repo_missing_content(self, backend, csrf_token): | ||||
repo = backend.create_repo() | ||||
filename = 'init.py' | ||||
response = self.app.post( | ||||
url( | ||||
'files_add', | ||||
repo_name=repo.repo_name, | ||||
revision='tip', f_path='/'), | ||||
params={ | ||||
'content': "", | ||||
'filename': filename, | ||||
'location': "", | ||||
'csrf_token': csrf_token, | ||||
}, | ||||
status=302) | ||||
assert_session_flash( | ||||
response, 'Successfully committed to %s' | ||||
% os.path.join(filename)) | ||||
def test_add_file_into_repo_missing_filename(self, backend, csrf_token): | ||||
response = self.app.post( | ||||
url( | ||||
'files_add', | ||||
repo_name=backend.repo_name, | ||||
revision='tip', f_path='/'), | ||||
params={ | ||||
'content': "foo", | ||||
'csrf_token': csrf_token, | ||||
}, | ||||
status=302) | ||||
assert_session_flash(response, 'No filename') | ||||
def test_add_file_into_repo_errors_and_no_commits( | ||||
self, backend, csrf_token): | ||||
repo = backend.create_repo() | ||||
# Create a file with no filename, it will display an error but | ||||
# the repo has no commits yet | ||||
response = self.app.post( | ||||
url( | ||||
'files_add', | ||||
repo_name=repo.repo_name, | ||||
revision='tip', f_path='/'), | ||||
params={ | ||||
'content': "foo", | ||||
'csrf_token': csrf_token, | ||||
}, | ||||
status=302) | ||||
assert_session_flash(response, 'No filename') | ||||
# Not allowed, redirect to the summary | ||||
redirected = response.follow() | ||||
summary_url = url('summary_home', repo_name=repo.repo_name) | ||||
# As there are no commits, displays the summary page with the error of | ||||
# creating a file with no filename | ||||
assert redirected.req.path == summary_url | ||||
@pytest.mark.parametrize("location, filename", [ | ||||
('/abs', 'foo'), | ||||
('../rel', 'foo'), | ||||
('file/../foo', 'foo'), | ||||
]) | ||||
def test_add_file_into_repo_bad_filenames( | ||||
self, location, filename, backend, csrf_token): | ||||
response = self.app.post( | ||||
url( | ||||
'files_add', | ||||
repo_name=backend.repo_name, | ||||
revision='tip', f_path='/'), | ||||
params={ | ||||
'content': "foo", | ||||
'filename': filename, | ||||
'location': location, | ||||
'csrf_token': csrf_token, | ||||
}, | ||||
status=302) | ||||
assert_session_flash( | ||||
response, | ||||
'The location specified must be a relative path and must not ' | ||||
'contain .. in the path') | ||||
@pytest.mark.parametrize("cnt, location, filename", [ | ||||
(1, '', 'foo.txt'), | ||||
(2, 'dir', 'foo.rst'), | ||||
(3, 'rel/dir', 'foo.bar'), | ||||
]) | ||||
def test_add_file_into_repo(self, cnt, location, filename, backend, | ||||
csrf_token): | ||||
repo = backend.create_repo() | ||||
response = self.app.post( | ||||
url( | ||||
'files_add', | ||||
repo_name=repo.repo_name, | ||||
revision='tip', f_path='/'), | ||||
params={ | ||||
'content': "foo", | ||||
'filename': filename, | ||||
'location': location, | ||||
'csrf_token': csrf_token, | ||||
}, | ||||
status=302) | ||||
assert_session_flash( | ||||
response, 'Successfully committed to %s' | ||||
% os.path.join(location, filename)) | ||||
def test_edit_file_view(self, backend): | ||||
response = self.app.get( | ||||
url( | ||||
'files_edit_home', | ||||
repo_name=backend.repo_name, | ||||
revision=backend.default_head_id, | ||||
f_path='vcs/nodes.py'), | ||||
status=200) | ||||
response.mustcontain("Module holding everything related to vcs nodes.") | ||||
def test_edit_file_view_not_on_branch(self, backend): | ||||
repo = backend.create_repo() | ||||
backend.ensure_file("vcs/nodes.py") | ||||
response = self.app.get( | ||||
url( | ||||
'files_edit_home', | ||||
repo_name=repo.repo_name, | ||||
revision='tip', f_path='vcs/nodes.py'), | ||||
status=302) | ||||
assert_session_flash( | ||||
response, | ||||
'You can only edit files with revision being a valid branch') | ||||
def test_edit_file_view_commit_changes(self, backend, csrf_token): | ||||
repo = backend.create_repo() | ||||
backend.ensure_file("vcs/nodes.py", content="print 'hello'") | ||||
response = self.app.post( | ||||
url( | ||||
'files_edit', | ||||
repo_name=repo.repo_name, | ||||
revision=backend.default_head_id, | ||||
f_path='vcs/nodes.py'), | ||||
params={ | ||||
'content': "print 'hello world'", | ||||
'message': 'I committed', | ||||
'filename': "vcs/nodes.py", | ||||
'csrf_token': csrf_token, | ||||
}, | ||||
status=302) | ||||
assert_session_flash( | ||||
response, 'Successfully committed to vcs/nodes.py') | ||||
tip = repo.get_commit(commit_idx=-1) | ||||
assert tip.message == 'I committed' | ||||
def test_edit_file_view_commit_changes_default_message(self, backend, | ||||
csrf_token): | ||||
repo = backend.create_repo() | ||||
backend.ensure_file("vcs/nodes.py", content="print 'hello'") | ||||
commit_id = ( | ||||
backend.default_branch_name or | ||||
backend.repo.scm_instance().commit_ids[-1]) | ||||
response = self.app.post( | ||||
url( | ||||
'files_edit', | ||||
repo_name=repo.repo_name, | ||||
revision=commit_id, | ||||
f_path='vcs/nodes.py'), | ||||
params={ | ||||
'content': "print 'hello world'", | ||||
'message': '', | ||||
'filename': "vcs/nodes.py", | ||||
'csrf_token': csrf_token, | ||||
}, | ||||
status=302) | ||||
assert_session_flash( | ||||
response, 'Successfully committed to vcs/nodes.py') | ||||
tip = repo.get_commit(commit_idx=-1) | ||||
assert tip.message == 'Edited file vcs/nodes.py via RhodeCode Enterprise' | ||||
def test_delete_file_view(self, backend): | ||||
self.app.get(url( | ||||
'files_delete_home', | ||||
repo_name=backend.repo_name, | ||||
revision='tip', f_path='vcs/nodes.py')) | ||||
def test_delete_file_view_not_on_branch(self, backend): | ||||
repo = backend.create_repo() | ||||
backend.ensure_file('vcs/nodes.py') | ||||
response = self.app.get( | ||||
url( | ||||
'files_delete_home', | ||||
repo_name=repo.repo_name, | ||||
revision='tip', f_path='vcs/nodes.py'), | ||||
status=302) | ||||
assert_session_flash( | ||||
response, | ||||
'You can only delete files with revision being a valid branch') | ||||
def test_delete_file_view_commit_changes(self, backend, csrf_token): | ||||
repo = backend.create_repo() | ||||
backend.ensure_file("vcs/nodes.py") | ||||
response = self.app.post( | ||||
url( | ||||
'files_delete_home', | ||||
repo_name=repo.repo_name, | ||||
revision=backend.default_head_id, | ||||
f_path='vcs/nodes.py'), | ||||
params={ | ||||
'message': 'i commited', | ||||
'csrf_token': csrf_token, | ||||
}, | ||||
status=302) | ||||
assert_session_flash( | ||||
response, 'Successfully deleted file vcs/nodes.py') | ||||
def assert_files_in_response(response, files, params): | ||||
template = ( | ||||
r423 | 'href="/%(repo_name)s/files/%(commit_id)s/%(name)s"') | |||
r1 | _assert_items_in_response(response, files, template, params) | |||
def assert_dirs_in_response(response, dirs, params): | ||||
template = ( | ||||
r423 | 'href="/%(repo_name)s/files/%(commit_id)s/%(name)s"') | |||
r1 | _assert_items_in_response(response, dirs, template, params) | |||
def _assert_items_in_response(response, items, template, params): | ||||
for item in items: | ||||
item_params = {'name': item} | ||||
item_params.update(params) | ||||
response.mustcontain(template % item_params) | ||||
def assert_timeago_in_response(response, items, params): | ||||
for item in items: | ||||
response.mustcontain(h.age_component(params['date'])) | ||||
r622 | ||||
@pytest.mark.usefixtures("autologin_user", "app") | ||||
class TestSideBySideDiff: | ||||
def test_diff2way(self, app, backend, backend_stub): | ||||
f_path = 'content' | ||||
commit1_content = 'content-25d7e49c18b159446c' | ||||
commit2_content = 'content-603d6c72c46d953420' | ||||
repo = backend.create_repo() | ||||
commit1 = _commit_change( | ||||
repo.repo_name, filename=f_path, content=commit1_content, | ||||
message='A', vcs_type=backend.alias, parent=None, newfile=True) | ||||
commit2 = _commit_change( | ||||
repo.repo_name, filename=f_path, content=commit2_content, | ||||
message='B, child of A', vcs_type=backend.alias, parent=commit1) | ||||
response = self.app.get(url( | ||||
controller='files', action='diff_2way', | ||||
repo_name=repo.repo_name, | ||||
diff1=commit1.raw_id, | ||||
diff2=commit2.raw_id, | ||||
f_path=f_path)) | ||||
assert_response = AssertResponse(response) | ||||
response.mustcontain( | ||||
('Side-by-side Diff r0:%s ... r1:%s') % ( commit1.short_id, commit2.short_id )) | ||||
response.mustcontain('id="compare"') | ||||
response.mustcontain(( | ||||
"var orig1_url = '/%s/raw/%s/%s';\n" | ||||
"var orig2_url = '/%s/raw/%s/%s';") % | ||||
( repo.repo_name, commit1.raw_id, f_path, | ||||
repo.repo_name, commit2.raw_id, f_path)) | ||||
def test_diff2way_with_empty_file(self, app, backend, backend_stub): | ||||
commits = [ | ||||
{'message': 'First commit'}, | ||||
{'message': 'Commit with binary', | ||||
'added': [nodes.FileNode('file.empty', content='')]}, | ||||
] | ||||
f_path='file.empty' | ||||
repo = backend.create_repo(commits=commits) | ||||
commit_id1 = repo.get_commit(commit_idx=0).raw_id | ||||
commit_id2 = repo.get_commit(commit_idx=1).raw_id | ||||
response = self.app.get(url( | ||||
controller='files', action='diff_2way', | ||||
repo_name=repo.repo_name, | ||||
diff1=commit_id1, | ||||
diff2=commit_id2, | ||||
f_path=f_path)) | ||||
assert_response = AssertResponse(response) | ||||
if backend.alias == 'svn': | ||||
assert_session_flash( response, | ||||
('%(file_path)s has not changed') % { 'file_path': 'file.empty' }) | ||||
else: | ||||
response.mustcontain( | ||||
('Side-by-side Diff r0:%s ... r1:%s') % ( repo.get_commit(commit_idx=0).short_id, repo.get_commit(commit_idx=1).short_id )) | ||||
response.mustcontain('id="compare"') | ||||
response.mustcontain(( | ||||
"var orig1_url = '/%s/raw/%s/%s';\n" | ||||
"var orig2_url = '/%s/raw/%s/%s';") % | ||||
( repo.repo_name, commit_id1, f_path, | ||||
repo.repo_name, commit_id2, f_path)) | ||||
def test_empty_diff_2way_redirect_to_summary_with_alert(self, app, backend): | ||||
commit_id_range = { | ||||
'hg': ( | ||||
'25d7e49c18b159446cadfa506a5cf8ad1cb04067', | ||||
'603d6c72c46d953420c89d36372f08d9f305f5dd'), | ||||
'git': ( | ||||
'6fc9270775aaf5544c1deb014f4ddd60c952fcbb', | ||||
'03fa803d7e9fb14daa9a3089e0d1494eda75d986'), | ||||
'svn': ( | ||||
'335', | ||||
'337'), | ||||
} | ||||
f_path = 'setup.py' | ||||
commit_ids = commit_id_range[backend.alias] | ||||
response = self.app.get(url( | ||||
controller='files', action='diff_2way', | ||||
repo_name=backend.repo_name, | ||||
diff2=commit_ids[0], | ||||
diff1=commit_ids[1], | ||||
f_path=f_path)) | ||||
assert_response = AssertResponse(response) | ||||
assert_session_flash( response, | ||||
('%(file_path)s has not changed') % { 'file_path': f_path }) | ||||