# Copyright (C) 2010-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import os import pytest from rhodecode.lib.ext_json import json from rhodecode.model.auth_token import AuthTokenModel from rhodecode.model.db import Session, FileStore, Repository, User from rhodecode.apps.file_store import utils as store_utils from rhodecode.apps.file_store import config_keys from rhodecode.tests import TestController from rhodecode.tests.routes import route_path class TestFileStoreViews(TestController): @pytest.fixture() def create_artifact_factory(self, tmpdir, ini_settings): def factory(user_id, content, f_name='example.txt'): config = ini_settings config[config_keys.backend_type] = config_keys.backend_legacy_filesystem f_store = store_utils.get_filestore_backend(config) filesystem_file = os.path.join(str(tmpdir), f_name) with open(filesystem_file, 'wt') as f: f.write(content) with open(filesystem_file, 'rb') as f: store_uid, metadata = f_store.store(f_name, f, metadata={'filename': f_name}) os.remove(filesystem_file) entry = FileStore.create( file_uid=store_uid, filename=metadata["filename"], file_hash=metadata["sha256"], file_size=metadata["size"], file_display_name='file_display_name', file_description='repo artifact `{}`'.format(metadata["filename"]), check_acl=True, user_id=user_id, ) Session().add(entry) Session().commit() return entry return factory @pytest.mark.parametrize("fid, content, exists", [ ('abcde-0.jpg', "xxxxx", True), ('abcde-0.exe', "1234567", True), ('abcde-0.jpg', "xxxxx", False), ]) def test_get_files_from_store(self, fid, content, exists, tmpdir, user_util, ini_settings): user = self.log_user() user_id = user['user_id'] repo_id = user_util.create_repo().repo_id config = ini_settings config[config_keys.backend_type] = config_keys.backend_legacy_filesystem store_uid = fid if exists: status = 200 f_store = store_utils.get_filestore_backend(config) filesystem_file = os.path.join(str(tmpdir), fid) with open(filesystem_file, 'wt') as f: f.write(content) with open(filesystem_file, 'rb') as f: store_uid, metadata = f_store.store(fid, f, metadata={'filename': fid}) os.remove(filesystem_file) entry = FileStore.create( file_uid=store_uid, filename=metadata["filename"], file_hash=metadata["sha256"], file_size=metadata["size"], file_display_name='file_display_name', file_description='repo artifact `{}`'.format(metadata["filename"]), check_acl=True, user_id=user_id, scope_repo_id=repo_id ) Session().add(entry) Session().commit() else: status = 404 response = self.app.get(route_path('download_file', fid=store_uid), status=status) if exists: assert response.text == content metadata = f_store.get_metadata(store_uid) assert 'size' in metadata def test_upload_files_without_content_to_store(self): self.log_user() response = self.app.post( route_path('upload_file'), params={'csrf_token': self.csrf_token}, status=200) assert response.json == { 'error': 'store_file data field is missing', 'access_path': None, 'store_fid': None} def test_upload_files_bogus_content_to_store(self): self.log_user() response = self.app.post( route_path('upload_file'), params={'csrf_token': self.csrf_token, 'store_file': 'bogus'}, status=200) assert response.json == { 'error': 'filename cannot be read from the data field', 'access_path': None, 'store_fid': None} def test_upload_content_to_store(self): self.log_user() response = self.app.post( route_path('upload_file'), upload_files=[('store_file', b'myfile.txt', b'SOME CONTENT')], params={'csrf_token': self.csrf_token}, status=200) assert response.json['store_fid'] def test_download_file_non_scoped(self, user_util, create_artifact_factory): user = self.log_user() user_id = user['user_id'] content = 'HELLO MY NAME IS ARTIFACT !' artifact = create_artifact_factory(user_id, content) file_uid = artifact.file_uid response = self.app.get(route_path('download_file', fid=file_uid), status=200) assert response.text == content # log-in to new user and test download again user = user_util.create_user(password='qweqwe') self.log_user(user.username, 'qweqwe') response = self.app.get(route_path('download_file', fid=file_uid), status=200) assert response.text == content def test_download_file_scoped_to_repo(self, user_util, create_artifact_factory): user = self.log_user() user_id = user['user_id'] content = 'HELLO MY NAME IS ARTIFACT !' artifact = create_artifact_factory(user_id, content) # bind to repo repo = user_util.create_repo() repo_id = repo.repo_id artifact.scope_repo_id = repo_id Session().add(artifact) Session().commit() file_uid = artifact.file_uid response = self.app.get(route_path('download_file', fid=file_uid), status=200) assert response.text == content # log-in to new user and test download again user = user_util.create_user(password='qweqwe') self.log_user(user.username, 'qweqwe') response = self.app.get(route_path('download_file', fid=file_uid), status=200) assert response.text == content # forbid user the rights to repo repo = Repository.get(repo_id) user_util.grant_user_permission_to_repo(repo, user, 'repository.none') self.app.get(route_path('download_file', fid=file_uid), status=404) def test_download_file_scoped_to_user(self, user_util, create_artifact_factory): user = self.log_user() user_id = user['user_id'] content = 'HELLO MY NAME IS ARTIFACT !' artifact = create_artifact_factory(user_id, content) # bind to user user = user_util.create_user(password='qweqwe') artifact.scope_user_id = user.user_id Session().add(artifact) Session().commit() # artifact creator doesn't have access since it's bind to another user file_uid = artifact.file_uid self.app.get(route_path('download_file', fid=file_uid), status=404) # log-in to new user and test download again, should be ok since we're bind to this artifact self.log_user(user.username, 'qweqwe') response = self.app.get(route_path('download_file', fid=file_uid), status=200) assert response.text == content def test_download_file_scoped_to_repo_with_bad_token(self, user_util, create_artifact_factory): user_id = User.get_first_super_admin().user_id content = 'HELLO MY NAME IS ARTIFACT !' artifact = create_artifact_factory(user_id, content) # bind to repo repo = user_util.create_repo() repo_id = repo.repo_id artifact.scope_repo_id = repo_id Session().add(artifact) Session().commit() file_uid = artifact.file_uid self.app.get(route_path('download_file_by_token', _auth_token='bogus', fid=file_uid), status=302) def test_download_file_scoped_to_repo_with_token(self, user_util, create_artifact_factory): user = User.get_first_super_admin() AuthTokenModel().create(user, 'test artifact token', role=AuthTokenModel.cls.ROLE_ARTIFACT_DOWNLOAD) user = User.get_first_super_admin() artifact_token = user.artifact_token user_id = User.get_first_super_admin().user_id content = 'HELLO MY NAME IS ARTIFACT !' artifact = create_artifact_factory(user_id, content) # bind to repo repo = user_util.create_repo() repo_id = repo.repo_id artifact.scope_repo_id = repo_id Session().add(artifact) Session().commit() file_uid = artifact.file_uid response = self.app.get( route_path('download_file_by_token', _auth_token=artifact_token, fid=file_uid), status=200) assert response.text == content