# Copyright (C) 2010-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import os import pytest from rhodecode.lib.ext_json import json from rhodecode.model.auth_token import AuthTokenModel from rhodecode.model.db import Session, FileStore, Repository, User from rhodecode.tests import TestController from rhodecode.apps.file_store import utils, config_keys def route_path(name, params=None, **kwargs): import urllib.request import urllib.parse import urllib.error base_url = { 'upload_file': '/_file_store/upload', 'download_file': '/_file_store/download/{fid}', 'download_file_by_token': '/_file_store/token-download/{_auth_token}/{fid}' }[name].format(**kwargs) if params: base_url = '{}?{}'.format(base_url, urllib.parse.urlencode(params)) return base_url class TestFileStoreViews(TestController): @pytest.mark.parametrize("fid, content, exists", [ ('abcde-0.jpg', "xxxxx", True), ('abcde-0.exe', "1234567", True), ('abcde-0.jpg', "xxxxx", False), ]) def test_get_files_from_store(self, fid, content, exists, tmpdir, user_util): user = self.log_user() user_id = user['user_id'] repo_id = user_util.create_repo().repo_id store_path = self.app._pyramid_settings[config_keys.store_path] store_uid = fid if exists: status = 200 store = utils.get_file_storage({config_keys.store_path: store_path}) filesystem_file = os.path.join(str(tmpdir), fid) with open(filesystem_file, 'wt') as f: f.write(content) with open(filesystem_file, 'rb') as f: store_uid, metadata = store.save_file(f, fid, extra_metadata={'filename': fid}) entry = FileStore.create( file_uid=store_uid, filename=metadata["filename"], file_hash=metadata["sha256"], file_size=metadata["size"], file_display_name='file_display_name', file_description='repo artifact `{}`'.format(metadata["filename"]), check_acl=True, user_id=user_id, scope_repo_id=repo_id ) Session().add(entry) Session().commit() else: status = 404 response = self.app.get(route_path('download_file', fid=store_uid), status=status) if exists: assert response.text == content file_store_path = os.path.dirname(store.resolve_name(store_uid, store_path)[1]) metadata_file = os.path.join(file_store_path, store_uid + '.meta') assert os.path.exists(metadata_file) with open(metadata_file, 'rb') as f: json_data = json.loads(f.read()) assert json_data assert 'size' in json_data def test_upload_files_without_content_to_store(self): self.log_user() response = self.app.post( route_path('upload_file'), params={'csrf_token': self.csrf_token}, status=200) assert response.json == { 'error': 'store_file data field is missing', 'access_path': None, 'store_fid': None} def test_upload_files_bogus_content_to_store(self): self.log_user() response = self.app.post( route_path('upload_file'), params={'csrf_token': self.csrf_token, 'store_file': 'bogus'}, status=200) assert response.json == { 'error': 'filename cannot be read from the data field', 'access_path': None, 'store_fid': None} def test_upload_content_to_store(self): self.log_user() response = self.app.post( route_path('upload_file'), upload_files=[('store_file', b'myfile.txt', b'SOME CONTENT')], params={'csrf_token': self.csrf_token}, status=200) assert response.json['store_fid'] @pytest.fixture() def create_artifact_factory(self, tmpdir): def factory(user_id, content): store_path = self.app._pyramid_settings[config_keys.store_path] store = utils.get_file_storage({config_keys.store_path: store_path}) fid = 'example.txt' filesystem_file = os.path.join(str(tmpdir), fid) with open(filesystem_file, 'wt') as f: f.write(content) with open(filesystem_file, 'rb') as f: store_uid, metadata = store.save_file(f, fid, extra_metadata={'filename': fid}) entry = FileStore.create( file_uid=store_uid, filename=metadata["filename"], file_hash=metadata["sha256"], file_size=metadata["size"], file_display_name='file_display_name', file_description='repo artifact `{}`'.format(metadata["filename"]), check_acl=True, user_id=user_id, ) Session().add(entry) Session().commit() return entry return factory def test_download_file_non_scoped(self, user_util, create_artifact_factory): user = self.log_user() user_id = user['user_id'] content = 'HELLO MY NAME IS ARTIFACT !' artifact = create_artifact_factory(user_id, content) file_uid = artifact.file_uid response = self.app.get(route_path('download_file', fid=file_uid), status=200) assert response.text == content # log-in to new user and test download again user = user_util.create_user(password='qweqwe') self.log_user(user.username, 'qweqwe') response = self.app.get(route_path('download_file', fid=file_uid), status=200) assert response.text == content def test_download_file_scoped_to_repo(self, user_util, create_artifact_factory): user = self.log_user() user_id = user['user_id'] content = 'HELLO MY NAME IS ARTIFACT !' artifact = create_artifact_factory(user_id, content) # bind to repo repo = user_util.create_repo() repo_id = repo.repo_id artifact.scope_repo_id = repo_id Session().add(artifact) Session().commit() file_uid = artifact.file_uid response = self.app.get(route_path('download_file', fid=file_uid), status=200) assert response.text == content # log-in to new user and test download again user = user_util.create_user(password='qweqwe') self.log_user(user.username, 'qweqwe') response = self.app.get(route_path('download_file', fid=file_uid), status=200) assert response.text == content # forbid user the rights to repo repo = Repository.get(repo_id) user_util.grant_user_permission_to_repo(repo, user, 'repository.none') self.app.get(route_path('download_file', fid=file_uid), status=404) def test_download_file_scoped_to_user(self, user_util, create_artifact_factory): user = self.log_user() user_id = user['user_id'] content = 'HELLO MY NAME IS ARTIFACT !' artifact = create_artifact_factory(user_id, content) # bind to user user = user_util.create_user(password='qweqwe') artifact.scope_user_id = user.user_id Session().add(artifact) Session().commit() # artifact creator doesn't have access since it's bind to another user file_uid = artifact.file_uid self.app.get(route_path('download_file', fid=file_uid), status=404) # log-in to new user and test download again, should be ok since we're bind to this artifact self.log_user(user.username, 'qweqwe') response = self.app.get(route_path('download_file', fid=file_uid), status=200) assert response.text == content def test_download_file_scoped_to_repo_with_bad_token(self, user_util, create_artifact_factory): user_id = User.get_first_super_admin().user_id content = 'HELLO MY NAME IS ARTIFACT !' artifact = create_artifact_factory(user_id, content) # bind to repo repo = user_util.create_repo() repo_id = repo.repo_id artifact.scope_repo_id = repo_id Session().add(artifact) Session().commit() file_uid = artifact.file_uid self.app.get(route_path('download_file_by_token', _auth_token='bogus', fid=file_uid), status=302) def test_download_file_scoped_to_repo_with_token(self, user_util, create_artifact_factory): user = User.get_first_super_admin() AuthTokenModel().create(user, 'test artifact token', role=AuthTokenModel.cls.ROLE_ARTIFACT_DOWNLOAD) user = User.get_first_super_admin() artifact_token = user.artifact_token user_id = User.get_first_super_admin().user_id content = 'HELLO MY NAME IS ARTIFACT !' artifact = create_artifact_factory(user_id, content) # bind to repo repo = user_util.create_repo() repo_id = repo.repo_id artifact.scope_repo_id = repo_id Session().add(artifact) Session().commit() file_uid = artifact.file_uid response = self.app.get( route_path('download_file_by_token', _auth_token=artifact_token, fid=file_uid), status=200) assert response.text == content