# HG changeset patch # User Marcin Kuzminski # Date 2012-06-13 21:27:33 # Node ID 402a96fcfa22f990c77e0d08c31c4bf2ca95f72b # Parent 26193dba1f0e4eb3ae1792586365bfd119260b51 Added vcs testsuite for better integration tests + added fetching of two new repos into test env for rhodecode diff --git a/rhodecode/lib/utils.py b/rhodecode/lib/utils.py --- a/rhodecode/lib/utils.py +++ b/rhodecode/lib/utils.py @@ -599,6 +599,10 @@ def create_test_env(repos_test_path, con tar.extractall(jn(TESTS_TMP_PATH, HG_REPO)) tar.close() + #LOAD VCS test stuff + from rhodecode.tests.vcs import setup_package + setup_package() + #============================================================================== # PASTER COMMANDS diff --git a/rhodecode/lib/vcs/backends/hg/inmemory.py b/rhodecode/lib/vcs/backends/hg/inmemory.py --- a/rhodecode/lib/vcs/backends/hg/inmemory.py +++ b/rhodecode/lib/vcs/backends/hg/inmemory.py @@ -32,7 +32,8 @@ class MercurialInMemoryChangeset(BaseInM from .repository import MercurialRepository if not isinstance(message, unicode) or not isinstance(author, unicode): raise RepositoryError('Given message and author needs to be ' - 'an instance') + 'an instance got %r & %r instead' + % (type(message), type(author))) if branch is None: branch = MercurialRepository.DEFAULT_BRANCH_NAME diff --git a/rhodecode/lib/vcs/nodes.py b/rhodecode/lib/vcs/nodes.py --- a/rhodecode/lib/vcs/nodes.py +++ b/rhodecode/lib/vcs/nodes.py @@ -422,7 +422,7 @@ class FileNode(Node): def __repr__(self): return '<%s %r @ %s>' % (self.__class__.__name__, self.path, - self.changeset.short_id) + getattr(self.changeset, 'short_id', '')) class RemovedFileNode(FileNode): @@ -557,7 +557,7 @@ class DirNode(Node): def __repr__(self): return '<%s %r @ %s>' % (self.__class__.__name__, self.path, - self.changeset.short_id) + getattr(self.changeset, 'short_id', '')) class RootNode(DirNode): @@ -591,7 +591,7 @@ class SubModuleNode(Node): def __repr__(self): return '<%s %r @ %s>' % (self.__class__.__name__, self.path, - self.changeset.short_id) + getattr(self.changeset, 'short_id', '')) def _extract_submodule_url(self): if self.alias == 'git': diff --git a/rhodecode/tests/__init__.py b/rhodecode/tests/__init__.py --- a/rhodecode/tests/__init__.py +++ b/rhodecode/tests/__init__.py @@ -10,6 +10,9 @@ setup-app`) and provides the base testin import os import time import logging +import datetime +import hashlib +import tempfile from os.path import join as jn from unittest import TestCase @@ -27,6 +30,7 @@ from rhodecode.model.db import User import pylons.test + os.environ['TZ'] = 'UTC' if not is_windows: time.tzset() @@ -36,9 +40,11 @@ log = logging.getLogger(__name__) __all__ = [ 'environ', 'url', 'TestController', 'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO', 'HG_FORK', 'GIT_FORK', - 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS', - 'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN', - 'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL' + 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN', + 'TEST_USER_REGULAR_PASS', 'TEST_USER_REGULAR_EMAIL', + 'TEST_USER_REGULAR2_LOGIN', 'TEST_USER_REGULAR2_PASS', + 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO', 'TEST_GIT_REPO', + 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO', 'SCM_TESTS', ] # Invoke websetup with the current config file @@ -73,6 +79,46 @@ NEW_GIT_REPO = 'vcs_test_git_new' HG_FORK = 'vcs_test_hg_fork' GIT_FORK = 'vcs_test_git_fork' +## VCS +SCM_TESTS = ['hg', 'git'] +uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple()))) + +THIS = os.path.abspath(os.path.dirname(__file__)) + +GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git' +TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO) +TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix) +TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix) + + +HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs' +TEST_HG_REPO = jn(TESTS_TMP_PATH, 'vcs-hg') +TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix) +TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix) + +TEST_DIR = tempfile.gettempdir() +TEST_REPO_PREFIX = 'vcs-test' + + +def get_new_dir(title): + """ + Returns always new directory path. + """ + from rhodecode.tests.vcs.utils import get_normalized_path + name = TEST_REPO_PREFIX + if title: + name = '-'.join((name, title)) + hex = hashlib.sha1(str(time.time())).hexdigest() + name = '-'.join((name, hex)) + path = os.path.join(TEST_DIR, name) + return get_normalized_path(path) + + +PACKAGE_DIR = os.path.abspath(os.path.join( + os.path.dirname(__file__), '..')) + +TEST_USER_CONFIG_FILE = jn(THIS, 'aconfig') + class TestController(TestCase): @@ -90,8 +136,8 @@ class TestController(TestCase): password=TEST_USER_ADMIN_PASS): self._logged_username = username response = self.app.post(url(controller='login', action='index'), - {'username':username, - 'password':password}) + {'username': username, + 'password': password}) if 'invalid user name' in response.body: self.fail('could not login using %s %s' % (username, password)) diff --git a/rhodecode/tests/functional/test_files.py b/rhodecode/tests/functional/test_files.py --- a/rhodecode/tests/functional/test_files.py +++ b/rhodecode/tests/functional/test_files.py @@ -186,6 +186,14 @@ class TestFilesController(TestController response.mustcontain("""branch: default""") + def test_file_annotation_git(self): + self.log_user() + response = self.app.get(url(controller='files', action='index', + repo_name=GIT_REPO, + revision='master', + f_path='vcs/nodes.py', + annotate=True)) + def test_archival(self): self.log_user() diff --git a/rhodecode/tests/functional/test_search.py b/rhodecode/tests/functional/test_search.py --- a/rhodecode/tests/functional/test_search.py +++ b/rhodecode/tests/functional/test_search.py @@ -2,6 +2,7 @@ from rhodecode.tests import * import os from nose.plugins.skip import SkipTest + class TestSearchController(TestController): def test_index(self): @@ -18,20 +19,21 @@ class TestSearchController(TestControlle else: self.log_user() response = self.app.get(url(controller='search', action='index'), - {'q':HG_REPO}) + {'q': HG_REPO}) self.assertTrue('There is no index to search in. ' 'Please run whoosh indexer' in response.body) def test_normal_search(self): self.log_user() response = self.app.get(url(controller='search', action='index'), - {'q':'def repo'}) - self.assertTrue('10 results' in response.body) - self.assertTrue('Permission denied' not in response.body) + {'q': 'def repo'}) + response.mustcontain('10 results') + response.mustcontain('Permission denied') def test_repo_search(self): self.log_user() response = self.app.get(url(controller='search', action='index'), - {'q':'repository:%s def test' % HG_REPO}) - self.assertTrue('4 results' in response.body) - self.assertTrue('Permission denied' not in response.body) + {'q': 'repository:%s def test' % HG_REPO}) + + response.mustcontain('4 results') + response.mustcontain('Permission denied') diff --git a/rhodecode/tests/functional/test_summary.py b/rhodecode/tests/functional/test_summary.py --- a/rhodecode/tests/functional/test_summary.py +++ b/rhodecode/tests/functional/test_summary.py @@ -15,8 +15,8 @@ class TestSummaryController(TestControll #repo type response.mustcontain( """Mercurial """ + """title="Mercurial repository" alt="Mercurial repository" """ + """src="/images/icons/hgicon.png"/>""" ) response.mustcontain( """""") - response.mustcontain("""""") + response.mustcontain("""""" % HG_REPO) + response.mustcontain("""""" % ID) + + def test_index_git(self): + self.log_user() + ID = Repository.get_by_repo_name(GIT_REPO).repo_id + response = self.app.get(url(controller='summary', + action='index', + repo_name=GIT_REPO)) - def test_index_by_id(self): + #repo type + response.mustcontain( + """Git repository""" + ) + response.mustcontain( + """public """ + ) + + # clone url... + response.mustcontain("""""" % GIT_REPO) + response.mustcontain("""""" % ID) + + def test_index_by_id_hg(self): self.log_user() ID = Repository.get_by_repo_name(HG_REPO).repo_id response = self.app.get(url(controller='summary', @@ -59,6 +82,21 @@ class TestSummaryController(TestControll """title="public repository" alt="public """ """repository" src="/images/icons/lock_open.png"/>""") + def test_index_by_id_git(self): + self.log_user() + ID = Repository.get_by_repo_name(GIT_REPO).repo_id + response = self.app.get(url(controller='summary', + action='index', + repo_name='_%s' % ID)) + + #repo type + response.mustcontain("""Git """) + response.mustcontain("""public """) + def _enable_stats(self): r = Repository.get_by_repo_name(HG_REPO) r.enable_statistics = True diff --git a/rhodecode/tests/vcs/__init__.py b/rhodecode/tests/vcs/__init__.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/__init__.py @@ -0,0 +1,56 @@ +""" +Unit tests for vcs_ library. + +In order to run tests we need to prepare our environment first. Tests would be +run for each engine listed at ``conf.SCM_TESTS`` - keys are aliases from +``vcs.backends.BACKENDS``. + +For each SCM we run tests for, we need some repository. We would use +repositories location from system environment variables or test suite defaults +- see ``conf`` module for more detail. We simply try to check if repository at +certain location exists, if not we would try to fetch them. At ``test_vcs`` or +``test_common`` we run unit tests common for each repository type and for +example specific mercurial tests are located at ``test_hg`` module. + +Oh, and tests are run with ``unittest.collector`` wrapped by ``collector`` +function at ``tests/__init__.py``. + +.. _vcs: http://bitbucket.org/marcinkuzminski/vcs +.. _unittest: http://pypi.python.org/pypi/unittest + +""" +import os +from rhodecode.lib import vcs +from rhodecode.lib.vcs.utils.compat import unittest +from rhodecode.tests import * +from utils import VCSTestError, SCMFetcher + + +def setup_package(): + """ + Prepares whole package for tests which mainly means it would try to fetch + test repositories or use already existing ones. + """ + fetchers = { + 'hg': { + 'alias': 'hg', + 'test_repo_path': TEST_HG_REPO, + 'remote_repo': HG_REMOTE_REPO, + 'clone_cmd': 'hg clone', + }, + 'git': { + 'alias': 'git', + 'test_repo_path': TEST_GIT_REPO, + 'remote_repo': GIT_REMOTE_REPO, + 'clone_cmd': 'git clone --bare', + }, + } + try: + for scm, fetcher_info in fetchers.items(): + fetcher = SCMFetcher(**fetcher_info) + fetcher.setup() + except VCSTestError, err: + raise RuntimeError(str(err)) + +start_dir = os.path.abspath(os.path.dirname(__file__)) +unittest.defaultTestLoader.discover(start_dir) diff --git a/rhodecode/tests/vcs/aconfig b/rhodecode/tests/vcs/aconfig new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/aconfig @@ -0,0 +1,10 @@ +[user] +name = Foo Bar +email = foo.bar@example.com + +[ui] +username = Foo Bar foo.bar@example.com + +[universal] +foo = bar + diff --git a/rhodecode/tests/vcs/base.py b/rhodecode/tests/vcs/base.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/base.py @@ -0,0 +1,111 @@ +""" +Module providing backend independent mixin class. It requires that +InMemoryChangeset class is working properly at backend class. +""" +import os +from rhodecode.lib import vcs +import time +import shutil +import datetime +from rhodecode.lib.vcs.utils.compat import unittest + +from conf import SCM_TESTS, get_new_dir + +from rhodecode.lib.vcs.nodes import FileNode + + +class BackendTestMixin(object): + """ + This is a backend independent test case class which should be created + with ``type`` method. + + It is required to set following attributes at subclass: + + - ``backend_alias``: alias of used backend (see ``vcs.BACKENDS``) + - ``repo_path``: path to the repository which would be created for set of + tests + - ``recreate_repo_per_test``: If set to ``False``, repo would NOT be created + before every single test. Defaults to ``True``. + """ + recreate_repo_per_test = True + + @classmethod + def get_backend(cls): + return vcs.get_backend(cls.backend_alias) + + @classmethod + def _get_commits(cls): + commits = [ + { + 'message': u'Initial commit', + 'author': u'Joe Doe ', + 'date': datetime.datetime(2010, 1, 1, 20), + 'added': [ + FileNode('foobar', content='Foobar'), + FileNode('foobar2', content='Foobar II'), + FileNode('foo/bar/baz', content='baz here!'), + ], + }, + { + 'message': u'Changes...', + 'author': u'Jane Doe ', + 'date': datetime.datetime(2010, 1, 1, 21), + 'added': [ + FileNode('some/new.txt', content='news...'), + ], + 'changed': [ + FileNode('foobar', 'Foobar I'), + ], + 'removed': [], + }, + ] + return commits + + @classmethod + def setUpClass(cls): + Backend = cls.get_backend() + cls.backend_class = Backend + cls.repo_path = get_new_dir(str(time.time())) + cls.repo = Backend(cls.repo_path, create=True) + cls.imc = cls.repo.in_memory_changeset + + for commit in cls._get_commits(): + for node in commit.get('added', []): + cls.imc.add(FileNode(node.path, content=node.content)) + for node in commit.get('changed', []): + cls.imc.change(FileNode(node.path, content=node.content)) + for node in commit.get('removed', []): + cls.imc.remove(FileNode(node.path)) + + cls.tip = cls.imc.commit(message=unicode(commit['message']), + author=unicode(commit['author']), + date=commit['date']) + + @classmethod + def tearDownClass(cls): + if not getattr(cls, 'recreate_repo_per_test', False) and \ + 'VCS_REMOVE_TEST_DIRS' in os.environ: + shutil.rmtree(cls.repo_path) + + def setUp(self): + if getattr(self, 'recreate_repo_per_test', False): + self.__class__.setUpClass() + + def tearDown(self): + if getattr(self, 'recreate_repo_per_test', False) and \ + 'VCS_REMOVE_TEST_DIRS' in os.environ: + shutil.rmtree(self.repo_path) + + +# For each backend create test case class +for alias in SCM_TESTS: + attrs = { + 'backend_alias': alias, + } + cls_name = ''.join(('%s base backend test' % alias).title().split()) + bases = (BackendTestMixin, unittest.TestCase) + globals()[cls_name] = type(cls_name, bases, attrs) + + +if __name__ == '__main__': + unittest.main() diff --git a/rhodecode/tests/vcs/conf.py b/rhodecode/tests/vcs/conf.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/conf.py @@ -0,0 +1,61 @@ +""" +Unit tests configuration module for vcs. +""" +import os +import time +import hashlib +import tempfile +import datetime + +from utils import get_normalized_path +from os.path import join as jn + +__all__ = ( + 'TEST_HG_REPO', 'TEST_GIT_REPO', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO', + 'SCM_TESTS', +) + +SCM_TESTS = ['hg', 'git'] +uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple()))) + +THIS = os.path.abspath(os.path.dirname(__file__)) + +GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git' + +TEST_TMP_PATH = os.environ.get('VCS_TEST_ROOT', '/tmp') +TEST_GIT_REPO = os.environ.get('VCS_TEST_GIT_REPO', + jn(TEST_TMP_PATH, 'vcs-git')) +TEST_GIT_REPO_CLONE = os.environ.get('VCS_TEST_GIT_REPO_CLONE', + jn(TEST_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)) +TEST_GIT_REPO_PULL = os.environ.get('VCS_TEST_GIT_REPO_PULL', + jn(TEST_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)) + +HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs' +TEST_HG_REPO = os.environ.get('VCS_TEST_HG_REPO', + jn(TEST_TMP_PATH, 'vcs-hg')) +TEST_HG_REPO_CLONE = os.environ.get('VCS_TEST_HG_REPO_CLONE', + jn(TEST_TMP_PATH, 'vcshgclone%s' % uniq_suffix)) +TEST_HG_REPO_PULL = os.environ.get('VCS_TEST_HG_REPO_PULL', + jn(TEST_TMP_PATH, 'vcshgpull%s' % uniq_suffix)) + +TEST_DIR = os.environ.get('VCS_TEST_ROOT', tempfile.gettempdir()) +TEST_REPO_PREFIX = 'vcs-test' + + +def get_new_dir(title): + """ + Returns always new directory path. + """ + name = TEST_REPO_PREFIX + if title: + name = '-'.join((name, title)) + hex = hashlib.sha1(str(time.time())).hexdigest() + name = '-'.join((name, hex)) + path = os.path.join(TEST_DIR, name) + return get_normalized_path(path) + + +PACKAGE_DIR = os.path.abspath(os.path.join( + os.path.dirname(__file__), '..')) + +TEST_USER_CONFIG_FILE = jn(THIS, 'aconfig') diff --git a/rhodecode/tests/vcs/test_archives.py b/rhodecode/tests/vcs/test_archives.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/test_archives.py @@ -0,0 +1,108 @@ +from __future__ import with_statement + +import os +import tarfile +import zipfile +import datetime +import tempfile +import StringIO +from base import BackendTestMixin +from conf import SCM_TESTS +from rhodecode.lib.vcs.exceptions import VCSError +from rhodecode.lib.vcs.nodes import FileNode +from rhodecode.lib.vcs.utils.compat import unittest + + +class ArchivesTestCaseMixin(BackendTestMixin): + + @classmethod + def _get_commits(cls): + start_date = datetime.datetime(2010, 1, 1, 20) + for x in xrange(5): + yield { + 'message': 'Commit %d' % x, + 'author': 'Joe Doe ', + 'date': start_date + datetime.timedelta(hours=12 * x), + 'added': [ + FileNode('%d/file_%d.txt' % (x, x), + content='Foobar %d' % x), + ], + } + + def test_archive_zip(self): + path = tempfile.mkstemp()[1] + with open(path, 'wb') as f: + self.tip.fill_archive(stream=f, kind='zip', prefix='repo') + out = zipfile.ZipFile(path) + + for x in xrange(5): + node_path = '%d/file_%d.txt' % (x, x) + decompressed = StringIO.StringIO() + decompressed.write(out.read('repo/' + node_path)) + self.assertEqual( + decompressed.getvalue(), + self.tip.get_node(node_path).content) + + def test_archive_tgz(self): + path = tempfile.mkstemp()[1] + with open(path, 'wb') as f: + self.tip.fill_archive(stream=f, kind='tgz', prefix='repo') + outdir = tempfile.mkdtemp() + + outfile = tarfile.open(path, 'r|gz') + outfile.extractall(outdir) + + for x in xrange(5): + node_path = '%d/file_%d.txt' % (x, x) + self.assertEqual( + open(os.path.join(outdir, 'repo/' + node_path)).read(), + self.tip.get_node(node_path).content) + + def test_archive_tbz2(self): + path = tempfile.mkstemp()[1] + with open(path, 'w+b') as f: + self.tip.fill_archive(stream=f, kind='tbz2', prefix='repo') + outdir = tempfile.mkdtemp() + + outfile = tarfile.open(path, 'r|bz2') + outfile.extractall(outdir) + + for x in xrange(5): + node_path = '%d/file_%d.txt' % (x, x) + self.assertEqual( + open(os.path.join(outdir, 'repo/' + node_path)).read(), + self.tip.get_node(node_path).content) + + def test_archive_default_stream(self): + tmppath = tempfile.mkstemp()[1] + with open(tmppath, 'w') as stream: + self.tip.fill_archive(stream=stream) + mystream = StringIO.StringIO() + self.tip.fill_archive(stream=mystream) + mystream.seek(0) + with open(tmppath, 'r') as f: + self.assertEqual(f.read(), mystream.read()) + + def test_archive_wrong_kind(self): + with self.assertRaises(VCSError): + self.tip.fill_archive(kind='wrong kind') + + def test_archive_empty_prefix(self): + with self.assertRaises(VCSError): + self.tip.fill_archive(prefix='') + + def test_archive_prefix_with_leading_slash(self): + with self.assertRaises(VCSError): + self.tip.fill_archive(prefix='/any') + +# For each backend create test case class +for alias in SCM_TESTS: + attrs = { + 'backend_alias': alias, + } + cls_name = ''.join(('%s archive test' % alias).title().split()) + bases = (ArchivesTestCaseMixin, unittest.TestCase) + globals()[cls_name] = type(cls_name, bases, attrs) + +if __name__ == '__main__': + unittest.main() diff --git a/rhodecode/tests/vcs/test_branches.py b/rhodecode/tests/vcs/test_branches.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/test_branches.py @@ -0,0 +1,118 @@ +from __future__ import with_statement + +from rhodecode.lib import vcs +import datetime +from rhodecode.lib.vcs.utils.compat import unittest + +from base import BackendTestMixin +from conf import SCM_TESTS + +from rhodecode.lib.vcs.nodes import FileNode + + +class BranchesTestCaseMixin(BackendTestMixin): + + @classmethod + def _get_commits(cls): + commits = [ + { + 'message': 'Initial commit', + 'author': 'Joe Doe ', + 'date': datetime.datetime(2010, 1, 1, 20), + 'added': [ + FileNode('foobar', content='Foobar'), + FileNode('foobar2', content='Foobar II'), + FileNode('foo/bar/baz', content='baz here!'), + ], + }, + { + 'message': 'Changes...', + 'author': 'Jane Doe ', + 'date': datetime.datetime(2010, 1, 1, 21), + 'added': [ + FileNode('some/new.txt', content='news...'), + ], + 'changed': [ + FileNode('foobar', 'Foobar I'), + ], + 'removed': [], + }, + ] + return commits + + def test_simple(self): + tip = self.repo.get_changeset() + self.assertEqual(tip.date, datetime.datetime(2010, 1, 1, 21)) + + def test_new_branch(self): + # This check must not be removed to ensure the 'branches' LazyProperty + # gets hit *before* the new 'foobar' branch got created: + self.assertFalse('foobar' in self.repo.branches) + self.imc.add(vcs.nodes.FileNode('docs/index.txt', + content='Documentation\n')) + foobar_tip = self.imc.commit( + message=u'New branch: foobar', + author=u'joe', + branch='foobar', + ) + self.assertTrue('foobar' in self.repo.branches) + self.assertEqual(foobar_tip.branch, 'foobar') + + def test_new_head(self): + tip = self.repo.get_changeset() + self.imc.add(vcs.nodes.FileNode('docs/index.txt', + content='Documentation\n')) + foobar_tip = self.imc.commit( + message=u'New branch: foobar', + author=u'joe', + branch='foobar', + parents=[tip], + ) + self.imc.change(vcs.nodes.FileNode('docs/index.txt', + content='Documentation\nand more...\n')) + newtip = self.imc.commit( + message=u'At default branch', + author=u'joe', + branch=foobar_tip.branch, + parents=[foobar_tip], + ) + + newest_tip = self.imc.commit( + message=u'Merged with %s' % foobar_tip.raw_id, + author=u'joe', + branch=self.backend_class.DEFAULT_BRANCH_NAME, + parents=[newtip, foobar_tip], + ) + + self.assertEqual(newest_tip.branch, + self.backend_class.DEFAULT_BRANCH_NAME) + + def test_branch_with_slash_in_name(self): + self.imc.add(vcs.nodes.FileNode('extrafile', content='Some data\n')) + self.imc.commit(u'Branch with a slash!', author=u'joe', + branch='issue/123') + self.assertTrue('issue/123' in self.repo.branches) + + def test_branch_with_slash_in_name_and_similar_without(self): + self.imc.add(vcs.nodes.FileNode('extrafile', content='Some data\n')) + self.imc.commit(u'Branch with a slash!', author=u'joe', + branch='issue/123') + self.imc.add(vcs.nodes.FileNode('extrafile II', content='Some data\n')) + self.imc.commit(u'Branch without a slash...', author=u'joe', + branch='123') + self.assertIn('issue/123', self.repo.branches) + self.assertIn('123', self.repo.branches) + + +# For each backend create test case class +for alias in SCM_TESTS: + attrs = { + 'backend_alias': alias, + } + cls_name = ''.join(('%s branches test' % alias).title().split()) + bases = (BranchesTestCaseMixin, unittest.TestCase) + globals()[cls_name] = type(cls_name, bases, attrs) + + +if __name__ == '__main__': + unittest.main() diff --git a/rhodecode/tests/vcs/test_changesets.py b/rhodecode/tests/vcs/test_changesets.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/test_changesets.py @@ -0,0 +1,336 @@ +from __future__ import with_statement + +from rhodecode.lib import vcs +import datetime +from base import BackendTestMixin +from conf import SCM_TESTS +from rhodecode.lib.vcs.backends.base import BaseChangeset +from rhodecode.lib.vcs.nodes import FileNode +from rhodecode.lib.vcs.exceptions import BranchDoesNotExistError +from rhodecode.lib.vcs.exceptions import ChangesetDoesNotExistError +from rhodecode.lib.vcs.exceptions import RepositoryError +from rhodecode.lib.vcs.utils.compat import unittest + + +class TestBaseChangeset(unittest.TestCase): + + def test_as_dict(self): + changeset = BaseChangeset() + changeset.id = 'ID' + changeset.raw_id = 'RAW_ID' + changeset.short_id = 'SHORT_ID' + changeset.revision = 1009 + changeset.date = datetime.datetime(2011, 1, 30, 1, 45) + changeset.message = 'Message of a commit' + changeset.author = 'Joe Doe ' + changeset.added = [FileNode('foo/bar/baz'), FileNode('foobar')] + changeset.changed = [] + changeset.removed = [] + self.assertEqual(changeset.as_dict(), { + 'id': 'ID', + 'raw_id': 'RAW_ID', + 'short_id': 'SHORT_ID', + 'revision': 1009, + 'date': datetime.datetime(2011, 1, 30, 1, 45), + 'message': 'Message of a commit', + 'author': { + 'name': 'Joe Doe', + 'email': 'joe.doe@example.com', + }, + 'added': ['foo/bar/baz', 'foobar'], + 'changed': [], + 'removed': [], + }) + +class ChangesetsWithCommitsTestCaseixin(BackendTestMixin): + recreate_repo_per_test = True + + @classmethod + def _get_commits(cls): + start_date = datetime.datetime(2010, 1, 1, 20) + for x in xrange(5): + yield { + 'message': 'Commit %d' % x, + 'author': 'Joe Doe ', + 'date': start_date + datetime.timedelta(hours=12 * x), + 'added': [ + FileNode('file_%d.txt' % x, content='Foobar %d' % x), + ], + } + + def test_new_branch(self): + self.imc.add(vcs.nodes.FileNode('docs/index.txt', + content='Documentation\n')) + foobar_tip = self.imc.commit( + message=u'New branch: foobar', + author=u'joe', + branch='foobar', + ) + self.assertTrue('foobar' in self.repo.branches) + self.assertEqual(foobar_tip.branch, 'foobar') + # 'foobar' should be the only branch that contains the new commit + self.assertNotEqual(*self.repo.branches.values()) + + def test_new_head_in_default_branch(self): + tip = self.repo.get_changeset() + self.imc.add(vcs.nodes.FileNode('docs/index.txt', + content='Documentation\n')) + foobar_tip = self.imc.commit( + message=u'New branch: foobar', + author=u'joe', + branch='foobar', + parents=[tip], + ) + self.imc.change(vcs.nodes.FileNode('docs/index.txt', + content='Documentation\nand more...\n')) + newtip = self.imc.commit( + message=u'At default branch', + author=u'joe', + branch=foobar_tip.branch, + parents=[foobar_tip], + ) + + newest_tip = self.imc.commit( + message=u'Merged with %s' % foobar_tip.raw_id, + author=u'joe', + branch=self.backend_class.DEFAULT_BRANCH_NAME, + parents=[newtip, foobar_tip], + ) + + self.assertEqual(newest_tip.branch, + self.backend_class.DEFAULT_BRANCH_NAME) + + def test_get_changesets_respects_branch_name(self): + tip = self.repo.get_changeset() + self.imc.add(vcs.nodes.FileNode('docs/index.txt', + content='Documentation\n')) + doc_changeset = self.imc.commit( + message=u'New branch: docs', + author=u'joe', + branch='docs', + ) + self.imc.add(vcs.nodes.FileNode('newfile', content='')) + self.imc.commit( + message=u'Back in default branch', + author=u'joe', + parents=[tip], + ) + default_branch_changesets = self.repo.get_changesets( + branch_name=self.repo.DEFAULT_BRANCH_NAME) + self.assertNotIn(doc_changeset, default_branch_changesets) + + +class ChangesetsTestCaseMixin(BackendTestMixin): + recreate_repo_per_test = False + + @classmethod + def _get_commits(cls): + start_date = datetime.datetime(2010, 1, 1, 20) + for x in xrange(5): + yield { + 'message': u'Commit %d' % x, + 'author': u'Joe Doe ', + 'date': start_date + datetime.timedelta(hours=12 * x), + 'added': [ + FileNode('file_%d.txt' % x, content='Foobar %d' % x), + ], + } + + def test_simple(self): + tip = self.repo.get_changeset() + self.assertEqual(tip.date, datetime.datetime(2010, 1, 3, 20)) + + def test_get_changesets_is_ordered_by_date(self): + changesets = list(self.repo.get_changesets()) + ordered_by_date = sorted(changesets, + key=lambda cs: cs.date) + self.assertItemsEqual(changesets, ordered_by_date) + + def test_get_changesets_respects_start(self): + second_id = self.repo.revisions[1] + changesets = list(self.repo.get_changesets(start=second_id)) + self.assertEqual(len(changesets), 4) + + def test_get_changesets_numerical_id_respects_start(self): + second_id = 1 + changesets = list(self.repo.get_changesets(start=second_id)) + self.assertEqual(len(changesets), 4) + + def test_get_changesets_includes_start_changeset(self): + second_id = self.repo.revisions[1] + changesets = list(self.repo.get_changesets(start=second_id)) + self.assertEqual(changesets[0].raw_id, second_id) + + def test_get_changesets_respects_end(self): + second_id = self.repo.revisions[1] + changesets = list(self.repo.get_changesets(end=second_id)) + self.assertEqual(changesets[-1].raw_id, second_id) + self.assertEqual(len(changesets), 2) + + def test_get_changesets_numerical_id_respects_end(self): + second_id = 1 + changesets = list(self.repo.get_changesets(end=second_id)) + self.assertEqual(changesets.index(changesets[-1]), second_id) + self.assertEqual(len(changesets), 2) + + def test_get_changesets_respects_both_start_and_end(self): + second_id = self.repo.revisions[1] + third_id = self.repo.revisions[2] + changesets = list(self.repo.get_changesets(start=second_id, + end=third_id)) + self.assertEqual(len(changesets), 2) + + def test_get_changesets_numerical_id_respects_both_start_and_end(self): + changesets = list(self.repo.get_changesets(start=2, end=3)) + self.assertEqual(len(changesets), 2) + + def test_get_changesets_includes_end_changeset(self): + second_id = self.repo.revisions[1] + changesets = list(self.repo.get_changesets(end=second_id)) + self.assertEqual(changesets[-1].raw_id, second_id) + + def test_get_changesets_respects_start_date(self): + start_date = datetime.datetime(2010, 2, 1) + for cs in self.repo.get_changesets(start_date=start_date): + self.assertGreaterEqual(cs.date, start_date) + + def test_get_changesets_respects_end_date(self): + end_date = datetime.datetime(2010, 2, 1) + for cs in self.repo.get_changesets(end_date=end_date): + self.assertLessEqual(cs.date, end_date) + + def test_get_changesets_respects_reverse(self): + changesets_id_list = [cs.raw_id for cs in + self.repo.get_changesets(reverse=True)] + self.assertItemsEqual(changesets_id_list, reversed(self.repo.revisions)) + + def test_get_filenodes_generator(self): + tip = self.repo.get_changeset() + filepaths = [node.path for node in tip.get_filenodes_generator()] + self.assertItemsEqual(filepaths, ['file_%d.txt' % x for x in xrange(5)]) + + def test_size(self): + tip = self.repo.get_changeset() + size = 5 * len('Foobar N') # Size of 5 files + self.assertEqual(tip.size, size) + + def test_author(self): + tip = self.repo.get_changeset() + self.assertEqual(tip.author, u'Joe Doe ') + + def test_author_name(self): + tip = self.repo.get_changeset() + self.assertEqual(tip.author_name, u'Joe Doe') + + def test_author_email(self): + tip = self.repo.get_changeset() + self.assertEqual(tip.author_email, u'joe.doe@example.com') + + def test_get_changesets_raise_changesetdoesnotexist_for_wrong_start(self): + with self.assertRaises(ChangesetDoesNotExistError): + list(self.repo.get_changesets(start='foobar')) + + def test_get_changesets_raise_changesetdoesnotexist_for_wrong_end(self): + with self.assertRaises(ChangesetDoesNotExistError): + list(self.repo.get_changesets(end='foobar')) + + def test_get_changesets_raise_branchdoesnotexist_for_wrong_branch_name(self): + with self.assertRaises(BranchDoesNotExistError): + list(self.repo.get_changesets(branch_name='foobar')) + + def test_get_changesets_raise_repositoryerror_for_wrong_start_end(self): + start = self.repo.revisions[-1] + end = self.repo.revisions[0] + with self.assertRaises(RepositoryError): + list(self.repo.get_changesets(start=start, end=end)) + + def test_get_changesets_numerical_id_reversed(self): + with self.assertRaises(RepositoryError): + [x for x in self.repo.get_changesets(start=3, end=2)] + + def test_get_changesets_numerical_id_respects_both_start_and_end_last(self): + with self.assertRaises(RepositoryError): + last = len(self.repo.revisions) + list(self.repo.get_changesets(start=last-1, end=last-2)) + + def test_get_changesets_numerical_id_last_zero_error(self): + with self.assertRaises(RepositoryError): + last = len(self.repo.revisions) + list(self.repo.get_changesets(start=last-1, end=0)) + + +class ChangesetsChangesTestCaseMixin(BackendTestMixin): + recreate_repo_per_test = False + + @classmethod + def _get_commits(cls): + return [ + { + 'message': u'Initial', + 'author': u'Joe Doe ', + 'date': datetime.datetime(2010, 1, 1, 20), + 'added': [ + FileNode('foo/bar', content='foo'), + FileNode('foobar', content='foo'), + FileNode('qwe', content='foo'), + ], + }, + { + 'message': u'Massive changes', + 'author': u'Joe Doe ', + 'date': datetime.datetime(2010, 1, 1, 22), + 'added': [FileNode('fallout', content='War never changes')], + 'changed': [ + FileNode('foo/bar', content='baz'), + FileNode('foobar', content='baz'), + ], + 'removed': [FileNode('qwe')], + }, + ] + + def test_initial_commit(self): + changeset = self.repo.get_changeset(0) + self.assertItemsEqual(changeset.added, [ + changeset.get_node('foo/bar'), + changeset.get_node('foobar'), + changeset.get_node('qwe'), + ]) + self.assertItemsEqual(changeset.changed, []) + self.assertItemsEqual(changeset.removed, []) + + def test_head_added(self): + changeset = self.repo.get_changeset() + self.assertItemsEqual(changeset.added, [ + changeset.get_node('fallout'), + ]) + self.assertItemsEqual(changeset.changed, [ + changeset.get_node('foo/bar'), + changeset.get_node('foobar'), + ]) + self.assertEqual(len(changeset.removed), 1) + self.assertEqual(list(changeset.removed)[0].path, 'qwe') + + +# For each backend create test case class +for alias in SCM_TESTS: + attrs = { + 'backend_alias': alias, + } + # tests with additional commits + cls_name = ''.join(('%s changesets with commits test' % alias).title().split()) + bases = (ChangesetsWithCommitsTestCaseixin, unittest.TestCase) + globals()[cls_name] = type(cls_name, bases, attrs) + + # tests without additional commits + cls_name = ''.join(('%s changesets test' % alias).title().split()) + bases = (ChangesetsTestCaseMixin, unittest.TestCase) + globals()[cls_name] = type(cls_name, bases, attrs) + + # tests changes + cls_name = ''.join(('%s changesets changes test' % alias).title().split()) + bases = (ChangesetsChangesTestCaseMixin, unittest.TestCase) + globals()[cls_name] = type(cls_name, bases, attrs) + + +if __name__ == '__main__': + unittest.main() diff --git a/rhodecode/tests/vcs/test_filenodes_unicode_path.py b/rhodecode/tests/vcs/test_filenodes_unicode_path.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/test_filenodes_unicode_path.py @@ -0,0 +1,49 @@ +# encoding: utf8 + +from __future__ import with_statement + +import datetime +from rhodecode.lib.vcs.nodes import FileNode +from rhodecode.lib.vcs.utils.compat import unittest +from test_inmemchangesets import BackendBaseTestCase +from conf import SCM_TESTS + + +class FileNodeUnicodePathTestsMixin(object): + + fname = 'ąśðąęłąć.txt' + ufname = (fname).decode('utf-8') + + def get_commits(self): + self.nodes = [ + FileNode(self.fname, content='Foobar'), + ] + + commits = [ + { + 'message': 'Initial commit', + 'author': 'Joe Doe ', + 'date': datetime.datetime(2010, 1, 1, 20), + 'added': self.nodes, + }, + ] + return commits + + def test_filenode_path(self): + node = self.tip.get_node(self.fname) + unode = self.tip.get_node(self.ufname) + self.assertEqual(node, unode) + + +for alias in SCM_TESTS: + attrs = { + 'backend_alias': alias, + } + cls_name = ''.join(('%s file node unicode path test' % alias).title() + .split()) + bases = (FileNodeUnicodePathTestsMixin, BackendBaseTestCase) + globals()[cls_name] = type(cls_name, bases, attrs) + + +if __name__ == '__main__': + unittest.main() diff --git a/rhodecode/tests/vcs/test_getitem.py b/rhodecode/tests/vcs/test_getitem.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/test_getitem.py @@ -0,0 +1,44 @@ +from __future__ import with_statement + +import datetime +from base import BackendTestMixin +from conf import SCM_TESTS +from rhodecode.lib.vcs.nodes import FileNode +from rhodecode.lib.vcs.utils.compat import unittest + + +class GetitemTestCaseMixin(BackendTestMixin): + + @classmethod + def _get_commits(cls): + start_date = datetime.datetime(2010, 1, 1, 20) + for x in xrange(5): + yield { + 'message': 'Commit %d' % x, + 'author': 'Joe Doe ', + 'date': start_date + datetime.timedelta(hours=12 * x), + 'added': [ + FileNode('file_%d.txt' % x, content='Foobar %d' % x), + ], + } + + def test__getitem__last_item_is_tip(self): + self.assertEqual(self.repo[-1], self.repo.get_changeset()) + + def test__getitem__returns_correct_items(self): + changesets = [self.repo[x] for x in xrange(len(self.repo.revisions))] + self.assertEqual(changesets, list(self.repo.get_changesets())) + + +# For each backend create test case class +for alias in SCM_TESTS: + attrs = { + 'backend_alias': alias, + } + cls_name = ''.join(('%s getitem test' % alias).title().split()) + bases = (GetitemTestCaseMixin, unittest.TestCase) + globals()[cls_name] = type(cls_name, bases, attrs) + + +if __name__ == '__main__': + unittest.main() diff --git a/rhodecode/tests/vcs/test_getslice.py b/rhodecode/tests/vcs/test_getslice.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/test_getslice.py @@ -0,0 +1,56 @@ +from __future__ import with_statement + +import datetime +from base import BackendTestMixin +from conf import SCM_TESTS +from rhodecode.lib.vcs.nodes import FileNode +from rhodecode.lib.vcs.utils.compat import unittest + + +class GetsliceTestCaseMixin(BackendTestMixin): + + @classmethod + def _get_commits(cls): + start_date = datetime.datetime(2010, 1, 1, 20) + for x in xrange(5): + yield { + 'message': 'Commit %d' % x, + 'author': 'Joe Doe ', + 'date': start_date + datetime.timedelta(hours=12 * x), + 'added': [ + FileNode('file_%d.txt' % x, content='Foobar %d' % x), + ], + } + + def test__getslice__last_item_is_tip(self): + self.assertEqual(list(self.repo[-1:])[0], self.repo.get_changeset()) + + def test__getslice__respects_start_index(self): + self.assertEqual(list(self.repo[2:]), + [self.repo.get_changeset(rev) for rev in self.repo.revisions[2:]]) + + def test__getslice__respects_negative_start_index(self): + self.assertEqual(list(self.repo[-2:]), + [self.repo.get_changeset(rev) for rev in self.repo.revisions[-2:]]) + + def test__getslice__respects_end_index(self): + self.assertEqual(list(self.repo[:2]), + [self.repo.get_changeset(rev) for rev in self.repo.revisions[:2]]) + + def test__getslice__respects_negative_end_index(self): + self.assertEqual(list(self.repo[:-2]), + [self.repo.get_changeset(rev) for rev in self.repo.revisions[:-2]]) + + +# For each backend create test case class +for alias in SCM_TESTS: + attrs = { + 'backend_alias': alias, + } + cls_name = ''.join(('%s getslice test' % alias).title().split()) + bases = (GetsliceTestCaseMixin, unittest.TestCase) + globals()[cls_name] = type(cls_name, bases, attrs) + + +if __name__ == '__main__': + unittest.main() diff --git a/rhodecode/tests/vcs/test_git.py b/rhodecode/tests/vcs/test_git.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/test_git.py @@ -0,0 +1,702 @@ +from __future__ import with_statement + +import os +import mock +import datetime +from rhodecode.lib.vcs.backends.git import GitRepository, GitChangeset +from rhodecode.lib.vcs.exceptions import RepositoryError, VCSError, NodeDoesNotExistError +from rhodecode.lib.vcs.nodes import NodeKind, FileNode, DirNode, NodeState +from rhodecode.lib.vcs.utils.compat import unittest +from rhodecode.tests.vcs.base import BackendTestMixin +from conf import TEST_GIT_REPO, TEST_GIT_REPO_CLONE, get_new_dir + + +class GitRepositoryTest(unittest.TestCase): + + def __check_for_existing_repo(self): + if os.path.exists(TEST_GIT_REPO_CLONE): + self.fail('Cannot test git clone repo as location %s already ' + 'exists. You should manually remove it first.' + % TEST_GIT_REPO_CLONE) + + def setUp(self): + self.repo = GitRepository(TEST_GIT_REPO) + + def test_wrong_repo_path(self): + wrong_repo_path = '/tmp/errorrepo' + self.assertRaises(RepositoryError, GitRepository, wrong_repo_path) + + def test_repo_clone(self): + self.__check_for_existing_repo() + repo = GitRepository(TEST_GIT_REPO) + repo_clone = GitRepository(TEST_GIT_REPO_CLONE, + src_url=TEST_GIT_REPO, create=True, update_after_clone=True) + self.assertEqual(len(repo.revisions), len(repo_clone.revisions)) + # Checking hashes of changesets should be enough + for changeset in repo.get_changesets(): + raw_id = changeset.raw_id + self.assertEqual(raw_id, repo_clone.get_changeset(raw_id).raw_id) + + def test_repo_clone_without_create(self): + self.assertRaises(RepositoryError, GitRepository, + TEST_GIT_REPO_CLONE + '_wo_create', src_url=TEST_GIT_REPO) + + def test_repo_clone_with_update(self): + repo = GitRepository(TEST_GIT_REPO) + clone_path = TEST_GIT_REPO_CLONE + '_with_update' + repo_clone = GitRepository(clone_path, + create=True, src_url=TEST_GIT_REPO, update_after_clone=True) + self.assertEqual(len(repo.revisions), len(repo_clone.revisions)) + + #check if current workdir was updated + fpath = os.path.join(clone_path, 'MANIFEST.in') + self.assertEqual(True, os.path.isfile(fpath), + 'Repo was cloned and updated but file %s could not be found' + % fpath) + + def test_repo_clone_without_update(self): + repo = GitRepository(TEST_GIT_REPO) + clone_path = TEST_GIT_REPO_CLONE + '_without_update' + repo_clone = GitRepository(clone_path, + create=True, src_url=TEST_GIT_REPO, update_after_clone=False) + self.assertEqual(len(repo.revisions), len(repo_clone.revisions)) + #check if current workdir was *NOT* updated + fpath = os.path.join(clone_path, 'MANIFEST.in') + # Make sure it's not bare repo + self.assertFalse(repo_clone._repo.bare) + self.assertEqual(False, os.path.isfile(fpath), + 'Repo was cloned and updated but file %s was found' + % fpath) + + def test_repo_clone_into_bare_repo(self): + repo = GitRepository(TEST_GIT_REPO) + clone_path = TEST_GIT_REPO_CLONE + '_bare.git' + repo_clone = GitRepository(clone_path, create=True, + src_url=repo.path, bare=True) + self.assertTrue(repo_clone._repo.bare) + + def test_create_repo_is_not_bare_by_default(self): + repo = GitRepository(get_new_dir('not-bare-by-default'), create=True) + self.assertFalse(repo._repo.bare) + + def test_create_bare_repo(self): + repo = GitRepository(get_new_dir('bare-repo'), create=True, bare=True) + self.assertTrue(repo._repo.bare) + + def test_revisions(self): + # there are 112 revisions (by now) + # so we can assume they would be available from now on + subset = set([ + 'c1214f7e79e02fc37156ff215cd71275450cffc3', + '38b5fe81f109cb111f549bfe9bb6b267e10bc557', + 'fa6600f6848800641328adbf7811fd2372c02ab2', + '102607b09cdd60e2793929c4f90478be29f85a17', + '49d3fd156b6f7db46313fac355dca1a0b94a0017', + '2d1028c054665b962fa3d307adfc923ddd528038', + 'd7e0d30fbcae12c90680eb095a4f5f02505ce501', + 'ff7ca51e58c505fec0dd2491de52c622bb7a806b', + 'dd80b0f6cf5052f17cc738c2951c4f2070200d7f', + '8430a588b43b5d6da365400117c89400326e7992', + 'd955cd312c17b02143c04fa1099a352b04368118', + 'f67b87e5c629c2ee0ba58f85197e423ff28d735b', + 'add63e382e4aabc9e1afdc4bdc24506c269b7618', + 'f298fe1189f1b69779a4423f40b48edf92a703fc', + 'bd9b619eb41994cac43d67cf4ccc8399c1125808', + '6e125e7c890379446e98980d8ed60fba87d0f6d1', + 'd4a54db9f745dfeba6933bf5b1e79e15d0af20bd', + '0b05e4ed56c802098dfc813cbe779b2f49e92500', + '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e', + '45223f8f114c64bf4d6f853e3c35a369a6305520', + 'ca1eb7957a54bce53b12d1a51b13452f95bc7c7e', + 'f5ea29fc42ef67a2a5a7aecff10e1566699acd68', + '27d48942240f5b91dfda77accd2caac94708cc7d', + '622f0eb0bafd619d2560c26f80f09e3b0b0d78af', + 'e686b958768ee96af8029fe19c6050b1a8dd3b2b']) + self.assertTrue(subset.issubset(set(self.repo.revisions))) + + + + def test_slicing(self): + #4 1 5 10 95 + for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5), + (10, 20, 10), (5, 100, 95)]: + revs = list(self.repo[sfrom:sto]) + self.assertEqual(len(revs), size) + self.assertEqual(revs[0], self.repo.get_changeset(sfrom)) + self.assertEqual(revs[-1], self.repo.get_changeset(sto - 1)) + + + def test_branches(self): + # TODO: Need more tests here + # Removed (those are 'remotes' branches for cloned repo) + #self.assertTrue('master' in self.repo.branches) + #self.assertTrue('gittree' in self.repo.branches) + #self.assertTrue('web-branch' in self.repo.branches) + for name, id in self.repo.branches.items(): + self.assertTrue(isinstance( + self.repo.get_changeset(id), GitChangeset)) + + def test_tags(self): + # TODO: Need more tests here + self.assertTrue('v0.1.1' in self.repo.tags) + self.assertTrue('v0.1.2' in self.repo.tags) + for name, id in self.repo.tags.items(): + self.assertTrue(isinstance( + self.repo.get_changeset(id), GitChangeset)) + + def _test_single_changeset_cache(self, revision): + chset = self.repo.get_changeset(revision) + self.assertTrue(revision in self.repo.changesets) + self.assertTrue(chset is self.repo.changesets[revision]) + + def test_initial_changeset(self): + id = self.repo.revisions[0] + init_chset = self.repo.get_changeset(id) + self.assertEqual(init_chset.message, 'initial import\n') + self.assertEqual(init_chset.author, + 'Marcin Kuzminski ') + for path in ('vcs/__init__.py', + 'vcs/backends/BaseRepository.py', + 'vcs/backends/__init__.py'): + self.assertTrue(isinstance(init_chset.get_node(path), FileNode)) + for path in ('', 'vcs', 'vcs/backends'): + self.assertTrue(isinstance(init_chset.get_node(path), DirNode)) + + self.assertRaises(NodeDoesNotExistError, init_chset.get_node, path='foobar') + + node = init_chset.get_node('vcs/') + self.assertTrue(hasattr(node, 'kind')) + self.assertEqual(node.kind, NodeKind.DIR) + + node = init_chset.get_node('vcs') + self.assertTrue(hasattr(node, 'kind')) + self.assertEqual(node.kind, NodeKind.DIR) + + node = init_chset.get_node('vcs/__init__.py') + self.assertTrue(hasattr(node, 'kind')) + self.assertEqual(node.kind, NodeKind.FILE) + + def test_not_existing_changeset(self): + self.assertRaises(RepositoryError, self.repo.get_changeset, + 'f' * 40) + + def test_changeset10(self): + + chset10 = self.repo.get_changeset(self.repo.revisions[9]) + README = """=== +VCS +=== + +Various Version Control System management abstraction layer for Python. + +Introduction +------------ + +TODO: To be written... + +""" + node = chset10.get_node('README.rst') + self.assertEqual(node.kind, NodeKind.FILE) + self.assertEqual(node.content, README) + + +class GitChangesetTest(unittest.TestCase): + + def setUp(self): + self.repo = GitRepository(TEST_GIT_REPO) + + def test_default_changeset(self): + tip = self.repo.get_changeset() + self.assertEqual(tip, self.repo.get_changeset(None)) + self.assertEqual(tip, self.repo.get_changeset('tip')) + + def test_root_node(self): + tip = self.repo.get_changeset() + self.assertTrue(tip.root is tip.get_node('')) + + def test_lazy_fetch(self): + """ + Test if changeset's nodes expands and are cached as we walk through + the revision. This test is somewhat hard to write as order of tests + is a key here. Written by running command after command in a shell. + """ + hex = '2a13f185e4525f9d4b59882791a2d397b90d5ddc' + self.assertTrue(hex in self.repo.revisions) + chset = self.repo.get_changeset(hex) + self.assertTrue(len(chset.nodes) == 0) + root = chset.root + self.assertTrue(len(chset.nodes) == 1) + self.assertTrue(len(root.nodes) == 8) + # accessing root.nodes updates chset.nodes + self.assertTrue(len(chset.nodes) == 9) + + docs = root.get_node('docs') + # we haven't yet accessed anything new as docs dir was already cached + self.assertTrue(len(chset.nodes) == 9) + self.assertTrue(len(docs.nodes) == 8) + # accessing docs.nodes updates chset.nodes + self.assertTrue(len(chset.nodes) == 17) + + self.assertTrue(docs is chset.get_node('docs')) + self.assertTrue(docs is root.nodes[0]) + self.assertTrue(docs is root.dirs[0]) + self.assertTrue(docs is chset.get_node('docs')) + + def test_nodes_with_changeset(self): + hex = '2a13f185e4525f9d4b59882791a2d397b90d5ddc' + chset = self.repo.get_changeset(hex) + root = chset.root + docs = root.get_node('docs') + self.assertTrue(docs is chset.get_node('docs')) + api = docs.get_node('api') + self.assertTrue(api is chset.get_node('docs/api')) + index = api.get_node('index.rst') + self.assertTrue(index is chset.get_node('docs/api/index.rst')) + self.assertTrue(index is chset.get_node('docs')\ + .get_node('api')\ + .get_node('index.rst')) + + def test_branch_and_tags(self): + ''' + rev0 = self.repo.revisions[0] + chset0 = self.repo.get_changeset(rev0) + self.assertEqual(chset0.branch, 'master') + self.assertEqual(chset0.tags, []) + + rev10 = self.repo.revisions[10] + chset10 = self.repo.get_changeset(rev10) + self.assertEqual(chset10.branch, 'master') + self.assertEqual(chset10.tags, []) + + rev44 = self.repo.revisions[44] + chset44 = self.repo.get_changeset(rev44) + self.assertEqual(chset44.branch, 'web-branch') + + tip = self.repo.get_changeset('tip') + self.assertTrue('tip' in tip.tags) + ''' + # Those tests would fail - branches are now going + # to be changed at main API in order to support git backend + pass + + def _test_slices(self, limit, offset): + count = self.repo.count() + changesets = self.repo.get_changesets(limit=limit, offset=offset) + idx = 0 + for changeset in changesets: + rev = offset + idx + idx += 1 + rev_id = self.repo.revisions[rev] + if idx > limit: + self.fail("Exceeded limit already (getting revision %s, " + "there are %s total revisions, offset=%s, limit=%s)" + % (rev_id, count, offset, limit)) + self.assertEqual(changeset, self.repo.get_changeset(rev_id)) + result = list(self.repo.get_changesets(limit=limit, offset=offset)) + start = offset + end = limit and offset + limit or None + sliced = list(self.repo[start:end]) + self.failUnlessEqual(result, sliced, + msg="Comparison failed for limit=%s, offset=%s" + "(get_changeset returned: %s and sliced: %s" + % (limit, offset, result, sliced)) + + def _test_file_size(self, revision, path, size): + node = self.repo.get_changeset(revision).get_node(path) + self.assertTrue(node.is_file()) + self.assertEqual(node.size, size) + + def test_file_size(self): + to_check = ( + ('c1214f7e79e02fc37156ff215cd71275450cffc3', + 'vcs/backends/BaseRepository.py', 502), + ('d7e0d30fbcae12c90680eb095a4f5f02505ce501', + 'vcs/backends/hg.py', 854), + ('6e125e7c890379446e98980d8ed60fba87d0f6d1', + 'setup.py', 1068), + + ('d955cd312c17b02143c04fa1099a352b04368118', + 'vcs/backends/base.py', 2921), + ('ca1eb7957a54bce53b12d1a51b13452f95bc7c7e', + 'vcs/backends/base.py', 3936), + ('f50f42baeed5af6518ef4b0cb2f1423f3851a941', + 'vcs/backends/base.py', 6189), + ) + for revision, path, size in to_check: + self._test_file_size(revision, path, size) + + def test_file_history(self): + # we can only check if those revisions are present in the history + # as we cannot update this test every time file is changed + files = { + 'setup.py': [ + '54386793436c938cff89326944d4c2702340037d', + '51d254f0ecf5df2ce50c0b115741f4cf13985dab', + '998ed409c795fec2012b1c0ca054d99888b22090', + '5e0eb4c47f56564395f76333f319d26c79e2fb09', + '0115510b70c7229dbc5dc49036b32e7d91d23acd', + '7cb3fd1b6d8c20ba89e2264f1c8baebc8a52d36e', + '2a13f185e4525f9d4b59882791a2d397b90d5ddc', + '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e', + 'ff7ca51e58c505fec0dd2491de52c622bb7a806b', + ], + 'vcs/nodes.py': [ + '33fa3223355104431402a888fa77a4e9956feb3e', + 'fa014c12c26d10ba682fadb78f2a11c24c8118e1', + 'e686b958768ee96af8029fe19c6050b1a8dd3b2b', + 'ab5721ca0a081f26bf43d9051e615af2cc99952f', + 'c877b68d18e792a66b7f4c529ea02c8f80801542', + '4313566d2e417cb382948f8d9d7c765330356054', + '6c2303a793671e807d1cfc70134c9ca0767d98c2', + '54386793436c938cff89326944d4c2702340037d', + '54000345d2e78b03a99d561399e8e548de3f3203', + '1c6b3677b37ea064cb4b51714d8f7498f93f4b2b', + '2d03ca750a44440fb5ea8b751176d1f36f8e8f46', + '2a08b128c206db48c2f0b8f70df060e6db0ae4f8', + '30c26513ff1eb8e5ce0e1c6b477ee5dc50e2f34b', + 'ac71e9503c2ca95542839af0ce7b64011b72ea7c', + '12669288fd13adba2a9b7dd5b870cc23ffab92d2', + '5a0c84f3e6fe3473e4c8427199d5a6fc71a9b382', + '12f2f5e2b38e6ff3fbdb5d722efed9aa72ecb0d5', + '5eab1222a7cd4bfcbabc218ca6d04276d4e27378', + 'f50f42baeed5af6518ef4b0cb2f1423f3851a941', + 'd7e390a45f6aa96f04f5e7f583ad4f867431aa25', + 'f15c21f97864b4f071cddfbf2750ec2e23859414', + 'e906ef056cf539a4e4e5fc8003eaf7cf14dd8ade', + 'ea2b108b48aa8f8c9c4a941f66c1a03315ca1c3b', + '84dec09632a4458f79f50ddbbd155506c460b4f9', + '0115510b70c7229dbc5dc49036b32e7d91d23acd', + '2a13f185e4525f9d4b59882791a2d397b90d5ddc', + '3bf1c5868e570e39569d094f922d33ced2fa3b2b', + 'b8d04012574729d2c29886e53b1a43ef16dd00a1', + '6970b057cffe4aab0a792aa634c89f4bebf01441', + 'dd80b0f6cf5052f17cc738c2951c4f2070200d7f', + 'ff7ca51e58c505fec0dd2491de52c622bb7a806b', + ], + 'vcs/backends/git.py': [ + '4cf116ad5a457530381135e2f4c453e68a1b0105', + '9a751d84d8e9408e736329767387f41b36935153', + 'cb681fb539c3faaedbcdf5ca71ca413425c18f01', + '428f81bb652bcba8d631bce926e8834ff49bdcc6', + '180ab15aebf26f98f714d8c68715e0f05fa6e1c7', + '2b8e07312a2e89e92b90426ab97f349f4bce2a3a', + '50e08c506174d8645a4bb517dd122ac946a0f3bf', + '54000345d2e78b03a99d561399e8e548de3f3203', + ], + } + for path, revs in files.items(): + node = self.repo.get_changeset(revs[0]).get_node(path) + node_revs = [chset.raw_id for chset in node.history] + self.assertTrue(set(revs).issubset(set(node_revs)), + "We assumed that %s is subset of revisions for which file %s " + "has been changed, and history of that node returned: %s" + % (revs, path, node_revs)) + + def test_file_annotate(self): + files = { + 'vcs/backends/__init__.py': { + 'c1214f7e79e02fc37156ff215cd71275450cffc3': { + 'lines_no': 1, + 'changesets': [ + 'c1214f7e79e02fc37156ff215cd71275450cffc3', + ], + }, + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647': { + 'lines_no': 21, + 'changesets': [ + '49d3fd156b6f7db46313fac355dca1a0b94a0017', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + ], + }, + 'e29b67bd158580fc90fc5e9111240b90e6e86064': { + 'lines_no': 32, + 'changesets': [ + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '5eab1222a7cd4bfcbabc218ca6d04276d4e27378', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '992f38217b979d0b0987d0bae3cc26dac85d9b19', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '54000345d2e78b03a99d561399e8e548de3f3203', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '78c3f0c23b7ee935ec276acb8b8212444c33c396', + '992f38217b979d0b0987d0bae3cc26dac85d9b19', + '992f38217b979d0b0987d0bae3cc26dac85d9b19', + '992f38217b979d0b0987d0bae3cc26dac85d9b19', + '992f38217b979d0b0987d0bae3cc26dac85d9b19', + '2a13f185e4525f9d4b59882791a2d397b90d5ddc', + '992f38217b979d0b0987d0bae3cc26dac85d9b19', + '78c3f0c23b7ee935ec276acb8b8212444c33c396', + '992f38217b979d0b0987d0bae3cc26dac85d9b19', + '992f38217b979d0b0987d0bae3cc26dac85d9b19', + '992f38217b979d0b0987d0bae3cc26dac85d9b19', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '992f38217b979d0b0987d0bae3cc26dac85d9b19', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '992f38217b979d0b0987d0bae3cc26dac85d9b19', + '992f38217b979d0b0987d0bae3cc26dac85d9b19', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647', + ], + }, + }, + } + + for fname, revision_dict in files.items(): + for rev, data in revision_dict.items(): + cs = self.repo.get_changeset(rev) + ann = cs.get_file_annotate(fname) + + l1 = [x[1].raw_id for x in ann] + l2 = files[fname][rev]['changesets'] + self.assertTrue(l1 == l2 , "The lists of revision for %s@rev %s" + "from annotation list should match each other, " + "got \n%s \nvs \n%s " % (fname, rev, l1, l2)) + + def test_files_state(self): + """ + Tests state of FileNodes. + """ + node = self.repo\ + .get_changeset('e6ea6d16e2f26250124a1f4b4fe37a912f9d86a0')\ + .get_node('vcs/utils/diffs.py') + self.assertTrue(node.state, NodeState.ADDED) + self.assertTrue(node.added) + self.assertFalse(node.changed) + self.assertFalse(node.not_changed) + self.assertFalse(node.removed) + + node = self.repo\ + .get_changeset('33fa3223355104431402a888fa77a4e9956feb3e')\ + .get_node('.hgignore') + self.assertTrue(node.state, NodeState.CHANGED) + self.assertFalse(node.added) + self.assertTrue(node.changed) + self.assertFalse(node.not_changed) + self.assertFalse(node.removed) + + node = self.repo\ + .get_changeset('e29b67bd158580fc90fc5e9111240b90e6e86064')\ + .get_node('setup.py') + self.assertTrue(node.state, NodeState.NOT_CHANGED) + self.assertFalse(node.added) + self.assertFalse(node.changed) + self.assertTrue(node.not_changed) + self.assertFalse(node.removed) + + # If node has REMOVED state then trying to fetch it would raise + # ChangesetError exception + chset = self.repo.get_changeset( + 'fa6600f6848800641328adbf7811fd2372c02ab2') + path = 'vcs/backends/BaseRepository.py' + self.assertRaises(NodeDoesNotExistError, chset.get_node, path) + # but it would be one of ``removed`` (changeset's attribute) + self.assertTrue(path in [rf.path for rf in chset.removed]) + + chset = self.repo.get_changeset( + '54386793436c938cff89326944d4c2702340037d') + changed = ['setup.py', 'tests/test_nodes.py', 'vcs/backends/hg.py', + 'vcs/nodes.py'] + self.assertEqual(set(changed), set([f.path for f in chset.changed])) + + def test_commit_message_is_unicode(self): + for cs in self.repo: + self.assertEqual(type(cs.message), unicode) + + def test_changeset_author_is_unicode(self): + for cs in self.repo: + self.assertEqual(type(cs.author), unicode) + + def test_repo_files_content_is_unicode(self): + changeset = self.repo.get_changeset() + for node in changeset.get_node('/'): + if node.is_file(): + self.assertEqual(type(node.content), unicode) + + def test_wrong_path(self): + # There is 'setup.py' in the root dir but not there: + path = 'foo/bar/setup.py' + tip = self.repo.get_changeset() + self.assertRaises(VCSError, tip.get_node, path) + + def test_author_email(self): + self.assertEqual('marcin@python-blog.com', + self.repo.get_changeset('c1214f7e79e02fc37156ff215cd71275450cffc3')\ + .author_email) + self.assertEqual('lukasz.balcerzak@python-center.pl', + self.repo.get_changeset('ff7ca51e58c505fec0dd2491de52c622bb7a806b')\ + .author_email) + self.assertEqual('none@none', + self.repo.get_changeset('8430a588b43b5d6da365400117c89400326e7992')\ + .author_email) + + def test_author_username(self): + self.assertEqual('Marcin Kuzminski', + self.repo.get_changeset('c1214f7e79e02fc37156ff215cd71275450cffc3')\ + .author_name) + self.assertEqual('Lukasz Balcerzak', + self.repo.get_changeset('ff7ca51e58c505fec0dd2491de52c622bb7a806b')\ + .author_name) + self.assertEqual('marcink', + self.repo.get_changeset('8430a588b43b5d6da365400117c89400326e7992')\ + .author_name) + + +class GitSpecificTest(unittest.TestCase): + + def test_error_is_raised_for_added_if_diff_name_status_is_wrong(self): + repo = mock.MagicMock() + changeset = GitChangeset(repo, 'foobar') + changeset._diff_name_status = 'foobar' + with self.assertRaises(VCSError): + changeset.added + + def test_error_is_raised_for_changed_if_diff_name_status_is_wrong(self): + repo = mock.MagicMock() + changeset = GitChangeset(repo, 'foobar') + changeset._diff_name_status = 'foobar' + with self.assertRaises(VCSError): + changeset.added + + def test_error_is_raised_for_removed_if_diff_name_status_is_wrong(self): + repo = mock.MagicMock() + changeset = GitChangeset(repo, 'foobar') + changeset._diff_name_status = 'foobar' + with self.assertRaises(VCSError): + changeset.added + + +class GitSpecificWithRepoTest(BackendTestMixin, unittest.TestCase): + backend_alias = 'git' + + @classmethod + def _get_commits(cls): + return [ + { + 'message': 'Initial', + 'author': 'Joe Doe ', + 'date': datetime.datetime(2010, 1, 1, 20), + 'added': [ + FileNode('foobar/static/js/admin/base.js', content='base'), + FileNode('foobar/static/admin', content='admin', + mode=0120000), # this is a link + FileNode('foo', content='foo'), + ], + }, + { + 'message': 'Second', + 'author': 'Joe Doe ', + 'date': datetime.datetime(2010, 1, 1, 22), + 'added': [ + FileNode('foo2', content='foo2'), + ], + }, + ] + + def test_paths_slow_traversing(self): + cs = self.repo.get_changeset() + self.assertEqual(cs.get_node('foobar').get_node('static').get_node('js') + .get_node('admin').get_node('base.js').content, 'base') + + def test_paths_fast_traversing(self): + cs = self.repo.get_changeset() + self.assertEqual(cs.get_node('foobar/static/js/admin/base.js').content, + 'base') + + def test_workdir_get_branch(self): + self.repo.run_git_command('checkout -b production') + # Regression test: one of following would fail if we don't check + # .git/HEAD file + self.repo.run_git_command('checkout production') + self.assertEqual(self.repo.workdir.get_branch(), 'production') + self.repo.run_git_command('checkout master') + self.assertEqual(self.repo.workdir.get_branch(), 'master') + + def test_get_diff_runs_git_command_with_hashes(self): + self.repo.run_git_command = mock.Mock(return_value=['', '']) + self.repo.get_diff(0, 1) + self.repo.run_git_command.assert_called_once_with('diff -U%s %s %s' % + (3, self.repo._get_revision(0), self.repo._get_revision(1))) + + def test_get_diff_runs_git_command_with_str_hashes(self): + self.repo.run_git_command = mock.Mock(return_value=['', '']) + self.repo.get_diff(self.repo.EMPTY_CHANGESET, 1) + self.repo.run_git_command.assert_called_once_with('show -U%s %s' % + (3, self.repo._get_revision(1))) + + def test_get_diff_runs_git_command_with_path_if_its_given(self): + self.repo.run_git_command = mock.Mock(return_value=['', '']) + self.repo.get_diff(0, 1, 'foo') + self.repo.run_git_command.assert_called_once_with('diff -U%s %s %s -- "foo"' + % (3, self.repo._get_revision(0), self.repo._get_revision(1))) + + +class GitRegressionTest(BackendTestMixin, unittest.TestCase): + backend_alias = 'git' + + @classmethod + def _get_commits(cls): + return [ + { + 'message': 'Initial', + 'author': 'Joe Doe ', + 'date': datetime.datetime(2010, 1, 1, 20), + 'added': [ + FileNode('bot/__init__.py', content='base'), + FileNode('bot/templates/404.html', content='base'), + FileNode('bot/templates/500.html', content='base'), + ], + }, + { + 'message': 'Second', + 'author': 'Joe Doe ', + 'date': datetime.datetime(2010, 1, 1, 22), + 'added': [ + FileNode('bot/build/migrations/1.py', content='foo2'), + FileNode('bot/build/migrations/2.py', content='foo2'), + FileNode('bot/build/static/templates/f.html', content='foo2'), + FileNode('bot/build/static/templates/f1.html', content='foo2'), + FileNode('bot/build/templates/err.html', content='foo2'), + FileNode('bot/build/templates/err2.html', content='foo2'), + ], + }, + ] + + def test_similar_paths(self): + cs = self.repo.get_changeset() + paths = lambda *n:[x.path for x in n] + self.assertEqual(paths(*cs.get_nodes('bot')), ['bot/build', 'bot/templates', 'bot/__init__.py']) + self.assertEqual(paths(*cs.get_nodes('bot/build')), ['bot/build/migrations', 'bot/build/static', 'bot/build/templates']) + self.assertEqual(paths(*cs.get_nodes('bot/build/static')), ['bot/build/static/templates']) + # this get_nodes below causes troubles ! + self.assertEqual(paths(*cs.get_nodes('bot/build/static/templates')), ['bot/build/static/templates/f.html', 'bot/build/static/templates/f1.html']) + self.assertEqual(paths(*cs.get_nodes('bot/build/templates')), ['bot/build/templates/err.html', 'bot/build/templates/err2.html']) + self.assertEqual(paths(*cs.get_nodes('bot/templates/')), ['bot/templates/404.html', 'bot/templates/500.html']) + +if __name__ == '__main__': + unittest.main() diff --git a/rhodecode/tests/vcs/test_hg.py b/rhodecode/tests/vcs/test_hg.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/test_hg.py @@ -0,0 +1,561 @@ +from __future__ import with_statement + +import os +from rhodecode.lib.vcs.backends.hg import MercurialRepository, MercurialChangeset +from rhodecode.lib.vcs.exceptions import RepositoryError, VCSError, NodeDoesNotExistError +from rhodecode.lib.vcs.nodes import NodeKind, NodeState +from conf import PACKAGE_DIR, TEST_HG_REPO, TEST_HG_REPO_CLONE, \ + TEST_HG_REPO_PULL +from rhodecode.lib.vcs.utils.compat import unittest + + +# Use only clean mercurial's ui +import mercurial.scmutil +mercurial.scmutil.rcpath() +if mercurial.scmutil._rcpath: + mercurial.scmutil._rcpath = mercurial.scmutil._rcpath[:1] + + +class MercurialRepositoryTest(unittest.TestCase): + + def __check_for_existing_repo(self): + if os.path.exists(TEST_HG_REPO_CLONE): + self.fail('Cannot test mercurial clone repo as location %s already ' + 'exists. You should manually remove it first.' + % TEST_HG_REPO_CLONE) + + def setUp(self): + self.repo = MercurialRepository(TEST_HG_REPO) + + def test_wrong_repo_path(self): + wrong_repo_path = '/tmp/errorrepo' + self.assertRaises(RepositoryError, MercurialRepository, wrong_repo_path) + + def test_unicode_path_repo(self): + self.assertRaises(VCSError,lambda:MercurialRepository(u'iShouldFail')) + + def test_repo_clone(self): + self.__check_for_existing_repo() + repo = MercurialRepository(TEST_HG_REPO) + repo_clone = MercurialRepository(TEST_HG_REPO_CLONE, + src_url=TEST_HG_REPO, update_after_clone=True) + self.assertEqual(len(repo.revisions), len(repo_clone.revisions)) + # Checking hashes of changesets should be enough + for changeset in repo.get_changesets(): + raw_id = changeset.raw_id + self.assertEqual(raw_id, repo_clone.get_changeset(raw_id).raw_id) + + def test_repo_clone_with_update(self): + repo = MercurialRepository(TEST_HG_REPO) + repo_clone = MercurialRepository(TEST_HG_REPO_CLONE + '_w_update', + src_url=TEST_HG_REPO, update_after_clone=True) + self.assertEqual(len(repo.revisions), len(repo_clone.revisions)) + + #check if current workdir was updated + self.assertEqual(os.path.isfile(os.path.join(TEST_HG_REPO_CLONE \ + + '_w_update', + 'MANIFEST.in')), True,) + + def test_repo_clone_without_update(self): + repo = MercurialRepository(TEST_HG_REPO) + repo_clone = MercurialRepository(TEST_HG_REPO_CLONE + '_wo_update', + src_url=TEST_HG_REPO, update_after_clone=False) + self.assertEqual(len(repo.revisions), len(repo_clone.revisions)) + self.assertEqual(os.path.isfile(os.path.join(TEST_HG_REPO_CLONE \ + + '_wo_update', + 'MANIFEST.in')), False,) + + def test_pull(self): + if os.path.exists(TEST_HG_REPO_PULL): + self.fail('Cannot test mercurial pull command as location %s ' + 'already exists. You should manually remove it first' + % TEST_HG_REPO_PULL) + repo_new = MercurialRepository(TEST_HG_REPO_PULL, create=True) + self.assertTrue(len(self.repo.revisions) > len(repo_new.revisions)) + + repo_new.pull(self.repo.path) + repo_new = MercurialRepository(TEST_HG_REPO_PULL) + self.assertTrue(len(self.repo.revisions) == len(repo_new.revisions)) + + def test_revisions(self): + # there are 21 revisions at bitbucket now + # so we can assume they would be available from now on + subset = set(['b986218ba1c9b0d6a259fac9b050b1724ed8e545', + '3d8f361e72ab303da48d799ff1ac40d5ac37c67e', + '6cba7170863a2411822803fa77a0a264f1310b35', + '56349e29c2af3ac913b28bde9a2c6154436e615b', + '2dda4e345facb0ccff1a191052dd1606dba6781d', + '6fff84722075f1607a30f436523403845f84cd9e', + '7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7', + '3803844fdbd3b711175fc3da9bdacfcd6d29a6fb', + 'dc5d2c0661b61928834a785d3e64a3f80d3aad9c', + 'be90031137367893f1c406e0a8683010fd115b79', + 'db8e58be770518cbb2b1cdfa69146e47cd481481', + '84478366594b424af694a6c784cb991a16b87c21', + '17f8e105dddb9f339600389c6dc7175d395a535c', + '20a662e756499bde3095ffc9bc0643d1def2d0eb', + '2e319b85e70a707bba0beff866d9f9de032aa4f9', + '786facd2c61deb9cf91e9534735124fb8fc11842', + '94593d2128d38210a2fcd1aabff6dda0d6d9edf8', + 'aa6a0de05b7612707db567078e130a6cd114a9a7', + 'eada5a770da98ab0dd7325e29d00e0714f228d09' + ]) + self.assertTrue(subset.issubset(set(self.repo.revisions))) + + + # check if we have the proper order of revisions + org = ['b986218ba1c9b0d6a259fac9b050b1724ed8e545', + '3d8f361e72ab303da48d799ff1ac40d5ac37c67e', + '6cba7170863a2411822803fa77a0a264f1310b35', + '56349e29c2af3ac913b28bde9a2c6154436e615b', + '2dda4e345facb0ccff1a191052dd1606dba6781d', + '6fff84722075f1607a30f436523403845f84cd9e', + '7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7', + '3803844fdbd3b711175fc3da9bdacfcd6d29a6fb', + 'dc5d2c0661b61928834a785d3e64a3f80d3aad9c', + 'be90031137367893f1c406e0a8683010fd115b79', + 'db8e58be770518cbb2b1cdfa69146e47cd481481', + '84478366594b424af694a6c784cb991a16b87c21', + '17f8e105dddb9f339600389c6dc7175d395a535c', + '20a662e756499bde3095ffc9bc0643d1def2d0eb', + '2e319b85e70a707bba0beff866d9f9de032aa4f9', + '786facd2c61deb9cf91e9534735124fb8fc11842', + '94593d2128d38210a2fcd1aabff6dda0d6d9edf8', + 'aa6a0de05b7612707db567078e130a6cd114a9a7', + 'eada5a770da98ab0dd7325e29d00e0714f228d09', + '2c1885c735575ca478bf9e17b0029dca68824458', + 'd9bcd465040bf869799b09ad732c04e0eea99fe9', + '469e9c847fe1f6f7a697b8b25b4bc5b48780c1a7', + '4fb8326d78e5120da2c7468dcf7098997be385da', + '62b4a097164940bd66030c4db51687f3ec035eed', + '536c1a19428381cfea92ac44985304f6a8049569', + '965e8ab3c44b070cdaa5bf727ddef0ada980ecc4', + '9bb326a04ae5d98d437dece54be04f830cf1edd9', + 'f8940bcb890a98c4702319fbe36db75ea309b475', + 'ff5ab059786ebc7411e559a2cc309dfae3625a3b', + '6b6ad5f82ad5bb6190037671bd254bd4e1f4bf08', + 'ee87846a61c12153b51543bf860e1026c6d3dcba', ] + self.assertEqual(org, self.repo.revisions[:31]) + + def test_iter_slice(self): + sliced = list(self.repo[:10]) + itered = list(self.repo)[:10] + self.assertEqual(sliced, itered) + + def test_slicing(self): + #4 1 5 10 95 + for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5), + (10, 20, 10), (5, 100, 95)]: + revs = list(self.repo[sfrom:sto]) + self.assertEqual(len(revs), size) + self.assertEqual(revs[0], self.repo.get_changeset(sfrom)) + self.assertEqual(revs[-1], self.repo.get_changeset(sto - 1)) + + def test_branches(self): + # TODO: Need more tests here + + #active branches + self.assertTrue('default' in self.repo.branches) + + #closed branches + self.assertFalse('web' in self.repo.branches) + self.assertFalse('git' in self.repo.branches) + + # closed + self.assertTrue('workdir' in self.repo._get_branches(closed=True)) + self.assertTrue('webvcs' in self.repo._get_branches(closed=True)) + + for name, id in self.repo.branches.items(): + self.assertTrue(isinstance( + self.repo.get_changeset(id), MercurialChangeset)) + + def test_tip_in_tags(self): + # tip is always a tag + self.assertIn('tip', self.repo.tags) + + def test_tip_changeset_in_tags(self): + tip = self.repo.get_changeset() + self.assertEqual(self.repo.tags['tip'], tip.raw_id) + + def test_initial_changeset(self): + + init_chset = self.repo.get_changeset(0) + self.assertEqual(init_chset.message, 'initial import') + self.assertEqual(init_chset.author, + 'Marcin Kuzminski ') + self.assertEqual(sorted(init_chset._file_paths), + sorted([ + 'vcs/__init__.py', + 'vcs/backends/BaseRepository.py', + 'vcs/backends/__init__.py', + ]) + ) + self.assertEqual(sorted(init_chset._dir_paths), + sorted(['', 'vcs', 'vcs/backends'])) + + self.assertRaises(NodeDoesNotExistError, init_chset.get_node, path='foobar') + + node = init_chset.get_node('vcs/') + self.assertTrue(hasattr(node, 'kind')) + self.assertEqual(node.kind, NodeKind.DIR) + + node = init_chset.get_node('vcs') + self.assertTrue(hasattr(node, 'kind')) + self.assertEqual(node.kind, NodeKind.DIR) + + node = init_chset.get_node('vcs/__init__.py') + self.assertTrue(hasattr(node, 'kind')) + self.assertEqual(node.kind, NodeKind.FILE) + + def test_not_existing_changeset(self): + #rawid + self.assertRaises(RepositoryError, self.repo.get_changeset, + 'abcd' * 10) + #shortid + self.assertRaises(RepositoryError, self.repo.get_changeset, + 'erro' * 4) + #numeric + self.assertRaises(RepositoryError, self.repo.get_changeset, + self.repo.count() + 1) + + + # Small chance we ever get to this one + revision = pow(2, 30) + self.assertRaises(RepositoryError, self.repo.get_changeset, revision) + + def test_changeset10(self): + + chset10 = self.repo.get_changeset(10) + README = """=== +VCS +=== + +Various Version Control System management abstraction layer for Python. + +Introduction +------------ + +TODO: To be written... + +""" + node = chset10.get_node('README.rst') + self.assertEqual(node.kind, NodeKind.FILE) + self.assertEqual(node.content, README) + + +class MercurialChangesetTest(unittest.TestCase): + + def setUp(self): + self.repo = MercurialRepository(TEST_HG_REPO) + + def _test_equality(self, changeset): + revision = changeset.revision + self.assertEqual(changeset, self.repo.get_changeset(revision)) + + def test_equality(self): + self.setUp() + revs = [0, 10, 20] + changesets = [self.repo.get_changeset(rev) for rev in revs] + for changeset in changesets: + self._test_equality(changeset) + + def test_default_changeset(self): + tip = self.repo.get_changeset('tip') + self.assertEqual(tip, self.repo.get_changeset()) + self.assertEqual(tip, self.repo.get_changeset(revision=None)) + self.assertEqual(tip, list(self.repo[-1:])[0]) + + def test_root_node(self): + tip = self.repo.get_changeset('tip') + self.assertTrue(tip.root is tip.get_node('')) + + def test_lazy_fetch(self): + """ + Test if changeset's nodes expands and are cached as we walk through + the revision. This test is somewhat hard to write as order of tests + is a key here. Written by running command after command in a shell. + """ + self.setUp() + chset = self.repo.get_changeset(45) + self.assertTrue(len(chset.nodes) == 0) + root = chset.root + self.assertTrue(len(chset.nodes) == 1) + self.assertTrue(len(root.nodes) == 8) + # accessing root.nodes updates chset.nodes + self.assertTrue(len(chset.nodes) == 9) + + docs = root.get_node('docs') + # we haven't yet accessed anything new as docs dir was already cached + self.assertTrue(len(chset.nodes) == 9) + self.assertTrue(len(docs.nodes) == 8) + # accessing docs.nodes updates chset.nodes + self.assertTrue(len(chset.nodes) == 17) + + self.assertTrue(docs is chset.get_node('docs')) + self.assertTrue(docs is root.nodes[0]) + self.assertTrue(docs is root.dirs[0]) + self.assertTrue(docs is chset.get_node('docs')) + + def test_nodes_with_changeset(self): + self.setUp() + chset = self.repo.get_changeset(45) + root = chset.root + docs = root.get_node('docs') + self.assertTrue(docs is chset.get_node('docs')) + api = docs.get_node('api') + self.assertTrue(api is chset.get_node('docs/api')) + index = api.get_node('index.rst') + self.assertTrue(index is chset.get_node('docs/api/index.rst')) + self.assertTrue(index is chset.get_node('docs')\ + .get_node('api')\ + .get_node('index.rst')) + + def test_branch_and_tags(self): + chset0 = self.repo.get_changeset(0) + self.assertEqual(chset0.branch, 'default') + self.assertEqual(chset0.tags, []) + + chset10 = self.repo.get_changeset(10) + self.assertEqual(chset10.branch, 'default') + self.assertEqual(chset10.tags, []) + + chset44 = self.repo.get_changeset(44) + self.assertEqual(chset44.branch, 'web') + + tip = self.repo.get_changeset('tip') + self.assertTrue('tip' in tip.tags) + + def _test_file_size(self, revision, path, size): + node = self.repo.get_changeset(revision).get_node(path) + self.assertTrue(node.is_file()) + self.assertEqual(node.size, size) + + def test_file_size(self): + to_check = ( + (10, 'setup.py', 1068), + (20, 'setup.py', 1106), + (60, 'setup.py', 1074), + + (10, 'vcs/backends/base.py', 2921), + (20, 'vcs/backends/base.py', 3936), + (60, 'vcs/backends/base.py', 6189), + ) + for revision, path, size in to_check: + self._test_file_size(revision, path, size) + + def test_file_history(self): + # we can only check if those revisions are present in the history + # as we cannot update this test every time file is changed + files = { + 'setup.py': [7, 18, 45, 46, 47, 69, 77], + 'vcs/nodes.py': [7, 8, 24, 26, 30, 45, 47, 49, 56, 57, 58, 59, 60, + 61, 73, 76], + 'vcs/backends/hg.py': [4, 5, 6, 11, 12, 13, 14, 15, 16, 21, 22, 23, + 26, 27, 28, 30, 31, 33, 35, 36, 37, 38, 39, 40, 41, 44, 45, 47, + 48, 49, 53, 54, 55, 58, 60, 61, 67, 68, 69, 70, 73, 77, 78, 79, + 82], + } + for path, revs in files.items(): + tip = self.repo.get_changeset(revs[-1]) + node = tip.get_node(path) + node_revs = [chset.revision for chset in node.history] + self.assertTrue(set(revs).issubset(set(node_revs)), + "We assumed that %s is subset of revisions for which file %s " + "has been changed, and history of that node returned: %s" + % (revs, path, node_revs)) + + def test_file_annotate(self): + files = { + 'vcs/backends/__init__.py': + {89: {'lines_no': 31, + 'changesets': [32, 32, 61, 32, 32, 37, 32, 32, 32, 44, + 37, 37, 37, 37, 45, 37, 44, 37, 37, 37, + 32, 32, 32, 32, 37, 32, 37, 37, 32, + 32, 32]}, + 20: {'lines_no': 1, + 'changesets': [4]}, + 55: {'lines_no': 31, + 'changesets': [32, 32, 45, 32, 32, 37, 32, 32, 32, 44, + 37, 37, 37, 37, 45, 37, 44, 37, 37, 37, + 32, 32, 32, 32, 37, 32, 37, 37, 32, + 32, 32]}}, + 'vcs/exceptions.py': + {89: {'lines_no': 18, + 'changesets': [16, 16, 16, 16, 16, 16, 16, 16, 16, 16, + 16, 16, 17, 16, 16, 18, 18, 18]}, + 20: {'lines_no': 18, + 'changesets': [16, 16, 16, 16, 16, 16, 16, 16, 16, 16, + 16, 16, 17, 16, 16, 18, 18, 18]}, + 55: {'lines_no': 18, 'changesets': [16, 16, 16, 16, 16, 16, + 16, 16, 16, 16, 16, 16, + 17, 16, 16, 18, 18, 18]}}, + 'MANIFEST.in': {89: {'lines_no': 5, + 'changesets': [7, 7, 7, 71, 71]}, + 20: {'lines_no': 3, + 'changesets': [7, 7, 7]}, + 55: {'lines_no': 3, + 'changesets': [7, 7, 7]}}} + + + for fname, revision_dict in files.items(): + for rev, data in revision_dict.items(): + cs = self.repo.get_changeset(rev) + ann = cs.get_file_annotate(fname) + + l1 = [x[1].revision for x in ann] + l2 = files[fname][rev]['changesets'] + self.assertTrue(l1 == l2 , "The lists of revision for %s@rev%s" + "from annotation list should match each other," + "got \n%s \nvs \n%s " % (fname, rev, l1, l2)) + + def test_changeset_state(self): + """ + Tests which files have been added/changed/removed at particular revision + """ + + # rev 46ad32a4f974: + # hg st --rev 46ad32a4f974 + # changed: 13 + # added: 20 + # removed: 1 + changed = set(['.hgignore' + , 'README.rst' , 'docs/conf.py' , 'docs/index.rst' , 'setup.py' + , 'tests/test_hg.py' , 'tests/test_nodes.py' , 'vcs/__init__.py' + , 'vcs/backends/__init__.py' , 'vcs/backends/base.py' + , 'vcs/backends/hg.py' , 'vcs/nodes.py' , 'vcs/utils/__init__.py']) + + added = set(['docs/api/backends/hg.rst' + , 'docs/api/backends/index.rst' , 'docs/api/index.rst' + , 'docs/api/nodes.rst' , 'docs/api/web/index.rst' + , 'docs/api/web/simplevcs.rst' , 'docs/installation.rst' + , 'docs/quickstart.rst' , 'setup.cfg' , 'vcs/utils/baseui_config.py' + , 'vcs/utils/web.py' , 'vcs/web/__init__.py' , 'vcs/web/exceptions.py' + , 'vcs/web/simplevcs/__init__.py' , 'vcs/web/simplevcs/exceptions.py' + , 'vcs/web/simplevcs/middleware.py' , 'vcs/web/simplevcs/models.py' + , 'vcs/web/simplevcs/settings.py' , 'vcs/web/simplevcs/utils.py' + , 'vcs/web/simplevcs/views.py']) + + removed = set(['docs/api.rst']) + + chset64 = self.repo.get_changeset('46ad32a4f974') + self.assertEqual(set((node.path for node in chset64.added)), added) + self.assertEqual(set((node.path for node in chset64.changed)), changed) + self.assertEqual(set((node.path for node in chset64.removed)), removed) + + # rev b090f22d27d6: + # hg st --rev b090f22d27d6 + # changed: 13 + # added: 20 + # removed: 1 + chset88 = self.repo.get_changeset('b090f22d27d6') + self.assertEqual(set((node.path for node in chset88.added)), set()) + self.assertEqual(set((node.path for node in chset88.changed)), + set(['.hgignore'])) + self.assertEqual(set((node.path for node in chset88.removed)), set()) +# + # 85: + # added: 2 ['vcs/utils/diffs.py', 'vcs/web/simplevcs/views/diffs.py'] + # changed: 4 ['vcs/web/simplevcs/models.py', ...] + # removed: 1 ['vcs/utils/web.py'] + chset85 = self.repo.get_changeset(85) + self.assertEqual(set((node.path for node in chset85.added)), set([ + 'vcs/utils/diffs.py', + 'vcs/web/simplevcs/views/diffs.py'])) + self.assertEqual(set((node.path for node in chset85.changed)), set([ + 'vcs/web/simplevcs/models.py', + 'vcs/web/simplevcs/utils.py', + 'vcs/web/simplevcs/views/__init__.py', + 'vcs/web/simplevcs/views/repository.py', + ])) + self.assertEqual(set((node.path for node in chset85.removed)), + set(['vcs/utils/web.py'])) + + + def test_files_state(self): + """ + Tests state of FileNodes. + """ + chset = self.repo.get_changeset(85) + node = chset.get_node('vcs/utils/diffs.py') + self.assertTrue(node.state, NodeState.ADDED) + self.assertTrue(node.added) + self.assertFalse(node.changed) + self.assertFalse(node.not_changed) + self.assertFalse(node.removed) + + chset = self.repo.get_changeset(88) + node = chset.get_node('.hgignore') + self.assertTrue(node.state, NodeState.CHANGED) + self.assertFalse(node.added) + self.assertTrue(node.changed) + self.assertFalse(node.not_changed) + self.assertFalse(node.removed) + + chset = self.repo.get_changeset(85) + node = chset.get_node('setup.py') + self.assertTrue(node.state, NodeState.NOT_CHANGED) + self.assertFalse(node.added) + self.assertFalse(node.changed) + self.assertTrue(node.not_changed) + self.assertFalse(node.removed) + + # If node has REMOVED state then trying to fetch it would raise + # ChangesetError exception + chset = self.repo.get_changeset(2) + path = 'vcs/backends/BaseRepository.py' + self.assertRaises(NodeDoesNotExistError, chset.get_node, path) + # but it would be one of ``removed`` (changeset's attribute) + self.assertTrue(path in [rf.path for rf in chset.removed]) + + def test_commit_message_is_unicode(self): + for cm in self.repo: + self.assertEqual(type(cm.message), unicode) + + def test_changeset_author_is_unicode(self): + for cm in self.repo: + self.assertEqual(type(cm.author), unicode) + + def test_repo_files_content_is_unicode(self): + test_changeset = self.repo.get_changeset(100) + for node in test_changeset.get_node('/'): + if node.is_file(): + self.assertEqual(type(node.content), unicode) + + def test_wrong_path(self): + # There is 'setup.py' in the root dir but not there: + path = 'foo/bar/setup.py' + self.assertRaises(VCSError, self.repo.get_changeset().get_node, path) + + + def test_archival_file(self): + #TODO: + pass + + def test_archival_as_generator(self): + #TODO: + pass + + def test_archival_wrong_kind(self): + tip = self.repo.get_changeset() + self.assertRaises(VCSError, tip.fill_archive, kind='error') + + def test_archival_empty_prefix(self): + #TODO: + pass + + + def test_author_email(self): + self.assertEqual('marcin@python-blog.com', + self.repo.get_changeset('b986218ba1c9').author_email) + self.assertEqual('lukasz.balcerzak@python-center.pl', + self.repo.get_changeset('3803844fdbd3').author_email) + self.assertEqual('', + self.repo.get_changeset('84478366594b').author_email) + + def test_author_username(self): + self.assertEqual('Marcin Kuzminski', + self.repo.get_changeset('b986218ba1c9').author_name) + self.assertEqual('Lukasz Balcerzak', + self.repo.get_changeset('3803844fdbd3').author_name) + self.assertEqual('marcink', + self.repo.get_changeset('84478366594b').author_name) diff --git a/rhodecode/tests/vcs/test_inmemchangesets.py b/rhodecode/tests/vcs/test_inmemchangesets.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/test_inmemchangesets.py @@ -0,0 +1,340 @@ +""" +Tests so called "in memory changesets" commit API of vcs. +""" +from __future__ import with_statement + +from rhodecode.lib import vcs +import time +import datetime +from conf import SCM_TESTS, get_new_dir +from rhodecode.lib.vcs.exceptions import EmptyRepositoryError +from rhodecode.lib.vcs.exceptions import NodeAlreadyAddedError +from rhodecode.lib.vcs.exceptions import NodeAlreadyExistsError +from rhodecode.lib.vcs.exceptions import NodeAlreadyRemovedError +from rhodecode.lib.vcs.exceptions import NodeAlreadyChangedError +from rhodecode.lib.vcs.exceptions import NodeDoesNotExistError +from rhodecode.lib.vcs.exceptions import NodeNotChangedError +from rhodecode.lib.vcs.nodes import DirNode +from rhodecode.lib.vcs.nodes import FileNode +from rhodecode.lib.vcs.utils.compat import unittest + + +class InMemoryChangesetTestMixin(object): + """ + This is a backend independent test case class which should be created + with ``type`` method. + + It is required to set following attributes at subclass: + + - ``backend_alias``: alias of used backend (see ``vcs.BACKENDS``) + - ``repo_path``: path to the repository which would be created for set of + tests + """ + + def get_backend(self): + return vcs.get_backend(self.backend_alias) + + def setUp(self): + Backend = self.get_backend() + self.repo_path = get_new_dir(str(time.time())) + self.repo = Backend(self.repo_path, create=True) + self.imc = self.repo.in_memory_changeset + self.nodes = [ + FileNode('foobar', content='Foo & bar'), + FileNode('foobar2', content='Foo & bar, doubled!'), + FileNode('foo bar with spaces', content=''), + FileNode('foo/bar/baz', content='Inside'), + ] + + def test_add(self): + rev_count = len(self.repo.revisions) + to_add = [FileNode(node.path, content=node.content) + for node in self.nodes] + for node in to_add: + self.imc.add(node) + message = u'Added: %s' % ', '.join((node.path for node in self.nodes)) + author = unicode(self.__class__) + changeset = self.imc.commit(message=message, author=author) + + newtip = self.repo.get_changeset() + self.assertEqual(changeset, newtip) + self.assertEqual(rev_count + 1, len(self.repo.revisions)) + self.assertEqual(newtip.message, message) + self.assertEqual(newtip.author, author) + self.assertTrue(not any((self.imc.added, self.imc.changed, + self.imc.removed))) + for node in to_add: + self.assertEqual(newtip.get_node(node.path).content, node.content) + + def test_add_in_bulk(self): + rev_count = len(self.repo.revisions) + to_add = [FileNode(node.path, content=node.content) + for node in self.nodes] + self.imc.add(*to_add) + message = u'Added: %s' % ', '.join((node.path for node in self.nodes)) + author = unicode(self.__class__) + changeset = self.imc.commit(message=message, author=author) + + newtip = self.repo.get_changeset() + self.assertEqual(changeset, newtip) + self.assertEqual(rev_count + 1, len(self.repo.revisions)) + self.assertEqual(newtip.message, message) + self.assertEqual(newtip.author, author) + self.assertTrue(not any((self.imc.added, self.imc.changed, + self.imc.removed))) + for node in to_add: + self.assertEqual(newtip.get_node(node.path).content, node.content) + + def test_add_actually_adds_all_nodes_at_second_commit_too(self): + self.imc.add(FileNode('foo/bar/image.png', content='\0')) + self.imc.add(FileNode('foo/README.txt', content='readme!')) + changeset = self.imc.commit(u'Initial', u'joe.doe@example.com') + self.assertTrue(isinstance(changeset.get_node('foo'), DirNode)) + self.assertTrue(isinstance(changeset.get_node('foo/bar'), DirNode)) + self.assertEqual(changeset.get_node('foo/bar/image.png').content, '\0') + self.assertEqual(changeset.get_node('foo/README.txt').content, 'readme!') + + # commit some more files again + to_add = [ + FileNode('foo/bar/foobaz/bar', content='foo'), + FileNode('foo/bar/another/bar', content='foo'), + FileNode('foo/baz.txt', content='foo'), + FileNode('foobar/foobaz/file', content='foo'), + FileNode('foobar/barbaz', content='foo'), + ] + self.imc.add(*to_add) + changeset = self.imc.commit(u'Another', u'joe.doe@example.com') + self.assertEqual(changeset.get_node('foo/bar/foobaz/bar').content, 'foo') + self.assertEqual(changeset.get_node('foo/bar/another/bar').content, 'foo') + self.assertEqual(changeset.get_node('foo/baz.txt').content, 'foo') + self.assertEqual(changeset.get_node('foobar/foobaz/file').content, 'foo') + self.assertEqual(changeset.get_node('foobar/barbaz').content, 'foo') + + def test_add_raise_already_added(self): + node = FileNode('foobar', content='baz') + self.imc.add(node) + self.assertRaises(NodeAlreadyAddedError, self.imc.add, node) + + def test_check_integrity_raise_already_exist(self): + node = FileNode('foobar', content='baz') + self.imc.add(node) + self.imc.commit(message=u'Added foobar', author=unicode(self)) + self.imc.add(node) + self.assertRaises(NodeAlreadyExistsError, self.imc.commit, + message='new message', + author=str(self)) + + def test_change(self): + self.imc.add(FileNode('foo/bar/baz', content='foo')) + self.imc.add(FileNode('foo/fbar', content='foobar')) + tip = self.imc.commit(u'Initial', u'joe.doe@example.com') + + # Change node's content + node = FileNode('foo/bar/baz', content='My **changed** content') + self.imc.change(node) + self.imc.commit(u'Changed %s' % node.path, u'joe.doe@example.com') + + newtip = self.repo.get_changeset() + self.assertNotEqual(tip, newtip) + self.assertNotEqual(tip.id, newtip.id) + self.assertEqual(newtip.get_node('foo/bar/baz').content, + 'My **changed** content') + + def test_change_raise_empty_repository(self): + node = FileNode('foobar') + self.assertRaises(EmptyRepositoryError, self.imc.change, node) + + def test_check_integrity_change_raise_node_does_not_exist(self): + node = FileNode('foobar', content='baz') + self.imc.add(node) + self.imc.commit(message=u'Added foobar', author=unicode(self)) + node = FileNode('not-foobar', content='') + self.imc.change(node) + self.assertRaises(NodeDoesNotExistError, self.imc.commit, + message='Changed not existing node', + author=str(self)) + + def test_change_raise_node_already_changed(self): + node = FileNode('foobar', content='baz') + self.imc.add(node) + self.imc.commit(message=u'Added foobar', author=unicode(self)) + node = FileNode('foobar', content='more baz') + self.imc.change(node) + self.assertRaises(NodeAlreadyChangedError, self.imc.change, node) + + def test_check_integrity_change_raise_node_not_changed(self): + self.test_add() # Performs first commit + + node = FileNode(self.nodes[0].path, content=self.nodes[0].content) + self.imc.change(node) + self.assertRaises(NodeNotChangedError, self.imc.commit, + message=u'Trying to mark node as changed without touching it', + author=unicode(self)) + + def test_change_raise_node_already_removed(self): + node = FileNode('foobar', content='baz') + self.imc.add(node) + self.imc.commit(message=u'Added foobar', author=unicode(self)) + self.imc.remove(FileNode('foobar')) + self.assertRaises(NodeAlreadyRemovedError, self.imc.change, node) + + def test_remove(self): + self.test_add() # Performs first commit + + tip = self.repo.get_changeset() + node = self.nodes[0] + self.assertEqual(node.content, tip.get_node(node.path).content) + self.imc.remove(node) + self.imc.commit(message=u'Removed %s' % node.path, author=unicode(self)) + + newtip = self.repo.get_changeset() + self.assertNotEqual(tip, newtip) + self.assertNotEqual(tip.id, newtip.id) + self.assertRaises(NodeDoesNotExistError, newtip.get_node, node.path) + + def test_remove_last_file_from_directory(self): + node = FileNode('omg/qwe/foo/bar', content='foobar') + self.imc.add(node) + self.imc.commit(u'added', u'joe doe') + + self.imc.remove(node) + tip = self.imc.commit(u'removed', u'joe doe') + self.assertRaises(NodeDoesNotExistError, tip.get_node, 'omg/qwe/foo/bar') + + def test_remove_raise_node_does_not_exist(self): + self.imc.remove(self.nodes[0]) + self.assertRaises(NodeDoesNotExistError, self.imc.commit, + message='Trying to remove node at empty repository', + author=str(self)) + + def test_check_integrity_remove_raise_node_does_not_exist(self): + self.test_add() # Performs first commit + + node = FileNode('no-such-file') + self.imc.remove(node) + self.assertRaises(NodeDoesNotExistError, self.imc.commit, + message=u'Trying to remove not existing node', + author=unicode(self)) + + def test_remove_raise_node_already_removed(self): + self.test_add() # Performs first commit + + node = FileNode(self.nodes[0].path) + self.imc.remove(node) + self.assertRaises(NodeAlreadyRemovedError, self.imc.remove, node) + + def test_remove_raise_node_already_changed(self): + self.test_add() # Performs first commit + + node = FileNode(self.nodes[0].path, content='Bending time') + self.imc.change(node) + self.assertRaises(NodeAlreadyChangedError, self.imc.remove, node) + + def test_reset(self): + self.imc.add(FileNode('foo', content='bar')) + #self.imc.change(FileNode('baz', content='new')) + #self.imc.remove(FileNode('qwe')) + self.imc.reset() + self.assertTrue(not any((self.imc.added, self.imc.changed, + self.imc.removed))) + + def test_multiple_commits(self): + N = 3 # number of commits to perform + last = None + for x in xrange(N): + fname = 'file%s' % str(x).rjust(5, '0') + content = 'foobar\n' * x + node = FileNode(fname, content=content) + self.imc.add(node) + commit = self.imc.commit(u"Commit no. %s" % (x + 1), author=u'vcs') + self.assertTrue(last != commit) + last = commit + + # Check commit number for same repo + self.assertEqual(len(self.repo.revisions), N) + + # Check commit number for recreated repo + backend = self.get_backend() + repo = backend(self.repo_path) + self.assertEqual(len(repo.revisions), N) + + def test_date_attr(self): + node = FileNode('foobar.txt', content='Foobared!') + self.imc.add(node) + date = datetime.datetime(1985, 1, 30, 1, 45) + commit = self.imc.commit(u"Committed at time when I was born ;-)", + author=u'lb', date=date) + + self.assertEqual(commit.date, date) + + +class BackendBaseTestCase(unittest.TestCase): + """ + Base test class for tests which requires repository. + """ + backend_alias = 'hg' + commits = [ + { + 'message': 'Initial commit', + 'author': 'Joe Doe ', + 'date': datetime.datetime(2010, 1, 1, 20), + 'added': [ + FileNode('foobar', content='Foobar'), + FileNode('foobar2', content='Foobar II'), + FileNode('foo/bar/baz', content='baz here!'), + ], + }, + ] + + def get_backend(self): + return vcs.get_backend(self.backend_alias) + + def get_commits(self): + """ + Returns list of commits which builds repository for each tests. + """ + if hasattr(self, 'commits'): + return self.commits + + def get_new_repo_path(self): + """ + Returns newly created repository's directory. + """ + backend = self.get_backend() + key = '%s-%s' % (backend.alias, str(time.time())) + repo_path = get_new_dir(key) + return repo_path + + def setUp(self): + Backend = self.get_backend() + self.backend_class = Backend + self.repo_path = self.get_new_repo_path() + self.repo = Backend(self.repo_path, create=True) + self.imc = self.repo.in_memory_changeset + + for commit in self.get_commits(): + for node in commit.get('added', []): + self.imc.add(FileNode(node.path, content=node.content)) + for node in commit.get('changed', []): + self.imc.change(FileNode(node.path, content=node.content)) + for node in commit.get('removed', []): + self.imc.remove(FileNode(node.path)) + self.imc.commit(message=unicode(commit['message']), + author=unicode(commit['author']), + date=commit['date']) + + self.tip = self.repo.get_changeset() + + +# For each backend create test case class +for alias in SCM_TESTS: + attrs = { + 'backend_alias': alias, + } + cls_name = ''.join(('%s in memory changeset test' % alias).title().split()) + bases = (InMemoryChangesetTestMixin, unittest.TestCase) + globals()[cls_name] = type(cls_name, bases, attrs) + + +if __name__ == '__main__': + unittest.main() diff --git a/rhodecode/tests/vcs/test_nodes.py b/rhodecode/tests/vcs/test_nodes.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/test_nodes.py @@ -0,0 +1,183 @@ +from __future__ import with_statement + +import stat +from rhodecode.lib.vcs.nodes import DirNode +from rhodecode.lib.vcs.nodes import FileNode +from rhodecode.lib.vcs.nodes import Node +from rhodecode.lib.vcs.nodes import NodeError +from rhodecode.lib.vcs.nodes import NodeKind +from rhodecode.lib.vcs.utils.compat import unittest + + +class NodeBasicTest(unittest.TestCase): + + def test_init(self): + """ + Cannot innitialize Node objects with path with slash at the beginning. + """ + wrong_paths = ( + '/foo', + '/foo/bar' + ) + for path in wrong_paths: + self.assertRaises(NodeError, Node, path, NodeKind.FILE) + + wrong_paths = ( + '/foo/', + '/foo/bar/' + ) + for path in wrong_paths: + self.assertRaises(NodeError, Node, path, NodeKind.DIR) + + def test_name(self): + node = Node('', NodeKind.DIR) + self.assertEqual(node.name, '') + + node = Node('path', NodeKind.FILE) + self.assertEqual(node.name, 'path') + + node = Node('path/', NodeKind.DIR) + self.assertEqual(node.name, 'path') + + node = Node('some/path', NodeKind.FILE) + self.assertEqual(node.name, 'path') + + node = Node('some/path/', NodeKind.DIR) + self.assertEqual(node.name, 'path') + + def test_root_node(self): + self.assertRaises(NodeError, Node, '', NodeKind.FILE) + + def test_kind_setter(self): + node = Node('', NodeKind.DIR) + self.assertRaises(NodeError, setattr, node, 'kind', NodeKind.FILE) + + def _test_parent_path(self, node_path, expected_parent_path): + """ + Tests if node's parent path are properly computed. + """ + node = Node(node_path, NodeKind.DIR) + parent_path = node.get_parent_path() + self.assertTrue(parent_path.endswith('/') or \ + node.is_root() and parent_path == '') + self.assertEqual(parent_path, expected_parent_path, + "Node's path is %r and parent path is %r but should be %r" + % (node.path, parent_path, expected_parent_path)) + + def test_parent_path(self): + test_paths = ( + # (node_path, expected_parent_path) + ('', ''), + ('some/path/', 'some/'), + ('some/longer/path/', 'some/longer/'), + ) + for node_path, expected_parent_path in test_paths: + self._test_parent_path(node_path, expected_parent_path) + + ''' + def _test_trailing_slash(self, path): + if not path.endswith('/'): + self.fail("Trailing slash tests needs paths to end with slash") + for kind in NodeKind.FILE, NodeKind.DIR: + self.assertRaises(NodeError, Node, path=path, kind=kind) + + def test_trailing_slash(self): + for path in ('/', 'foo/', 'foo/bar/', 'foo/bar/biz/'): + self._test_trailing_slash(path) + ''' + + def test_is_file(self): + node = Node('any', NodeKind.FILE) + self.assertTrue(node.is_file()) + + node = FileNode('any') + self.assertTrue(node.is_file()) + self.assertRaises(AttributeError, getattr, node, 'nodes') + + def test_is_dir(self): + node = Node('any_dir', NodeKind.DIR) + self.assertTrue(node.is_dir()) + + node = DirNode('any_dir') + + self.assertTrue(node.is_dir()) + self.assertRaises(NodeError, getattr, node, 'content') + + def test_dir_node_iter(self): + nodes = [ + DirNode('docs'), + DirNode('tests'), + FileNode('bar'), + FileNode('foo'), + FileNode('readme.txt'), + FileNode('setup.py'), + ] + dirnode = DirNode('', nodes=nodes) + for node in dirnode: + node == dirnode.get_node(node.path) + + def test_node_state(self): + """ + Without link to changeset nodes should raise NodeError. + """ + node = FileNode('anything') + self.assertRaises(NodeError, getattr, node, 'state') + node = DirNode('anything') + self.assertRaises(NodeError, getattr, node, 'state') + + def test_file_node_stat(self): + node = FileNode('foobar', 'empty... almost') + mode = node.mode # default should be 0100644 + self.assertTrue(mode & stat.S_IRUSR) + self.assertTrue(mode & stat.S_IWUSR) + self.assertTrue(mode & stat.S_IRGRP) + self.assertTrue(mode & stat.S_IROTH) + self.assertFalse(mode & stat.S_IWGRP) + self.assertFalse(mode & stat.S_IWOTH) + self.assertFalse(mode & stat.S_IXUSR) + self.assertFalse(mode & stat.S_IXGRP) + self.assertFalse(mode & stat.S_IXOTH) + + def test_file_node_is_executable(self): + node = FileNode('foobar', 'empty... almost', mode=0100755) + self.assertTrue(node.is_executable()) + + node = FileNode('foobar', 'empty... almost', mode=0100500) + self.assertTrue(node.is_executable()) + + node = FileNode('foobar', 'empty... almost', mode=0100644) + self.assertFalse(node.is_executable()) + + def test_mimetype(self): + py_node = FileNode('test.py') + tar_node = FileNode('test.tar.gz') + + ext = 'CustomExtension' + + my_node2 = FileNode('myfile2') + my_node2._mimetype = [ext] + + my_node3 = FileNode('myfile3') + my_node3._mimetype = [ext,ext] + + self.assertEqual(py_node.mimetype,'text/x-python') + self.assertEqual(py_node.get_mimetype(),('text/x-python',None)) + + self.assertEqual(tar_node.mimetype,'application/x-tar') + self.assertEqual(tar_node.get_mimetype(),('application/x-tar','gzip')) + + self.assertRaises(NodeError,my_node2.get_mimetype) + + self.assertEqual(my_node3.mimetype,ext) + self.assertEqual(my_node3.get_mimetype(),[ext,ext]) + +class NodeContentTest(unittest.TestCase): + + def test_if_binary(self): + data = """\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x10\x00\x00\x00\x10\x08\x06\x00\x00\x00\x1f??a\x00\x00\x00\x04gAMA\x00\x00\xaf?7\x05\x8a?\x00\x00\x00\x19tEXtSoftware\x00Adobe ImageReadyq?e<\x00\x00\x025IDAT8?\xa5\x93?K\x94Q\x14\x87\x9f\xf7?Q\x1bs4?\x03\x9a\xa8?B\x02\x8b$\x10[U;i\x13?6h?&h[?"\x14j?\xa2M\x7fB\x14F\x9aQ?&\x842?\x0b\x89"\x82??!?\x9c!\x9c2l??{N\x8bW\x9dY\xb4\t/\x1c?=\x9b?}????\xa9*;9!?\x83\x91?[?\\v*?D\x04\'`EpNp\xa2X\'U?pVq"Sw.\x1e?\x08\x01D?jw????\xbc??7{|\x9b?\x89$\x01??W@\x15\x9c\x05q`Lt/\x97?\x94\xa1d?\x18~?\x18?\x18W[%\xb0?\x83??\x14\x88\x8dB?\xa6H\tL\tl\x19>/\x01`\xac\xabx?\x9cl\nx\xb0\x98\x07\x95\x88D$"q[\x19?d\x00(o\n\xa0??\x7f\xb9\xa4?\x1bF\x1f\x8e\xac\xa8?j??eUU}?.?\x9f\x8cE??x\x94??\r\xbdtoJU5"0N\x10U?\x00??V\t\x02\x9f\x81?U?\x00\x9eM\xae2?r\x9b7\x83\x82\x8aP3????.?&"?\xb7ZP \x0cJ?\x80\x15T\x95\x9a\x00??S\x8c\r?\xa1\x03\x07?\x96\x9b\xa7\xab=E??\xa4\xb3?\x19q??B\x91=\x8d??k?J\x0bV"??\xf7x?\xa1\x00?\\.\x87\x87???\x02F@D\x99],??\x10#?X\xb7=\xb9\x10?Z\x1by???cI??\x1ag?\x92\xbc?T?t[\x92\x81?<_\x17~\x92\x88?H%?\x10Q\x02\x9f\n\x81qQ\x0bm?\x1bX?\xb1AK\xa6\x9e\xb9?u\xb2?1\xbe|/\x92M@\xa2!F?\xa9>"\r\x92\x8e?>\x9a9Qv\x127?a\xac?Y?8?:??]X???9\x80\xb7?u?\x0b#BZ\x8d=\x1d?p\x00\x00\x00\x00IEND\xaeB`\x82""" + filenode = FileNode('calendar.png', content=data) + self.assertTrue(filenode.is_binary) + + +if __name__ == '__main__': + unittest.main() diff --git a/rhodecode/tests/vcs/test_repository.py b/rhodecode/tests/vcs/test_repository.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/test_repository.py @@ -0,0 +1,215 @@ +from __future__ import with_statement +import datetime +from base import BackendTestMixin +from conf import SCM_TESTS +from conf import TEST_USER_CONFIG_FILE +from rhodecode.lib.vcs.nodes import FileNode +from rhodecode.lib.vcs.utils.compat import unittest +from rhodecode.lib.vcs.exceptions import ChangesetDoesNotExistError + + +class RepositoryBaseTest(BackendTestMixin): + recreate_repo_per_test = False + + @classmethod + def _get_commits(cls): + return super(RepositoryBaseTest, cls)._get_commits()[:1] + + def test_get_config_value(self): + self.assertEqual(self.repo.get_config_value('universal', 'foo', + TEST_USER_CONFIG_FILE), 'bar') + + def test_get_config_value_defaults_to_None(self): + self.assertEqual(self.repo.get_config_value('universal', 'nonexist', + TEST_USER_CONFIG_FILE), None) + + def test_get_user_name(self): + self.assertEqual(self.repo.get_user_name(TEST_USER_CONFIG_FILE), + 'Foo Bar') + + def test_get_user_email(self): + self.assertEqual(self.repo.get_user_email(TEST_USER_CONFIG_FILE), + 'foo.bar@example.com') + + + +class RepositoryGetDiffTest(BackendTestMixin): + + @classmethod + def _get_commits(cls): + commits = [ + { + 'message': 'Initial commit', + 'author': 'Joe Doe ', + 'date': datetime.datetime(2010, 1, 1, 20), + 'added': [ + FileNode('foobar', content='foobar'), + FileNode('foobar2', content='foobar2'), + ], + }, + { + 'message': 'Changed foobar, added foobar3', + 'author': 'Jane Doe ', + 'date': datetime.datetime(2010, 1, 1, 21), + 'added': [ + FileNode('foobar3', content='foobar3'), + ], + 'changed': [ + FileNode('foobar', 'FOOBAR'), + ], + }, + { + 'message': 'Removed foobar, changed foobar3', + 'author': 'Jane Doe ', + 'date': datetime.datetime(2010, 1, 1, 22), + 'changed': [ + FileNode('foobar3', content='FOOBAR\nFOOBAR\nFOOBAR\n'), + ], + 'removed': [FileNode('foobar')], + }, + ] + return commits + + def test_raise_for_wrong(self): + with self.assertRaises(ChangesetDoesNotExistError): + self.repo.get_diff('a' * 40, 'b' * 40) + +class GitRepositoryGetDiffTest(RepositoryGetDiffTest, unittest.TestCase): + backend_alias = 'git' + + def test_initial_commit_diff(self): + initial_rev = self.repo.revisions[0] + self.assertEqual(self.repo.get_diff(self.repo.EMPTY_CHANGESET, initial_rev), '''diff --git a/foobar b/foobar +new file mode 100644 +index 0000000..f6ea049 +--- /dev/null ++++ b/foobar +@@ -0,0 +1 @@ ++foobar +\ No newline at end of file +diff --git a/foobar2 b/foobar2 +new file mode 100644 +index 0000000..e8c9d6b +--- /dev/null ++++ b/foobar2 +@@ -0,0 +1 @@ ++foobar2 +\ No newline at end of file +''') + + def test_second_changeset_diff(self): + revs = self.repo.revisions + self.assertEqual(self.repo.get_diff(revs[0], revs[1]), '''diff --git a/foobar b/foobar +index f6ea049..389865b 100644 +--- a/foobar ++++ b/foobar +@@ -1 +1 @@ +-foobar +\ No newline at end of file ++FOOBAR +\ No newline at end of file +diff --git a/foobar3 b/foobar3 +new file mode 100644 +index 0000000..c11c37d +--- /dev/null ++++ b/foobar3 +@@ -0,0 +1 @@ ++foobar3 +\ No newline at end of file +''') + + def test_third_changeset_diff(self): + revs = self.repo.revisions + self.assertEqual(self.repo.get_diff(revs[1], revs[2]), '''diff --git a/foobar b/foobar +deleted file mode 100644 +index 389865b..0000000 +--- a/foobar ++++ /dev/null +@@ -1 +0,0 @@ +-FOOBAR +\ No newline at end of file +diff --git a/foobar3 b/foobar3 +index c11c37d..f932447 100644 +--- a/foobar3 ++++ b/foobar3 +@@ -1 +1,3 @@ +-foobar3 +\ No newline at end of file ++FOOBAR ++FOOBAR ++FOOBAR +''') + + +class HgRepositoryGetDiffTest(RepositoryGetDiffTest, unittest.TestCase): + backend_alias = 'hg' + + def test_initial_commit_diff(self): + initial_rev = self.repo.revisions[0] + self.assertEqual(self.repo.get_diff(self.repo.EMPTY_CHANGESET, initial_rev), '''diff --git a/foobar b/foobar +new file mode 100755 +--- /dev/null ++++ b/foobar +@@ -0,0 +1,1 @@ ++foobar +\ No newline at end of file +diff --git a/foobar2 b/foobar2 +new file mode 100755 +--- /dev/null ++++ b/foobar2 +@@ -0,0 +1,1 @@ ++foobar2 +\ No newline at end of file +''') + + def test_second_changeset_diff(self): + revs = self.repo.revisions + self.assertEqual(self.repo.get_diff(revs[0], revs[1]), '''diff --git a/foobar b/foobar +--- a/foobar ++++ b/foobar +@@ -1,1 +1,1 @@ +-foobar +\ No newline at end of file ++FOOBAR +\ No newline at end of file +diff --git a/foobar3 b/foobar3 +new file mode 100755 +--- /dev/null ++++ b/foobar3 +@@ -0,0 +1,1 @@ ++foobar3 +\ No newline at end of file +''') + + def test_third_changeset_diff(self): + revs = self.repo.revisions + self.assertEqual(self.repo.get_diff(revs[1], revs[2]), '''diff --git a/foobar b/foobar +deleted file mode 100755 +--- a/foobar ++++ /dev/null +@@ -1,1 +0,0 @@ +-FOOBAR +\ No newline at end of file +diff --git a/foobar3 b/foobar3 +--- a/foobar3 ++++ b/foobar3 +@@ -1,1 +1,3 @@ +-foobar3 +\ No newline at end of file ++FOOBAR ++FOOBAR ++FOOBAR +''') + + +# For each backend create test case class +for alias in SCM_TESTS: + attrs = { + 'backend_alias': alias, + } + cls_name = alias.capitalize() + RepositoryBaseTest.__name__ + bases = (RepositoryBaseTest, unittest.TestCase) + globals()[cls_name] = type(cls_name, bases, attrs) + +if __name__ == '__main__': + unittest.main() diff --git a/rhodecode/tests/vcs/test_tags.py b/rhodecode/tests/vcs/test_tags.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/test_tags.py @@ -0,0 +1,61 @@ +from __future__ import with_statement + +from base import BackendTestMixin +from conf import SCM_TESTS +from rhodecode.lib.vcs.exceptions import TagAlreadyExistError +from rhodecode.lib.vcs.exceptions import TagDoesNotExistError +from rhodecode.lib.vcs.utils.compat import unittest + + +class TagsTestCaseMixin(BackendTestMixin): + + def test_new_tag(self): + tip = self.repo.get_changeset() + tagsize = len(self.repo.tags) + tag = self.repo.tag('last-commit', 'joe', tip.raw_id) + + self.assertEqual(len(self.repo.tags), tagsize + 1) + for top, dirs, files in tip.walk(): + self.assertEqual(top, tag.get_node(top.path)) + + def test_tag_already_exist(self): + tip = self.repo.get_changeset() + self.repo.tag('last-commit', 'joe', tip.raw_id) + + self.assertRaises(TagAlreadyExistError, + self.repo.tag, 'last-commit', 'joe', tip.raw_id) + + chset = self.repo.get_changeset(0) + self.assertRaises(TagAlreadyExistError, + self.repo.tag, 'last-commit', 'jane', chset.raw_id) + + def test_remove_tag(self): + tip = self.repo.get_changeset() + self.repo.tag('last-commit', 'joe', tip.raw_id) + tagsize = len(self.repo.tags) + + self.repo.remove_tag('last-commit', user='evil joe') + self.assertEqual(len(self.repo.tags), tagsize - 1) + + def test_remove_tag_which_does_not_exist(self): + self.assertRaises(TagDoesNotExistError, + self.repo.remove_tag, 'last-commit', user='evil joe') + + def test_name_with_slash(self): + self.repo.tag('19/10/11', 'joe') + self.assertTrue('19/10/11' in self.repo.tags) + self.repo.tag('11', 'joe') + self.assertTrue('11' in self.repo.tags) + +# For each backend create test case class +for alias in SCM_TESTS: + attrs = { + 'backend_alias': alias, + } + cls_name = ''.join(('%s tags test' % alias).title().split()) + bases = (TagsTestCaseMixin, unittest.TestCase) + globals()[cls_name] = type(cls_name, bases, attrs) + + +if __name__ == '__main__': + unittest.main() diff --git a/rhodecode/tests/vcs/test_utils.py b/rhodecode/tests/vcs/test_utils.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/test_utils.py @@ -0,0 +1,279 @@ +from __future__ import with_statement + +import os +import mock +import time +import shutil +import tempfile +import datetime +from rhodecode.lib.vcs.utils.compat import unittest +from rhodecode.lib.vcs.utils.paths import get_dirs_for_path +from rhodecode.lib.vcs.utils.helpers import get_dict_for_attrs +from rhodecode.lib.vcs.utils.helpers import get_scm +from rhodecode.lib.vcs.utils.helpers import get_scms_for_path +from rhodecode.lib.vcs.utils.helpers import get_total_seconds +from rhodecode.lib.vcs.utils.helpers import parse_changesets +from rhodecode.lib.vcs.utils.helpers import parse_datetime +from rhodecode.lib.vcs.utils import author_email, author_name +from rhodecode.lib.vcs.utils.paths import get_user_home +from rhodecode.lib.vcs.exceptions import VCSError + +from conf import TEST_HG_REPO, TEST_GIT_REPO, TEST_TMP_PATH + + +class PathsTest(unittest.TestCase): + + def _test_get_dirs_for_path(self, path, expected): + """ + Tests if get_dirs_for_path returns same as expected. + """ + expected = sorted(expected) + result = sorted(get_dirs_for_path(path)) + self.assertEqual(result, expected, + msg="%s != %s which was expected result for path %s" + % (result, expected, path)) + + def test_get_dirs_for_path(self): + path = 'foo/bar/baz/file' + paths_and_results = ( + ('foo/bar/baz/file', ['foo', 'foo/bar', 'foo/bar/baz']), + ('foo/bar/', ['foo', 'foo/bar']), + ('foo/bar', ['foo']), + ) + for path, expected in paths_and_results: + self._test_get_dirs_for_path(path, expected) + + + def test_get_scm(self): + self.assertEqual(('hg', TEST_HG_REPO), get_scm(TEST_HG_REPO)) + self.assertEqual(('git', TEST_GIT_REPO), get_scm(TEST_GIT_REPO)) + + def test_get_two_scms_for_path(self): + multialias_repo_path = os.path.join(TEST_TMP_PATH, 'hg-git-repo-2') + if os.path.isdir(multialias_repo_path): + shutil.rmtree(multialias_repo_path) + + os.mkdir(multialias_repo_path) + + self.assertRaises(VCSError, get_scm, multialias_repo_path) + + def test_get_scm_error_path(self): + self.assertRaises(VCSError, get_scm, 'err') + + def test_get_scms_for_path(self): + dirpath = tempfile.gettempdir() + new = os.path.join(dirpath, 'vcs-scms-for-path-%s' % time.time()) + os.mkdir(new) + self.assertEqual(get_scms_for_path(new), []) + + os.mkdir(os.path.join(new, '.tux')) + self.assertEqual(get_scms_for_path(new), []) + + os.mkdir(os.path.join(new, '.git')) + self.assertEqual(set(get_scms_for_path(new)), set(['git'])) + + os.mkdir(os.path.join(new, '.hg')) + self.assertEqual(set(get_scms_for_path(new)), set(['git', 'hg'])) + + +class TestParseChangesets(unittest.TestCase): + + def test_main_is_returned_correctly(self): + self.assertEqual(parse_changesets('123456'), { + 'start': None, + 'main': '123456', + 'end': None, + }) + + def test_start_is_returned_correctly(self): + self.assertEqual(parse_changesets('aaabbb..'), { + 'start': 'aaabbb', + 'main': None, + 'end': None, + }) + + def test_end_is_returned_correctly(self): + self.assertEqual(parse_changesets('..cccddd'), { + 'start': None, + 'main': None, + 'end': 'cccddd', + }) + + def test_that_two_or_three_dots_are_allowed(self): + text1 = 'a..b' + text2 = 'a...b' + self.assertEqual(parse_changesets(text1), parse_changesets(text2)) + + def test_that_input_is_stripped_first(self): + text1 = 'a..bb' + text2 = ' a..bb\t\n\t ' + self.assertEqual(parse_changesets(text1), parse_changesets(text2)) + + def test_that_exception_is_raised(self): + text = '123456.789012' # single dot is not recognized + with self.assertRaises(ValueError): + parse_changesets(text) + + def test_non_alphanumeric_raises_exception(self): + with self.assertRaises(ValueError): + parse_changesets('aaa@bbb') + + +class TestParseDatetime(unittest.TestCase): + + def test_datetime_text(self): + self.assertEqual(parse_datetime('2010-04-07 21:29:41'), + datetime.datetime(2010, 4, 7, 21, 29, 41)) + + def test_no_seconds(self): + self.assertEqual(parse_datetime('2010-04-07 21:29'), + datetime.datetime(2010, 4, 7, 21, 29)) + + def test_date_only(self): + self.assertEqual(parse_datetime('2010-04-07'), + datetime.datetime(2010, 4, 7)) + + def test_another_format(self): + self.assertEqual(parse_datetime('04/07/10 21:29:41'), + datetime.datetime(2010, 4, 7, 21, 29, 41)) + + def test_now(self): + self.assertTrue(parse_datetime('now') - datetime.datetime.now() < + datetime.timedelta(seconds=1)) + + def test_today(self): + today = datetime.date.today() + self.assertEqual(parse_datetime('today'), + datetime.datetime(*today.timetuple()[:3])) + + def test_yesterday(self): + yesterday = datetime.date.today() - datetime.timedelta(days=1) + self.assertEqual(parse_datetime('yesterday'), + datetime.datetime(*yesterday.timetuple()[:3])) + + def test_tomorrow(self): + tomorrow = datetime.date.today() + datetime.timedelta(days=1) + args = tomorrow.timetuple()[:3] + (23, 59, 59) + self.assertEqual(parse_datetime('tomorrow'), datetime.datetime(*args)) + + def test_days(self): + timestamp = datetime.datetime.today() - datetime.timedelta(days=3) + args = timestamp.timetuple()[:3] + (0, 0, 0, 0) + expected = datetime.datetime(*args) + self.assertEqual(parse_datetime('3d'), expected) + self.assertEqual(parse_datetime('3 d'), expected) + self.assertEqual(parse_datetime('3 day'), expected) + self.assertEqual(parse_datetime('3 days'), expected) + + def test_weeks(self): + timestamp = datetime.datetime.today() - datetime.timedelta(days=3 * 7) + args = timestamp.timetuple()[:3] + (0, 0, 0, 0) + expected = datetime.datetime(*args) + self.assertEqual(parse_datetime('3w'), expected) + self.assertEqual(parse_datetime('3 w'), expected) + self.assertEqual(parse_datetime('3 week'), expected) + self.assertEqual(parse_datetime('3 weeks'), expected) + + def test_mixed(self): + timestamp = datetime.datetime.today() - datetime.timedelta(days=2 * 7 + 3) + args = timestamp.timetuple()[:3] + (0, 0, 0, 0) + expected = datetime.datetime(*args) + self.assertEqual(parse_datetime('2w3d'), expected) + self.assertEqual(parse_datetime('2w 3d'), expected) + self.assertEqual(parse_datetime('2w 3 days'), expected) + self.assertEqual(parse_datetime('2 weeks 3 days'), expected) + + +class TestAuthorExtractors(unittest.TestCase): + TEST_AUTHORS = [('Marcin Kuzminski ', + ('Marcin Kuzminski', 'marcin@python-works.com')), + ('Marcin Kuzminski Spaces < marcin@python-works.com >', + ('Marcin Kuzminski Spaces', 'marcin@python-works.com')), + ('Marcin Kuzminski ', + ('Marcin Kuzminski', 'marcin.kuzminski@python-works.com')), + ('mrf RFC_SPEC ', + ('mrf RFC_SPEC', 'marcin+kuzminski@python-works.com')), + ('username ', + ('username', 'user@email.com')), + ('username ', + ('', 'justemail@mail.com')), + ('justname', + ('justname', '')), + ('Mr Double Name withemail@email.com ', + ('Mr Double Name', 'withemail@email.com')), + ] + + def test_author_email(self): + + for test_str, result in self.TEST_AUTHORS: + self.assertEqual(result[1], author_email(test_str)) + + + def test_author_name(self): + + for test_str, result in self.TEST_AUTHORS: + self.assertEqual(result[0], author_name(test_str)) + + +class TestGetDictForAttrs(unittest.TestCase): + + def test_returned_dict_has_expected_attrs(self): + obj = mock.Mock() + obj.NOT_INCLUDED = 'this key/value should not be included' + obj.CONST = True + obj.foo = 'aaa' + obj.attrs = {'foo': 'bar'} + obj.date = datetime.datetime(2010, 12, 31) + obj.count = 1001 + + self.assertEqual(get_dict_for_attrs(obj, ['CONST', 'foo', 'attrs', + 'date', 'count']), { + 'CONST': True, + 'foo': 'aaa', + 'attrs': {'foo': 'bar'}, + 'date': datetime.datetime(2010, 12, 31), + 'count': 1001, + }) + + +class TestGetTotalSeconds(unittest.TestCase): + + def assertTotalSecondsEqual(self, timedelta, expected_seconds): + result = get_total_seconds(timedelta) + self.assertEqual(result, expected_seconds, + "We computed %s seconds for %s but expected %s" + % (result, timedelta, expected_seconds)) + + def test_get_total_seconds_returns_proper_value(self): + self.assertTotalSecondsEqual(datetime.timedelta(seconds=1001), 1001) + + def test_get_total_seconds_returns_proper_value_for_partial_seconds(self): + self.assertTotalSecondsEqual(datetime.timedelta(seconds=50.65), 50.65) + + +class TestGetUserHome(unittest.TestCase): + + @mock.patch.object(os, 'environ', {}) + def test_defaults_to_none(self): + self.assertEqual(get_user_home(), None) + + @mock.patch.object(os, 'environ', {'HOME': '/home/foobar'}) + def test_unix_like(self): + self.assertEqual(get_user_home(), '/home/foobar') + + @mock.patch.object(os, 'environ', {'USERPROFILE': '/Users/foobar'}) + def test_windows_like(self): + self.assertEqual(get_user_home(), '/Users/foobar') + + @mock.patch.object(os, 'environ', {'HOME': '/home/foobar', + 'USERPROFILE': '/Users/foobar'}) + def test_prefers_home_over_userprofile(self): + self.assertEqual(get_user_home(), '/home/foobar') + + +if __name__ == '__main__': + unittest.main() diff --git a/rhodecode/tests/vcs/test_utils_filesize.py b/rhodecode/tests/vcs/test_utils_filesize.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/test_utils_filesize.py @@ -0,0 +1,26 @@ +from __future__ import with_statement + +from rhodecode.lib.vcs.utils.filesize import filesizeformat +from rhodecode.lib.vcs.utils.compat import unittest + + +class TestFilesizeformat(unittest.TestCase): + + def test_bytes(self): + self.assertEqual(filesizeformat(10), '10 B') + + def test_kilobytes(self): + self.assertEqual(filesizeformat(1024 * 2), '2 KB') + + def test_megabytes(self): + self.assertEqual(filesizeformat(1024 * 1024 * 2.3), '2.3 MB') + + def test_gigabytes(self): + self.assertEqual(filesizeformat(1024 * 1024 * 1024 * 12.92), '12.92 GB') + + def test_that_function_respects_sep_paramtere(self): + self.assertEqual(filesizeformat(1, ''), '1B') + + +if __name__ == '__main__': + unittest.main() diff --git a/rhodecode/tests/vcs/test_vcs.py b/rhodecode/tests/vcs/test_vcs.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/test_vcs.py @@ -0,0 +1,84 @@ +from __future__ import with_statement + +from rhodecode.lib.vcs import VCSError, get_repo, get_backend +from rhodecode.lib.vcs.backends.hg import MercurialRepository +from rhodecode.lib.vcs.utils.compat import unittest +from conf import TEST_HG_REPO, TEST_GIT_REPO, TEST_TMP_PATH +import os +import shutil + + +class VCSTest(unittest.TestCase): + """ + Tests for main module's methods. + """ + + def test_get_backend(self): + hg = get_backend('hg') + self.assertEqual(hg, MercurialRepository) + + def test_alias_detect_hg(self): + alias = 'hg' + path = TEST_HG_REPO + backend = get_backend(alias) + repo = backend(path) + self.assertEqual('hg',repo.alias) + + def test_alias_detect_git(self): + alias = 'git' + path = TEST_GIT_REPO + backend = get_backend(alias) + repo = backend(path) + self.assertEqual('git',repo.alias) + + def test_wrong_alias(self): + alias = 'wrong_alias' + self.assertRaises(VCSError, get_backend, alias) + + def test_get_repo(self): + alias = 'hg' + path = TEST_HG_REPO + backend = get_backend(alias) + repo = backend(path) + + self.assertEqual(repo.__class__, get_repo(path, alias).__class__) + self.assertEqual(repo.path, get_repo(path, alias).path) + + def test_get_repo_autoalias_hg(self): + alias = 'hg' + path = TEST_HG_REPO + backend = get_backend(alias) + repo = backend(path) + + self.assertEqual(repo.__class__, get_repo(path).__class__) + self.assertEqual(repo.path, get_repo(path).path) + + def test_get_repo_autoalias_git(self): + alias = 'git' + path = TEST_GIT_REPO + backend = get_backend(alias) + repo = backend(path) + + self.assertEqual(repo.__class__, get_repo(path).__class__) + self.assertEqual(repo.path, get_repo(path).path) + + + def test_get_repo_err(self): + blank_repo_path = os.path.join(TEST_TMP_PATH, 'blank-error-repo') + if os.path.isdir(blank_repo_path): + shutil.rmtree(blank_repo_path) + + os.mkdir(blank_repo_path) + self.assertRaises(VCSError, get_repo, blank_repo_path) + self.assertRaises(VCSError, get_repo, blank_repo_path + 'non_existing') + + def test_get_repo_multialias(self): + multialias_repo_path = os.path.join(TEST_TMP_PATH, 'hg-git-repo') + if os.path.isdir(multialias_repo_path): + shutil.rmtree(multialias_repo_path) + + os.mkdir(multialias_repo_path) + + os.mkdir(os.path.join(multialias_repo_path, '.git')) + os.mkdir(os.path.join(multialias_repo_path, '.hg')) + self.assertRaises(VCSError, get_repo, multialias_repo_path) diff --git a/rhodecode/tests/vcs/test_workdirs.py b/rhodecode/tests/vcs/test_workdirs.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/test_workdirs.py @@ -0,0 +1,90 @@ +from __future__ import with_statement + +import datetime +from rhodecode.lib.vcs.nodes import FileNode +from rhodecode.lib.vcs.utils.compat import unittest +from base import BackendTestMixin +from conf import SCM_TESTS + + +class WorkdirTestCaseMixin(BackendTestMixin): + + @classmethod + def _get_commits(cls): + commits = [ + { + 'message': u'Initial commit', + 'author': u'Joe Doe ', + 'date': datetime.datetime(2010, 1, 1, 20), + 'added': [ + FileNode('foobar', content='Foobar'), + FileNode('foobar2', content='Foobar II'), + FileNode('foo/bar/baz', content='baz here!'), + ], + }, + { + 'message': u'Changes...', + 'author': u'Jane Doe ', + 'date': datetime.datetime(2010, 1, 1, 21), + 'added': [ + FileNode('some/new.txt', content='news...'), + ], + 'changed': [ + FileNode('foobar', 'Foobar I'), + ], + 'removed': [], + }, + ] + return commits + + def test_get_branch_for_default_branch(self): + self.assertEqual(self.repo.workdir.get_branch(), + self.repo.DEFAULT_BRANCH_NAME) + + def test_get_branch_after_adding_one(self): + self.imc.add(FileNode('docs/index.txt', + content='Documentation\n')) + self.imc.commit( + message=u'New branch: foobar', + author=u'joe', + branch='foobar', + ) + + def test_get_changeset(self): + self.imc.add(FileNode('docs/index.txt', + content='Documentation\n')) + head = self.imc.commit( + message=u'New branch: foobar', + author=u'joe', + branch='foobar', + ) + self.assertEqual(self.repo.workdir.get_changeset(), head) + + def test_checkout_branch(self): + from rhodecode.lib.vcs.exceptions import BranchDoesNotExistError + # first, 'foobranch' does not exist. + self.assertRaises(BranchDoesNotExistError, self.repo.workdir.checkout_branch, + branch='foobranch') + # create new branch 'foobranch'. + self.imc.add(FileNode('file1', content='blah')) + self.imc.commit(message=u'asd', author=u'john', branch='foobranch') + # go back to the default branch + self.repo.workdir.checkout_branch() + self.assertEqual(self.repo.workdir.get_branch(), self.backend_class.DEFAULT_BRANCH_NAME) + # checkout 'foobranch' + self.repo.workdir.checkout_branch('foobranch') + self.assertEqual(self.repo.workdir.get_branch(), 'foobranch') + + +# For each backend create test case class +for alias in SCM_TESTS: + attrs = { + 'backend_alias': alias, + } + cls_name = ''.join(('%s branch test' % alias).title().split()) + bases = (WorkdirTestCaseMixin, unittest.TestCase) + globals()[cls_name] = type(cls_name, bases, attrs) + + +if __name__ == '__main__': + unittest.main() diff --git a/rhodecode/tests/vcs/utils.py b/rhodecode/tests/vcs/utils.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/utils.py @@ -0,0 +1,97 @@ +""" +Utilities for tests only. These are not or should not be used normally - +functions here are crafted as we don't want to use ``vcs`` to verify tests. +""" +import os +import re +import sys + +from subprocess import Popen + + +class VCSTestError(Exception): + pass + + +def run_command(cmd, args): + """ + Runs command on the system with given ``args``. + """ + command = ' '.join((cmd, args)) + p = Popen(command, shell=True) + status = os.waitpid(p.pid, 0)[1] + return status + + +def eprint(msg): + """ + Prints given ``msg`` into sys.stderr as nose test runner hides all output + from sys.stdout by default and if we want to pipe stream somewhere we don't + need those verbose messages anyway. + Appends line break. + """ + sys.stderr.write(msg) + sys.stderr.write('\n') + + +class SCMFetcher(object): + + def __init__(self, alias, test_repo_path, remote_repo, clone_cmd): + """ + :param clone_cmd: command which would clone remote repository; pass + only first bits - remote path and destination would be appended + using ``remote_repo`` and ``test_repo_path`` + """ + self.alias = alias + self.test_repo_path = test_repo_path + self.remote_repo = remote_repo + self.clone_cmd = clone_cmd + + def setup(self): + if not os.path.isdir(self.test_repo_path): + self.fetch_repo() + + def fetch_repo(self): + """ + Tries to fetch repository from remote path. + """ + remote = self.remote_repo + eprint("Fetching repository %s into %s" % (remote, self.test_repo_path)) + run_command(self.clone_cmd, '%s %s' % (remote, self.test_repo_path)) + + +def get_normalized_path(path): + """ + If given path exists, new path would be generated and returned. Otherwise + same whats given is returned. Assumes that there would be no more than + 10000 same named files. + """ + if os.path.exists(path): + dir, basename = os.path.split(path) + splitted_name = basename.split('.') + if len(splitted_name) > 1: + ext = splitted_name[-1] + else: + ext = None + name = '.'.join(splitted_name[:-1]) + matcher = re.compile(r'^.*-(\d{5})$') + start = 0 + m = matcher.match(name) + if not m: + # Haven't append number yet so return first + newname = '%s-00000' % name + newpath = os.path.join(dir, newname) + if ext: + newpath = '.'.join((newpath, ext)) + return get_normalized_path(newpath) + else: + start = int(m.group(1)[-5:]) + 1 + for x in xrange(start, 10000): + newname = name[:-5] + str(x).rjust(5, '0') + newpath = os.path.join(dir, newname) + if ext: + newpath = '.'.join((newpath, ext)) + if not os.path.exists(newpath): + return newpath + raise VCSTestError("Couldn't compute new path for %s" % path) + return path