# -*- coding: utf-8 -*-
# Copyright (C) 2010-2020 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import datetime
import os
import shutil
import tarfile
import tempfile
import zipfile
import StringIO
import mock
import pytest
from rhodecode.lib.vcs.backends import base
from rhodecode.lib.vcs.exceptions import ImproperArchiveTypeError, VCSError
from rhodecode.lib.vcs.nodes import FileNode
from rhodecode.tests.vcs.conftest import BackendTestMixin
@pytest.mark.usefixtures("vcs_repository_support")
class TestArchives(BackendTestMixin):
@pytest.fixture(autouse=True)
def tempfile(self, request):
self.temp_file = tempfile.mkstemp()[1]
@request.addfinalizer
def cleanup():
os.remove(self.temp_file)
@classmethod
def _get_commits(cls):
start_date = datetime.datetime(2010, 1, 1, 20)
yield {
'message': 'Initial Commit',
'author': 'Joe Doe ',
'date': start_date + datetime.timedelta(hours=12),
'added': [
FileNode('executable_0o100755', '...', mode=0o100755),
FileNode('executable_0o100500', '...', mode=0o100500),
FileNode('not_executable', '...', mode=0o100644),
],
}
for x in range(5):
yield {
'message': 'Commit %d' % x,
'author': 'Joe Doe ',
'date': start_date + datetime.timedelta(hours=12 * x),
'added': [
FileNode('%d/file_%d.txt' % (x, x), content='Foobar %d' % x),
],
}
@pytest.mark.parametrize('compressor', ['gz', 'bz2'])
def test_archive_tar(self, compressor):
self.tip.archive_repo(
self.temp_file, kind='t{}'.format(compressor), archive_dir_name='repo')
out_dir = tempfile.mkdtemp()
out_file = tarfile.open(self.temp_file, 'r|{}'.format(compressor))
out_file.extractall(out_dir)
out_file.close()
for x in range(5):
node_path = '%d/file_%d.txt' % (x, x)
with open(os.path.join(out_dir, 'repo/' + node_path)) as f:
file_content = f.read()
assert file_content == self.tip.get_node(node_path).content
shutil.rmtree(out_dir)
@pytest.mark.parametrize('compressor', ['gz', 'bz2'])
def test_archive_tar_symlink(self, compressor):
return False
@pytest.mark.parametrize('compressor', ['gz', 'bz2'])
def test_archive_tar_file_modes(self, compressor):
self.tip.archive_repo(
self.temp_file, kind='t{}'.format(compressor), archive_dir_name='repo')
out_dir = tempfile.mkdtemp()
out_file = tarfile.open(self.temp_file, 'r|{}'.format(compressor))
out_file.extractall(out_dir)
out_file.close()
dest = lambda inp: os.path.join(out_dir, 'repo/' + inp)
assert oct(os.stat(dest('not_executable')).st_mode) == '0100644'
def test_archive_zip(self):
self.tip.archive_repo(self.temp_file, kind='zip', archive_dir_name='repo')
out = zipfile.ZipFile(self.temp_file)
for x in range(5):
node_path = '%d/file_%d.txt' % (x, x)
decompressed = StringIO.StringIO()
decompressed.write(out.read('repo/' + node_path))
assert decompressed.getvalue() == \
self.tip.get_node(node_path).content
decompressed.close()
def test_archive_zip_with_metadata(self):
self.tip.archive_repo(self.temp_file, kind='zip',
archive_dir_name='repo', write_metadata=True)
out = zipfile.ZipFile(self.temp_file)
metafile = out.read('repo/.archival.txt')
raw_id = self.tip.raw_id
assert 'commit_id:%s' % raw_id in metafile
for x in range(5):
node_path = '%d/file_%d.txt' % (x, x)
decompressed = StringIO.StringIO()
decompressed.write(out.read('repo/' + node_path))
assert decompressed.getvalue() == \
self.tip.get_node(node_path).content
decompressed.close()
def test_archive_wrong_kind(self):
with pytest.raises(ImproperArchiveTypeError):
self.tip.archive_repo(self.temp_file, kind='wrong kind')
@pytest.fixture()
def base_commit():
"""
Prepare a `base.BaseCommit` just enough for `_validate_archive_prefix`.
"""
commit = base.BaseCommit()
commit.repository = mock.Mock()
commit.repository.name = u'fake_repo'
commit.short_id = 'fake_id'
return commit
@pytest.mark.parametrize("prefix", [u"unicode-prefix", u"Ünïcödë"])
def test_validate_archive_prefix_enforces_bytes_as_prefix(prefix, base_commit):
with pytest.raises(ValueError):
base_commit._validate_archive_prefix(prefix)
def test_validate_archive_prefix_empty_prefix(base_commit):
# TODO: johbo: Should raise a ValueError here.
with pytest.raises(VCSError):
base_commit._validate_archive_prefix('')
def test_validate_archive_prefix_with_leading_slash(base_commit):
# TODO: johbo: Should raise a ValueError here.
with pytest.raises(VCSError):
base_commit._validate_archive_prefix('/any')
def test_validate_archive_prefix_falls_back_to_repository_name(base_commit):
prefix = base_commit._validate_archive_prefix(None)
expected_prefix = base_commit._ARCHIVE_PREFIX_TEMPLATE.format(
repo_name='fake_repo',
short_id='fake_id')
assert isinstance(prefix, str)
assert prefix == expected_prefix