##// END OF EJS Templates
deps: bumped zope.interface==7.1.1
deps: bumped zope.interface==7.1.1

File last commit:

r5516:3496180b default
r5595:e7e5db4e default
Show More
test_filestore_backends.py
128 lines | 5.9 KiB | text/x-python | PythonLexer
/ rhodecode / apps / file_store / tests / test_filestore_backends.py
# Copyright (C) 2010-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import pytest
from rhodecode.apps import file_store
from rhodecode.apps.file_store import config_keys
from rhodecode.apps.file_store.backends.filesystem_legacy import LegacyFileSystemBackend
from rhodecode.apps.file_store.backends.filesystem import FileSystemBackend
from rhodecode.apps.file_store.backends.objectstore import ObjectStoreBackend
from rhodecode.apps.file_store.exceptions import FileNotAllowedException, FileOverSizeException
from rhodecode.apps.file_store import utils as store_utils
from rhodecode.apps.file_store.tests import random_binary_file, file_store_instance
class TestFileStoreBackends:
@pytest.mark.parametrize('backend_type, expected_instance', [
(config_keys.backend_legacy_filesystem, LegacyFileSystemBackend),
(config_keys.backend_filesystem, FileSystemBackend),
(config_keys.backend_objectstore, ObjectStoreBackend),
])
def test_get_backend(self, backend_type, expected_instance, ini_settings):
config = ini_settings
config[config_keys.backend_type] = backend_type
f_store = store_utils.get_filestore_backend(config=config, always_init=True)
assert isinstance(f_store, expected_instance)
@pytest.mark.parametrize('backend_type, expected_instance', [
(config_keys.backend_legacy_filesystem, LegacyFileSystemBackend),
(config_keys.backend_filesystem, FileSystemBackend),
(config_keys.backend_objectstore, ObjectStoreBackend),
])
def test_store_and_read(self, backend_type, expected_instance, ini_settings, random_binary_file):
filename, temp_file = random_binary_file
config = ini_settings
config[config_keys.backend_type] = backend_type
f_store = store_utils.get_filestore_backend(config=config, always_init=True)
metadata = {
'user_uploaded': {
'username': 'user1',
'user_id': 10,
'ip': '10.20.30.40'
}
}
store_fid, metadata = f_store.store(filename, temp_file, extra_metadata=metadata)
assert store_fid
assert metadata
# read-after write
reader, metadata2 = f_store.fetch(store_fid)
assert reader
assert metadata2['filename'] == filename
@pytest.mark.parametrize('backend_type, expected_instance', [
(config_keys.backend_legacy_filesystem, LegacyFileSystemBackend),
(config_keys.backend_filesystem, FileSystemBackend),
(config_keys.backend_objectstore, ObjectStoreBackend),
])
def test_store_file_not_allowed(self, backend_type, expected_instance, ini_settings, random_binary_file):
filename, temp_file = random_binary_file
config = ini_settings
config[config_keys.backend_type] = backend_type
f_store = store_utils.get_filestore_backend(config=config, always_init=True)
with pytest.raises(FileNotAllowedException):
f_store.store('notallowed.exe', temp_file, extensions=['.txt'])
@pytest.mark.parametrize('backend_type, expected_instance', [
(config_keys.backend_legacy_filesystem, LegacyFileSystemBackend),
(config_keys.backend_filesystem, FileSystemBackend),
(config_keys.backend_objectstore, ObjectStoreBackend),
])
def test_store_file_over_size(self, backend_type, expected_instance, ini_settings, random_binary_file):
filename, temp_file = random_binary_file
config = ini_settings
config[config_keys.backend_type] = backend_type
f_store = store_utils.get_filestore_backend(config=config, always_init=True)
with pytest.raises(FileOverSizeException):
f_store.store('toobig.exe', temp_file, extensions=['.exe'], max_filesize=124)
@pytest.mark.parametrize('backend_type, expected_instance, extra_conf', [
(config_keys.backend_legacy_filesystem, LegacyFileSystemBackend, {}),
(config_keys.backend_filesystem, FileSystemBackend, {config_keys.filesystem_storage_path: '/tmp/test-fs-store'}),
(config_keys.backend_objectstore, ObjectStoreBackend, {config_keys.objectstore_bucket: 'test-bucket'}),
])
def test_store_stats_and_keys(self, backend_type, expected_instance, extra_conf, ini_settings, random_binary_file):
config = ini_settings
config[config_keys.backend_type] = backend_type
config.update(extra_conf)
f_store = store_utils.get_filestore_backend(config=config, always_init=True)
# purge storage before running
for shard, k in f_store.iter_artifacts():
f_store.delete(k)
for i in range(10):
filename, temp_file = random_binary_file
metadata = {
'user_uploaded': {
'username': 'user1',
'user_id': 10,
'ip': '10.20.30.40'
}
}
store_fid, metadata = f_store.store(filename, temp_file, extra_metadata=metadata)
assert store_fid
assert metadata
cnt, size, meta = f_store.get_statistics()
assert cnt == 10
assert 10 == len(list(f_store.iter_keys()))