|
|
"""
|
|
|
Utilities for tests only. These are not or should not be used normally -
|
|
|
functions here are crafted as we don't want to use ``vcs`` to verify tests.
|
|
|
"""
|
|
|
import os
|
|
|
import re
|
|
|
import sys
|
|
|
|
|
|
from subprocess import Popen
|
|
|
|
|
|
|
|
|
class VCSTestError(Exception):
|
|
|
pass
|
|
|
|
|
|
|
|
|
def run_command(cmd, args):
|
|
|
"""
|
|
|
Runs command on the system with given ``args``.
|
|
|
"""
|
|
|
command = ' '.join((cmd, args))
|
|
|
p = Popen(command, shell=True)
|
|
|
status = os.waitpid(p.pid, 0)[1]
|
|
|
return status
|
|
|
|
|
|
|
|
|
def eprint(msg):
|
|
|
"""
|
|
|
Prints given ``msg`` into sys.stderr as nose test runner hides all output
|
|
|
from sys.stdout by default and if we want to pipe stream somewhere we don't
|
|
|
need those verbose messages anyway.
|
|
|
Appends line break.
|
|
|
"""
|
|
|
sys.stderr.write(msg)
|
|
|
sys.stderr.write('\n')
|
|
|
|
|
|
|
|
|
class SCMFetcher(object):
|
|
|
|
|
|
def __init__(self, alias, test_repo_path, remote_repo, clone_cmd):
|
|
|
"""
|
|
|
:param clone_cmd: command which would clone remote repository; pass
|
|
|
only first bits - remote path and destination would be appended
|
|
|
using ``remote_repo`` and ``test_repo_path``
|
|
|
"""
|
|
|
self.alias = alias
|
|
|
self.test_repo_path = test_repo_path
|
|
|
self.remote_repo = remote_repo
|
|
|
self.clone_cmd = clone_cmd
|
|
|
|
|
|
def setup(self):
|
|
|
if not os.path.isdir(self.test_repo_path):
|
|
|
self.fetch_repo()
|
|
|
|
|
|
def fetch_repo(self):
|
|
|
"""
|
|
|
Tries to fetch repository from remote path.
|
|
|
"""
|
|
|
remote = self.remote_repo
|
|
|
eprint("Fetching repository %s into %s" % (remote, self.test_repo_path))
|
|
|
run_command(self.clone_cmd, '%s %s' % (remote, self.test_repo_path))
|
|
|
|
|
|
|
|
|
def get_normalized_path(path):
|
|
|
"""
|
|
|
If given path exists, new path would be generated and returned. Otherwise
|
|
|
same whats given is returned. Assumes that there would be no more than
|
|
|
10000 same named files.
|
|
|
"""
|
|
|
if os.path.exists(path):
|
|
|
dir, basename = os.path.split(path)
|
|
|
splitted_name = basename.split('.')
|
|
|
if len(splitted_name) > 1:
|
|
|
ext = splitted_name[-1]
|
|
|
else:
|
|
|
ext = None
|
|
|
name = '.'.join(splitted_name[:-1])
|
|
|
matcher = re.compile(r'^.*-(\d{5})$')
|
|
|
start = 0
|
|
|
m = matcher.match(name)
|
|
|
if not m:
|
|
|
# Haven't append number yet so return first
|
|
|
newname = '%s-00000' % name
|
|
|
newpath = os.path.join(dir, newname)
|
|
|
if ext:
|
|
|
newpath = '.'.join((newpath, ext))
|
|
|
return get_normalized_path(newpath)
|
|
|
else:
|
|
|
start = int(m.group(1)[-5:]) + 1
|
|
|
for x in xrange(start, 10000):
|
|
|
newname = name[:-5] + str(x).rjust(5, '0')
|
|
|
newpath = os.path.join(dir, newname)
|
|
|
if ext:
|
|
|
newpath = '.'.join((newpath, ext))
|
|
|
if not os.path.exists(newpath):
|
|
|
return newpath
|
|
|
raise VCSTestError("Couldn't compute new path for %s" % path)
|
|
|
return path
|
|
|
|