##// END OF EJS Templates
fix(caching): fixed problems with Cache query for users....
fix(caching): fixed problems with Cache query for users. The old way of querying caused the user get query to be always cached, and returning old results even in 2fa forms. The new limited query doesn't cache the user object resolving issues

File last commit:

r5173:95a4b30f default
r5365:ae8a165b default
Show More
test_upload_file.py
246 lines | 9.6 KiB | text/x-python | PythonLexer
# Copyright (C) 2010-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import os
import pytest
from rhodecode.lib.ext_json import json
from rhodecode.model.auth_token import AuthTokenModel
from rhodecode.model.db import Session, FileStore, Repository, User
from rhodecode.apps.file_store import utils, config_keys
from rhodecode.tests import TestController
from rhodecode.tests.routes import route_path
class TestFileStoreViews(TestController):
@pytest.mark.parametrize("fid, content, exists", [
('abcde-0.jpg', "xxxxx", True),
('abcde-0.exe', "1234567", True),
('abcde-0.jpg', "xxxxx", False),
])
def test_get_files_from_store(self, fid, content, exists, tmpdir, user_util):
user = self.log_user()
user_id = user['user_id']
repo_id = user_util.create_repo().repo_id
store_path = self.app._pyramid_settings[config_keys.store_path]
store_uid = fid
if exists:
status = 200
store = utils.get_file_storage({config_keys.store_path: store_path})
filesystem_file = os.path.join(str(tmpdir), fid)
with open(filesystem_file, 'wt') as f:
f.write(content)
with open(filesystem_file, 'rb') as f:
store_uid, metadata = store.save_file(f, fid, extra_metadata={'filename': fid})
entry = FileStore.create(
file_uid=store_uid, filename=metadata["filename"],
file_hash=metadata["sha256"], file_size=metadata["size"],
file_display_name='file_display_name',
file_description='repo artifact `{}`'.format(metadata["filename"]),
check_acl=True, user_id=user_id,
scope_repo_id=repo_id
)
Session().add(entry)
Session().commit()
else:
status = 404
response = self.app.get(route_path('download_file', fid=store_uid), status=status)
if exists:
assert response.text == content
file_store_path = os.path.dirname(store.resolve_name(store_uid, store_path)[1])
metadata_file = os.path.join(file_store_path, store_uid + '.meta')
assert os.path.exists(metadata_file)
with open(metadata_file, 'rb') as f:
json_data = json.loads(f.read())
assert json_data
assert 'size' in json_data
def test_upload_files_without_content_to_store(self):
self.log_user()
response = self.app.post(
route_path('upload_file'),
params={'csrf_token': self.csrf_token},
status=200)
assert response.json == {
'error': 'store_file data field is missing',
'access_path': None,
'store_fid': None}
def test_upload_files_bogus_content_to_store(self):
self.log_user()
response = self.app.post(
route_path('upload_file'),
params={'csrf_token': self.csrf_token, 'store_file': 'bogus'},
status=200)
assert response.json == {
'error': 'filename cannot be read from the data field',
'access_path': None,
'store_fid': None}
def test_upload_content_to_store(self):
self.log_user()
response = self.app.post(
route_path('upload_file'),
upload_files=[('store_file', b'myfile.txt', b'SOME CONTENT')],
params={'csrf_token': self.csrf_token},
status=200)
assert response.json['store_fid']
@pytest.fixture()
def create_artifact_factory(self, tmpdir):
def factory(user_id, content):
store_path = self.app._pyramid_settings[config_keys.store_path]
store = utils.get_file_storage({config_keys.store_path: store_path})
fid = 'example.txt'
filesystem_file = os.path.join(str(tmpdir), fid)
with open(filesystem_file, 'wt') as f:
f.write(content)
with open(filesystem_file, 'rb') as f:
store_uid, metadata = store.save_file(f, fid, extra_metadata={'filename': fid})
entry = FileStore.create(
file_uid=store_uid, filename=metadata["filename"],
file_hash=metadata["sha256"], file_size=metadata["size"],
file_display_name='file_display_name',
file_description='repo artifact `{}`'.format(metadata["filename"]),
check_acl=True, user_id=user_id,
)
Session().add(entry)
Session().commit()
return entry
return factory
def test_download_file_non_scoped(self, user_util, create_artifact_factory):
user = self.log_user()
user_id = user['user_id']
content = 'HELLO MY NAME IS ARTIFACT !'
artifact = create_artifact_factory(user_id, content)
file_uid = artifact.file_uid
response = self.app.get(route_path('download_file', fid=file_uid), status=200)
assert response.text == content
# log-in to new user and test download again
user = user_util.create_user(password='qweqwe')
self.log_user(user.username, 'qweqwe')
response = self.app.get(route_path('download_file', fid=file_uid), status=200)
assert response.text == content
def test_download_file_scoped_to_repo(self, user_util, create_artifact_factory):
user = self.log_user()
user_id = user['user_id']
content = 'HELLO MY NAME IS ARTIFACT !'
artifact = create_artifact_factory(user_id, content)
# bind to repo
repo = user_util.create_repo()
repo_id = repo.repo_id
artifact.scope_repo_id = repo_id
Session().add(artifact)
Session().commit()
file_uid = artifact.file_uid
response = self.app.get(route_path('download_file', fid=file_uid), status=200)
assert response.text == content
# log-in to new user and test download again
user = user_util.create_user(password='qweqwe')
self.log_user(user.username, 'qweqwe')
response = self.app.get(route_path('download_file', fid=file_uid), status=200)
assert response.text == content
# forbid user the rights to repo
repo = Repository.get(repo_id)
user_util.grant_user_permission_to_repo(repo, user, 'repository.none')
self.app.get(route_path('download_file', fid=file_uid), status=404)
def test_download_file_scoped_to_user(self, user_util, create_artifact_factory):
user = self.log_user()
user_id = user['user_id']
content = 'HELLO MY NAME IS ARTIFACT !'
artifact = create_artifact_factory(user_id, content)
# bind to user
user = user_util.create_user(password='qweqwe')
artifact.scope_user_id = user.user_id
Session().add(artifact)
Session().commit()
# artifact creator doesn't have access since it's bind to another user
file_uid = artifact.file_uid
self.app.get(route_path('download_file', fid=file_uid), status=404)
# log-in to new user and test download again, should be ok since we're bind to this artifact
self.log_user(user.username, 'qweqwe')
response = self.app.get(route_path('download_file', fid=file_uid), status=200)
assert response.text == content
def test_download_file_scoped_to_repo_with_bad_token(self, user_util, create_artifact_factory):
user_id = User.get_first_super_admin().user_id
content = 'HELLO MY NAME IS ARTIFACT !'
artifact = create_artifact_factory(user_id, content)
# bind to repo
repo = user_util.create_repo()
repo_id = repo.repo_id
artifact.scope_repo_id = repo_id
Session().add(artifact)
Session().commit()
file_uid = artifact.file_uid
self.app.get(route_path('download_file_by_token',
_auth_token='bogus', fid=file_uid), status=302)
def test_download_file_scoped_to_repo_with_token(self, user_util, create_artifact_factory):
user = User.get_first_super_admin()
AuthTokenModel().create(user, 'test artifact token',
role=AuthTokenModel.cls.ROLE_ARTIFACT_DOWNLOAD)
user = User.get_first_super_admin()
artifact_token = user.artifact_token
user_id = User.get_first_super_admin().user_id
content = 'HELLO MY NAME IS ARTIFACT !'
artifact = create_artifact_factory(user_id, content)
# bind to repo
repo = user_util.create_repo()
repo_id = repo.repo_id
artifact.scope_repo_id = repo_id
Session().add(artifact)
Session().commit()
file_uid = artifact.file_uid
response = self.app.get(
route_path('download_file_by_token',
_auth_token=artifact_token, fid=file_uid), status=200)
assert response.text == content