Show More
utils.py
97 lines
| 2.9 KiB
| text/x-python
|
PythonLexer
r2451 | """ | |||
Utilities for tests only. These are not or should not be used normally - | ||||
functions here are crafted as we don't want to use ``vcs`` to verify tests. | ||||
""" | ||||
import os | ||||
import re | ||||
import sys | ||||
from subprocess import Popen | ||||
class VCSTestError(Exception): | ||||
pass | ||||
def run_command(cmd, args): | ||||
""" | ||||
Runs command on the system with given ``args``. | ||||
""" | ||||
command = ' '.join((cmd, args)) | ||||
p = Popen(command, shell=True) | ||||
status = os.waitpid(p.pid, 0)[1] | ||||
return status | ||||
def eprint(msg): | ||||
""" | ||||
Prints given ``msg`` into sys.stderr as nose test runner hides all output | ||||
from sys.stdout by default and if we want to pipe stream somewhere we don't | ||||
need those verbose messages anyway. | ||||
Appends line break. | ||||
""" | ||||
sys.stderr.write(msg) | ||||
sys.stderr.write('\n') | ||||
class SCMFetcher(object): | ||||
def __init__(self, alias, test_repo_path, remote_repo, clone_cmd): | ||||
""" | ||||
:param clone_cmd: command which would clone remote repository; pass | ||||
only first bits - remote path and destination would be appended | ||||
using ``remote_repo`` and ``test_repo_path`` | ||||
""" | ||||
self.alias = alias | ||||
self.test_repo_path = test_repo_path | ||||
self.remote_repo = remote_repo | ||||
self.clone_cmd = clone_cmd | ||||
def setup(self): | ||||
if not os.path.isdir(self.test_repo_path): | ||||
self.fetch_repo() | ||||
def fetch_repo(self): | ||||
""" | ||||
Tries to fetch repository from remote path. | ||||
""" | ||||
remote = self.remote_repo | ||||
eprint("Fetching repository %s into %s" % (remote, self.test_repo_path)) | ||||
run_command(self.clone_cmd, '%s %s' % (remote, self.test_repo_path)) | ||||
def get_normalized_path(path): | ||||
""" | ||||
If given path exists, new path would be generated and returned. Otherwise | ||||
same whats given is returned. Assumes that there would be no more than | ||||
10000 same named files. | ||||
""" | ||||
if os.path.exists(path): | ||||
dir, basename = os.path.split(path) | ||||
splitted_name = basename.split('.') | ||||
if len(splitted_name) > 1: | ||||
ext = splitted_name[-1] | ||||
else: | ||||
ext = None | ||||
name = '.'.join(splitted_name[:-1]) | ||||
matcher = re.compile(r'^.*-(\d{5})$') | ||||
start = 0 | ||||
m = matcher.match(name) | ||||
if not m: | ||||
# Haven't append number yet so return first | ||||
newname = '%s-00000' % name | ||||
newpath = os.path.join(dir, newname) | ||||
if ext: | ||||
newpath = '.'.join((newpath, ext)) | ||||
return get_normalized_path(newpath) | ||||
else: | ||||
start = int(m.group(1)[-5:]) + 1 | ||||
for x in xrange(start, 10000): | ||||
newname = name[:-5] + str(x).rjust(5, '0') | ||||
newpath = os.path.join(dir, newname) | ||||
if ext: | ||||
newpath = '.'.join((newpath, ext)) | ||||
if not os.path.exists(newpath): | ||||
return newpath | ||||
raise VCSTestError("Couldn't compute new path for %s" % path) | ||||
return path | ||||