##// END OF EJS Templates
fix(celery): signal startup failure with sys.exit to recycle celery worker on errors
fix(celery): signal startup failure with sys.exit to recycle celery worker on errors

File last commit:

r5173:95a4b30f default
r5408:fcfc4234 v5.0.1 stable
Show More
test_upload_file.py
246 lines | 9.6 KiB | text/x-python | PythonLexer
# Copyright (C) 2010-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import os
import pytest
from rhodecode.lib.ext_json import json
from rhodecode.model.auth_token import AuthTokenModel
from rhodecode.model.db import Session, FileStore, Repository, User
from rhodecode.apps.file_store import utils, config_keys
from rhodecode.tests import TestController
from rhodecode.tests.routes import route_path
class TestFileStoreViews(TestController):
@pytest.mark.parametrize("fid, content, exists", [
('abcde-0.jpg', "xxxxx", True),
('abcde-0.exe', "1234567", True),
('abcde-0.jpg', "xxxxx", False),
])
def test_get_files_from_store(self, fid, content, exists, tmpdir, user_util):
user = self.log_user()
user_id = user['user_id']
repo_id = user_util.create_repo().repo_id
store_path = self.app._pyramid_settings[config_keys.store_path]
store_uid = fid
if exists:
status = 200
store = utils.get_file_storage({config_keys.store_path: store_path})
filesystem_file = os.path.join(str(tmpdir), fid)
with open(filesystem_file, 'wt') as f:
f.write(content)
with open(filesystem_file, 'rb') as f:
store_uid, metadata = store.save_file(f, fid, extra_metadata={'filename': fid})
entry = FileStore.create(
file_uid=store_uid, filename=metadata["filename"],
file_hash=metadata["sha256"], file_size=metadata["size"],
file_display_name='file_display_name',
file_description='repo artifact `{}`'.format(metadata["filename"]),
check_acl=True, user_id=user_id,
scope_repo_id=repo_id
)
Session().add(entry)
Session().commit()
else:
status = 404
response = self.app.get(route_path('download_file', fid=store_uid), status=status)
if exists:
assert response.text == content
file_store_path = os.path.dirname(store.resolve_name(store_uid, store_path)[1])
metadata_file = os.path.join(file_store_path, store_uid + '.meta')
assert os.path.exists(metadata_file)
with open(metadata_file, 'rb') as f:
json_data = json.loads(f.read())
assert json_data
assert 'size' in json_data
def test_upload_files_without_content_to_store(self):
self.log_user()
response = self.app.post(
route_path('upload_file'),
params={'csrf_token': self.csrf_token},
status=200)
assert response.json == {
'error': 'store_file data field is missing',
'access_path': None,
'store_fid': None}
def test_upload_files_bogus_content_to_store(self):
self.log_user()
response = self.app.post(
route_path('upload_file'),
params={'csrf_token': self.csrf_token, 'store_file': 'bogus'},
status=200)
assert response.json == {
'error': 'filename cannot be read from the data field',
'access_path': None,
'store_fid': None}
def test_upload_content_to_store(self):
self.log_user()
response = self.app.post(
route_path('upload_file'),
upload_files=[('store_file', b'myfile.txt', b'SOME CONTENT')],
params={'csrf_token': self.csrf_token},
status=200)
assert response.json['store_fid']
@pytest.fixture()
def create_artifact_factory(self, tmpdir):
def factory(user_id, content):
store_path = self.app._pyramid_settings[config_keys.store_path]
store = utils.get_file_storage({config_keys.store_path: store_path})
fid = 'example.txt'
filesystem_file = os.path.join(str(tmpdir), fid)
with open(filesystem_file, 'wt') as f:
f.write(content)
with open(filesystem_file, 'rb') as f:
store_uid, metadata = store.save_file(f, fid, extra_metadata={'filename': fid})
entry = FileStore.create(
file_uid=store_uid, filename=metadata["filename"],
file_hash=metadata["sha256"], file_size=metadata["size"],
file_display_name='file_display_name',
file_description='repo artifact `{}`'.format(metadata["filename"]),
check_acl=True, user_id=user_id,
)
Session().add(entry)
Session().commit()
return entry
return factory
def test_download_file_non_scoped(self, user_util, create_artifact_factory):
user = self.log_user()
user_id = user['user_id']
content = 'HELLO MY NAME IS ARTIFACT !'
artifact = create_artifact_factory(user_id, content)
file_uid = artifact.file_uid
response = self.app.get(route_path('download_file', fid=file_uid), status=200)
assert response.text == content
# log-in to new user and test download again
user = user_util.create_user(password='qweqwe')
self.log_user(user.username, 'qweqwe')
response = self.app.get(route_path('download_file', fid=file_uid), status=200)
assert response.text == content
def test_download_file_scoped_to_repo(self, user_util, create_artifact_factory):
user = self.log_user()
user_id = user['user_id']
content = 'HELLO MY NAME IS ARTIFACT !'
artifact = create_artifact_factory(user_id, content)
# bind to repo
repo = user_util.create_repo()
repo_id = repo.repo_id
artifact.scope_repo_id = repo_id
Session().add(artifact)
Session().commit()
file_uid = artifact.file_uid
response = self.app.get(route_path('download_file', fid=file_uid), status=200)
assert response.text == content
# log-in to new user and test download again
user = user_util.create_user(password='qweqwe')
self.log_user(user.username, 'qweqwe')
response = self.app.get(route_path('download_file', fid=file_uid), status=200)
assert response.text == content
# forbid user the rights to repo
repo = Repository.get(repo_id)
user_util.grant_user_permission_to_repo(repo, user, 'repository.none')
self.app.get(route_path('download_file', fid=file_uid), status=404)
def test_download_file_scoped_to_user(self, user_util, create_artifact_factory):
user = self.log_user()
user_id = user['user_id']
content = 'HELLO MY NAME IS ARTIFACT !'
artifact = create_artifact_factory(user_id, content)
# bind to user
user = user_util.create_user(password='qweqwe')
artifact.scope_user_id = user.user_id
Session().add(artifact)
Session().commit()
# artifact creator doesn't have access since it's bind to another user
file_uid = artifact.file_uid
self.app.get(route_path('download_file', fid=file_uid), status=404)
# log-in to new user and test download again, should be ok since we're bind to this artifact
self.log_user(user.username, 'qweqwe')
response = self.app.get(route_path('download_file', fid=file_uid), status=200)
assert response.text == content
def test_download_file_scoped_to_repo_with_bad_token(self, user_util, create_artifact_factory):
user_id = User.get_first_super_admin().user_id
content = 'HELLO MY NAME IS ARTIFACT !'
artifact = create_artifact_factory(user_id, content)
# bind to repo
repo = user_util.create_repo()
repo_id = repo.repo_id
artifact.scope_repo_id = repo_id
Session().add(artifact)
Session().commit()
file_uid = artifact.file_uid
self.app.get(route_path('download_file_by_token',
_auth_token='bogus', fid=file_uid), status=302)
def test_download_file_scoped_to_repo_with_token(self, user_util, create_artifact_factory):
user = User.get_first_super_admin()
AuthTokenModel().create(user, 'test artifact token',
role=AuthTokenModel.cls.ROLE_ARTIFACT_DOWNLOAD)
user = User.get_first_super_admin()
artifact_token = user.artifact_token
user_id = User.get_first_super_admin().user_id
content = 'HELLO MY NAME IS ARTIFACT !'
artifact = create_artifact_factory(user_id, content)
# bind to repo
repo = user_util.create_repo()
repo_id = repo.repo_id
artifact.scope_repo_id = repo_id
Session().add(artifact)
Session().commit()
file_uid = artifact.file_uid
response = self.app.get(
route_path('download_file_by_token',
_auth_token=artifact_token, fid=file_uid), status=200)
assert response.text == content