utils.py
114 lines
| 3.8 KiB
| text/x-python
|
PythonLexer
r5608 | # Copyright (C) 2010-2024 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
Utilities for tests only. These are not or should not be used normally - | ||||
functions here are crafted as we don't want to use ``vcs`` to verify tests. | ||||
""" | ||||
import os | ||||
import re | ||||
import sys | ||||
r4926 | from subprocess import Popen | |||
r1 | ||||
class VCSTestError(Exception): | ||||
pass | ||||
def run_command(cmd, args): | ||||
""" | ||||
Runs command on the system with given ``args``. | ||||
""" | ||||
r5607 | command = " ".join((cmd, args)) | |||
r1 | p = Popen(command, shell=True) | |||
status = os.waitpid(p.pid, 0)[1] | ||||
return status | ||||
def eprint(msg): | ||||
""" | ||||
Prints given ``msg`` into sys.stderr as nose test runner hides all output | ||||
from sys.stdout by default and if we want to pipe stream somewhere we don't | ||||
need those verbose messages anyway. | ||||
Appends line break. | ||||
""" | ||||
sys.stderr.write(msg) | ||||
r5607 | sys.stderr.write("\n") | |||
r1 | ||||
# TODO: Revisit once we have CI running, if this is not helping us, remove it | ||||
class SCMFetcher(object): | ||||
def __init__(self, alias, test_repo_path): | ||||
""" | ||||
:param clone_cmd: command which would clone remote repository; pass | ||||
only first bits - remote path and destination would be appended | ||||
using ``remote_repo`` and ``test_repo_path`` | ||||
""" | ||||
self.alias = alias | ||||
self.test_repo_path = test_repo_path | ||||
r5087 | def setup_method(self): | |||
r1 | if not os.path.isdir(self.test_repo_path): | |||
self.fetch_repo() | ||||
def fetch_repo(self): | ||||
""" | ||||
Tries to fetch repository from remote path. | ||||
""" | ||||
remote = self.remote_repo | ||||
r5607 | eprint("Fetching repository %s into %s" % (remote, self.test_repo_path)) | |||
run_command(self.clone_cmd, "%s %s" % (remote, self.test_repo_path)) | ||||
r1 | ||||
def get_normalized_path(path): | ||||
""" | ||||
If given path exists, new path would be generated and returned. Otherwise | ||||
same whats given is returned. Assumes that there would be no more than | ||||
10000 same named files. | ||||
""" | ||||
if os.path.exists(path): | ||||
dir, basename = os.path.split(path) | ||||
r5607 | splitted_name = basename.split(".") | |||
r1 | if len(splitted_name) > 1: | |||
ext = splitted_name[-1] | ||||
else: | ||||
ext = None | ||||
r5607 | name = ".".join(splitted_name[:-1]) | |||
matcher = re.compile(r"^.*-(\d{5})$") | ||||
r1 | start = 0 | |||
m = matcher.match(name) | ||||
if not m: | ||||
# Haven't append number yet so return first | ||||
r5607 | newname = f"{name}-00000" | |||
r1 | newpath = os.path.join(dir, newname) | |||
if ext: | ||||
r5607 | newpath = ".".join((newpath, ext)) | |||
r1 | return get_normalized_path(newpath) | |||
else: | ||||
start = int(m.group(1)[-5:]) + 1 | ||||
r4906 | for x in range(start, 10000): | |||
r5607 | newname = name[:-5] + str(x).rjust(5, "0") | |||
r1 | newpath = os.path.join(dir, newname) | |||
if ext: | ||||
r5607 | newpath = ".".join((newpath, ext)) | |||
r1 | if not os.path.exists(newpath): | |||
return newpath | ||||
raise VCSTestError("Couldn't compute new path for %s" % path) | ||||
return path | ||||