##// END OF EJS Templates
Added vcs testsuite for better integration tests + added fetching...
marcink -
r2451:402a96fc beta
parent child Browse files
Show More
@@ -0,0 +1,56 b''
1 """
2 Unit tests for vcs_ library.
3
4 In order to run tests we need to prepare our environment first. Tests would be
5 run for each engine listed at ``conf.SCM_TESTS`` - keys are aliases from
6 ``vcs.backends.BACKENDS``.
7
8 For each SCM we run tests for, we need some repository. We would use
9 repositories location from system environment variables or test suite defaults
10 - see ``conf`` module for more detail. We simply try to check if repository at
11 certain location exists, if not we would try to fetch them. At ``test_vcs`` or
12 ``test_common`` we run unit tests common for each repository type and for
13 example specific mercurial tests are located at ``test_hg`` module.
14
15 Oh, and tests are run with ``unittest.collector`` wrapped by ``collector``
16 function at ``tests/__init__.py``.
17
18 .. _vcs: http://bitbucket.org/marcinkuzminski/vcs
19 .. _unittest: http://pypi.python.org/pypi/unittest
20
21 """
22 import os
23 from rhodecode.lib import vcs
24 from rhodecode.lib.vcs.utils.compat import unittest
25 from rhodecode.tests import *
26 from utils import VCSTestError, SCMFetcher
27
28
29 def setup_package():
30 """
31 Prepares whole package for tests which mainly means it would try to fetch
32 test repositories or use already existing ones.
33 """
34 fetchers = {
35 'hg': {
36 'alias': 'hg',
37 'test_repo_path': TEST_HG_REPO,
38 'remote_repo': HG_REMOTE_REPO,
39 'clone_cmd': 'hg clone',
40 },
41 'git': {
42 'alias': 'git',
43 'test_repo_path': TEST_GIT_REPO,
44 'remote_repo': GIT_REMOTE_REPO,
45 'clone_cmd': 'git clone --bare',
46 },
47 }
48 try:
49 for scm, fetcher_info in fetchers.items():
50 fetcher = SCMFetcher(**fetcher_info)
51 fetcher.setup()
52 except VCSTestError, err:
53 raise RuntimeError(str(err))
54
55 start_dir = os.path.abspath(os.path.dirname(__file__))
56 unittest.defaultTestLoader.discover(start_dir)
@@ -0,0 +1,10 b''
1 [user]
2 name = Foo Bar
3 email = foo.bar@example.com
4
5 [ui]
6 username = Foo Bar foo.bar@example.com
7
8 [universal]
9 foo = bar
10
@@ -0,0 +1,111 b''
1 """
2 Module providing backend independent mixin class. It requires that
3 InMemoryChangeset class is working properly at backend class.
4 """
5 import os
6 from rhodecode.lib import vcs
7 import time
8 import shutil
9 import datetime
10 from rhodecode.lib.vcs.utils.compat import unittest
11
12 from conf import SCM_TESTS, get_new_dir
13
14 from rhodecode.lib.vcs.nodes import FileNode
15
16
17 class BackendTestMixin(object):
18 """
19 This is a backend independent test case class which should be created
20 with ``type`` method.
21
22 It is required to set following attributes at subclass:
23
24 - ``backend_alias``: alias of used backend (see ``vcs.BACKENDS``)
25 - ``repo_path``: path to the repository which would be created for set of
26 tests
27 - ``recreate_repo_per_test``: If set to ``False``, repo would NOT be created
28 before every single test. Defaults to ``True``.
29 """
30 recreate_repo_per_test = True
31
32 @classmethod
33 def get_backend(cls):
34 return vcs.get_backend(cls.backend_alias)
35
36 @classmethod
37 def _get_commits(cls):
38 commits = [
39 {
40 'message': u'Initial commit',
41 'author': u'Joe Doe <joe.doe@example.com>',
42 'date': datetime.datetime(2010, 1, 1, 20),
43 'added': [
44 FileNode('foobar', content='Foobar'),
45 FileNode('foobar2', content='Foobar II'),
46 FileNode('foo/bar/baz', content='baz here!'),
47 ],
48 },
49 {
50 'message': u'Changes...',
51 'author': u'Jane Doe <jane.doe@example.com>',
52 'date': datetime.datetime(2010, 1, 1, 21),
53 'added': [
54 FileNode('some/new.txt', content='news...'),
55 ],
56 'changed': [
57 FileNode('foobar', 'Foobar I'),
58 ],
59 'removed': [],
60 },
61 ]
62 return commits
63
64 @classmethod
65 def setUpClass(cls):
66 Backend = cls.get_backend()
67 cls.backend_class = Backend
68 cls.repo_path = get_new_dir(str(time.time()))
69 cls.repo = Backend(cls.repo_path, create=True)
70 cls.imc = cls.repo.in_memory_changeset
71
72 for commit in cls._get_commits():
73 for node in commit.get('added', []):
74 cls.imc.add(FileNode(node.path, content=node.content))
75 for node in commit.get('changed', []):
76 cls.imc.change(FileNode(node.path, content=node.content))
77 for node in commit.get('removed', []):
78 cls.imc.remove(FileNode(node.path))
79
80 cls.tip = cls.imc.commit(message=unicode(commit['message']),
81 author=unicode(commit['author']),
82 date=commit['date'])
83
84 @classmethod
85 def tearDownClass(cls):
86 if not getattr(cls, 'recreate_repo_per_test', False) and \
87 'VCS_REMOVE_TEST_DIRS' in os.environ:
88 shutil.rmtree(cls.repo_path)
89
90 def setUp(self):
91 if getattr(self, 'recreate_repo_per_test', False):
92 self.__class__.setUpClass()
93
94 def tearDown(self):
95 if getattr(self, 'recreate_repo_per_test', False) and \
96 'VCS_REMOVE_TEST_DIRS' in os.environ:
97 shutil.rmtree(self.repo_path)
98
99
100 # For each backend create test case class
101 for alias in SCM_TESTS:
102 attrs = {
103 'backend_alias': alias,
104 }
105 cls_name = ''.join(('%s base backend test' % alias).title().split())
106 bases = (BackendTestMixin, unittest.TestCase)
107 globals()[cls_name] = type(cls_name, bases, attrs)
108
109
110 if __name__ == '__main__':
111 unittest.main()
@@ -0,0 +1,61 b''
1 """
2 Unit tests configuration module for vcs.
3 """
4 import os
5 import time
6 import hashlib
7 import tempfile
8 import datetime
9
10 from utils import get_normalized_path
11 from os.path import join as jn
12
13 __all__ = (
14 'TEST_HG_REPO', 'TEST_GIT_REPO', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO',
15 'SCM_TESTS',
16 )
17
18 SCM_TESTS = ['hg', 'git']
19 uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
20
21 THIS = os.path.abspath(os.path.dirname(__file__))
22
23 GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
24
25 TEST_TMP_PATH = os.environ.get('VCS_TEST_ROOT', '/tmp')
26 TEST_GIT_REPO = os.environ.get('VCS_TEST_GIT_REPO',
27 jn(TEST_TMP_PATH, 'vcs-git'))
28 TEST_GIT_REPO_CLONE = os.environ.get('VCS_TEST_GIT_REPO_CLONE',
29 jn(TEST_TMP_PATH, 'vcsgitclone%s' % uniq_suffix))
30 TEST_GIT_REPO_PULL = os.environ.get('VCS_TEST_GIT_REPO_PULL',
31 jn(TEST_TMP_PATH, 'vcsgitpull%s' % uniq_suffix))
32
33 HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
34 TEST_HG_REPO = os.environ.get('VCS_TEST_HG_REPO',
35 jn(TEST_TMP_PATH, 'vcs-hg'))
36 TEST_HG_REPO_CLONE = os.environ.get('VCS_TEST_HG_REPO_CLONE',
37 jn(TEST_TMP_PATH, 'vcshgclone%s' % uniq_suffix))
38 TEST_HG_REPO_PULL = os.environ.get('VCS_TEST_HG_REPO_PULL',
39 jn(TEST_TMP_PATH, 'vcshgpull%s' % uniq_suffix))
40
41 TEST_DIR = os.environ.get('VCS_TEST_ROOT', tempfile.gettempdir())
42 TEST_REPO_PREFIX = 'vcs-test'
43
44
45 def get_new_dir(title):
46 """
47 Returns always new directory path.
48 """
49 name = TEST_REPO_PREFIX
50 if title:
51 name = '-'.join((name, title))
52 hex = hashlib.sha1(str(time.time())).hexdigest()
53 name = '-'.join((name, hex))
54 path = os.path.join(TEST_DIR, name)
55 return get_normalized_path(path)
56
57
58 PACKAGE_DIR = os.path.abspath(os.path.join(
59 os.path.dirname(__file__), '..'))
60
61 TEST_USER_CONFIG_FILE = jn(THIS, 'aconfig')
@@ -0,0 +1,108 b''
1 from __future__ import with_statement
2
3 import os
4 import tarfile
5 import zipfile
6 import datetime
7 import tempfile
8 import StringIO
9 from base import BackendTestMixin
10 from conf import SCM_TESTS
11 from rhodecode.lib.vcs.exceptions import VCSError
12 from rhodecode.lib.vcs.nodes import FileNode
13 from rhodecode.lib.vcs.utils.compat import unittest
14
15
16 class ArchivesTestCaseMixin(BackendTestMixin):
17
18 @classmethod
19 def _get_commits(cls):
20 start_date = datetime.datetime(2010, 1, 1, 20)
21 for x in xrange(5):
22 yield {
23 'message': 'Commit %d' % x,
24 'author': 'Joe Doe <joe.doe@example.com>',
25 'date': start_date + datetime.timedelta(hours=12 * x),
26 'added': [
27 FileNode('%d/file_%d.txt' % (x, x),
28 content='Foobar %d' % x),
29 ],
30 }
31
32 def test_archive_zip(self):
33 path = tempfile.mkstemp()[1]
34 with open(path, 'wb') as f:
35 self.tip.fill_archive(stream=f, kind='zip', prefix='repo')
36 out = zipfile.ZipFile(path)
37
38 for x in xrange(5):
39 node_path = '%d/file_%d.txt' % (x, x)
40 decompressed = StringIO.StringIO()
41 decompressed.write(out.read('repo/' + node_path))
42 self.assertEqual(
43 decompressed.getvalue(),
44 self.tip.get_node(node_path).content)
45
46 def test_archive_tgz(self):
47 path = tempfile.mkstemp()[1]
48 with open(path, 'wb') as f:
49 self.tip.fill_archive(stream=f, kind='tgz', prefix='repo')
50 outdir = tempfile.mkdtemp()
51
52 outfile = tarfile.open(path, 'r|gz')
53 outfile.extractall(outdir)
54
55 for x in xrange(5):
56 node_path = '%d/file_%d.txt' % (x, x)
57 self.assertEqual(
58 open(os.path.join(outdir, 'repo/' + node_path)).read(),
59 self.tip.get_node(node_path).content)
60
61 def test_archive_tbz2(self):
62 path = tempfile.mkstemp()[1]
63 with open(path, 'w+b') as f:
64 self.tip.fill_archive(stream=f, kind='tbz2', prefix='repo')
65 outdir = tempfile.mkdtemp()
66
67 outfile = tarfile.open(path, 'r|bz2')
68 outfile.extractall(outdir)
69
70 for x in xrange(5):
71 node_path = '%d/file_%d.txt' % (x, x)
72 self.assertEqual(
73 open(os.path.join(outdir, 'repo/' + node_path)).read(),
74 self.tip.get_node(node_path).content)
75
76 def test_archive_default_stream(self):
77 tmppath = tempfile.mkstemp()[1]
78 with open(tmppath, 'w') as stream:
79 self.tip.fill_archive(stream=stream)
80 mystream = StringIO.StringIO()
81 self.tip.fill_archive(stream=mystream)
82 mystream.seek(0)
83 with open(tmppath, 'r') as f:
84 self.assertEqual(f.read(), mystream.read())
85
86 def test_archive_wrong_kind(self):
87 with self.assertRaises(VCSError):
88 self.tip.fill_archive(kind='wrong kind')
89
90 def test_archive_empty_prefix(self):
91 with self.assertRaises(VCSError):
92 self.tip.fill_archive(prefix='')
93
94 def test_archive_prefix_with_leading_slash(self):
95 with self.assertRaises(VCSError):
96 self.tip.fill_archive(prefix='/any')
97
98 # For each backend create test case class
99 for alias in SCM_TESTS:
100 attrs = {
101 'backend_alias': alias,
102 }
103 cls_name = ''.join(('%s archive test' % alias).title().split())
104 bases = (ArchivesTestCaseMixin, unittest.TestCase)
105 globals()[cls_name] = type(cls_name, bases, attrs)
106
107 if __name__ == '__main__':
108 unittest.main()
@@ -0,0 +1,118 b''
1 from __future__ import with_statement
2
3 from rhodecode.lib import vcs
4 import datetime
5 from rhodecode.lib.vcs.utils.compat import unittest
6
7 from base import BackendTestMixin
8 from conf import SCM_TESTS
9
10 from rhodecode.lib.vcs.nodes import FileNode
11
12
13 class BranchesTestCaseMixin(BackendTestMixin):
14
15 @classmethod
16 def _get_commits(cls):
17 commits = [
18 {
19 'message': 'Initial commit',
20 'author': 'Joe Doe <joe.doe@example.com>',
21 'date': datetime.datetime(2010, 1, 1, 20),
22 'added': [
23 FileNode('foobar', content='Foobar'),
24 FileNode('foobar2', content='Foobar II'),
25 FileNode('foo/bar/baz', content='baz here!'),
26 ],
27 },
28 {
29 'message': 'Changes...',
30 'author': 'Jane Doe <jane.doe@example.com>',
31 'date': datetime.datetime(2010, 1, 1, 21),
32 'added': [
33 FileNode('some/new.txt', content='news...'),
34 ],
35 'changed': [
36 FileNode('foobar', 'Foobar I'),
37 ],
38 'removed': [],
39 },
40 ]
41 return commits
42
43 def test_simple(self):
44 tip = self.repo.get_changeset()
45 self.assertEqual(tip.date, datetime.datetime(2010, 1, 1, 21))
46
47 def test_new_branch(self):
48 # This check must not be removed to ensure the 'branches' LazyProperty
49 # gets hit *before* the new 'foobar' branch got created:
50 self.assertFalse('foobar' in self.repo.branches)
51 self.imc.add(vcs.nodes.FileNode('docs/index.txt',
52 content='Documentation\n'))
53 foobar_tip = self.imc.commit(
54 message=u'New branch: foobar',
55 author=u'joe',
56 branch='foobar',
57 )
58 self.assertTrue('foobar' in self.repo.branches)
59 self.assertEqual(foobar_tip.branch, 'foobar')
60
61 def test_new_head(self):
62 tip = self.repo.get_changeset()
63 self.imc.add(vcs.nodes.FileNode('docs/index.txt',
64 content='Documentation\n'))
65 foobar_tip = self.imc.commit(
66 message=u'New branch: foobar',
67 author=u'joe',
68 branch='foobar',
69 parents=[tip],
70 )
71 self.imc.change(vcs.nodes.FileNode('docs/index.txt',
72 content='Documentation\nand more...\n'))
73 newtip = self.imc.commit(
74 message=u'At default branch',
75 author=u'joe',
76 branch=foobar_tip.branch,
77 parents=[foobar_tip],
78 )
79
80 newest_tip = self.imc.commit(
81 message=u'Merged with %s' % foobar_tip.raw_id,
82 author=u'joe',
83 branch=self.backend_class.DEFAULT_BRANCH_NAME,
84 parents=[newtip, foobar_tip],
85 )
86
87 self.assertEqual(newest_tip.branch,
88 self.backend_class.DEFAULT_BRANCH_NAME)
89
90 def test_branch_with_slash_in_name(self):
91 self.imc.add(vcs.nodes.FileNode('extrafile', content='Some data\n'))
92 self.imc.commit(u'Branch with a slash!', author=u'joe',
93 branch='issue/123')
94 self.assertTrue('issue/123' in self.repo.branches)
95
96 def test_branch_with_slash_in_name_and_similar_without(self):
97 self.imc.add(vcs.nodes.FileNode('extrafile', content='Some data\n'))
98 self.imc.commit(u'Branch with a slash!', author=u'joe',
99 branch='issue/123')
100 self.imc.add(vcs.nodes.FileNode('extrafile II', content='Some data\n'))
101 self.imc.commit(u'Branch without a slash...', author=u'joe',
102 branch='123')
103 self.assertIn('issue/123', self.repo.branches)
104 self.assertIn('123', self.repo.branches)
105
106
107 # For each backend create test case class
108 for alias in SCM_TESTS:
109 attrs = {
110 'backend_alias': alias,
111 }
112 cls_name = ''.join(('%s branches test' % alias).title().split())
113 bases = (BranchesTestCaseMixin, unittest.TestCase)
114 globals()[cls_name] = type(cls_name, bases, attrs)
115
116
117 if __name__ == '__main__':
118 unittest.main()
@@ -0,0 +1,336 b''
1 from __future__ import with_statement
2
3 from rhodecode.lib import vcs
4 import datetime
5 from base import BackendTestMixin
6 from conf import SCM_TESTS
7 from rhodecode.lib.vcs.backends.base import BaseChangeset
8 from rhodecode.lib.vcs.nodes import FileNode
9 from rhodecode.lib.vcs.exceptions import BranchDoesNotExistError
10 from rhodecode.lib.vcs.exceptions import ChangesetDoesNotExistError
11 from rhodecode.lib.vcs.exceptions import RepositoryError
12 from rhodecode.lib.vcs.utils.compat import unittest
13
14
15 class TestBaseChangeset(unittest.TestCase):
16
17 def test_as_dict(self):
18 changeset = BaseChangeset()
19 changeset.id = 'ID'
20 changeset.raw_id = 'RAW_ID'
21 changeset.short_id = 'SHORT_ID'
22 changeset.revision = 1009
23 changeset.date = datetime.datetime(2011, 1, 30, 1, 45)
24 changeset.message = 'Message of a commit'
25 changeset.author = 'Joe Doe <joe.doe@example.com>'
26 changeset.added = [FileNode('foo/bar/baz'), FileNode('foobar')]
27 changeset.changed = []
28 changeset.removed = []
29 self.assertEqual(changeset.as_dict(), {
30 'id': 'ID',
31 'raw_id': 'RAW_ID',
32 'short_id': 'SHORT_ID',
33 'revision': 1009,
34 'date': datetime.datetime(2011, 1, 30, 1, 45),
35 'message': 'Message of a commit',
36 'author': {
37 'name': 'Joe Doe',
38 'email': 'joe.doe@example.com',
39 },
40 'added': ['foo/bar/baz', 'foobar'],
41 'changed': [],
42 'removed': [],
43 })
44
45 class ChangesetsWithCommitsTestCaseixin(BackendTestMixin):
46 recreate_repo_per_test = True
47
48 @classmethod
49 def _get_commits(cls):
50 start_date = datetime.datetime(2010, 1, 1, 20)
51 for x in xrange(5):
52 yield {
53 'message': 'Commit %d' % x,
54 'author': 'Joe Doe <joe.doe@example.com>',
55 'date': start_date + datetime.timedelta(hours=12 * x),
56 'added': [
57 FileNode('file_%d.txt' % x, content='Foobar %d' % x),
58 ],
59 }
60
61 def test_new_branch(self):
62 self.imc.add(vcs.nodes.FileNode('docs/index.txt',
63 content='Documentation\n'))
64 foobar_tip = self.imc.commit(
65 message=u'New branch: foobar',
66 author=u'joe',
67 branch='foobar',
68 )
69 self.assertTrue('foobar' in self.repo.branches)
70 self.assertEqual(foobar_tip.branch, 'foobar')
71 # 'foobar' should be the only branch that contains the new commit
72 self.assertNotEqual(*self.repo.branches.values())
73
74 def test_new_head_in_default_branch(self):
75 tip = self.repo.get_changeset()
76 self.imc.add(vcs.nodes.FileNode('docs/index.txt',
77 content='Documentation\n'))
78 foobar_tip = self.imc.commit(
79 message=u'New branch: foobar',
80 author=u'joe',
81 branch='foobar',
82 parents=[tip],
83 )
84 self.imc.change(vcs.nodes.FileNode('docs/index.txt',
85 content='Documentation\nand more...\n'))
86 newtip = self.imc.commit(
87 message=u'At default branch',
88 author=u'joe',
89 branch=foobar_tip.branch,
90 parents=[foobar_tip],
91 )
92
93 newest_tip = self.imc.commit(
94 message=u'Merged with %s' % foobar_tip.raw_id,
95 author=u'joe',
96 branch=self.backend_class.DEFAULT_BRANCH_NAME,
97 parents=[newtip, foobar_tip],
98 )
99
100 self.assertEqual(newest_tip.branch,
101 self.backend_class.DEFAULT_BRANCH_NAME)
102
103 def test_get_changesets_respects_branch_name(self):
104 tip = self.repo.get_changeset()
105 self.imc.add(vcs.nodes.FileNode('docs/index.txt',
106 content='Documentation\n'))
107 doc_changeset = self.imc.commit(
108 message=u'New branch: docs',
109 author=u'joe',
110 branch='docs',
111 )
112 self.imc.add(vcs.nodes.FileNode('newfile', content=''))
113 self.imc.commit(
114 message=u'Back in default branch',
115 author=u'joe',
116 parents=[tip],
117 )
118 default_branch_changesets = self.repo.get_changesets(
119 branch_name=self.repo.DEFAULT_BRANCH_NAME)
120 self.assertNotIn(doc_changeset, default_branch_changesets)
121
122
123 class ChangesetsTestCaseMixin(BackendTestMixin):
124 recreate_repo_per_test = False
125
126 @classmethod
127 def _get_commits(cls):
128 start_date = datetime.datetime(2010, 1, 1, 20)
129 for x in xrange(5):
130 yield {
131 'message': u'Commit %d' % x,
132 'author': u'Joe Doe <joe.doe@example.com>',
133 'date': start_date + datetime.timedelta(hours=12 * x),
134 'added': [
135 FileNode('file_%d.txt' % x, content='Foobar %d' % x),
136 ],
137 }
138
139 def test_simple(self):
140 tip = self.repo.get_changeset()
141 self.assertEqual(tip.date, datetime.datetime(2010, 1, 3, 20))
142
143 def test_get_changesets_is_ordered_by_date(self):
144 changesets = list(self.repo.get_changesets())
145 ordered_by_date = sorted(changesets,
146 key=lambda cs: cs.date)
147 self.assertItemsEqual(changesets, ordered_by_date)
148
149 def test_get_changesets_respects_start(self):
150 second_id = self.repo.revisions[1]
151 changesets = list(self.repo.get_changesets(start=second_id))
152 self.assertEqual(len(changesets), 4)
153
154 def test_get_changesets_numerical_id_respects_start(self):
155 second_id = 1
156 changesets = list(self.repo.get_changesets(start=second_id))
157 self.assertEqual(len(changesets), 4)
158
159 def test_get_changesets_includes_start_changeset(self):
160 second_id = self.repo.revisions[1]
161 changesets = list(self.repo.get_changesets(start=second_id))
162 self.assertEqual(changesets[0].raw_id, second_id)
163
164 def test_get_changesets_respects_end(self):
165 second_id = self.repo.revisions[1]
166 changesets = list(self.repo.get_changesets(end=second_id))
167 self.assertEqual(changesets[-1].raw_id, second_id)
168 self.assertEqual(len(changesets), 2)
169
170 def test_get_changesets_numerical_id_respects_end(self):
171 second_id = 1
172 changesets = list(self.repo.get_changesets(end=second_id))
173 self.assertEqual(changesets.index(changesets[-1]), second_id)
174 self.assertEqual(len(changesets), 2)
175
176 def test_get_changesets_respects_both_start_and_end(self):
177 second_id = self.repo.revisions[1]
178 third_id = self.repo.revisions[2]
179 changesets = list(self.repo.get_changesets(start=second_id,
180 end=third_id))
181 self.assertEqual(len(changesets), 2)
182
183 def test_get_changesets_numerical_id_respects_both_start_and_end(self):
184 changesets = list(self.repo.get_changesets(start=2, end=3))
185 self.assertEqual(len(changesets), 2)
186
187 def test_get_changesets_includes_end_changeset(self):
188 second_id = self.repo.revisions[1]
189 changesets = list(self.repo.get_changesets(end=second_id))
190 self.assertEqual(changesets[-1].raw_id, second_id)
191
192 def test_get_changesets_respects_start_date(self):
193 start_date = datetime.datetime(2010, 2, 1)
194 for cs in self.repo.get_changesets(start_date=start_date):
195 self.assertGreaterEqual(cs.date, start_date)
196
197 def test_get_changesets_respects_end_date(self):
198 end_date = datetime.datetime(2010, 2, 1)
199 for cs in self.repo.get_changesets(end_date=end_date):
200 self.assertLessEqual(cs.date, end_date)
201
202 def test_get_changesets_respects_reverse(self):
203 changesets_id_list = [cs.raw_id for cs in
204 self.repo.get_changesets(reverse=True)]
205 self.assertItemsEqual(changesets_id_list, reversed(self.repo.revisions))
206
207 def test_get_filenodes_generator(self):
208 tip = self.repo.get_changeset()
209 filepaths = [node.path for node in tip.get_filenodes_generator()]
210 self.assertItemsEqual(filepaths, ['file_%d.txt' % x for x in xrange(5)])
211
212 def test_size(self):
213 tip = self.repo.get_changeset()
214 size = 5 * len('Foobar N') # Size of 5 files
215 self.assertEqual(tip.size, size)
216
217 def test_author(self):
218 tip = self.repo.get_changeset()
219 self.assertEqual(tip.author, u'Joe Doe <joe.doe@example.com>')
220
221 def test_author_name(self):
222 tip = self.repo.get_changeset()
223 self.assertEqual(tip.author_name, u'Joe Doe')
224
225 def test_author_email(self):
226 tip = self.repo.get_changeset()
227 self.assertEqual(tip.author_email, u'joe.doe@example.com')
228
229 def test_get_changesets_raise_changesetdoesnotexist_for_wrong_start(self):
230 with self.assertRaises(ChangesetDoesNotExistError):
231 list(self.repo.get_changesets(start='foobar'))
232
233 def test_get_changesets_raise_changesetdoesnotexist_for_wrong_end(self):
234 with self.assertRaises(ChangesetDoesNotExistError):
235 list(self.repo.get_changesets(end='foobar'))
236
237 def test_get_changesets_raise_branchdoesnotexist_for_wrong_branch_name(self):
238 with self.assertRaises(BranchDoesNotExistError):
239 list(self.repo.get_changesets(branch_name='foobar'))
240
241 def test_get_changesets_raise_repositoryerror_for_wrong_start_end(self):
242 start = self.repo.revisions[-1]
243 end = self.repo.revisions[0]
244 with self.assertRaises(RepositoryError):
245 list(self.repo.get_changesets(start=start, end=end))
246
247 def test_get_changesets_numerical_id_reversed(self):
248 with self.assertRaises(RepositoryError):
249 [x for x in self.repo.get_changesets(start=3, end=2)]
250
251 def test_get_changesets_numerical_id_respects_both_start_and_end_last(self):
252 with self.assertRaises(RepositoryError):
253 last = len(self.repo.revisions)
254 list(self.repo.get_changesets(start=last-1, end=last-2))
255
256 def test_get_changesets_numerical_id_last_zero_error(self):
257 with self.assertRaises(RepositoryError):
258 last = len(self.repo.revisions)
259 list(self.repo.get_changesets(start=last-1, end=0))
260
261
262 class ChangesetsChangesTestCaseMixin(BackendTestMixin):
263 recreate_repo_per_test = False
264
265 @classmethod
266 def _get_commits(cls):
267 return [
268 {
269 'message': u'Initial',
270 'author': u'Joe Doe <joe.doe@example.com>',
271 'date': datetime.datetime(2010, 1, 1, 20),
272 'added': [
273 FileNode('foo/bar', content='foo'),
274 FileNode('foobar', content='foo'),
275 FileNode('qwe', content='foo'),
276 ],
277 },
278 {
279 'message': u'Massive changes',
280 'author': u'Joe Doe <joe.doe@example.com>',
281 'date': datetime.datetime(2010, 1, 1, 22),
282 'added': [FileNode('fallout', content='War never changes')],
283 'changed': [
284 FileNode('foo/bar', content='baz'),
285 FileNode('foobar', content='baz'),
286 ],
287 'removed': [FileNode('qwe')],
288 },
289 ]
290
291 def test_initial_commit(self):
292 changeset = self.repo.get_changeset(0)
293 self.assertItemsEqual(changeset.added, [
294 changeset.get_node('foo/bar'),
295 changeset.get_node('foobar'),
296 changeset.get_node('qwe'),
297 ])
298 self.assertItemsEqual(changeset.changed, [])
299 self.assertItemsEqual(changeset.removed, [])
300
301 def test_head_added(self):
302 changeset = self.repo.get_changeset()
303 self.assertItemsEqual(changeset.added, [
304 changeset.get_node('fallout'),
305 ])
306 self.assertItemsEqual(changeset.changed, [
307 changeset.get_node('foo/bar'),
308 changeset.get_node('foobar'),
309 ])
310 self.assertEqual(len(changeset.removed), 1)
311 self.assertEqual(list(changeset.removed)[0].path, 'qwe')
312
313
314 # For each backend create test case class
315 for alias in SCM_TESTS:
316 attrs = {
317 'backend_alias': alias,
318 }
319 # tests with additional commits
320 cls_name = ''.join(('%s changesets with commits test' % alias).title().split())
321 bases = (ChangesetsWithCommitsTestCaseixin, unittest.TestCase)
322 globals()[cls_name] = type(cls_name, bases, attrs)
323
324 # tests without additional commits
325 cls_name = ''.join(('%s changesets test' % alias).title().split())
326 bases = (ChangesetsTestCaseMixin, unittest.TestCase)
327 globals()[cls_name] = type(cls_name, bases, attrs)
328
329 # tests changes
330 cls_name = ''.join(('%s changesets changes test' % alias).title().split())
331 bases = (ChangesetsChangesTestCaseMixin, unittest.TestCase)
332 globals()[cls_name] = type(cls_name, bases, attrs)
333
334
335 if __name__ == '__main__':
336 unittest.main()
@@ -0,0 +1,49 b''
1 # encoding: utf8
2
3 from __future__ import with_statement
4
5 import datetime
6 from rhodecode.lib.vcs.nodes import FileNode
7 from rhodecode.lib.vcs.utils.compat import unittest
8 from test_inmemchangesets import BackendBaseTestCase
9 from conf import SCM_TESTS
10
11
12 class FileNodeUnicodePathTestsMixin(object):
13
14 fname = 'Δ…Ε›Γ°Δ…Δ™Ε‚Δ…Δ‡.txt'
15 ufname = (fname).decode('utf-8')
16
17 def get_commits(self):
18 self.nodes = [
19 FileNode(self.fname, content='Foobar'),
20 ]
21
22 commits = [
23 {
24 'message': 'Initial commit',
25 'author': 'Joe Doe <joe.doe@example.com>',
26 'date': datetime.datetime(2010, 1, 1, 20),
27 'added': self.nodes,
28 },
29 ]
30 return commits
31
32 def test_filenode_path(self):
33 node = self.tip.get_node(self.fname)
34 unode = self.tip.get_node(self.ufname)
35 self.assertEqual(node, unode)
36
37
38 for alias in SCM_TESTS:
39 attrs = {
40 'backend_alias': alias,
41 }
42 cls_name = ''.join(('%s file node unicode path test' % alias).title()
43 .split())
44 bases = (FileNodeUnicodePathTestsMixin, BackendBaseTestCase)
45 globals()[cls_name] = type(cls_name, bases, attrs)
46
47
48 if __name__ == '__main__':
49 unittest.main()
@@ -0,0 +1,44 b''
1 from __future__ import with_statement
2
3 import datetime
4 from base import BackendTestMixin
5 from conf import SCM_TESTS
6 from rhodecode.lib.vcs.nodes import FileNode
7 from rhodecode.lib.vcs.utils.compat import unittest
8
9
10 class GetitemTestCaseMixin(BackendTestMixin):
11
12 @classmethod
13 def _get_commits(cls):
14 start_date = datetime.datetime(2010, 1, 1, 20)
15 for x in xrange(5):
16 yield {
17 'message': 'Commit %d' % x,
18 'author': 'Joe Doe <joe.doe@example.com>',
19 'date': start_date + datetime.timedelta(hours=12 * x),
20 'added': [
21 FileNode('file_%d.txt' % x, content='Foobar %d' % x),
22 ],
23 }
24
25 def test__getitem__last_item_is_tip(self):
26 self.assertEqual(self.repo[-1], self.repo.get_changeset())
27
28 def test__getitem__returns_correct_items(self):
29 changesets = [self.repo[x] for x in xrange(len(self.repo.revisions))]
30 self.assertEqual(changesets, list(self.repo.get_changesets()))
31
32
33 # For each backend create test case class
34 for alias in SCM_TESTS:
35 attrs = {
36 'backend_alias': alias,
37 }
38 cls_name = ''.join(('%s getitem test' % alias).title().split())
39 bases = (GetitemTestCaseMixin, unittest.TestCase)
40 globals()[cls_name] = type(cls_name, bases, attrs)
41
42
43 if __name__ == '__main__':
44 unittest.main()
@@ -0,0 +1,56 b''
1 from __future__ import with_statement
2
3 import datetime
4 from base import BackendTestMixin
5 from conf import SCM_TESTS
6 from rhodecode.lib.vcs.nodes import FileNode
7 from rhodecode.lib.vcs.utils.compat import unittest
8
9
10 class GetsliceTestCaseMixin(BackendTestMixin):
11
12 @classmethod
13 def _get_commits(cls):
14 start_date = datetime.datetime(2010, 1, 1, 20)
15 for x in xrange(5):
16 yield {
17 'message': 'Commit %d' % x,
18 'author': 'Joe Doe <joe.doe@example.com>',
19 'date': start_date + datetime.timedelta(hours=12 * x),
20 'added': [
21 FileNode('file_%d.txt' % x, content='Foobar %d' % x),
22 ],
23 }
24
25 def test__getslice__last_item_is_tip(self):
26 self.assertEqual(list(self.repo[-1:])[0], self.repo.get_changeset())
27
28 def test__getslice__respects_start_index(self):
29 self.assertEqual(list(self.repo[2:]),
30 [self.repo.get_changeset(rev) for rev in self.repo.revisions[2:]])
31
32 def test__getslice__respects_negative_start_index(self):
33 self.assertEqual(list(self.repo[-2:]),
34 [self.repo.get_changeset(rev) for rev in self.repo.revisions[-2:]])
35
36 def test__getslice__respects_end_index(self):
37 self.assertEqual(list(self.repo[:2]),
38 [self.repo.get_changeset(rev) for rev in self.repo.revisions[:2]])
39
40 def test__getslice__respects_negative_end_index(self):
41 self.assertEqual(list(self.repo[:-2]),
42 [self.repo.get_changeset(rev) for rev in self.repo.revisions[:-2]])
43
44
45 # For each backend create test case class
46 for alias in SCM_TESTS:
47 attrs = {
48 'backend_alias': alias,
49 }
50 cls_name = ''.join(('%s getslice test' % alias).title().split())
51 bases = (GetsliceTestCaseMixin, unittest.TestCase)
52 globals()[cls_name] = type(cls_name, bases, attrs)
53
54
55 if __name__ == '__main__':
56 unittest.main()
This diff has been collapsed as it changes many lines, (702 lines changed) Show them Hide them
@@ -0,0 +1,702 b''
1 from __future__ import with_statement
2
3 import os
4 import mock
5 import datetime
6 from rhodecode.lib.vcs.backends.git import GitRepository, GitChangeset
7 from rhodecode.lib.vcs.exceptions import RepositoryError, VCSError, NodeDoesNotExistError
8 from rhodecode.lib.vcs.nodes import NodeKind, FileNode, DirNode, NodeState
9 from rhodecode.lib.vcs.utils.compat import unittest
10 from rhodecode.tests.vcs.base import BackendTestMixin
11 from conf import TEST_GIT_REPO, TEST_GIT_REPO_CLONE, get_new_dir
12
13
14 class GitRepositoryTest(unittest.TestCase):
15
16 def __check_for_existing_repo(self):
17 if os.path.exists(TEST_GIT_REPO_CLONE):
18 self.fail('Cannot test git clone repo as location %s already '
19 'exists. You should manually remove it first.'
20 % TEST_GIT_REPO_CLONE)
21
22 def setUp(self):
23 self.repo = GitRepository(TEST_GIT_REPO)
24
25 def test_wrong_repo_path(self):
26 wrong_repo_path = '/tmp/errorrepo'
27 self.assertRaises(RepositoryError, GitRepository, wrong_repo_path)
28
29 def test_repo_clone(self):
30 self.__check_for_existing_repo()
31 repo = GitRepository(TEST_GIT_REPO)
32 repo_clone = GitRepository(TEST_GIT_REPO_CLONE,
33 src_url=TEST_GIT_REPO, create=True, update_after_clone=True)
34 self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
35 # Checking hashes of changesets should be enough
36 for changeset in repo.get_changesets():
37 raw_id = changeset.raw_id
38 self.assertEqual(raw_id, repo_clone.get_changeset(raw_id).raw_id)
39
40 def test_repo_clone_without_create(self):
41 self.assertRaises(RepositoryError, GitRepository,
42 TEST_GIT_REPO_CLONE + '_wo_create', src_url=TEST_GIT_REPO)
43
44 def test_repo_clone_with_update(self):
45 repo = GitRepository(TEST_GIT_REPO)
46 clone_path = TEST_GIT_REPO_CLONE + '_with_update'
47 repo_clone = GitRepository(clone_path,
48 create=True, src_url=TEST_GIT_REPO, update_after_clone=True)
49 self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
50
51 #check if current workdir was updated
52 fpath = os.path.join(clone_path, 'MANIFEST.in')
53 self.assertEqual(True, os.path.isfile(fpath),
54 'Repo was cloned and updated but file %s could not be found'
55 % fpath)
56
57 def test_repo_clone_without_update(self):
58 repo = GitRepository(TEST_GIT_REPO)
59 clone_path = TEST_GIT_REPO_CLONE + '_without_update'
60 repo_clone = GitRepository(clone_path,
61 create=True, src_url=TEST_GIT_REPO, update_after_clone=False)
62 self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
63 #check if current workdir was *NOT* updated
64 fpath = os.path.join(clone_path, 'MANIFEST.in')
65 # Make sure it's not bare repo
66 self.assertFalse(repo_clone._repo.bare)
67 self.assertEqual(False, os.path.isfile(fpath),
68 'Repo was cloned and updated but file %s was found'
69 % fpath)
70
71 def test_repo_clone_into_bare_repo(self):
72 repo = GitRepository(TEST_GIT_REPO)
73 clone_path = TEST_GIT_REPO_CLONE + '_bare.git'
74 repo_clone = GitRepository(clone_path, create=True,
75 src_url=repo.path, bare=True)
76 self.assertTrue(repo_clone._repo.bare)
77
78 def test_create_repo_is_not_bare_by_default(self):
79 repo = GitRepository(get_new_dir('not-bare-by-default'), create=True)
80 self.assertFalse(repo._repo.bare)
81
82 def test_create_bare_repo(self):
83 repo = GitRepository(get_new_dir('bare-repo'), create=True, bare=True)
84 self.assertTrue(repo._repo.bare)
85
86 def test_revisions(self):
87 # there are 112 revisions (by now)
88 # so we can assume they would be available from now on
89 subset = set([
90 'c1214f7e79e02fc37156ff215cd71275450cffc3',
91 '38b5fe81f109cb111f549bfe9bb6b267e10bc557',
92 'fa6600f6848800641328adbf7811fd2372c02ab2',
93 '102607b09cdd60e2793929c4f90478be29f85a17',
94 '49d3fd156b6f7db46313fac355dca1a0b94a0017',
95 '2d1028c054665b962fa3d307adfc923ddd528038',
96 'd7e0d30fbcae12c90680eb095a4f5f02505ce501',
97 'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
98 'dd80b0f6cf5052f17cc738c2951c4f2070200d7f',
99 '8430a588b43b5d6da365400117c89400326e7992',
100 'd955cd312c17b02143c04fa1099a352b04368118',
101 'f67b87e5c629c2ee0ba58f85197e423ff28d735b',
102 'add63e382e4aabc9e1afdc4bdc24506c269b7618',
103 'f298fe1189f1b69779a4423f40b48edf92a703fc',
104 'bd9b619eb41994cac43d67cf4ccc8399c1125808',
105 '6e125e7c890379446e98980d8ed60fba87d0f6d1',
106 'd4a54db9f745dfeba6933bf5b1e79e15d0af20bd',
107 '0b05e4ed56c802098dfc813cbe779b2f49e92500',
108 '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
109 '45223f8f114c64bf4d6f853e3c35a369a6305520',
110 'ca1eb7957a54bce53b12d1a51b13452f95bc7c7e',
111 'f5ea29fc42ef67a2a5a7aecff10e1566699acd68',
112 '27d48942240f5b91dfda77accd2caac94708cc7d',
113 '622f0eb0bafd619d2560c26f80f09e3b0b0d78af',
114 'e686b958768ee96af8029fe19c6050b1a8dd3b2b'])
115 self.assertTrue(subset.issubset(set(self.repo.revisions)))
116
117
118
119 def test_slicing(self):
120 #4 1 5 10 95
121 for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5),
122 (10, 20, 10), (5, 100, 95)]:
123 revs = list(self.repo[sfrom:sto])
124 self.assertEqual(len(revs), size)
125 self.assertEqual(revs[0], self.repo.get_changeset(sfrom))
126 self.assertEqual(revs[-1], self.repo.get_changeset(sto - 1))
127
128
129 def test_branches(self):
130 # TODO: Need more tests here
131 # Removed (those are 'remotes' branches for cloned repo)
132 #self.assertTrue('master' in self.repo.branches)
133 #self.assertTrue('gittree' in self.repo.branches)
134 #self.assertTrue('web-branch' in self.repo.branches)
135 for name, id in self.repo.branches.items():
136 self.assertTrue(isinstance(
137 self.repo.get_changeset(id), GitChangeset))
138
139 def test_tags(self):
140 # TODO: Need more tests here
141 self.assertTrue('v0.1.1' in self.repo.tags)
142 self.assertTrue('v0.1.2' in self.repo.tags)
143 for name, id in self.repo.tags.items():
144 self.assertTrue(isinstance(
145 self.repo.get_changeset(id), GitChangeset))
146
147 def _test_single_changeset_cache(self, revision):
148 chset = self.repo.get_changeset(revision)
149 self.assertTrue(revision in self.repo.changesets)
150 self.assertTrue(chset is self.repo.changesets[revision])
151
152 def test_initial_changeset(self):
153 id = self.repo.revisions[0]
154 init_chset = self.repo.get_changeset(id)
155 self.assertEqual(init_chset.message, 'initial import\n')
156 self.assertEqual(init_chset.author,
157 'Marcin Kuzminski <marcin@python-blog.com>')
158 for path in ('vcs/__init__.py',
159 'vcs/backends/BaseRepository.py',
160 'vcs/backends/__init__.py'):
161 self.assertTrue(isinstance(init_chset.get_node(path), FileNode))
162 for path in ('', 'vcs', 'vcs/backends'):
163 self.assertTrue(isinstance(init_chset.get_node(path), DirNode))
164
165 self.assertRaises(NodeDoesNotExistError, init_chset.get_node, path='foobar')
166
167 node = init_chset.get_node('vcs/')
168 self.assertTrue(hasattr(node, 'kind'))
169 self.assertEqual(node.kind, NodeKind.DIR)
170
171 node = init_chset.get_node('vcs')
172 self.assertTrue(hasattr(node, 'kind'))
173 self.assertEqual(node.kind, NodeKind.DIR)
174
175 node = init_chset.get_node('vcs/__init__.py')
176 self.assertTrue(hasattr(node, 'kind'))
177 self.assertEqual(node.kind, NodeKind.FILE)
178
179 def test_not_existing_changeset(self):
180 self.assertRaises(RepositoryError, self.repo.get_changeset,
181 'f' * 40)
182
183 def test_changeset10(self):
184
185 chset10 = self.repo.get_changeset(self.repo.revisions[9])
186 README = """===
187 VCS
188 ===
189
190 Various Version Control System management abstraction layer for Python.
191
192 Introduction
193 ------------
194
195 TODO: To be written...
196
197 """
198 node = chset10.get_node('README.rst')
199 self.assertEqual(node.kind, NodeKind.FILE)
200 self.assertEqual(node.content, README)
201
202
203 class GitChangesetTest(unittest.TestCase):
204
205 def setUp(self):
206 self.repo = GitRepository(TEST_GIT_REPO)
207
208 def test_default_changeset(self):
209 tip = self.repo.get_changeset()
210 self.assertEqual(tip, self.repo.get_changeset(None))
211 self.assertEqual(tip, self.repo.get_changeset('tip'))
212
213 def test_root_node(self):
214 tip = self.repo.get_changeset()
215 self.assertTrue(tip.root is tip.get_node(''))
216
217 def test_lazy_fetch(self):
218 """
219 Test if changeset's nodes expands and are cached as we walk through
220 the revision. This test is somewhat hard to write as order of tests
221 is a key here. Written by running command after command in a shell.
222 """
223 hex = '2a13f185e4525f9d4b59882791a2d397b90d5ddc'
224 self.assertTrue(hex in self.repo.revisions)
225 chset = self.repo.get_changeset(hex)
226 self.assertTrue(len(chset.nodes) == 0)
227 root = chset.root
228 self.assertTrue(len(chset.nodes) == 1)
229 self.assertTrue(len(root.nodes) == 8)
230 # accessing root.nodes updates chset.nodes
231 self.assertTrue(len(chset.nodes) == 9)
232
233 docs = root.get_node('docs')
234 # we haven't yet accessed anything new as docs dir was already cached
235 self.assertTrue(len(chset.nodes) == 9)
236 self.assertTrue(len(docs.nodes) == 8)
237 # accessing docs.nodes updates chset.nodes
238 self.assertTrue(len(chset.nodes) == 17)
239
240 self.assertTrue(docs is chset.get_node('docs'))
241 self.assertTrue(docs is root.nodes[0])
242 self.assertTrue(docs is root.dirs[0])
243 self.assertTrue(docs is chset.get_node('docs'))
244
245 def test_nodes_with_changeset(self):
246 hex = '2a13f185e4525f9d4b59882791a2d397b90d5ddc'
247 chset = self.repo.get_changeset(hex)
248 root = chset.root
249 docs = root.get_node('docs')
250 self.assertTrue(docs is chset.get_node('docs'))
251 api = docs.get_node('api')
252 self.assertTrue(api is chset.get_node('docs/api'))
253 index = api.get_node('index.rst')
254 self.assertTrue(index is chset.get_node('docs/api/index.rst'))
255 self.assertTrue(index is chset.get_node('docs')\
256 .get_node('api')\
257 .get_node('index.rst'))
258
259 def test_branch_and_tags(self):
260 '''
261 rev0 = self.repo.revisions[0]
262 chset0 = self.repo.get_changeset(rev0)
263 self.assertEqual(chset0.branch, 'master')
264 self.assertEqual(chset0.tags, [])
265
266 rev10 = self.repo.revisions[10]
267 chset10 = self.repo.get_changeset(rev10)
268 self.assertEqual(chset10.branch, 'master')
269 self.assertEqual(chset10.tags, [])
270
271 rev44 = self.repo.revisions[44]
272 chset44 = self.repo.get_changeset(rev44)
273 self.assertEqual(chset44.branch, 'web-branch')
274
275 tip = self.repo.get_changeset('tip')
276 self.assertTrue('tip' in tip.tags)
277 '''
278 # Those tests would fail - branches are now going
279 # to be changed at main API in order to support git backend
280 pass
281
282 def _test_slices(self, limit, offset):
283 count = self.repo.count()
284 changesets = self.repo.get_changesets(limit=limit, offset=offset)
285 idx = 0
286 for changeset in changesets:
287 rev = offset + idx
288 idx += 1
289 rev_id = self.repo.revisions[rev]
290 if idx > limit:
291 self.fail("Exceeded limit already (getting revision %s, "
292 "there are %s total revisions, offset=%s, limit=%s)"
293 % (rev_id, count, offset, limit))
294 self.assertEqual(changeset, self.repo.get_changeset(rev_id))
295 result = list(self.repo.get_changesets(limit=limit, offset=offset))
296 start = offset
297 end = limit and offset + limit or None
298 sliced = list(self.repo[start:end])
299 self.failUnlessEqual(result, sliced,
300 msg="Comparison failed for limit=%s, offset=%s"
301 "(get_changeset returned: %s and sliced: %s"
302 % (limit, offset, result, sliced))
303
304 def _test_file_size(self, revision, path, size):
305 node = self.repo.get_changeset(revision).get_node(path)
306 self.assertTrue(node.is_file())
307 self.assertEqual(node.size, size)
308
309 def test_file_size(self):
310 to_check = (
311 ('c1214f7e79e02fc37156ff215cd71275450cffc3',
312 'vcs/backends/BaseRepository.py', 502),
313 ('d7e0d30fbcae12c90680eb095a4f5f02505ce501',
314 'vcs/backends/hg.py', 854),
315 ('6e125e7c890379446e98980d8ed60fba87d0f6d1',
316 'setup.py', 1068),
317
318 ('d955cd312c17b02143c04fa1099a352b04368118',
319 'vcs/backends/base.py', 2921),
320 ('ca1eb7957a54bce53b12d1a51b13452f95bc7c7e',
321 'vcs/backends/base.py', 3936),
322 ('f50f42baeed5af6518ef4b0cb2f1423f3851a941',
323 'vcs/backends/base.py', 6189),
324 )
325 for revision, path, size in to_check:
326 self._test_file_size(revision, path, size)
327
328 def test_file_history(self):
329 # we can only check if those revisions are present in the history
330 # as we cannot update this test every time file is changed
331 files = {
332 'setup.py': [
333 '54386793436c938cff89326944d4c2702340037d',
334 '51d254f0ecf5df2ce50c0b115741f4cf13985dab',
335 '998ed409c795fec2012b1c0ca054d99888b22090',
336 '5e0eb4c47f56564395f76333f319d26c79e2fb09',
337 '0115510b70c7229dbc5dc49036b32e7d91d23acd',
338 '7cb3fd1b6d8c20ba89e2264f1c8baebc8a52d36e',
339 '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
340 '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
341 'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
342 ],
343 'vcs/nodes.py': [
344 '33fa3223355104431402a888fa77a4e9956feb3e',
345 'fa014c12c26d10ba682fadb78f2a11c24c8118e1',
346 'e686b958768ee96af8029fe19c6050b1a8dd3b2b',
347 'ab5721ca0a081f26bf43d9051e615af2cc99952f',
348 'c877b68d18e792a66b7f4c529ea02c8f80801542',
349 '4313566d2e417cb382948f8d9d7c765330356054',
350 '6c2303a793671e807d1cfc70134c9ca0767d98c2',
351 '54386793436c938cff89326944d4c2702340037d',
352 '54000345d2e78b03a99d561399e8e548de3f3203',
353 '1c6b3677b37ea064cb4b51714d8f7498f93f4b2b',
354 '2d03ca750a44440fb5ea8b751176d1f36f8e8f46',
355 '2a08b128c206db48c2f0b8f70df060e6db0ae4f8',
356 '30c26513ff1eb8e5ce0e1c6b477ee5dc50e2f34b',
357 'ac71e9503c2ca95542839af0ce7b64011b72ea7c',
358 '12669288fd13adba2a9b7dd5b870cc23ffab92d2',
359 '5a0c84f3e6fe3473e4c8427199d5a6fc71a9b382',
360 '12f2f5e2b38e6ff3fbdb5d722efed9aa72ecb0d5',
361 '5eab1222a7cd4bfcbabc218ca6d04276d4e27378',
362 'f50f42baeed5af6518ef4b0cb2f1423f3851a941',
363 'd7e390a45f6aa96f04f5e7f583ad4f867431aa25',
364 'f15c21f97864b4f071cddfbf2750ec2e23859414',
365 'e906ef056cf539a4e4e5fc8003eaf7cf14dd8ade',
366 'ea2b108b48aa8f8c9c4a941f66c1a03315ca1c3b',
367 '84dec09632a4458f79f50ddbbd155506c460b4f9',
368 '0115510b70c7229dbc5dc49036b32e7d91d23acd',
369 '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
370 '3bf1c5868e570e39569d094f922d33ced2fa3b2b',
371 'b8d04012574729d2c29886e53b1a43ef16dd00a1',
372 '6970b057cffe4aab0a792aa634c89f4bebf01441',
373 'dd80b0f6cf5052f17cc738c2951c4f2070200d7f',
374 'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
375 ],
376 'vcs/backends/git.py': [
377 '4cf116ad5a457530381135e2f4c453e68a1b0105',
378 '9a751d84d8e9408e736329767387f41b36935153',
379 'cb681fb539c3faaedbcdf5ca71ca413425c18f01',
380 '428f81bb652bcba8d631bce926e8834ff49bdcc6',
381 '180ab15aebf26f98f714d8c68715e0f05fa6e1c7',
382 '2b8e07312a2e89e92b90426ab97f349f4bce2a3a',
383 '50e08c506174d8645a4bb517dd122ac946a0f3bf',
384 '54000345d2e78b03a99d561399e8e548de3f3203',
385 ],
386 }
387 for path, revs in files.items():
388 node = self.repo.get_changeset(revs[0]).get_node(path)
389 node_revs = [chset.raw_id for chset in node.history]
390 self.assertTrue(set(revs).issubset(set(node_revs)),
391 "We assumed that %s is subset of revisions for which file %s "
392 "has been changed, and history of that node returned: %s"
393 % (revs, path, node_revs))
394
395 def test_file_annotate(self):
396 files = {
397 'vcs/backends/__init__.py': {
398 'c1214f7e79e02fc37156ff215cd71275450cffc3': {
399 'lines_no': 1,
400 'changesets': [
401 'c1214f7e79e02fc37156ff215cd71275450cffc3',
402 ],
403 },
404 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647': {
405 'lines_no': 21,
406 'changesets': [
407 '49d3fd156b6f7db46313fac355dca1a0b94a0017',
408 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
409 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
410 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
411 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
412 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
413 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
414 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
415 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
416 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
417 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
418 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
419 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
420 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
421 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
422 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
423 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
424 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
425 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
426 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
427 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
428 ],
429 },
430 'e29b67bd158580fc90fc5e9111240b90e6e86064': {
431 'lines_no': 32,
432 'changesets': [
433 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
434 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
435 '5eab1222a7cd4bfcbabc218ca6d04276d4e27378',
436 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
437 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
438 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
439 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
440 '54000345d2e78b03a99d561399e8e548de3f3203',
441 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
442 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
443 '78c3f0c23b7ee935ec276acb8b8212444c33c396',
444 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
445 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
446 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
447 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
448 '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
449 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
450 '78c3f0c23b7ee935ec276acb8b8212444c33c396',
451 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
452 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
453 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
454 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
455 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
456 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
457 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
458 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
459 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
460 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
461 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
462 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
463 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
464 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
465 ],
466 },
467 },
468 }
469
470 for fname, revision_dict in files.items():
471 for rev, data in revision_dict.items():
472 cs = self.repo.get_changeset(rev)
473 ann = cs.get_file_annotate(fname)
474
475 l1 = [x[1].raw_id for x in ann]
476 l2 = files[fname][rev]['changesets']
477 self.assertTrue(l1 == l2 , "The lists of revision for %s@rev %s"
478 "from annotation list should match each other, "
479 "got \n%s \nvs \n%s " % (fname, rev, l1, l2))
480
481 def test_files_state(self):
482 """
483 Tests state of FileNodes.
484 """
485 node = self.repo\
486 .get_changeset('e6ea6d16e2f26250124a1f4b4fe37a912f9d86a0')\
487 .get_node('vcs/utils/diffs.py')
488 self.assertTrue(node.state, NodeState.ADDED)
489 self.assertTrue(node.added)
490 self.assertFalse(node.changed)
491 self.assertFalse(node.not_changed)
492 self.assertFalse(node.removed)
493
494 node = self.repo\
495 .get_changeset('33fa3223355104431402a888fa77a4e9956feb3e')\
496 .get_node('.hgignore')
497 self.assertTrue(node.state, NodeState.CHANGED)
498 self.assertFalse(node.added)
499 self.assertTrue(node.changed)
500 self.assertFalse(node.not_changed)
501 self.assertFalse(node.removed)
502
503 node = self.repo\
504 .get_changeset('e29b67bd158580fc90fc5e9111240b90e6e86064')\
505 .get_node('setup.py')
506 self.assertTrue(node.state, NodeState.NOT_CHANGED)
507 self.assertFalse(node.added)
508 self.assertFalse(node.changed)
509 self.assertTrue(node.not_changed)
510 self.assertFalse(node.removed)
511
512 # If node has REMOVED state then trying to fetch it would raise
513 # ChangesetError exception
514 chset = self.repo.get_changeset(
515 'fa6600f6848800641328adbf7811fd2372c02ab2')
516 path = 'vcs/backends/BaseRepository.py'
517 self.assertRaises(NodeDoesNotExistError, chset.get_node, path)
518 # but it would be one of ``removed`` (changeset's attribute)
519 self.assertTrue(path in [rf.path for rf in chset.removed])
520
521 chset = self.repo.get_changeset(
522 '54386793436c938cff89326944d4c2702340037d')
523 changed = ['setup.py', 'tests/test_nodes.py', 'vcs/backends/hg.py',
524 'vcs/nodes.py']
525 self.assertEqual(set(changed), set([f.path for f in chset.changed]))
526
527 def test_commit_message_is_unicode(self):
528 for cs in self.repo:
529 self.assertEqual(type(cs.message), unicode)
530
531 def test_changeset_author_is_unicode(self):
532 for cs in self.repo:
533 self.assertEqual(type(cs.author), unicode)
534
535 def test_repo_files_content_is_unicode(self):
536 changeset = self.repo.get_changeset()
537 for node in changeset.get_node('/'):
538 if node.is_file():
539 self.assertEqual(type(node.content), unicode)
540
541 def test_wrong_path(self):
542 # There is 'setup.py' in the root dir but not there:
543 path = 'foo/bar/setup.py'
544 tip = self.repo.get_changeset()
545 self.assertRaises(VCSError, tip.get_node, path)
546
547 def test_author_email(self):
548 self.assertEqual('marcin@python-blog.com',
549 self.repo.get_changeset('c1214f7e79e02fc37156ff215cd71275450cffc3')\
550 .author_email)
551 self.assertEqual('lukasz.balcerzak@python-center.pl',
552 self.repo.get_changeset('ff7ca51e58c505fec0dd2491de52c622bb7a806b')\
553 .author_email)
554 self.assertEqual('none@none',
555 self.repo.get_changeset('8430a588b43b5d6da365400117c89400326e7992')\
556 .author_email)
557
558 def test_author_username(self):
559 self.assertEqual('Marcin Kuzminski',
560 self.repo.get_changeset('c1214f7e79e02fc37156ff215cd71275450cffc3')\
561 .author_name)
562 self.assertEqual('Lukasz Balcerzak',
563 self.repo.get_changeset('ff7ca51e58c505fec0dd2491de52c622bb7a806b')\
564 .author_name)
565 self.assertEqual('marcink',
566 self.repo.get_changeset('8430a588b43b5d6da365400117c89400326e7992')\
567 .author_name)
568
569
570 class GitSpecificTest(unittest.TestCase):
571
572 def test_error_is_raised_for_added_if_diff_name_status_is_wrong(self):
573 repo = mock.MagicMock()
574 changeset = GitChangeset(repo, 'foobar')
575 changeset._diff_name_status = 'foobar'
576 with self.assertRaises(VCSError):
577 changeset.added
578
579 def test_error_is_raised_for_changed_if_diff_name_status_is_wrong(self):
580 repo = mock.MagicMock()
581 changeset = GitChangeset(repo, 'foobar')
582 changeset._diff_name_status = 'foobar'
583 with self.assertRaises(VCSError):
584 changeset.added
585
586 def test_error_is_raised_for_removed_if_diff_name_status_is_wrong(self):
587 repo = mock.MagicMock()
588 changeset = GitChangeset(repo, 'foobar')
589 changeset._diff_name_status = 'foobar'
590 with self.assertRaises(VCSError):
591 changeset.added
592
593
594 class GitSpecificWithRepoTest(BackendTestMixin, unittest.TestCase):
595 backend_alias = 'git'
596
597 @classmethod
598 def _get_commits(cls):
599 return [
600 {
601 'message': 'Initial',
602 'author': 'Joe Doe <joe.doe@example.com>',
603 'date': datetime.datetime(2010, 1, 1, 20),
604 'added': [
605 FileNode('foobar/static/js/admin/base.js', content='base'),
606 FileNode('foobar/static/admin', content='admin',
607 mode=0120000), # this is a link
608 FileNode('foo', content='foo'),
609 ],
610 },
611 {
612 'message': 'Second',
613 'author': 'Joe Doe <joe.doe@example.com>',
614 'date': datetime.datetime(2010, 1, 1, 22),
615 'added': [
616 FileNode('foo2', content='foo2'),
617 ],
618 },
619 ]
620
621 def test_paths_slow_traversing(self):
622 cs = self.repo.get_changeset()
623 self.assertEqual(cs.get_node('foobar').get_node('static').get_node('js')
624 .get_node('admin').get_node('base.js').content, 'base')
625
626 def test_paths_fast_traversing(self):
627 cs = self.repo.get_changeset()
628 self.assertEqual(cs.get_node('foobar/static/js/admin/base.js').content,
629 'base')
630
631 def test_workdir_get_branch(self):
632 self.repo.run_git_command('checkout -b production')
633 # Regression test: one of following would fail if we don't check
634 # .git/HEAD file
635 self.repo.run_git_command('checkout production')
636 self.assertEqual(self.repo.workdir.get_branch(), 'production')
637 self.repo.run_git_command('checkout master')
638 self.assertEqual(self.repo.workdir.get_branch(), 'master')
639
640 def test_get_diff_runs_git_command_with_hashes(self):
641 self.repo.run_git_command = mock.Mock(return_value=['', ''])
642 self.repo.get_diff(0, 1)
643 self.repo.run_git_command.assert_called_once_with('diff -U%s %s %s' %
644 (3, self.repo._get_revision(0), self.repo._get_revision(1)))
645
646 def test_get_diff_runs_git_command_with_str_hashes(self):
647 self.repo.run_git_command = mock.Mock(return_value=['', ''])
648 self.repo.get_diff(self.repo.EMPTY_CHANGESET, 1)
649 self.repo.run_git_command.assert_called_once_with('show -U%s %s' %
650 (3, self.repo._get_revision(1)))
651
652 def test_get_diff_runs_git_command_with_path_if_its_given(self):
653 self.repo.run_git_command = mock.Mock(return_value=['', ''])
654 self.repo.get_diff(0, 1, 'foo')
655 self.repo.run_git_command.assert_called_once_with('diff -U%s %s %s -- "foo"'
656 % (3, self.repo._get_revision(0), self.repo._get_revision(1)))
657
658
659 class GitRegressionTest(BackendTestMixin, unittest.TestCase):
660 backend_alias = 'git'
661
662 @classmethod
663 def _get_commits(cls):
664 return [
665 {
666 'message': 'Initial',
667 'author': 'Joe Doe <joe.doe@example.com>',
668 'date': datetime.datetime(2010, 1, 1, 20),
669 'added': [
670 FileNode('bot/__init__.py', content='base'),
671 FileNode('bot/templates/404.html', content='base'),
672 FileNode('bot/templates/500.html', content='base'),
673 ],
674 },
675 {
676 'message': 'Second',
677 'author': 'Joe Doe <joe.doe@example.com>',
678 'date': datetime.datetime(2010, 1, 1, 22),
679 'added': [
680 FileNode('bot/build/migrations/1.py', content='foo2'),
681 FileNode('bot/build/migrations/2.py', content='foo2'),
682 FileNode('bot/build/static/templates/f.html', content='foo2'),
683 FileNode('bot/build/static/templates/f1.html', content='foo2'),
684 FileNode('bot/build/templates/err.html', content='foo2'),
685 FileNode('bot/build/templates/err2.html', content='foo2'),
686 ],
687 },
688 ]
689
690 def test_similar_paths(self):
691 cs = self.repo.get_changeset()
692 paths = lambda *n:[x.path for x in n]
693 self.assertEqual(paths(*cs.get_nodes('bot')), ['bot/build', 'bot/templates', 'bot/__init__.py'])
694 self.assertEqual(paths(*cs.get_nodes('bot/build')), ['bot/build/migrations', 'bot/build/static', 'bot/build/templates'])
695 self.assertEqual(paths(*cs.get_nodes('bot/build/static')), ['bot/build/static/templates'])
696 # this get_nodes below causes troubles !
697 self.assertEqual(paths(*cs.get_nodes('bot/build/static/templates')), ['bot/build/static/templates/f.html', 'bot/build/static/templates/f1.html'])
698 self.assertEqual(paths(*cs.get_nodes('bot/build/templates')), ['bot/build/templates/err.html', 'bot/build/templates/err2.html'])
699 self.assertEqual(paths(*cs.get_nodes('bot/templates/')), ['bot/templates/404.html', 'bot/templates/500.html'])
700
701 if __name__ == '__main__':
702 unittest.main()
This diff has been collapsed as it changes many lines, (561 lines changed) Show them Hide them
@@ -0,0 +1,561 b''
1 from __future__ import with_statement
2
3 import os
4 from rhodecode.lib.vcs.backends.hg import MercurialRepository, MercurialChangeset
5 from rhodecode.lib.vcs.exceptions import RepositoryError, VCSError, NodeDoesNotExistError
6 from rhodecode.lib.vcs.nodes import NodeKind, NodeState
7 from conf import PACKAGE_DIR, TEST_HG_REPO, TEST_HG_REPO_CLONE, \
8 TEST_HG_REPO_PULL
9 from rhodecode.lib.vcs.utils.compat import unittest
10
11
12 # Use only clean mercurial's ui
13 import mercurial.scmutil
14 mercurial.scmutil.rcpath()
15 if mercurial.scmutil._rcpath:
16 mercurial.scmutil._rcpath = mercurial.scmutil._rcpath[:1]
17
18
19 class MercurialRepositoryTest(unittest.TestCase):
20
21 def __check_for_existing_repo(self):
22 if os.path.exists(TEST_HG_REPO_CLONE):
23 self.fail('Cannot test mercurial clone repo as location %s already '
24 'exists. You should manually remove it first.'
25 % TEST_HG_REPO_CLONE)
26
27 def setUp(self):
28 self.repo = MercurialRepository(TEST_HG_REPO)
29
30 def test_wrong_repo_path(self):
31 wrong_repo_path = '/tmp/errorrepo'
32 self.assertRaises(RepositoryError, MercurialRepository, wrong_repo_path)
33
34 def test_unicode_path_repo(self):
35 self.assertRaises(VCSError,lambda:MercurialRepository(u'iShouldFail'))
36
37 def test_repo_clone(self):
38 self.__check_for_existing_repo()
39 repo = MercurialRepository(TEST_HG_REPO)
40 repo_clone = MercurialRepository(TEST_HG_REPO_CLONE,
41 src_url=TEST_HG_REPO, update_after_clone=True)
42 self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
43 # Checking hashes of changesets should be enough
44 for changeset in repo.get_changesets():
45 raw_id = changeset.raw_id
46 self.assertEqual(raw_id, repo_clone.get_changeset(raw_id).raw_id)
47
48 def test_repo_clone_with_update(self):
49 repo = MercurialRepository(TEST_HG_REPO)
50 repo_clone = MercurialRepository(TEST_HG_REPO_CLONE + '_w_update',
51 src_url=TEST_HG_REPO, update_after_clone=True)
52 self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
53
54 #check if current workdir was updated
55 self.assertEqual(os.path.isfile(os.path.join(TEST_HG_REPO_CLONE \
56 + '_w_update',
57 'MANIFEST.in')), True,)
58
59 def test_repo_clone_without_update(self):
60 repo = MercurialRepository(TEST_HG_REPO)
61 repo_clone = MercurialRepository(TEST_HG_REPO_CLONE + '_wo_update',
62 src_url=TEST_HG_REPO, update_after_clone=False)
63 self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
64 self.assertEqual(os.path.isfile(os.path.join(TEST_HG_REPO_CLONE \
65 + '_wo_update',
66 'MANIFEST.in')), False,)
67
68 def test_pull(self):
69 if os.path.exists(TEST_HG_REPO_PULL):
70 self.fail('Cannot test mercurial pull command as location %s '
71 'already exists. You should manually remove it first'
72 % TEST_HG_REPO_PULL)
73 repo_new = MercurialRepository(TEST_HG_REPO_PULL, create=True)
74 self.assertTrue(len(self.repo.revisions) > len(repo_new.revisions))
75
76 repo_new.pull(self.repo.path)
77 repo_new = MercurialRepository(TEST_HG_REPO_PULL)
78 self.assertTrue(len(self.repo.revisions) == len(repo_new.revisions))
79
80 def test_revisions(self):
81 # there are 21 revisions at bitbucket now
82 # so we can assume they would be available from now on
83 subset = set(['b986218ba1c9b0d6a259fac9b050b1724ed8e545',
84 '3d8f361e72ab303da48d799ff1ac40d5ac37c67e',
85 '6cba7170863a2411822803fa77a0a264f1310b35',
86 '56349e29c2af3ac913b28bde9a2c6154436e615b',
87 '2dda4e345facb0ccff1a191052dd1606dba6781d',
88 '6fff84722075f1607a30f436523403845f84cd9e',
89 '7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7',
90 '3803844fdbd3b711175fc3da9bdacfcd6d29a6fb',
91 'dc5d2c0661b61928834a785d3e64a3f80d3aad9c',
92 'be90031137367893f1c406e0a8683010fd115b79',
93 'db8e58be770518cbb2b1cdfa69146e47cd481481',
94 '84478366594b424af694a6c784cb991a16b87c21',
95 '17f8e105dddb9f339600389c6dc7175d395a535c',
96 '20a662e756499bde3095ffc9bc0643d1def2d0eb',
97 '2e319b85e70a707bba0beff866d9f9de032aa4f9',
98 '786facd2c61deb9cf91e9534735124fb8fc11842',
99 '94593d2128d38210a2fcd1aabff6dda0d6d9edf8',
100 'aa6a0de05b7612707db567078e130a6cd114a9a7',
101 'eada5a770da98ab0dd7325e29d00e0714f228d09'
102 ])
103 self.assertTrue(subset.issubset(set(self.repo.revisions)))
104
105
106 # check if we have the proper order of revisions
107 org = ['b986218ba1c9b0d6a259fac9b050b1724ed8e545',
108 '3d8f361e72ab303da48d799ff1ac40d5ac37c67e',
109 '6cba7170863a2411822803fa77a0a264f1310b35',
110 '56349e29c2af3ac913b28bde9a2c6154436e615b',
111 '2dda4e345facb0ccff1a191052dd1606dba6781d',
112 '6fff84722075f1607a30f436523403845f84cd9e',
113 '7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7',
114 '3803844fdbd3b711175fc3da9bdacfcd6d29a6fb',
115 'dc5d2c0661b61928834a785d3e64a3f80d3aad9c',
116 'be90031137367893f1c406e0a8683010fd115b79',
117 'db8e58be770518cbb2b1cdfa69146e47cd481481',
118 '84478366594b424af694a6c784cb991a16b87c21',
119 '17f8e105dddb9f339600389c6dc7175d395a535c',
120 '20a662e756499bde3095ffc9bc0643d1def2d0eb',
121 '2e319b85e70a707bba0beff866d9f9de032aa4f9',
122 '786facd2c61deb9cf91e9534735124fb8fc11842',
123 '94593d2128d38210a2fcd1aabff6dda0d6d9edf8',
124 'aa6a0de05b7612707db567078e130a6cd114a9a7',
125 'eada5a770da98ab0dd7325e29d00e0714f228d09',
126 '2c1885c735575ca478bf9e17b0029dca68824458',
127 'd9bcd465040bf869799b09ad732c04e0eea99fe9',
128 '469e9c847fe1f6f7a697b8b25b4bc5b48780c1a7',
129 '4fb8326d78e5120da2c7468dcf7098997be385da',
130 '62b4a097164940bd66030c4db51687f3ec035eed',
131 '536c1a19428381cfea92ac44985304f6a8049569',
132 '965e8ab3c44b070cdaa5bf727ddef0ada980ecc4',
133 '9bb326a04ae5d98d437dece54be04f830cf1edd9',
134 'f8940bcb890a98c4702319fbe36db75ea309b475',
135 'ff5ab059786ebc7411e559a2cc309dfae3625a3b',
136 '6b6ad5f82ad5bb6190037671bd254bd4e1f4bf08',
137 'ee87846a61c12153b51543bf860e1026c6d3dcba', ]
138 self.assertEqual(org, self.repo.revisions[:31])
139
140 def test_iter_slice(self):
141 sliced = list(self.repo[:10])
142 itered = list(self.repo)[:10]
143 self.assertEqual(sliced, itered)
144
145 def test_slicing(self):
146 #4 1 5 10 95
147 for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5),
148 (10, 20, 10), (5, 100, 95)]:
149 revs = list(self.repo[sfrom:sto])
150 self.assertEqual(len(revs), size)
151 self.assertEqual(revs[0], self.repo.get_changeset(sfrom))
152 self.assertEqual(revs[-1], self.repo.get_changeset(sto - 1))
153
154 def test_branches(self):
155 # TODO: Need more tests here
156
157 #active branches
158 self.assertTrue('default' in self.repo.branches)
159
160 #closed branches
161 self.assertFalse('web' in self.repo.branches)
162 self.assertFalse('git' in self.repo.branches)
163
164 # closed
165 self.assertTrue('workdir' in self.repo._get_branches(closed=True))
166 self.assertTrue('webvcs' in self.repo._get_branches(closed=True))
167
168 for name, id in self.repo.branches.items():
169 self.assertTrue(isinstance(
170 self.repo.get_changeset(id), MercurialChangeset))
171
172 def test_tip_in_tags(self):
173 # tip is always a tag
174 self.assertIn('tip', self.repo.tags)
175
176 def test_tip_changeset_in_tags(self):
177 tip = self.repo.get_changeset()
178 self.assertEqual(self.repo.tags['tip'], tip.raw_id)
179
180 def test_initial_changeset(self):
181
182 init_chset = self.repo.get_changeset(0)
183 self.assertEqual(init_chset.message, 'initial import')
184 self.assertEqual(init_chset.author,
185 'Marcin Kuzminski <marcin@python-blog.com>')
186 self.assertEqual(sorted(init_chset._file_paths),
187 sorted([
188 'vcs/__init__.py',
189 'vcs/backends/BaseRepository.py',
190 'vcs/backends/__init__.py',
191 ])
192 )
193 self.assertEqual(sorted(init_chset._dir_paths),
194 sorted(['', 'vcs', 'vcs/backends']))
195
196 self.assertRaises(NodeDoesNotExistError, init_chset.get_node, path='foobar')
197
198 node = init_chset.get_node('vcs/')
199 self.assertTrue(hasattr(node, 'kind'))
200 self.assertEqual(node.kind, NodeKind.DIR)
201
202 node = init_chset.get_node('vcs')
203 self.assertTrue(hasattr(node, 'kind'))
204 self.assertEqual(node.kind, NodeKind.DIR)
205
206 node = init_chset.get_node('vcs/__init__.py')
207 self.assertTrue(hasattr(node, 'kind'))
208 self.assertEqual(node.kind, NodeKind.FILE)
209
210 def test_not_existing_changeset(self):
211 #rawid
212 self.assertRaises(RepositoryError, self.repo.get_changeset,
213 'abcd' * 10)
214 #shortid
215 self.assertRaises(RepositoryError, self.repo.get_changeset,
216 'erro' * 4)
217 #numeric
218 self.assertRaises(RepositoryError, self.repo.get_changeset,
219 self.repo.count() + 1)
220
221
222 # Small chance we ever get to this one
223 revision = pow(2, 30)
224 self.assertRaises(RepositoryError, self.repo.get_changeset, revision)
225
226 def test_changeset10(self):
227
228 chset10 = self.repo.get_changeset(10)
229 README = """===
230 VCS
231 ===
232
233 Various Version Control System management abstraction layer for Python.
234
235 Introduction
236 ------------
237
238 TODO: To be written...
239
240 """
241 node = chset10.get_node('README.rst')
242 self.assertEqual(node.kind, NodeKind.FILE)
243 self.assertEqual(node.content, README)
244
245
246 class MercurialChangesetTest(unittest.TestCase):
247
248 def setUp(self):
249 self.repo = MercurialRepository(TEST_HG_REPO)
250
251 def _test_equality(self, changeset):
252 revision = changeset.revision
253 self.assertEqual(changeset, self.repo.get_changeset(revision))
254
255 def test_equality(self):
256 self.setUp()
257 revs = [0, 10, 20]
258 changesets = [self.repo.get_changeset(rev) for rev in revs]
259 for changeset in changesets:
260 self._test_equality(changeset)
261
262 def test_default_changeset(self):
263 tip = self.repo.get_changeset('tip')
264 self.assertEqual(tip, self.repo.get_changeset())
265 self.assertEqual(tip, self.repo.get_changeset(revision=None))
266 self.assertEqual(tip, list(self.repo[-1:])[0])
267
268 def test_root_node(self):
269 tip = self.repo.get_changeset('tip')
270 self.assertTrue(tip.root is tip.get_node(''))
271
272 def test_lazy_fetch(self):
273 """
274 Test if changeset's nodes expands and are cached as we walk through
275 the revision. This test is somewhat hard to write as order of tests
276 is a key here. Written by running command after command in a shell.
277 """
278 self.setUp()
279 chset = self.repo.get_changeset(45)
280 self.assertTrue(len(chset.nodes) == 0)
281 root = chset.root
282 self.assertTrue(len(chset.nodes) == 1)
283 self.assertTrue(len(root.nodes) == 8)
284 # accessing root.nodes updates chset.nodes
285 self.assertTrue(len(chset.nodes) == 9)
286
287 docs = root.get_node('docs')
288 # we haven't yet accessed anything new as docs dir was already cached
289 self.assertTrue(len(chset.nodes) == 9)
290 self.assertTrue(len(docs.nodes) == 8)
291 # accessing docs.nodes updates chset.nodes
292 self.assertTrue(len(chset.nodes) == 17)
293
294 self.assertTrue(docs is chset.get_node('docs'))
295 self.assertTrue(docs is root.nodes[0])
296 self.assertTrue(docs is root.dirs[0])
297 self.assertTrue(docs is chset.get_node('docs'))
298
299 def test_nodes_with_changeset(self):
300 self.setUp()
301 chset = self.repo.get_changeset(45)
302 root = chset.root
303 docs = root.get_node('docs')
304 self.assertTrue(docs is chset.get_node('docs'))
305 api = docs.get_node('api')
306 self.assertTrue(api is chset.get_node('docs/api'))
307 index = api.get_node('index.rst')
308 self.assertTrue(index is chset.get_node('docs/api/index.rst'))
309 self.assertTrue(index is chset.get_node('docs')\
310 .get_node('api')\
311 .get_node('index.rst'))
312
313 def test_branch_and_tags(self):
314 chset0 = self.repo.get_changeset(0)
315 self.assertEqual(chset0.branch, 'default')
316 self.assertEqual(chset0.tags, [])
317
318 chset10 = self.repo.get_changeset(10)
319 self.assertEqual(chset10.branch, 'default')
320 self.assertEqual(chset10.tags, [])
321
322 chset44 = self.repo.get_changeset(44)
323 self.assertEqual(chset44.branch, 'web')
324
325 tip = self.repo.get_changeset('tip')
326 self.assertTrue('tip' in tip.tags)
327
328 def _test_file_size(self, revision, path, size):
329 node = self.repo.get_changeset(revision).get_node(path)
330 self.assertTrue(node.is_file())
331 self.assertEqual(node.size, size)
332
333 def test_file_size(self):
334 to_check = (
335 (10, 'setup.py', 1068),
336 (20, 'setup.py', 1106),
337 (60, 'setup.py', 1074),
338
339 (10, 'vcs/backends/base.py', 2921),
340 (20, 'vcs/backends/base.py', 3936),
341 (60, 'vcs/backends/base.py', 6189),
342 )
343 for revision, path, size in to_check:
344 self._test_file_size(revision, path, size)
345
346 def test_file_history(self):
347 # we can only check if those revisions are present in the history
348 # as we cannot update this test every time file is changed
349 files = {
350 'setup.py': [7, 18, 45, 46, 47, 69, 77],
351 'vcs/nodes.py': [7, 8, 24, 26, 30, 45, 47, 49, 56, 57, 58, 59, 60,
352 61, 73, 76],
353 'vcs/backends/hg.py': [4, 5, 6, 11, 12, 13, 14, 15, 16, 21, 22, 23,
354 26, 27, 28, 30, 31, 33, 35, 36, 37, 38, 39, 40, 41, 44, 45, 47,
355 48, 49, 53, 54, 55, 58, 60, 61, 67, 68, 69, 70, 73, 77, 78, 79,
356 82],
357 }
358 for path, revs in files.items():
359 tip = self.repo.get_changeset(revs[-1])
360 node = tip.get_node(path)
361 node_revs = [chset.revision for chset in node.history]
362 self.assertTrue(set(revs).issubset(set(node_revs)),
363 "We assumed that %s is subset of revisions for which file %s "
364 "has been changed, and history of that node returned: %s"
365 % (revs, path, node_revs))
366
367 def test_file_annotate(self):
368 files = {
369 'vcs/backends/__init__.py':
370 {89: {'lines_no': 31,
371 'changesets': [32, 32, 61, 32, 32, 37, 32, 32, 32, 44,
372 37, 37, 37, 37, 45, 37, 44, 37, 37, 37,
373 32, 32, 32, 32, 37, 32, 37, 37, 32,
374 32, 32]},
375 20: {'lines_no': 1,
376 'changesets': [4]},
377 55: {'lines_no': 31,
378 'changesets': [32, 32, 45, 32, 32, 37, 32, 32, 32, 44,
379 37, 37, 37, 37, 45, 37, 44, 37, 37, 37,
380 32, 32, 32, 32, 37, 32, 37, 37, 32,
381 32, 32]}},
382 'vcs/exceptions.py':
383 {89: {'lines_no': 18,
384 'changesets': [16, 16, 16, 16, 16, 16, 16, 16, 16, 16,
385 16, 16, 17, 16, 16, 18, 18, 18]},
386 20: {'lines_no': 18,
387 'changesets': [16, 16, 16, 16, 16, 16, 16, 16, 16, 16,
388 16, 16, 17, 16, 16, 18, 18, 18]},
389 55: {'lines_no': 18, 'changesets': [16, 16, 16, 16, 16, 16,
390 16, 16, 16, 16, 16, 16,
391 17, 16, 16, 18, 18, 18]}},
392 'MANIFEST.in': {89: {'lines_no': 5,
393 'changesets': [7, 7, 7, 71, 71]},
394 20: {'lines_no': 3,
395 'changesets': [7, 7, 7]},
396 55: {'lines_no': 3,
397 'changesets': [7, 7, 7]}}}
398
399
400 for fname, revision_dict in files.items():
401 for rev, data in revision_dict.items():
402 cs = self.repo.get_changeset(rev)
403 ann = cs.get_file_annotate(fname)
404
405 l1 = [x[1].revision for x in ann]
406 l2 = files[fname][rev]['changesets']
407 self.assertTrue(l1 == l2 , "The lists of revision for %s@rev%s"
408 "from annotation list should match each other,"
409 "got \n%s \nvs \n%s " % (fname, rev, l1, l2))
410
411 def test_changeset_state(self):
412 """
413 Tests which files have been added/changed/removed at particular revision
414 """
415
416 # rev 46ad32a4f974:
417 # hg st --rev 46ad32a4f974
418 # changed: 13
419 # added: 20
420 # removed: 1
421 changed = set(['.hgignore'
422 , 'README.rst' , 'docs/conf.py' , 'docs/index.rst' , 'setup.py'
423 , 'tests/test_hg.py' , 'tests/test_nodes.py' , 'vcs/__init__.py'
424 , 'vcs/backends/__init__.py' , 'vcs/backends/base.py'
425 , 'vcs/backends/hg.py' , 'vcs/nodes.py' , 'vcs/utils/__init__.py'])
426
427 added = set(['docs/api/backends/hg.rst'
428 , 'docs/api/backends/index.rst' , 'docs/api/index.rst'
429 , 'docs/api/nodes.rst' , 'docs/api/web/index.rst'
430 , 'docs/api/web/simplevcs.rst' , 'docs/installation.rst'
431 , 'docs/quickstart.rst' , 'setup.cfg' , 'vcs/utils/baseui_config.py'
432 , 'vcs/utils/web.py' , 'vcs/web/__init__.py' , 'vcs/web/exceptions.py'
433 , 'vcs/web/simplevcs/__init__.py' , 'vcs/web/simplevcs/exceptions.py'
434 , 'vcs/web/simplevcs/middleware.py' , 'vcs/web/simplevcs/models.py'
435 , 'vcs/web/simplevcs/settings.py' , 'vcs/web/simplevcs/utils.py'
436 , 'vcs/web/simplevcs/views.py'])
437
438 removed = set(['docs/api.rst'])
439
440 chset64 = self.repo.get_changeset('46ad32a4f974')
441 self.assertEqual(set((node.path for node in chset64.added)), added)
442 self.assertEqual(set((node.path for node in chset64.changed)), changed)
443 self.assertEqual(set((node.path for node in chset64.removed)), removed)
444
445 # rev b090f22d27d6:
446 # hg st --rev b090f22d27d6
447 # changed: 13
448 # added: 20
449 # removed: 1
450 chset88 = self.repo.get_changeset('b090f22d27d6')
451 self.assertEqual(set((node.path for node in chset88.added)), set())
452 self.assertEqual(set((node.path for node in chset88.changed)),
453 set(['.hgignore']))
454 self.assertEqual(set((node.path for node in chset88.removed)), set())
455 #
456 # 85:
457 # added: 2 ['vcs/utils/diffs.py', 'vcs/web/simplevcs/views/diffs.py']
458 # changed: 4 ['vcs/web/simplevcs/models.py', ...]
459 # removed: 1 ['vcs/utils/web.py']
460 chset85 = self.repo.get_changeset(85)
461 self.assertEqual(set((node.path for node in chset85.added)), set([
462 'vcs/utils/diffs.py',
463 'vcs/web/simplevcs/views/diffs.py']))
464 self.assertEqual(set((node.path for node in chset85.changed)), set([
465 'vcs/web/simplevcs/models.py',
466 'vcs/web/simplevcs/utils.py',
467 'vcs/web/simplevcs/views/__init__.py',
468 'vcs/web/simplevcs/views/repository.py',
469 ]))
470 self.assertEqual(set((node.path for node in chset85.removed)),
471 set(['vcs/utils/web.py']))
472
473
474 def test_files_state(self):
475 """
476 Tests state of FileNodes.
477 """
478 chset = self.repo.get_changeset(85)
479 node = chset.get_node('vcs/utils/diffs.py')
480 self.assertTrue(node.state, NodeState.ADDED)
481 self.assertTrue(node.added)
482 self.assertFalse(node.changed)
483 self.assertFalse(node.not_changed)
484 self.assertFalse(node.removed)
485
486 chset = self.repo.get_changeset(88)
487 node = chset.get_node('.hgignore')
488 self.assertTrue(node.state, NodeState.CHANGED)
489 self.assertFalse(node.added)
490 self.assertTrue(node.changed)
491 self.assertFalse(node.not_changed)
492 self.assertFalse(node.removed)
493
494 chset = self.repo.get_changeset(85)
495 node = chset.get_node('setup.py')
496 self.assertTrue(node.state, NodeState.NOT_CHANGED)
497 self.assertFalse(node.added)
498 self.assertFalse(node.changed)
499 self.assertTrue(node.not_changed)
500 self.assertFalse(node.removed)
501
502 # If node has REMOVED state then trying to fetch it would raise
503 # ChangesetError exception
504 chset = self.repo.get_changeset(2)
505 path = 'vcs/backends/BaseRepository.py'
506 self.assertRaises(NodeDoesNotExistError, chset.get_node, path)
507 # but it would be one of ``removed`` (changeset's attribute)
508 self.assertTrue(path in [rf.path for rf in chset.removed])
509
510 def test_commit_message_is_unicode(self):
511 for cm in self.repo:
512 self.assertEqual(type(cm.message), unicode)
513
514 def test_changeset_author_is_unicode(self):
515 for cm in self.repo:
516 self.assertEqual(type(cm.author), unicode)
517
518 def test_repo_files_content_is_unicode(self):
519 test_changeset = self.repo.get_changeset(100)
520 for node in test_changeset.get_node('/'):
521 if node.is_file():
522 self.assertEqual(type(node.content), unicode)
523
524 def test_wrong_path(self):
525 # There is 'setup.py' in the root dir but not there:
526 path = 'foo/bar/setup.py'
527 self.assertRaises(VCSError, self.repo.get_changeset().get_node, path)
528
529
530 def test_archival_file(self):
531 #TODO:
532 pass
533
534 def test_archival_as_generator(self):
535 #TODO:
536 pass
537
538 def test_archival_wrong_kind(self):
539 tip = self.repo.get_changeset()
540 self.assertRaises(VCSError, tip.fill_archive, kind='error')
541
542 def test_archival_empty_prefix(self):
543 #TODO:
544 pass
545
546
547 def test_author_email(self):
548 self.assertEqual('marcin@python-blog.com',
549 self.repo.get_changeset('b986218ba1c9').author_email)
550 self.assertEqual('lukasz.balcerzak@python-center.pl',
551 self.repo.get_changeset('3803844fdbd3').author_email)
552 self.assertEqual('',
553 self.repo.get_changeset('84478366594b').author_email)
554
555 def test_author_username(self):
556 self.assertEqual('Marcin Kuzminski',
557 self.repo.get_changeset('b986218ba1c9').author_name)
558 self.assertEqual('Lukasz Balcerzak',
559 self.repo.get_changeset('3803844fdbd3').author_name)
560 self.assertEqual('marcink',
561 self.repo.get_changeset('84478366594b').author_name)
@@ -0,0 +1,340 b''
1 """
2 Tests so called "in memory changesets" commit API of vcs.
3 """
4 from __future__ import with_statement
5
6 from rhodecode.lib import vcs
7 import time
8 import datetime
9 from conf import SCM_TESTS, get_new_dir
10 from rhodecode.lib.vcs.exceptions import EmptyRepositoryError
11 from rhodecode.lib.vcs.exceptions import NodeAlreadyAddedError
12 from rhodecode.lib.vcs.exceptions import NodeAlreadyExistsError
13 from rhodecode.lib.vcs.exceptions import NodeAlreadyRemovedError
14 from rhodecode.lib.vcs.exceptions import NodeAlreadyChangedError
15 from rhodecode.lib.vcs.exceptions import NodeDoesNotExistError
16 from rhodecode.lib.vcs.exceptions import NodeNotChangedError
17 from rhodecode.lib.vcs.nodes import DirNode
18 from rhodecode.lib.vcs.nodes import FileNode
19 from rhodecode.lib.vcs.utils.compat import unittest
20
21
22 class InMemoryChangesetTestMixin(object):
23 """
24 This is a backend independent test case class which should be created
25 with ``type`` method.
26
27 It is required to set following attributes at subclass:
28
29 - ``backend_alias``: alias of used backend (see ``vcs.BACKENDS``)
30 - ``repo_path``: path to the repository which would be created for set of
31 tests
32 """
33
34 def get_backend(self):
35 return vcs.get_backend(self.backend_alias)
36
37 def setUp(self):
38 Backend = self.get_backend()
39 self.repo_path = get_new_dir(str(time.time()))
40 self.repo = Backend(self.repo_path, create=True)
41 self.imc = self.repo.in_memory_changeset
42 self.nodes = [
43 FileNode('foobar', content='Foo & bar'),
44 FileNode('foobar2', content='Foo & bar, doubled!'),
45 FileNode('foo bar with spaces', content=''),
46 FileNode('foo/bar/baz', content='Inside'),
47 ]
48
49 def test_add(self):
50 rev_count = len(self.repo.revisions)
51 to_add = [FileNode(node.path, content=node.content)
52 for node in self.nodes]
53 for node in to_add:
54 self.imc.add(node)
55 message = u'Added: %s' % ', '.join((node.path for node in self.nodes))
56 author = unicode(self.__class__)
57 changeset = self.imc.commit(message=message, author=author)
58
59 newtip = self.repo.get_changeset()
60 self.assertEqual(changeset, newtip)
61 self.assertEqual(rev_count + 1, len(self.repo.revisions))
62 self.assertEqual(newtip.message, message)
63 self.assertEqual(newtip.author, author)
64 self.assertTrue(not any((self.imc.added, self.imc.changed,
65 self.imc.removed)))
66 for node in to_add:
67 self.assertEqual(newtip.get_node(node.path).content, node.content)
68
69 def test_add_in_bulk(self):
70 rev_count = len(self.repo.revisions)
71 to_add = [FileNode(node.path, content=node.content)
72 for node in self.nodes]
73 self.imc.add(*to_add)
74 message = u'Added: %s' % ', '.join((node.path for node in self.nodes))
75 author = unicode(self.__class__)
76 changeset = self.imc.commit(message=message, author=author)
77
78 newtip = self.repo.get_changeset()
79 self.assertEqual(changeset, newtip)
80 self.assertEqual(rev_count + 1, len(self.repo.revisions))
81 self.assertEqual(newtip.message, message)
82 self.assertEqual(newtip.author, author)
83 self.assertTrue(not any((self.imc.added, self.imc.changed,
84 self.imc.removed)))
85 for node in to_add:
86 self.assertEqual(newtip.get_node(node.path).content, node.content)
87
88 def test_add_actually_adds_all_nodes_at_second_commit_too(self):
89 self.imc.add(FileNode('foo/bar/image.png', content='\0'))
90 self.imc.add(FileNode('foo/README.txt', content='readme!'))
91 changeset = self.imc.commit(u'Initial', u'joe.doe@example.com')
92 self.assertTrue(isinstance(changeset.get_node('foo'), DirNode))
93 self.assertTrue(isinstance(changeset.get_node('foo/bar'), DirNode))
94 self.assertEqual(changeset.get_node('foo/bar/image.png').content, '\0')
95 self.assertEqual(changeset.get_node('foo/README.txt').content, 'readme!')
96
97 # commit some more files again
98 to_add = [
99 FileNode('foo/bar/foobaz/bar', content='foo'),
100 FileNode('foo/bar/another/bar', content='foo'),
101 FileNode('foo/baz.txt', content='foo'),
102 FileNode('foobar/foobaz/file', content='foo'),
103 FileNode('foobar/barbaz', content='foo'),
104 ]
105 self.imc.add(*to_add)
106 changeset = self.imc.commit(u'Another', u'joe.doe@example.com')
107 self.assertEqual(changeset.get_node('foo/bar/foobaz/bar').content, 'foo')
108 self.assertEqual(changeset.get_node('foo/bar/another/bar').content, 'foo')
109 self.assertEqual(changeset.get_node('foo/baz.txt').content, 'foo')
110 self.assertEqual(changeset.get_node('foobar/foobaz/file').content, 'foo')
111 self.assertEqual(changeset.get_node('foobar/barbaz').content, 'foo')
112
113 def test_add_raise_already_added(self):
114 node = FileNode('foobar', content='baz')
115 self.imc.add(node)
116 self.assertRaises(NodeAlreadyAddedError, self.imc.add, node)
117
118 def test_check_integrity_raise_already_exist(self):
119 node = FileNode('foobar', content='baz')
120 self.imc.add(node)
121 self.imc.commit(message=u'Added foobar', author=unicode(self))
122 self.imc.add(node)
123 self.assertRaises(NodeAlreadyExistsError, self.imc.commit,
124 message='new message',
125 author=str(self))
126
127 def test_change(self):
128 self.imc.add(FileNode('foo/bar/baz', content='foo'))
129 self.imc.add(FileNode('foo/fbar', content='foobar'))
130 tip = self.imc.commit(u'Initial', u'joe.doe@example.com')
131
132 # Change node's content
133 node = FileNode('foo/bar/baz', content='My **changed** content')
134 self.imc.change(node)
135 self.imc.commit(u'Changed %s' % node.path, u'joe.doe@example.com')
136
137 newtip = self.repo.get_changeset()
138 self.assertNotEqual(tip, newtip)
139 self.assertNotEqual(tip.id, newtip.id)
140 self.assertEqual(newtip.get_node('foo/bar/baz').content,
141 'My **changed** content')
142
143 def test_change_raise_empty_repository(self):
144 node = FileNode('foobar')
145 self.assertRaises(EmptyRepositoryError, self.imc.change, node)
146
147 def test_check_integrity_change_raise_node_does_not_exist(self):
148 node = FileNode('foobar', content='baz')
149 self.imc.add(node)
150 self.imc.commit(message=u'Added foobar', author=unicode(self))
151 node = FileNode('not-foobar', content='')
152 self.imc.change(node)
153 self.assertRaises(NodeDoesNotExistError, self.imc.commit,
154 message='Changed not existing node',
155 author=str(self))
156
157 def test_change_raise_node_already_changed(self):
158 node = FileNode('foobar', content='baz')
159 self.imc.add(node)
160 self.imc.commit(message=u'Added foobar', author=unicode(self))
161 node = FileNode('foobar', content='more baz')
162 self.imc.change(node)
163 self.assertRaises(NodeAlreadyChangedError, self.imc.change, node)
164
165 def test_check_integrity_change_raise_node_not_changed(self):
166 self.test_add() # Performs first commit
167
168 node = FileNode(self.nodes[0].path, content=self.nodes[0].content)
169 self.imc.change(node)
170 self.assertRaises(NodeNotChangedError, self.imc.commit,
171 message=u'Trying to mark node as changed without touching it',
172 author=unicode(self))
173
174 def test_change_raise_node_already_removed(self):
175 node = FileNode('foobar', content='baz')
176 self.imc.add(node)
177 self.imc.commit(message=u'Added foobar', author=unicode(self))
178 self.imc.remove(FileNode('foobar'))
179 self.assertRaises(NodeAlreadyRemovedError, self.imc.change, node)
180
181 def test_remove(self):
182 self.test_add() # Performs first commit
183
184 tip = self.repo.get_changeset()
185 node = self.nodes[0]
186 self.assertEqual(node.content, tip.get_node(node.path).content)
187 self.imc.remove(node)
188 self.imc.commit(message=u'Removed %s' % node.path, author=unicode(self))
189
190 newtip = self.repo.get_changeset()
191 self.assertNotEqual(tip, newtip)
192 self.assertNotEqual(tip.id, newtip.id)
193 self.assertRaises(NodeDoesNotExistError, newtip.get_node, node.path)
194
195 def test_remove_last_file_from_directory(self):
196 node = FileNode('omg/qwe/foo/bar', content='foobar')
197 self.imc.add(node)
198 self.imc.commit(u'added', u'joe doe')
199
200 self.imc.remove(node)
201 tip = self.imc.commit(u'removed', u'joe doe')
202 self.assertRaises(NodeDoesNotExistError, tip.get_node, 'omg/qwe/foo/bar')
203
204 def test_remove_raise_node_does_not_exist(self):
205 self.imc.remove(self.nodes[0])
206 self.assertRaises(NodeDoesNotExistError, self.imc.commit,
207 message='Trying to remove node at empty repository',
208 author=str(self))
209
210 def test_check_integrity_remove_raise_node_does_not_exist(self):
211 self.test_add() # Performs first commit
212
213 node = FileNode('no-such-file')
214 self.imc.remove(node)
215 self.assertRaises(NodeDoesNotExistError, self.imc.commit,
216 message=u'Trying to remove not existing node',
217 author=unicode(self))
218
219 def test_remove_raise_node_already_removed(self):
220 self.test_add() # Performs first commit
221
222 node = FileNode(self.nodes[0].path)
223 self.imc.remove(node)
224 self.assertRaises(NodeAlreadyRemovedError, self.imc.remove, node)
225
226 def test_remove_raise_node_already_changed(self):
227 self.test_add() # Performs first commit
228
229 node = FileNode(self.nodes[0].path, content='Bending time')
230 self.imc.change(node)
231 self.assertRaises(NodeAlreadyChangedError, self.imc.remove, node)
232
233 def test_reset(self):
234 self.imc.add(FileNode('foo', content='bar'))
235 #self.imc.change(FileNode('baz', content='new'))
236 #self.imc.remove(FileNode('qwe'))
237 self.imc.reset()
238 self.assertTrue(not any((self.imc.added, self.imc.changed,
239 self.imc.removed)))
240
241 def test_multiple_commits(self):
242 N = 3 # number of commits to perform
243 last = None
244 for x in xrange(N):
245 fname = 'file%s' % str(x).rjust(5, '0')
246 content = 'foobar\n' * x
247 node = FileNode(fname, content=content)
248 self.imc.add(node)
249 commit = self.imc.commit(u"Commit no. %s" % (x + 1), author=u'vcs')
250 self.assertTrue(last != commit)
251 last = commit
252
253 # Check commit number for same repo
254 self.assertEqual(len(self.repo.revisions), N)
255
256 # Check commit number for recreated repo
257 backend = self.get_backend()
258 repo = backend(self.repo_path)
259 self.assertEqual(len(repo.revisions), N)
260
261 def test_date_attr(self):
262 node = FileNode('foobar.txt', content='Foobared!')
263 self.imc.add(node)
264 date = datetime.datetime(1985, 1, 30, 1, 45)
265 commit = self.imc.commit(u"Committed at time when I was born ;-)",
266 author=u'lb', date=date)
267
268 self.assertEqual(commit.date, date)
269
270
271 class BackendBaseTestCase(unittest.TestCase):
272 """
273 Base test class for tests which requires repository.
274 """
275 backend_alias = 'hg'
276 commits = [
277 {
278 'message': 'Initial commit',
279 'author': 'Joe Doe <joe.doe@example.com>',
280 'date': datetime.datetime(2010, 1, 1, 20),
281 'added': [
282 FileNode('foobar', content='Foobar'),
283 FileNode('foobar2', content='Foobar II'),
284 FileNode('foo/bar/baz', content='baz here!'),
285 ],
286 },
287 ]
288
289 def get_backend(self):
290 return vcs.get_backend(self.backend_alias)
291
292 def get_commits(self):
293 """
294 Returns list of commits which builds repository for each tests.
295 """
296 if hasattr(self, 'commits'):
297 return self.commits
298
299 def get_new_repo_path(self):
300 """
301 Returns newly created repository's directory.
302 """
303 backend = self.get_backend()
304 key = '%s-%s' % (backend.alias, str(time.time()))
305 repo_path = get_new_dir(key)
306 return repo_path
307
308 def setUp(self):
309 Backend = self.get_backend()
310 self.backend_class = Backend
311 self.repo_path = self.get_new_repo_path()
312 self.repo = Backend(self.repo_path, create=True)
313 self.imc = self.repo.in_memory_changeset
314
315 for commit in self.get_commits():
316 for node in commit.get('added', []):
317 self.imc.add(FileNode(node.path, content=node.content))
318 for node in commit.get('changed', []):
319 self.imc.change(FileNode(node.path, content=node.content))
320 for node in commit.get('removed', []):
321 self.imc.remove(FileNode(node.path))
322 self.imc.commit(message=unicode(commit['message']),
323 author=unicode(commit['author']),
324 date=commit['date'])
325
326 self.tip = self.repo.get_changeset()
327
328
329 # For each backend create test case class
330 for alias in SCM_TESTS:
331 attrs = {
332 'backend_alias': alias,
333 }
334 cls_name = ''.join(('%s in memory changeset test' % alias).title().split())
335 bases = (InMemoryChangesetTestMixin, unittest.TestCase)
336 globals()[cls_name] = type(cls_name, bases, attrs)
337
338
339 if __name__ == '__main__':
340 unittest.main()
@@ -0,0 +1,183 b''
1 from __future__ import with_statement
2
3 import stat
4 from rhodecode.lib.vcs.nodes import DirNode
5 from rhodecode.lib.vcs.nodes import FileNode
6 from rhodecode.lib.vcs.nodes import Node
7 from rhodecode.lib.vcs.nodes import NodeError
8 from rhodecode.lib.vcs.nodes import NodeKind
9 from rhodecode.lib.vcs.utils.compat import unittest
10
11
12 class NodeBasicTest(unittest.TestCase):
13
14 def test_init(self):
15 """
16 Cannot innitialize Node objects with path with slash at the beginning.
17 """
18 wrong_paths = (
19 '/foo',
20 '/foo/bar'
21 )
22 for path in wrong_paths:
23 self.assertRaises(NodeError, Node, path, NodeKind.FILE)
24
25 wrong_paths = (
26 '/foo/',
27 '/foo/bar/'
28 )
29 for path in wrong_paths:
30 self.assertRaises(NodeError, Node, path, NodeKind.DIR)
31
32 def test_name(self):
33 node = Node('', NodeKind.DIR)
34 self.assertEqual(node.name, '')
35
36 node = Node('path', NodeKind.FILE)
37 self.assertEqual(node.name, 'path')
38
39 node = Node('path/', NodeKind.DIR)
40 self.assertEqual(node.name, 'path')
41
42 node = Node('some/path', NodeKind.FILE)
43 self.assertEqual(node.name, 'path')
44
45 node = Node('some/path/', NodeKind.DIR)
46 self.assertEqual(node.name, 'path')
47
48 def test_root_node(self):
49 self.assertRaises(NodeError, Node, '', NodeKind.FILE)
50
51 def test_kind_setter(self):
52 node = Node('', NodeKind.DIR)
53 self.assertRaises(NodeError, setattr, node, 'kind', NodeKind.FILE)
54
55 def _test_parent_path(self, node_path, expected_parent_path):
56 """
57 Tests if node's parent path are properly computed.
58 """
59 node = Node(node_path, NodeKind.DIR)
60 parent_path = node.get_parent_path()
61 self.assertTrue(parent_path.endswith('/') or \
62 node.is_root() and parent_path == '')
63 self.assertEqual(parent_path, expected_parent_path,
64 "Node's path is %r and parent path is %r but should be %r"
65 % (node.path, parent_path, expected_parent_path))
66
67 def test_parent_path(self):
68 test_paths = (
69 # (node_path, expected_parent_path)
70 ('', ''),
71 ('some/path/', 'some/'),
72 ('some/longer/path/', 'some/longer/'),
73 )
74 for node_path, expected_parent_path in test_paths:
75 self._test_parent_path(node_path, expected_parent_path)
76
77 '''
78 def _test_trailing_slash(self, path):
79 if not path.endswith('/'):
80 self.fail("Trailing slash tests needs paths to end with slash")
81 for kind in NodeKind.FILE, NodeKind.DIR:
82 self.assertRaises(NodeError, Node, path=path, kind=kind)
83
84 def test_trailing_slash(self):
85 for path in ('/', 'foo/', 'foo/bar/', 'foo/bar/biz/'):
86 self._test_trailing_slash(path)
87 '''
88
89 def test_is_file(self):
90 node = Node('any', NodeKind.FILE)
91 self.assertTrue(node.is_file())
92
93 node = FileNode('any')
94 self.assertTrue(node.is_file())
95 self.assertRaises(AttributeError, getattr, node, 'nodes')
96
97 def test_is_dir(self):
98 node = Node('any_dir', NodeKind.DIR)
99 self.assertTrue(node.is_dir())
100
101 node = DirNode('any_dir')
102
103 self.assertTrue(node.is_dir())
104 self.assertRaises(NodeError, getattr, node, 'content')
105
106 def test_dir_node_iter(self):
107 nodes = [
108 DirNode('docs'),
109 DirNode('tests'),
110 FileNode('bar'),
111 FileNode('foo'),
112 FileNode('readme.txt'),
113 FileNode('setup.py'),
114 ]
115 dirnode = DirNode('', nodes=nodes)
116 for node in dirnode:
117 node == dirnode.get_node(node.path)
118
119 def test_node_state(self):
120 """
121 Without link to changeset nodes should raise NodeError.
122 """
123 node = FileNode('anything')
124 self.assertRaises(NodeError, getattr, node, 'state')
125 node = DirNode('anything')
126 self.assertRaises(NodeError, getattr, node, 'state')
127
128 def test_file_node_stat(self):
129 node = FileNode('foobar', 'empty... almost')
130 mode = node.mode # default should be 0100644
131 self.assertTrue(mode & stat.S_IRUSR)
132 self.assertTrue(mode & stat.S_IWUSR)
133 self.assertTrue(mode & stat.S_IRGRP)
134 self.assertTrue(mode & stat.S_IROTH)
135 self.assertFalse(mode & stat.S_IWGRP)
136 self.assertFalse(mode & stat.S_IWOTH)
137 self.assertFalse(mode & stat.S_IXUSR)
138 self.assertFalse(mode & stat.S_IXGRP)
139 self.assertFalse(mode & stat.S_IXOTH)
140
141 def test_file_node_is_executable(self):
142 node = FileNode('foobar', 'empty... almost', mode=0100755)
143 self.assertTrue(node.is_executable())
144
145 node = FileNode('foobar', 'empty... almost', mode=0100500)
146 self.assertTrue(node.is_executable())
147
148 node = FileNode('foobar', 'empty... almost', mode=0100644)
149 self.assertFalse(node.is_executable())
150
151 def test_mimetype(self):
152 py_node = FileNode('test.py')
153 tar_node = FileNode('test.tar.gz')
154
155 ext = 'CustomExtension'
156
157 my_node2 = FileNode('myfile2')
158 my_node2._mimetype = [ext]
159
160 my_node3 = FileNode('myfile3')
161 my_node3._mimetype = [ext,ext]
162
163 self.assertEqual(py_node.mimetype,'text/x-python')
164 self.assertEqual(py_node.get_mimetype(),('text/x-python',None))
165
166 self.assertEqual(tar_node.mimetype,'application/x-tar')
167 self.assertEqual(tar_node.get_mimetype(),('application/x-tar','gzip'))
168
169 self.assertRaises(NodeError,my_node2.get_mimetype)
170
171 self.assertEqual(my_node3.mimetype,ext)
172 self.assertEqual(my_node3.get_mimetype(),[ext,ext])
173
174 class NodeContentTest(unittest.TestCase):
175
176 def test_if_binary(self):
177 data = """\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x10\x00\x00\x00\x10\x08\x06\x00\x00\x00\x1f??a\x00\x00\x00\x04gAMA\x00\x00\xaf?7\x05\x8a?\x00\x00\x00\x19tEXtSoftware\x00Adobe ImageReadyq?e<\x00\x00\x025IDAT8?\xa5\x93?K\x94Q\x14\x87\x9f\xf7?Q\x1bs4?\x03\x9a\xa8?B\x02\x8b$\x10[U;i\x13?6h?&h[?"\x14j?\xa2M\x7fB\x14F\x9aQ?&\x842?\x0b\x89"\x82??!?\x9c!\x9c2l??{N\x8bW\x9dY\xb4\t/\x1c?=\x9b?}????\xa9*;9!?\x83\x91?[?\\v*?D\x04\'`EpNp\xa2X\'U?pVq"Sw.\x1e?\x08\x01D?jw????\xbc??7{|\x9b?\x89$\x01??W@\x15\x9c\x05q`Lt/\x97?\x94\xa1d?\x18~?\x18?\x18W[%\xb0?\x83??\x14\x88\x8dB?\xa6H\tL\tl\x19>/\x01`\xac\xabx?\x9cl\nx\xb0\x98\x07\x95\x88D$"q[\x19?d\x00(o\n\xa0??\x7f\xb9\xa4?\x1bF\x1f\x8e\xac\xa8?j??eUU}?.?\x9f\x8cE??x\x94??\r\xbdtoJU5"0N\x10U?\x00??V\t\x02\x9f\x81?U?\x00\x9eM\xae2?r\x9b7\x83\x82\x8aP3????.?&"?\xb7ZP \x0c<?O\xa5\t}\xb8?\x99\xa6?\x87?\x1di|/\xa0??0\xbe\x1fp?d&\x1a\xad\x95\x8a\x07?\t*\x10??b:?d?.\x13C\x8a?\x12\xbe\xbf\x8e?{???\x08?\x80\xa7\x13+d\x13>J?\x80\x15T\x95\x9a\x00??S\x8c\r?\xa1\x03\x07?\x96\x9b\xa7\xab=E??\xa4\xb3?\x19q??B\x91=\x8d??k?J\x0bV"??\xf7x?\xa1\x00?\\.\x87\x87???\x02F@D\x99],??\x10#?X\xb7=\xb9\x10?Z\x1by???cI??\x1ag?\x92\xbc?T?t[\x92\x81?<_\x17~\x92\x88?H%?\x10Q\x02\x9f\n\x81qQ\x0bm?\x1bX?\xb1AK\xa6\x9e\xb9?u\xb2?1\xbe|/\x92M@\xa2!F?\xa9>"\r<DT?>\x92\x8e?>\x9a9Qv\x127?a\xac?Y?8?:??]X???9\x80\xb7?u?\x0b#BZ\x8d=\x1d?p\x00\x00\x00\x00IEND\xaeB`\x82"""
178 filenode = FileNode('calendar.png', content=data)
179 self.assertTrue(filenode.is_binary)
180
181
182 if __name__ == '__main__':
183 unittest.main()
@@ -0,0 +1,215 b''
1 from __future__ import with_statement
2 import datetime
3 from base import BackendTestMixin
4 from conf import SCM_TESTS
5 from conf import TEST_USER_CONFIG_FILE
6 from rhodecode.lib.vcs.nodes import FileNode
7 from rhodecode.lib.vcs.utils.compat import unittest
8 from rhodecode.lib.vcs.exceptions import ChangesetDoesNotExistError
9
10
11 class RepositoryBaseTest(BackendTestMixin):
12 recreate_repo_per_test = False
13
14 @classmethod
15 def _get_commits(cls):
16 return super(RepositoryBaseTest, cls)._get_commits()[:1]
17
18 def test_get_config_value(self):
19 self.assertEqual(self.repo.get_config_value('universal', 'foo',
20 TEST_USER_CONFIG_FILE), 'bar')
21
22 def test_get_config_value_defaults_to_None(self):
23 self.assertEqual(self.repo.get_config_value('universal', 'nonexist',
24 TEST_USER_CONFIG_FILE), None)
25
26 def test_get_user_name(self):
27 self.assertEqual(self.repo.get_user_name(TEST_USER_CONFIG_FILE),
28 'Foo Bar')
29
30 def test_get_user_email(self):
31 self.assertEqual(self.repo.get_user_email(TEST_USER_CONFIG_FILE),
32 'foo.bar@example.com')
33
34
35
36 class RepositoryGetDiffTest(BackendTestMixin):
37
38 @classmethod
39 def _get_commits(cls):
40 commits = [
41 {
42 'message': 'Initial commit',
43 'author': 'Joe Doe <joe.doe@example.com>',
44 'date': datetime.datetime(2010, 1, 1, 20),
45 'added': [
46 FileNode('foobar', content='foobar'),
47 FileNode('foobar2', content='foobar2'),
48 ],
49 },
50 {
51 'message': 'Changed foobar, added foobar3',
52 'author': 'Jane Doe <jane.doe@example.com>',
53 'date': datetime.datetime(2010, 1, 1, 21),
54 'added': [
55 FileNode('foobar3', content='foobar3'),
56 ],
57 'changed': [
58 FileNode('foobar', 'FOOBAR'),
59 ],
60 },
61 {
62 'message': 'Removed foobar, changed foobar3',
63 'author': 'Jane Doe <jane.doe@example.com>',
64 'date': datetime.datetime(2010, 1, 1, 22),
65 'changed': [
66 FileNode('foobar3', content='FOOBAR\nFOOBAR\nFOOBAR\n'),
67 ],
68 'removed': [FileNode('foobar')],
69 },
70 ]
71 return commits
72
73 def test_raise_for_wrong(self):
74 with self.assertRaises(ChangesetDoesNotExistError):
75 self.repo.get_diff('a' * 40, 'b' * 40)
76
77 class GitRepositoryGetDiffTest(RepositoryGetDiffTest, unittest.TestCase):
78 backend_alias = 'git'
79
80 def test_initial_commit_diff(self):
81 initial_rev = self.repo.revisions[0]
82 self.assertEqual(self.repo.get_diff(self.repo.EMPTY_CHANGESET, initial_rev), '''diff --git a/foobar b/foobar
83 new file mode 100644
84 index 0000000..f6ea049
85 --- /dev/null
86 +++ b/foobar
87 @@ -0,0 +1 @@
88 +foobar
89 \ No newline at end of file
90 diff --git a/foobar2 b/foobar2
91 new file mode 100644
92 index 0000000..e8c9d6b
93 --- /dev/null
94 +++ b/foobar2
95 @@ -0,0 +1 @@
96 +foobar2
97 \ No newline at end of file
98 ''')
99
100 def test_second_changeset_diff(self):
101 revs = self.repo.revisions
102 self.assertEqual(self.repo.get_diff(revs[0], revs[1]), '''diff --git a/foobar b/foobar
103 index f6ea049..389865b 100644
104 --- a/foobar
105 +++ b/foobar
106 @@ -1 +1 @@
107 -foobar
108 \ No newline at end of file
109 +FOOBAR
110 \ No newline at end of file
111 diff --git a/foobar3 b/foobar3
112 new file mode 100644
113 index 0000000..c11c37d
114 --- /dev/null
115 +++ b/foobar3
116 @@ -0,0 +1 @@
117 +foobar3
118 \ No newline at end of file
119 ''')
120
121 def test_third_changeset_diff(self):
122 revs = self.repo.revisions
123 self.assertEqual(self.repo.get_diff(revs[1], revs[2]), '''diff --git a/foobar b/foobar
124 deleted file mode 100644
125 index 389865b..0000000
126 --- a/foobar
127 +++ /dev/null
128 @@ -1 +0,0 @@
129 -FOOBAR
130 \ No newline at end of file
131 diff --git a/foobar3 b/foobar3
132 index c11c37d..f932447 100644
133 --- a/foobar3
134 +++ b/foobar3
135 @@ -1 +1,3 @@
136 -foobar3
137 \ No newline at end of file
138 +FOOBAR
139 +FOOBAR
140 +FOOBAR
141 ''')
142
143
144 class HgRepositoryGetDiffTest(RepositoryGetDiffTest, unittest.TestCase):
145 backend_alias = 'hg'
146
147 def test_initial_commit_diff(self):
148 initial_rev = self.repo.revisions[0]
149 self.assertEqual(self.repo.get_diff(self.repo.EMPTY_CHANGESET, initial_rev), '''diff --git a/foobar b/foobar
150 new file mode 100755
151 --- /dev/null
152 +++ b/foobar
153 @@ -0,0 +1,1 @@
154 +foobar
155 \ No newline at end of file
156 diff --git a/foobar2 b/foobar2
157 new file mode 100755
158 --- /dev/null
159 +++ b/foobar2
160 @@ -0,0 +1,1 @@
161 +foobar2
162 \ No newline at end of file
163 ''')
164
165 def test_second_changeset_diff(self):
166 revs = self.repo.revisions
167 self.assertEqual(self.repo.get_diff(revs[0], revs[1]), '''diff --git a/foobar b/foobar
168 --- a/foobar
169 +++ b/foobar
170 @@ -1,1 +1,1 @@
171 -foobar
172 \ No newline at end of file
173 +FOOBAR
174 \ No newline at end of file
175 diff --git a/foobar3 b/foobar3
176 new file mode 100755
177 --- /dev/null
178 +++ b/foobar3
179 @@ -0,0 +1,1 @@
180 +foobar3
181 \ No newline at end of file
182 ''')
183
184 def test_third_changeset_diff(self):
185 revs = self.repo.revisions
186 self.assertEqual(self.repo.get_diff(revs[1], revs[2]), '''diff --git a/foobar b/foobar
187 deleted file mode 100755
188 --- a/foobar
189 +++ /dev/null
190 @@ -1,1 +0,0 @@
191 -FOOBAR
192 \ No newline at end of file
193 diff --git a/foobar3 b/foobar3
194 --- a/foobar3
195 +++ b/foobar3
196 @@ -1,1 +1,3 @@
197 -foobar3
198 \ No newline at end of file
199 +FOOBAR
200 +FOOBAR
201 +FOOBAR
202 ''')
203
204
205 # For each backend create test case class
206 for alias in SCM_TESTS:
207 attrs = {
208 'backend_alias': alias,
209 }
210 cls_name = alias.capitalize() + RepositoryBaseTest.__name__
211 bases = (RepositoryBaseTest, unittest.TestCase)
212 globals()[cls_name] = type(cls_name, bases, attrs)
213
214 if __name__ == '__main__':
215 unittest.main()
@@ -0,0 +1,61 b''
1 from __future__ import with_statement
2
3 from base import BackendTestMixin
4 from conf import SCM_TESTS
5 from rhodecode.lib.vcs.exceptions import TagAlreadyExistError
6 from rhodecode.lib.vcs.exceptions import TagDoesNotExistError
7 from rhodecode.lib.vcs.utils.compat import unittest
8
9
10 class TagsTestCaseMixin(BackendTestMixin):
11
12 def test_new_tag(self):
13 tip = self.repo.get_changeset()
14 tagsize = len(self.repo.tags)
15 tag = self.repo.tag('last-commit', 'joe', tip.raw_id)
16
17 self.assertEqual(len(self.repo.tags), tagsize + 1)
18 for top, dirs, files in tip.walk():
19 self.assertEqual(top, tag.get_node(top.path))
20
21 def test_tag_already_exist(self):
22 tip = self.repo.get_changeset()
23 self.repo.tag('last-commit', 'joe', tip.raw_id)
24
25 self.assertRaises(TagAlreadyExistError,
26 self.repo.tag, 'last-commit', 'joe', tip.raw_id)
27
28 chset = self.repo.get_changeset(0)
29 self.assertRaises(TagAlreadyExistError,
30 self.repo.tag, 'last-commit', 'jane', chset.raw_id)
31
32 def test_remove_tag(self):
33 tip = self.repo.get_changeset()
34 self.repo.tag('last-commit', 'joe', tip.raw_id)
35 tagsize = len(self.repo.tags)
36
37 self.repo.remove_tag('last-commit', user='evil joe')
38 self.assertEqual(len(self.repo.tags), tagsize - 1)
39
40 def test_remove_tag_which_does_not_exist(self):
41 self.assertRaises(TagDoesNotExistError,
42 self.repo.remove_tag, 'last-commit', user='evil joe')
43
44 def test_name_with_slash(self):
45 self.repo.tag('19/10/11', 'joe')
46 self.assertTrue('19/10/11' in self.repo.tags)
47 self.repo.tag('11', 'joe')
48 self.assertTrue('11' in self.repo.tags)
49
50 # For each backend create test case class
51 for alias in SCM_TESTS:
52 attrs = {
53 'backend_alias': alias,
54 }
55 cls_name = ''.join(('%s tags test' % alias).title().split())
56 bases = (TagsTestCaseMixin, unittest.TestCase)
57 globals()[cls_name] = type(cls_name, bases, attrs)
58
59
60 if __name__ == '__main__':
61 unittest.main()
@@ -0,0 +1,279 b''
1 from __future__ import with_statement
2
3 import os
4 import mock
5 import time
6 import shutil
7 import tempfile
8 import datetime
9 from rhodecode.lib.vcs.utils.compat import unittest
10 from rhodecode.lib.vcs.utils.paths import get_dirs_for_path
11 from rhodecode.lib.vcs.utils.helpers import get_dict_for_attrs
12 from rhodecode.lib.vcs.utils.helpers import get_scm
13 from rhodecode.lib.vcs.utils.helpers import get_scms_for_path
14 from rhodecode.lib.vcs.utils.helpers import get_total_seconds
15 from rhodecode.lib.vcs.utils.helpers import parse_changesets
16 from rhodecode.lib.vcs.utils.helpers import parse_datetime
17 from rhodecode.lib.vcs.utils import author_email, author_name
18 from rhodecode.lib.vcs.utils.paths import get_user_home
19 from rhodecode.lib.vcs.exceptions import VCSError
20
21 from conf import TEST_HG_REPO, TEST_GIT_REPO, TEST_TMP_PATH
22
23
24 class PathsTest(unittest.TestCase):
25
26 def _test_get_dirs_for_path(self, path, expected):
27 """
28 Tests if get_dirs_for_path returns same as expected.
29 """
30 expected = sorted(expected)
31 result = sorted(get_dirs_for_path(path))
32 self.assertEqual(result, expected,
33 msg="%s != %s which was expected result for path %s"
34 % (result, expected, path))
35
36 def test_get_dirs_for_path(self):
37 path = 'foo/bar/baz/file'
38 paths_and_results = (
39 ('foo/bar/baz/file', ['foo', 'foo/bar', 'foo/bar/baz']),
40 ('foo/bar/', ['foo', 'foo/bar']),
41 ('foo/bar', ['foo']),
42 )
43 for path, expected in paths_and_results:
44 self._test_get_dirs_for_path(path, expected)
45
46
47 def test_get_scm(self):
48 self.assertEqual(('hg', TEST_HG_REPO), get_scm(TEST_HG_REPO))
49 self.assertEqual(('git', TEST_GIT_REPO), get_scm(TEST_GIT_REPO))
50
51 def test_get_two_scms_for_path(self):
52 multialias_repo_path = os.path.join(TEST_TMP_PATH, 'hg-git-repo-2')
53 if os.path.isdir(multialias_repo_path):
54 shutil.rmtree(multialias_repo_path)
55
56 os.mkdir(multialias_repo_path)
57
58 self.assertRaises(VCSError, get_scm, multialias_repo_path)
59
60 def test_get_scm_error_path(self):
61 self.assertRaises(VCSError, get_scm, 'err')
62
63 def test_get_scms_for_path(self):
64 dirpath = tempfile.gettempdir()
65 new = os.path.join(dirpath, 'vcs-scms-for-path-%s' % time.time())
66 os.mkdir(new)
67 self.assertEqual(get_scms_for_path(new), [])
68
69 os.mkdir(os.path.join(new, '.tux'))
70 self.assertEqual(get_scms_for_path(new), [])
71
72 os.mkdir(os.path.join(new, '.git'))
73 self.assertEqual(set(get_scms_for_path(new)), set(['git']))
74
75 os.mkdir(os.path.join(new, '.hg'))
76 self.assertEqual(set(get_scms_for_path(new)), set(['git', 'hg']))
77
78
79 class TestParseChangesets(unittest.TestCase):
80
81 def test_main_is_returned_correctly(self):
82 self.assertEqual(parse_changesets('123456'), {
83 'start': None,
84 'main': '123456',
85 'end': None,
86 })
87
88 def test_start_is_returned_correctly(self):
89 self.assertEqual(parse_changesets('aaabbb..'), {
90 'start': 'aaabbb',
91 'main': None,
92 'end': None,
93 })
94
95 def test_end_is_returned_correctly(self):
96 self.assertEqual(parse_changesets('..cccddd'), {
97 'start': None,
98 'main': None,
99 'end': 'cccddd',
100 })
101
102 def test_that_two_or_three_dots_are_allowed(self):
103 text1 = 'a..b'
104 text2 = 'a...b'
105 self.assertEqual(parse_changesets(text1), parse_changesets(text2))
106
107 def test_that_input_is_stripped_first(self):
108 text1 = 'a..bb'
109 text2 = ' a..bb\t\n\t '
110 self.assertEqual(parse_changesets(text1), parse_changesets(text2))
111
112 def test_that_exception_is_raised(self):
113 text = '123456.789012' # single dot is not recognized
114 with self.assertRaises(ValueError):
115 parse_changesets(text)
116
117 def test_non_alphanumeric_raises_exception(self):
118 with self.assertRaises(ValueError):
119 parse_changesets('aaa@bbb')
120
121
122 class TestParseDatetime(unittest.TestCase):
123
124 def test_datetime_text(self):
125 self.assertEqual(parse_datetime('2010-04-07 21:29:41'),
126 datetime.datetime(2010, 4, 7, 21, 29, 41))
127
128 def test_no_seconds(self):
129 self.assertEqual(parse_datetime('2010-04-07 21:29'),
130 datetime.datetime(2010, 4, 7, 21, 29))
131
132 def test_date_only(self):
133 self.assertEqual(parse_datetime('2010-04-07'),
134 datetime.datetime(2010, 4, 7))
135
136 def test_another_format(self):
137 self.assertEqual(parse_datetime('04/07/10 21:29:41'),
138 datetime.datetime(2010, 4, 7, 21, 29, 41))
139
140 def test_now(self):
141 self.assertTrue(parse_datetime('now') - datetime.datetime.now() <
142 datetime.timedelta(seconds=1))
143
144 def test_today(self):
145 today = datetime.date.today()
146 self.assertEqual(parse_datetime('today'),
147 datetime.datetime(*today.timetuple()[:3]))
148
149 def test_yesterday(self):
150 yesterday = datetime.date.today() - datetime.timedelta(days=1)
151 self.assertEqual(parse_datetime('yesterday'),
152 datetime.datetime(*yesterday.timetuple()[:3]))
153
154 def test_tomorrow(self):
155 tomorrow = datetime.date.today() + datetime.timedelta(days=1)
156 args = tomorrow.timetuple()[:3] + (23, 59, 59)
157 self.assertEqual(parse_datetime('tomorrow'), datetime.datetime(*args))
158
159 def test_days(self):
160 timestamp = datetime.datetime.today() - datetime.timedelta(days=3)
161 args = timestamp.timetuple()[:3] + (0, 0, 0, 0)
162 expected = datetime.datetime(*args)
163 self.assertEqual(parse_datetime('3d'), expected)
164 self.assertEqual(parse_datetime('3 d'), expected)
165 self.assertEqual(parse_datetime('3 day'), expected)
166 self.assertEqual(parse_datetime('3 days'), expected)
167
168 def test_weeks(self):
169 timestamp = datetime.datetime.today() - datetime.timedelta(days=3 * 7)
170 args = timestamp.timetuple()[:3] + (0, 0, 0, 0)
171 expected = datetime.datetime(*args)
172 self.assertEqual(parse_datetime('3w'), expected)
173 self.assertEqual(parse_datetime('3 w'), expected)
174 self.assertEqual(parse_datetime('3 week'), expected)
175 self.assertEqual(parse_datetime('3 weeks'), expected)
176
177 def test_mixed(self):
178 timestamp = datetime.datetime.today() - datetime.timedelta(days=2 * 7 + 3)
179 args = timestamp.timetuple()[:3] + (0, 0, 0, 0)
180 expected = datetime.datetime(*args)
181 self.assertEqual(parse_datetime('2w3d'), expected)
182 self.assertEqual(parse_datetime('2w 3d'), expected)
183 self.assertEqual(parse_datetime('2w 3 days'), expected)
184 self.assertEqual(parse_datetime('2 weeks 3 days'), expected)
185
186
187 class TestAuthorExtractors(unittest.TestCase):
188 TEST_AUTHORS = [('Marcin Kuzminski <marcin@python-works.com>',
189 ('Marcin Kuzminski', 'marcin@python-works.com')),
190 ('Marcin Kuzminski Spaces < marcin@python-works.com >',
191 ('Marcin Kuzminski Spaces', 'marcin@python-works.com')),
192 ('Marcin Kuzminski <marcin.kuzminski@python-works.com>',
193 ('Marcin Kuzminski', 'marcin.kuzminski@python-works.com')),
194 ('mrf RFC_SPEC <marcin+kuzminski@python-works.com>',
195 ('mrf RFC_SPEC', 'marcin+kuzminski@python-works.com')),
196 ('username <user@email.com>',
197 ('username', 'user@email.com')),
198 ('username <user@email.com',
199 ('username', 'user@email.com')),
200 ('broken missing@email.com',
201 ('broken', 'missing@email.com')),
202 ('<justemail@mail.com>',
203 ('', 'justemail@mail.com')),
204 ('justname',
205 ('justname', '')),
206 ('Mr Double Name withemail@email.com ',
207 ('Mr Double Name', 'withemail@email.com')),
208 ]
209
210 def test_author_email(self):
211
212 for test_str, result in self.TEST_AUTHORS:
213 self.assertEqual(result[1], author_email(test_str))
214
215
216 def test_author_name(self):
217
218 for test_str, result in self.TEST_AUTHORS:
219 self.assertEqual(result[0], author_name(test_str))
220
221
222 class TestGetDictForAttrs(unittest.TestCase):
223
224 def test_returned_dict_has_expected_attrs(self):
225 obj = mock.Mock()
226 obj.NOT_INCLUDED = 'this key/value should not be included'
227 obj.CONST = True
228 obj.foo = 'aaa'
229 obj.attrs = {'foo': 'bar'}
230 obj.date = datetime.datetime(2010, 12, 31)
231 obj.count = 1001
232
233 self.assertEqual(get_dict_for_attrs(obj, ['CONST', 'foo', 'attrs',
234 'date', 'count']), {
235 'CONST': True,
236 'foo': 'aaa',
237 'attrs': {'foo': 'bar'},
238 'date': datetime.datetime(2010, 12, 31),
239 'count': 1001,
240 })
241
242
243 class TestGetTotalSeconds(unittest.TestCase):
244
245 def assertTotalSecondsEqual(self, timedelta, expected_seconds):
246 result = get_total_seconds(timedelta)
247 self.assertEqual(result, expected_seconds,
248 "We computed %s seconds for %s but expected %s"
249 % (result, timedelta, expected_seconds))
250
251 def test_get_total_seconds_returns_proper_value(self):
252 self.assertTotalSecondsEqual(datetime.timedelta(seconds=1001), 1001)
253
254 def test_get_total_seconds_returns_proper_value_for_partial_seconds(self):
255 self.assertTotalSecondsEqual(datetime.timedelta(seconds=50.65), 50.65)
256
257
258 class TestGetUserHome(unittest.TestCase):
259
260 @mock.patch.object(os, 'environ', {})
261 def test_defaults_to_none(self):
262 self.assertEqual(get_user_home(), None)
263
264 @mock.patch.object(os, 'environ', {'HOME': '/home/foobar'})
265 def test_unix_like(self):
266 self.assertEqual(get_user_home(), '/home/foobar')
267
268 @mock.patch.object(os, 'environ', {'USERPROFILE': '/Users/foobar'})
269 def test_windows_like(self):
270 self.assertEqual(get_user_home(), '/Users/foobar')
271
272 @mock.patch.object(os, 'environ', {'HOME': '/home/foobar',
273 'USERPROFILE': '/Users/foobar'})
274 def test_prefers_home_over_userprofile(self):
275 self.assertEqual(get_user_home(), '/home/foobar')
276
277
278 if __name__ == '__main__':
279 unittest.main()
@@ -0,0 +1,26 b''
1 from __future__ import with_statement
2
3 from rhodecode.lib.vcs.utils.filesize import filesizeformat
4 from rhodecode.lib.vcs.utils.compat import unittest
5
6
7 class TestFilesizeformat(unittest.TestCase):
8
9 def test_bytes(self):
10 self.assertEqual(filesizeformat(10), '10 B')
11
12 def test_kilobytes(self):
13 self.assertEqual(filesizeformat(1024 * 2), '2 KB')
14
15 def test_megabytes(self):
16 self.assertEqual(filesizeformat(1024 * 1024 * 2.3), '2.3 MB')
17
18 def test_gigabytes(self):
19 self.assertEqual(filesizeformat(1024 * 1024 * 1024 * 12.92), '12.92 GB')
20
21 def test_that_function_respects_sep_paramtere(self):
22 self.assertEqual(filesizeformat(1, ''), '1B')
23
24
25 if __name__ == '__main__':
26 unittest.main()
@@ -0,0 +1,84 b''
1 from __future__ import with_statement
2
3 from rhodecode.lib.vcs import VCSError, get_repo, get_backend
4 from rhodecode.lib.vcs.backends.hg import MercurialRepository
5 from rhodecode.lib.vcs.utils.compat import unittest
6 from conf import TEST_HG_REPO, TEST_GIT_REPO, TEST_TMP_PATH
7 import os
8 import shutil
9
10
11 class VCSTest(unittest.TestCase):
12 """
13 Tests for main module's methods.
14 """
15
16 def test_get_backend(self):
17 hg = get_backend('hg')
18 self.assertEqual(hg, MercurialRepository)
19
20 def test_alias_detect_hg(self):
21 alias = 'hg'
22 path = TEST_HG_REPO
23 backend = get_backend(alias)
24 repo = backend(path)
25 self.assertEqual('hg',repo.alias)
26
27 def test_alias_detect_git(self):
28 alias = 'git'
29 path = TEST_GIT_REPO
30 backend = get_backend(alias)
31 repo = backend(path)
32 self.assertEqual('git',repo.alias)
33
34 def test_wrong_alias(self):
35 alias = 'wrong_alias'
36 self.assertRaises(VCSError, get_backend, alias)
37
38 def test_get_repo(self):
39 alias = 'hg'
40 path = TEST_HG_REPO
41 backend = get_backend(alias)
42 repo = backend(path)
43
44 self.assertEqual(repo.__class__, get_repo(path, alias).__class__)
45 self.assertEqual(repo.path, get_repo(path, alias).path)
46
47 def test_get_repo_autoalias_hg(self):
48 alias = 'hg'
49 path = TEST_HG_REPO
50 backend = get_backend(alias)
51 repo = backend(path)
52
53 self.assertEqual(repo.__class__, get_repo(path).__class__)
54 self.assertEqual(repo.path, get_repo(path).path)
55
56 def test_get_repo_autoalias_git(self):
57 alias = 'git'
58 path = TEST_GIT_REPO
59 backend = get_backend(alias)
60 repo = backend(path)
61
62 self.assertEqual(repo.__class__, get_repo(path).__class__)
63 self.assertEqual(repo.path, get_repo(path).path)
64
65
66 def test_get_repo_err(self):
67 blank_repo_path = os.path.join(TEST_TMP_PATH, 'blank-error-repo')
68 if os.path.isdir(blank_repo_path):
69 shutil.rmtree(blank_repo_path)
70
71 os.mkdir(blank_repo_path)
72 self.assertRaises(VCSError, get_repo, blank_repo_path)
73 self.assertRaises(VCSError, get_repo, blank_repo_path + 'non_existing')
74
75 def test_get_repo_multialias(self):
76 multialias_repo_path = os.path.join(TEST_TMP_PATH, 'hg-git-repo')
77 if os.path.isdir(multialias_repo_path):
78 shutil.rmtree(multialias_repo_path)
79
80 os.mkdir(multialias_repo_path)
81
82 os.mkdir(os.path.join(multialias_repo_path, '.git'))
83 os.mkdir(os.path.join(multialias_repo_path, '.hg'))
84 self.assertRaises(VCSError, get_repo, multialias_repo_path)
@@ -0,0 +1,90 b''
1 from __future__ import with_statement
2
3 import datetime
4 from rhodecode.lib.vcs.nodes import FileNode
5 from rhodecode.lib.vcs.utils.compat import unittest
6 from base import BackendTestMixin
7 from conf import SCM_TESTS
8
9
10 class WorkdirTestCaseMixin(BackendTestMixin):
11
12 @classmethod
13 def _get_commits(cls):
14 commits = [
15 {
16 'message': u'Initial commit',
17 'author': u'Joe Doe <joe.doe@example.com>',
18 'date': datetime.datetime(2010, 1, 1, 20),
19 'added': [
20 FileNode('foobar', content='Foobar'),
21 FileNode('foobar2', content='Foobar II'),
22 FileNode('foo/bar/baz', content='baz here!'),
23 ],
24 },
25 {
26 'message': u'Changes...',
27 'author': u'Jane Doe <jane.doe@example.com>',
28 'date': datetime.datetime(2010, 1, 1, 21),
29 'added': [
30 FileNode('some/new.txt', content='news...'),
31 ],
32 'changed': [
33 FileNode('foobar', 'Foobar I'),
34 ],
35 'removed': [],
36 },
37 ]
38 return commits
39
40 def test_get_branch_for_default_branch(self):
41 self.assertEqual(self.repo.workdir.get_branch(),
42 self.repo.DEFAULT_BRANCH_NAME)
43
44 def test_get_branch_after_adding_one(self):
45 self.imc.add(FileNode('docs/index.txt',
46 content='Documentation\n'))
47 self.imc.commit(
48 message=u'New branch: foobar',
49 author=u'joe',
50 branch='foobar',
51 )
52
53 def test_get_changeset(self):
54 self.imc.add(FileNode('docs/index.txt',
55 content='Documentation\n'))
56 head = self.imc.commit(
57 message=u'New branch: foobar',
58 author=u'joe',
59 branch='foobar',
60 )
61 self.assertEqual(self.repo.workdir.get_changeset(), head)
62
63 def test_checkout_branch(self):
64 from rhodecode.lib.vcs.exceptions import BranchDoesNotExistError
65 # first, 'foobranch' does not exist.
66 self.assertRaises(BranchDoesNotExistError, self.repo.workdir.checkout_branch,
67 branch='foobranch')
68 # create new branch 'foobranch'.
69 self.imc.add(FileNode('file1', content='blah'))
70 self.imc.commit(message=u'asd', author=u'john', branch='foobranch')
71 # go back to the default branch
72 self.repo.workdir.checkout_branch()
73 self.assertEqual(self.repo.workdir.get_branch(), self.backend_class.DEFAULT_BRANCH_NAME)
74 # checkout 'foobranch'
75 self.repo.workdir.checkout_branch('foobranch')
76 self.assertEqual(self.repo.workdir.get_branch(), 'foobranch')
77
78
79 # For each backend create test case class
80 for alias in SCM_TESTS:
81 attrs = {
82 'backend_alias': alias,
83 }
84 cls_name = ''.join(('%s branch test' % alias).title().split())
85 bases = (WorkdirTestCaseMixin, unittest.TestCase)
86 globals()[cls_name] = type(cls_name, bases, attrs)
87
88
89 if __name__ == '__main__':
90 unittest.main()
@@ -0,0 +1,97 b''
1 """
2 Utilities for tests only. These are not or should not be used normally -
3 functions here are crafted as we don't want to use ``vcs`` to verify tests.
4 """
5 import os
6 import re
7 import sys
8
9 from subprocess import Popen
10
11
12 class VCSTestError(Exception):
13 pass
14
15
16 def run_command(cmd, args):
17 """
18 Runs command on the system with given ``args``.
19 """
20 command = ' '.join((cmd, args))
21 p = Popen(command, shell=True)
22 status = os.waitpid(p.pid, 0)[1]
23 return status
24
25
26 def eprint(msg):
27 """
28 Prints given ``msg`` into sys.stderr as nose test runner hides all output
29 from sys.stdout by default and if we want to pipe stream somewhere we don't
30 need those verbose messages anyway.
31 Appends line break.
32 """
33 sys.stderr.write(msg)
34 sys.stderr.write('\n')
35
36
37 class SCMFetcher(object):
38
39 def __init__(self, alias, test_repo_path, remote_repo, clone_cmd):
40 """
41 :param clone_cmd: command which would clone remote repository; pass
42 only first bits - remote path and destination would be appended
43 using ``remote_repo`` and ``test_repo_path``
44 """
45 self.alias = alias
46 self.test_repo_path = test_repo_path
47 self.remote_repo = remote_repo
48 self.clone_cmd = clone_cmd
49
50 def setup(self):
51 if not os.path.isdir(self.test_repo_path):
52 self.fetch_repo()
53
54 def fetch_repo(self):
55 """
56 Tries to fetch repository from remote path.
57 """
58 remote = self.remote_repo
59 eprint("Fetching repository %s into %s" % (remote, self.test_repo_path))
60 run_command(self.clone_cmd, '%s %s' % (remote, self.test_repo_path))
61
62
63 def get_normalized_path(path):
64 """
65 If given path exists, new path would be generated and returned. Otherwise
66 same whats given is returned. Assumes that there would be no more than
67 10000 same named files.
68 """
69 if os.path.exists(path):
70 dir, basename = os.path.split(path)
71 splitted_name = basename.split('.')
72 if len(splitted_name) > 1:
73 ext = splitted_name[-1]
74 else:
75 ext = None
76 name = '.'.join(splitted_name[:-1])
77 matcher = re.compile(r'^.*-(\d{5})$')
78 start = 0
79 m = matcher.match(name)
80 if not m:
81 # Haven't append number yet so return first
82 newname = '%s-00000' % name
83 newpath = os.path.join(dir, newname)
84 if ext:
85 newpath = '.'.join((newpath, ext))
86 return get_normalized_path(newpath)
87 else:
88 start = int(m.group(1)[-5:]) + 1
89 for x in xrange(start, 10000):
90 newname = name[:-5] + str(x).rjust(5, '0')
91 newpath = os.path.join(dir, newname)
92 if ext:
93 newpath = '.'.join((newpath, ext))
94 if not os.path.exists(newpath):
95 return newpath
96 raise VCSTestError("Couldn't compute new path for %s" % path)
97 return path
@@ -599,6 +599,10 b' def create_test_env(repos_test_path, con'
599 tar.extractall(jn(TESTS_TMP_PATH, HG_REPO))
599 tar.extractall(jn(TESTS_TMP_PATH, HG_REPO))
600 tar.close()
600 tar.close()
601
601
602 #LOAD VCS test stuff
603 from rhodecode.tests.vcs import setup_package
604 setup_package()
605
602
606
603 #==============================================================================
607 #==============================================================================
604 # PASTER COMMANDS
608 # PASTER COMMANDS
@@ -32,7 +32,8 b' class MercurialInMemoryChangeset(BaseInM'
32 from .repository import MercurialRepository
32 from .repository import MercurialRepository
33 if not isinstance(message, unicode) or not isinstance(author, unicode):
33 if not isinstance(message, unicode) or not isinstance(author, unicode):
34 raise RepositoryError('Given message and author needs to be '
34 raise RepositoryError('Given message and author needs to be '
35 'an <unicode> instance')
35 'an <unicode> instance got %r & %r instead'
36 % (type(message), type(author)))
36
37
37 if branch is None:
38 if branch is None:
38 branch = MercurialRepository.DEFAULT_BRANCH_NAME
39 branch = MercurialRepository.DEFAULT_BRANCH_NAME
@@ -422,7 +422,7 b' class FileNode(Node):'
422
422
423 def __repr__(self):
423 def __repr__(self):
424 return '<%s %r @ %s>' % (self.__class__.__name__, self.path,
424 return '<%s %r @ %s>' % (self.__class__.__name__, self.path,
425 self.changeset.short_id)
425 getattr(self.changeset, 'short_id', ''))
426
426
427
427
428 class RemovedFileNode(FileNode):
428 class RemovedFileNode(FileNode):
@@ -557,7 +557,7 b' class DirNode(Node):'
557
557
558 def __repr__(self):
558 def __repr__(self):
559 return '<%s %r @ %s>' % (self.__class__.__name__, self.path,
559 return '<%s %r @ %s>' % (self.__class__.__name__, self.path,
560 self.changeset.short_id)
560 getattr(self.changeset, 'short_id', ''))
561
561
562
562
563 class RootNode(DirNode):
563 class RootNode(DirNode):
@@ -591,7 +591,7 b' class SubModuleNode(Node):'
591
591
592 def __repr__(self):
592 def __repr__(self):
593 return '<%s %r @ %s>' % (self.__class__.__name__, self.path,
593 return '<%s %r @ %s>' % (self.__class__.__name__, self.path,
594 self.changeset.short_id)
594 getattr(self.changeset, 'short_id', ''))
595
595
596 def _extract_submodule_url(self):
596 def _extract_submodule_url(self):
597 if self.alias == 'git':
597 if self.alias == 'git':
@@ -10,6 +10,9 b' setup-app`) and provides the base testin'
10 import os
10 import os
11 import time
11 import time
12 import logging
12 import logging
13 import datetime
14 import hashlib
15 import tempfile
13 from os.path import join as jn
16 from os.path import join as jn
14
17
15 from unittest import TestCase
18 from unittest import TestCase
@@ -27,6 +30,7 b' from rhodecode.model.db import User'
27
30
28 import pylons.test
31 import pylons.test
29
32
33
30 os.environ['TZ'] = 'UTC'
34 os.environ['TZ'] = 'UTC'
31 if not is_windows:
35 if not is_windows:
32 time.tzset()
36 time.tzset()
@@ -36,9 +40,11 b' log = logging.getLogger(__name__)'
36 __all__ = [
40 __all__ = [
37 'environ', 'url', 'TestController', 'TESTS_TMP_PATH', 'HG_REPO',
41 'environ', 'url', 'TestController', 'TESTS_TMP_PATH', 'HG_REPO',
38 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO', 'HG_FORK', 'GIT_FORK',
42 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO', 'HG_FORK', 'GIT_FORK',
39 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
43 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN',
40 'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
44 'TEST_USER_REGULAR_PASS', 'TEST_USER_REGULAR_EMAIL',
41 'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL'
45 'TEST_USER_REGULAR2_LOGIN', 'TEST_USER_REGULAR2_PASS',
46 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO', 'TEST_GIT_REPO',
47 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO', 'SCM_TESTS',
42 ]
48 ]
43
49
44 # Invoke websetup with the current config file
50 # Invoke websetup with the current config file
@@ -73,6 +79,46 b" NEW_GIT_REPO = 'vcs_test_git_new'"
73 HG_FORK = 'vcs_test_hg_fork'
79 HG_FORK = 'vcs_test_hg_fork'
74 GIT_FORK = 'vcs_test_git_fork'
80 GIT_FORK = 'vcs_test_git_fork'
75
81
82 ## VCS
83 SCM_TESTS = ['hg', 'git']
84 uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
85
86 THIS = os.path.abspath(os.path.dirname(__file__))
87
88 GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
89 TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
90 TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
91 TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
92
93
94 HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
95 TEST_HG_REPO = jn(TESTS_TMP_PATH, 'vcs-hg')
96 TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
97 TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
98
99 TEST_DIR = tempfile.gettempdir()
100 TEST_REPO_PREFIX = 'vcs-test'
101
102
103 def get_new_dir(title):
104 """
105 Returns always new directory path.
106 """
107 from rhodecode.tests.vcs.utils import get_normalized_path
108 name = TEST_REPO_PREFIX
109 if title:
110 name = '-'.join((name, title))
111 hex = hashlib.sha1(str(time.time())).hexdigest()
112 name = '-'.join((name, hex))
113 path = os.path.join(TEST_DIR, name)
114 return get_normalized_path(path)
115
116
117 PACKAGE_DIR = os.path.abspath(os.path.join(
118 os.path.dirname(__file__), '..'))
119
120 TEST_USER_CONFIG_FILE = jn(THIS, 'aconfig')
121
76
122
77 class TestController(TestCase):
123 class TestController(TestCase):
78
124
@@ -90,8 +136,8 b' class TestController(TestCase):'
90 password=TEST_USER_ADMIN_PASS):
136 password=TEST_USER_ADMIN_PASS):
91 self._logged_username = username
137 self._logged_username = username
92 response = self.app.post(url(controller='login', action='index'),
138 response = self.app.post(url(controller='login', action='index'),
93 {'username':username,
139 {'username': username,
94 'password':password})
140 'password': password})
95
141
96 if 'invalid user name' in response.body:
142 if 'invalid user name' in response.body:
97 self.fail('could not login using %s %s' % (username, password))
143 self.fail('could not login using %s %s' % (username, password))
@@ -186,6 +186,14 b' class TestFilesController(TestController'
186
186
187 response.mustcontain("""<span style="text-transform: uppercase;"><a href="#">branch: default</a></span>""")
187 response.mustcontain("""<span style="text-transform: uppercase;"><a href="#">branch: default</a></span>""")
188
188
189 def test_file_annotation_git(self):
190 self.log_user()
191 response = self.app.get(url(controller='files', action='index',
192 repo_name=GIT_REPO,
193 revision='master',
194 f_path='vcs/nodes.py',
195 annotate=True))
196
189 def test_archival(self):
197 def test_archival(self):
190 self.log_user()
198 self.log_user()
191
199
@@ -2,6 +2,7 b' from rhodecode.tests import *'
2 import os
2 import os
3 from nose.plugins.skip import SkipTest
3 from nose.plugins.skip import SkipTest
4
4
5
5 class TestSearchController(TestController):
6 class TestSearchController(TestController):
6
7
7 def test_index(self):
8 def test_index(self):
@@ -18,20 +19,21 b' class TestSearchController(TestControlle'
18 else:
19 else:
19 self.log_user()
20 self.log_user()
20 response = self.app.get(url(controller='search', action='index'),
21 response = self.app.get(url(controller='search', action='index'),
21 {'q':HG_REPO})
22 {'q': HG_REPO})
22 self.assertTrue('There is no index to search in. '
23 self.assertTrue('There is no index to search in. '
23 'Please run whoosh indexer' in response.body)
24 'Please run whoosh indexer' in response.body)
24
25
25 def test_normal_search(self):
26 def test_normal_search(self):
26 self.log_user()
27 self.log_user()
27 response = self.app.get(url(controller='search', action='index'),
28 response = self.app.get(url(controller='search', action='index'),
28 {'q':'def repo'})
29 {'q': 'def repo'})
29 self.assertTrue('10 results' in response.body)
30 response.mustcontain('10 results')
30 self.assertTrue('Permission denied' not in response.body)
31 response.mustcontain('Permission denied')
31
32
32 def test_repo_search(self):
33 def test_repo_search(self):
33 self.log_user()
34 self.log_user()
34 response = self.app.get(url(controller='search', action='index'),
35 response = self.app.get(url(controller='search', action='index'),
35 {'q':'repository:%s def test' % HG_REPO})
36 {'q': 'repository:%s def test' % HG_REPO})
36 self.assertTrue('4 results' in response.body)
37
37 self.assertTrue('Permission denied' not in response.body)
38 response.mustcontain('4 results')
39 response.mustcontain('Permission denied')
@@ -15,8 +15,8 b' class TestSummaryController(TestControll'
15 #repo type
15 #repo type
16 response.mustcontain(
16 response.mustcontain(
17 """<img style="margin-bottom:2px" class="icon" """
17 """<img style="margin-bottom:2px" class="icon" """
18 """title="Mercurial repository" alt="Mercurial """
18 """title="Mercurial repository" alt="Mercurial repository" """
19 """repository" src="/images/icons/hgicon.png"/>"""
19 """src="/images/icons/hgicon.png"/>"""
20 )
20 )
21 response.mustcontain(
21 response.mustcontain(
22 """<img style="margin-bottom:2px" class="icon" """
22 """<img style="margin-bottom:2px" class="icon" """
@@ -41,10 +41,33 b' class TestSummaryController(TestControll'
41 )
41 )
42
42
43 # clone url...
43 # clone url...
44 response.mustcontain("""<input style="width:80%;margin-left:105px" type="text" id="clone_url" readonly="readonly" value="http://test_admin@localhost:80/vcs_test_hg"/>""")
44 response.mustcontain("""<input style="width:80%%;margin-left:105px" type="text" id="clone_url" readonly="readonly" value="http://test_admin@localhost:80/%s"/>""" % HG_REPO)
45 response.mustcontain("""<input style="display:none;width:80%;margin-left:105px" type="text" id="clone_url_id" readonly="readonly" value="http://test_admin@localhost:80/_1"/>""")
45 response.mustcontain("""<input style="display:none;width:80%%;margin-left:105px" type="text" id="clone_url_id" readonly="readonly" value="http://test_admin@localhost:80/_%s"/>""" % ID)
46
47 def test_index_git(self):
48 self.log_user()
49 ID = Repository.get_by_repo_name(GIT_REPO).repo_id
50 response = self.app.get(url(controller='summary',
51 action='index',
52 repo_name=GIT_REPO))
46
53
47 def test_index_by_id(self):
54 #repo type
55 response.mustcontain(
56 """<img style="margin-bottom:2px" class="icon" """
57 """title="Git repository" alt="Git repository" """
58 """src="/images/icons/giticon.png"/>"""
59 )
60 response.mustcontain(
61 """<img style="margin-bottom:2px" class="icon" """
62 """title="public repository" alt="public """
63 """repository" src="/images/icons/lock_open.png"/>"""
64 )
65
66 # clone url...
67 response.mustcontain("""<input style="width:80%%;margin-left:105px" type="text" id="clone_url" readonly="readonly" value="http://test_admin@localhost:80/%s"/>""" % GIT_REPO)
68 response.mustcontain("""<input style="display:none;width:80%%;margin-left:105px" type="text" id="clone_url_id" readonly="readonly" value="http://test_admin@localhost:80/_%s"/>""" % ID)
69
70 def test_index_by_id_hg(self):
48 self.log_user()
71 self.log_user()
49 ID = Repository.get_by_repo_name(HG_REPO).repo_id
72 ID = Repository.get_by_repo_name(HG_REPO).repo_id
50 response = self.app.get(url(controller='summary',
73 response = self.app.get(url(controller='summary',
@@ -59,6 +82,21 b' class TestSummaryController(TestControll'
59 """title="public repository" alt="public """
82 """title="public repository" alt="public """
60 """repository" src="/images/icons/lock_open.png"/>""")
83 """repository" src="/images/icons/lock_open.png"/>""")
61
84
85 def test_index_by_id_git(self):
86 self.log_user()
87 ID = Repository.get_by_repo_name(GIT_REPO).repo_id
88 response = self.app.get(url(controller='summary',
89 action='index',
90 repo_name='_%s' % ID))
91
92 #repo type
93 response.mustcontain("""<img style="margin-bottom:2px" class="icon" """
94 """title="Git repository" alt="Git """
95 """repository" src="/images/icons/hgicon.png"/>""")
96 response.mustcontain("""<img style="margin-bottom:2px" class="icon" """
97 """title="public repository" alt="public """
98 """repository" src="/images/icons/lock_open.png"/>""")
99
62 def _enable_stats(self):
100 def _enable_stats(self):
63 r = Repository.get_by_repo_name(HG_REPO)
101 r = Repository.get_by_repo_name(HG_REPO)
64 r.enable_statistics = True
102 r.enable_statistics = True
General Comments 0
You need to be logged in to leave comments. Login now