##// END OF EJS Templates
tests: removed use of unicode() calls
super-admin -
r4993:debf4b3b default
parent child Browse files
Show More
@@ -1,256 +1,256 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import time
22 22 import shutil
23 23 import datetime
24 24
25 25 import pytest
26 26
27 27 from rhodecode.lib.vcs.backends import get_backend
28 28 from rhodecode.lib.vcs.backends.base import Config
29 29 from rhodecode.lib.vcs.nodes import FileNode
30 30 from rhodecode.tests import get_new_dir
31 31 from rhodecode.tests.utils import check_skip_backends, check_xfail_backends
32 32
33 33
34 34 @pytest.fixture()
35 35 def vcs_repository_support(
36 36 request, backend_alias, baseapp, _vcs_repo_container):
37 37 """
38 38 Provide a test repository for the test run.
39 39
40 40 Depending on the value of `recreate_repo_per_test` a new repo for each
41 41 test will be created.
42 42
43 43 The parameter `--backends` can be used to limit this fixture to specific
44 44 backend implementations.
45 45 """
46 46 cls = request.cls
47 47
48 48 check_skip_backends(request.node, backend_alias)
49 49 check_xfail_backends(request.node, backend_alias)
50 50
51 51 if _should_create_repo_per_test(cls):
52 52 _vcs_repo_container = _create_vcs_repo_container(request)
53 53
54 54 repo = _vcs_repo_container.get_repo(cls, backend_alias=backend_alias)
55 55
56 56 # TODO: johbo: Supporting old test class api, think about removing this
57 57 cls.repo = repo
58 58 cls.repo_path = repo.path
59 59 cls.default_branch = repo.DEFAULT_BRANCH_NAME
60 60 cls.Backend = cls.backend_class = repo.__class__
61 61 cls.imc = repo.in_memory_commit
62 62
63 63 return backend_alias, repo
64 64
65 65
66 66 @pytest.fixture(scope='class')
67 67 def _vcs_repo_container(request):
68 68 """
69 69 Internal fixture intended to help support class based scoping on demand.
70 70 """
71 71 return _create_vcs_repo_container(request)
72 72
73 73
74 74 def _create_vcs_repo_container(request):
75 75 repo_container = VcsRepoContainer()
76 76 if not request.config.getoption('--keep-tmp-path'):
77 77 request.addfinalizer(repo_container.cleanup)
78 78 return repo_container
79 79
80 80
81 81 class VcsRepoContainer(object):
82 82
83 83 def __init__(self):
84 84 self._cleanup_paths = []
85 85 self._repos = {}
86 86
87 87 def get_repo(self, test_class, backend_alias):
88 88 if backend_alias not in self._repos:
89 89 repo = _create_empty_repository(test_class, backend_alias)
90 90
91 91 self._cleanup_paths.append(repo.path)
92 92 self._repos[backend_alias] = repo
93 93 return self._repos[backend_alias]
94 94
95 95 def cleanup(self):
96 96 for repo_path in reversed(self._cleanup_paths):
97 97 shutil.rmtree(repo_path)
98 98
99 99
100 100 def _should_create_repo_per_test(cls):
101 101 return getattr(cls, 'recreate_repo_per_test', False)
102 102
103 103
104 104 def _create_empty_repository(cls, backend_alias=None):
105 105 Backend = get_backend(backend_alias or cls.backend_alias)
106 106 repo_path = get_new_dir(str(time.time()))
107 107 repo = Backend(repo_path, create=True)
108 108 if hasattr(cls, '_get_commits'):
109 109 commits = cls._get_commits()
110 110 cls.tip = _add_commits_to_repo(repo, commits)
111 111
112 112 return repo
113 113
114 114
115 115 @pytest.fixture()
116 116 def config():
117 117 """
118 118 Instance of a repository config.
119 119
120 120 The instance contains only one value:
121 121
122 122 - Section: "section-a"
123 123 - Key: "a-1"
124 124 - Value: "value-a-1"
125 125
126 126 The intended usage is for cases where a config instance is needed but no
127 127 specific content is required.
128 128 """
129 129 config = Config()
130 130 config.set('section-a', 'a-1', 'value-a-1')
131 131 return config
132 132
133 133
134 134 def _add_commits_to_repo(repo, commits):
135 135 imc = repo.in_memory_commit
136 136 tip = None
137 137
138 138 for commit in commits:
139 139 for node in commit.get('added', []):
140 140 imc.add(FileNode(node.path, content=node.content))
141 141 for node in commit.get('changed', []):
142 142 imc.change(FileNode(node.path, content=node.content))
143 143 for node in commit.get('removed', []):
144 144 imc.remove(FileNode(node.path))
145 145
146 146 tip = imc.commit(
147 message=unicode(commit['message']),
148 author=unicode(commit['author']),
147 message=str(commit['message']),
148 author=str(commit['author']),
149 149 date=commit['date'],
150 150 branch=commit.get('branch'))
151 151 return tip
152 152
153 153
154 154 @pytest.fixture()
155 155 def vcs_repo(request, backend_alias):
156 156 Backend = get_backend(backend_alias)
157 157 repo_path = get_new_dir(str(time.time()))
158 158 repo = Backend(repo_path, create=True)
159 159
160 160 @request.addfinalizer
161 161 def cleanup():
162 162 shutil.rmtree(repo_path)
163 163
164 164 return repo
165 165
166 166
167 167 @pytest.fixture()
168 168 def generate_repo_with_commits(vcs_repo):
169 169 """
170 170 Creates a fabric to generate N comits with some file nodes on a randomly
171 171 generated repository
172 172 """
173 173
174 174 def commit_generator(num):
175 175 start_date = datetime.datetime(2010, 1, 1, 20)
176 176 for x in range(num):
177 177 yield {
178 178 'message': 'Commit %d' % x,
179 179 'author': 'Joe Doe <joe.doe@example.com>',
180 180 'date': start_date + datetime.timedelta(hours=12 * x),
181 181 'added': [
182 182 FileNode('file_%d.txt' % x, content='Foobar %d' % x),
183 183 ],
184 184 'modified': [
185 185 FileNode('file_%d.txt' % x,
186 186 content='Foobar %d modified' % (x-1)),
187 187 ]
188 188 }
189 189
190 190 def commit_maker(num=5):
191 191 _add_commits_to_repo(vcs_repo, commit_generator(num))
192 192 return vcs_repo
193 193
194 194 return commit_maker
195 195
196 196
197 197 @pytest.fixture()
198 198 def hg_repo(request, vcs_repo):
199 199 repo = vcs_repo
200 200
201 201 commits = repo._get_commits()
202 202 _add_commits_to_repo(repo, commits)
203 203
204 204 return repo
205 205
206 206
207 207 @pytest.fixture()
208 208 def hg_commit(hg_repo):
209 209 return hg_repo.get_commit()
210 210
211 211
212 212 class BackendTestMixin(object):
213 213 """
214 214 This is a backend independent test case class which should be created
215 215 with ``type`` method.
216 216
217 217 It is required to set following attributes at subclass:
218 218
219 219 - ``backend_alias``: alias of used backend (see ``vcs.BACKENDS``)
220 220 - ``repo_path``: path to the repository which would be created for set of
221 221 tests
222 222 - ``recreate_repo_per_test``: If set to ``False``, repo would NOT be
223 223 created
224 224 before every single test. Defaults to ``True``.
225 225 """
226 226 recreate_repo_per_test = True
227 227
228 228 @classmethod
229 229 def _get_commits(cls):
230 230 commits = [
231 231 {
232 232 'message': u'Initial commit',
233 233 'author': u'Joe Doe <joe.doe@example.com>',
234 234 'date': datetime.datetime(2010, 1, 1, 20),
235 235 'added': [
236 236 FileNode('foobar', content='Foobar'),
237 237 FileNode('foobar2', content='Foobar II'),
238 238 FileNode('foo/bar/baz', content='baz here!'),
239 239 ],
240 240 },
241 241 {
242 242 'message': u'Changes...',
243 243 'author': u'Jane Doe <jane.doe@example.com>',
244 244 'date': datetime.datetime(2010, 1, 1, 21),
245 245 'added': [
246 246 FileNode('some/new.txt', content='news...'),
247 247 ],
248 248 'changed': [
249 249 FileNode('foobar', 'Foobar I'),
250 250 ],
251 251 'removed': [],
252 252 },
253 253 ]
254 254 return commits
255 255
256 256
@@ -1,552 +1,552 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import datetime
22 22 from urllib.error import URLError
23 23
24 24 import mock
25 25 import pytest
26 26
27 27 from rhodecode.lib.vcs import backends
28 28 from rhodecode.lib.vcs.backends.base import (
29 29 Config, BaseInMemoryCommit, Reference, MergeResponse, MergeFailureReason)
30 30 from rhodecode.lib.vcs.exceptions import VCSError, RepositoryError
31 31 from rhodecode.lib.vcs.nodes import FileNode
32 32 from rhodecode.tests.vcs.conftest import BackendTestMixin
33 33 from rhodecode.tests import repo_id_generator
34 34
35 35
36 36 @pytest.mark.usefixtures("vcs_repository_support")
37 37 class TestRepositoryBase(BackendTestMixin):
38 38 recreate_repo_per_test = False
39 39
40 40 def test_init_accepts_unicode_path(self, tmpdir):
41 path = unicode(tmpdir.join(u'unicode Γ€'))
41 path = str(tmpdir.join(u'unicode Γ€'))
42 42 self.Backend(path, create=True)
43 43
44 44 def test_init_accepts_str_path(self, tmpdir):
45 45 path = str(tmpdir.join('str Γ€'))
46 46 self.Backend(path, create=True)
47 47
48 48 def test_init_fails_if_path_does_not_exist(self, tmpdir):
49 path = unicode(tmpdir.join('i-do-not-exist'))
49 path = str(tmpdir.join('i-do-not-exist'))
50 50 with pytest.raises(VCSError):
51 51 self.Backend(path)
52 52
53 53 def test_init_fails_if_path_is_not_a_valid_repository(self, tmpdir):
54 path = unicode(tmpdir.mkdir(u'unicode Γ€'))
54 path = str(tmpdir.mkdir(u'unicode Γ€'))
55 55 with pytest.raises(VCSError):
56 56 self.Backend(path)
57 57
58 58 def test_has_commits_attribute(self):
59 59 self.repo.commit_ids
60 60
61 61 def test_name(self):
62 62 assert self.repo.name.startswith('vcs-test')
63 63
64 64 @pytest.mark.backends("hg", "git")
65 65 def test_has_default_branch_name(self):
66 66 assert self.repo.DEFAULT_BRANCH_NAME is not None
67 67
68 68 @pytest.mark.backends("svn")
69 69 def test_has_no_default_branch_name(self):
70 70 assert self.repo.DEFAULT_BRANCH_NAME is None
71 71
72 72 def test_has_empty_commit(self):
73 73 assert self.repo.EMPTY_COMMIT_ID is not None
74 74 assert self.repo.EMPTY_COMMIT is not None
75 75
76 76 def test_empty_changeset_is_deprecated(self):
77 77 def get_empty_changeset(repo):
78 78 return repo.EMPTY_CHANGESET
79 79 pytest.deprecated_call(get_empty_changeset, self.repo)
80 80
81 81 def test_bookmarks(self):
82 82 assert len(self.repo.bookmarks) == 0
83 83
84 84 # TODO: Cover two cases: Local repo path, remote URL
85 85 def test_check_url(self):
86 86 config = Config()
87 87 assert self.Backend.check_url(self.repo.path, config)
88 88
89 89 def test_check_url_invalid(self):
90 90 config = Config()
91 91 with pytest.raises(URLError):
92 92 self.Backend.check_url(self.repo.path + "invalid", config)
93 93
94 94 def test_get_contact(self):
95 95 assert self.repo.contact
96 96
97 97 def test_get_description(self):
98 98 assert self.repo.description
99 99
100 100 def test_get_hook_location(self):
101 101 assert len(self.repo.get_hook_location()) != 0
102 102
103 103 def test_last_change(self, local_dt_to_utc):
104 104 assert self.repo.last_change >= local_dt_to_utc(
105 105 datetime.datetime(2010, 1, 1, 21, 0))
106 106
107 107 def test_last_change_in_empty_repository(self, vcsbackend, local_dt_to_utc):
108 108 delta = datetime.timedelta(seconds=1)
109 109
110 110 start = local_dt_to_utc(datetime.datetime.now())
111 111 empty_repo = vcsbackend.create_repo()
112 112 now = local_dt_to_utc(datetime.datetime.now())
113 113 assert empty_repo.last_change >= start - delta
114 114 assert empty_repo.last_change <= now + delta
115 115
116 116 def test_repo_equality(self):
117 117 assert self.repo == self.repo
118 118
119 119 def test_repo_equality_broken_object(self):
120 120 import copy
121 121 _repo = copy.copy(self.repo)
122 122 delattr(_repo, 'path')
123 123 assert self.repo != _repo
124 124
125 125 def test_repo_equality_other_object(self):
126 126 class dummy(object):
127 127 path = self.repo.path
128 128 assert self.repo != dummy()
129 129
130 130 def test_get_commit_is_implemented(self):
131 131 self.repo.get_commit()
132 132
133 133 def test_get_commits_is_implemented(self):
134 134 commit_iter = iter(self.repo.get_commits())
135 135 commit = next(commit_iter)
136 136 assert commit.idx == 0
137 137
138 138 def test_supports_iteration(self):
139 139 repo_iter = iter(self.repo)
140 140 commit = next(repo_iter)
141 141 assert commit.idx == 0
142 142
143 143 def test_in_memory_commit(self):
144 144 imc = self.repo.in_memory_commit
145 145 assert isinstance(imc, BaseInMemoryCommit)
146 146
147 147 @pytest.mark.backends("hg")
148 148 def test__get_url_unicode(self):
149 149 url = u'/home/repos/malmΓΆ'
150 150 assert self.repo._get_url(url)
151 151
152 152
153 153 @pytest.mark.usefixtures("vcs_repository_support")
154 154 class TestDeprecatedRepositoryAPI(BackendTestMixin):
155 155 recreate_repo_per_test = False
156 156
157 157 def test_revisions_is_deprecated(self):
158 158 def get_revisions(repo):
159 159 return repo.revisions
160 160 pytest.deprecated_call(get_revisions, self.repo)
161 161
162 162 def test_get_changeset_is_deprecated(self):
163 163 pytest.deprecated_call(self.repo.get_changeset)
164 164
165 165 def test_get_changesets_is_deprecated(self):
166 166 pytest.deprecated_call(self.repo.get_changesets)
167 167
168 168 def test_in_memory_changeset_is_deprecated(self):
169 169 def get_imc(repo):
170 170 return repo.in_memory_changeset
171 171 pytest.deprecated_call(get_imc, self.repo)
172 172
173 173
174 174 # TODO: these tests are incomplete, must check the resulting compare result for
175 175 # correcteness
176 176 class TestRepositoryCompare:
177 177
178 178 @pytest.mark.parametrize('merge', [True, False])
179 179 def test_compare_commits_of_same_repository(self, vcsbackend, merge):
180 180 target_repo = vcsbackend.create_repo(number_of_commits=5)
181 181 target_repo.compare(
182 182 target_repo[1].raw_id, target_repo[3].raw_id, target_repo,
183 183 merge=merge)
184 184
185 185 @pytest.mark.xfail_backends('svn')
186 186 @pytest.mark.parametrize('merge', [True, False])
187 187 def test_compare_cloned_repositories(self, vcsbackend, merge):
188 188 target_repo = vcsbackend.create_repo(number_of_commits=5)
189 189 source_repo = vcsbackend.clone_repo(target_repo)
190 190 assert target_repo != source_repo
191 191
192 192 vcsbackend.add_file(source_repo, 'newfile', 'somecontent')
193 193 source_commit = source_repo.get_commit()
194 194
195 195 target_repo.compare(
196 196 target_repo[1].raw_id, source_repo[3].raw_id, source_repo,
197 197 merge=merge)
198 198
199 199 @pytest.mark.xfail_backends('svn')
200 200 @pytest.mark.parametrize('merge', [True, False])
201 201 def test_compare_unrelated_repositories(self, vcsbackend, merge):
202 202 orig = vcsbackend.create_repo(number_of_commits=5)
203 203 unrelated = vcsbackend.create_repo(number_of_commits=5)
204 204 assert orig != unrelated
205 205
206 206 orig.compare(
207 207 orig[1].raw_id, unrelated[3].raw_id, unrelated, merge=merge)
208 208
209 209
210 210 class TestRepositoryGetCommonAncestor:
211 211
212 212 def test_get_common_ancestor_from_same_repo_existing(self, vcsbackend):
213 213 target_repo = vcsbackend.create_repo(number_of_commits=5)
214 214
215 215 expected_ancestor = target_repo[2].raw_id
216 216
217 217 assert target_repo.get_common_ancestor(
218 218 commit_id1=target_repo[2].raw_id,
219 219 commit_id2=target_repo[4].raw_id,
220 220 repo2=target_repo
221 221 ) == expected_ancestor
222 222
223 223 assert target_repo.get_common_ancestor(
224 224 commit_id1=target_repo[4].raw_id,
225 225 commit_id2=target_repo[2].raw_id,
226 226 repo2=target_repo
227 227 ) == expected_ancestor
228 228
229 229 @pytest.mark.xfail_backends("svn")
230 230 def test_get_common_ancestor_from_cloned_repo_existing(self, vcsbackend):
231 231 target_repo = vcsbackend.create_repo(number_of_commits=5)
232 232 source_repo = vcsbackend.clone_repo(target_repo)
233 233 assert target_repo != source_repo
234 234
235 235 vcsbackend.add_file(source_repo, 'newfile', 'somecontent')
236 236 source_commit = source_repo.get_commit()
237 237
238 238 expected_ancestor = target_repo[4].raw_id
239 239
240 240 assert target_repo.get_common_ancestor(
241 241 commit_id1=target_repo[4].raw_id,
242 242 commit_id2=source_commit.raw_id,
243 243 repo2=source_repo
244 244 ) == expected_ancestor
245 245
246 246 assert target_repo.get_common_ancestor(
247 247 commit_id1=source_commit.raw_id,
248 248 commit_id2=target_repo[4].raw_id,
249 249 repo2=target_repo
250 250 ) == expected_ancestor
251 251
252 252 @pytest.mark.xfail_backends("svn")
253 253 def test_get_common_ancestor_from_unrelated_repo_missing(self, vcsbackend):
254 254 original = vcsbackend.create_repo(number_of_commits=5)
255 255 unrelated = vcsbackend.create_repo(number_of_commits=5)
256 256 assert original != unrelated
257 257
258 258 assert original.get_common_ancestor(
259 259 commit_id1=original[0].raw_id,
260 260 commit_id2=unrelated[0].raw_id,
261 261 repo2=unrelated
262 262 ) is None
263 263
264 264 assert original.get_common_ancestor(
265 265 commit_id1=original[-1].raw_id,
266 266 commit_id2=unrelated[-1].raw_id,
267 267 repo2=unrelated
268 268 ) is None
269 269
270 270
271 271 @pytest.mark.backends("git", "hg")
272 272 class TestRepositoryMerge(object):
273 273 def prepare_for_success(self, vcsbackend):
274 274 self.target_repo = vcsbackend.create_repo(number_of_commits=1)
275 275 self.source_repo = vcsbackend.clone_repo(self.target_repo)
276 276 vcsbackend.add_file(self.target_repo, 'README_MERGE1', 'Version 1')
277 277 vcsbackend.add_file(self.source_repo, 'README_MERGE2', 'Version 2')
278 278 imc = self.source_repo.in_memory_commit
279 279 imc.add(FileNode('file_x', content=self.source_repo.name))
280 280 imc.commit(
281 281 message=u'Automatic commit from repo merge test',
282 282 author=u'Automatic <automatic@rhodecode.com>')
283 283 self.target_commit = self.target_repo.get_commit()
284 284 self.source_commit = self.source_repo.get_commit()
285 285 # This only works for Git and Mercurial
286 286 default_branch = self.target_repo.DEFAULT_BRANCH_NAME
287 287 self.target_ref = Reference('branch', default_branch, self.target_commit.raw_id)
288 288 self.source_ref = Reference('branch', default_branch, self.source_commit.raw_id)
289 289 self.workspace_id = 'test-merge-{}'.format(vcsbackend.alias)
290 290 self.repo_id = repo_id_generator(self.target_repo.path)
291 291
292 292 def prepare_for_conflict(self, vcsbackend):
293 293 self.target_repo = vcsbackend.create_repo(number_of_commits=1)
294 294 self.source_repo = vcsbackend.clone_repo(self.target_repo)
295 295 vcsbackend.add_file(self.target_repo, 'README_MERGE', 'Version 1')
296 296 vcsbackend.add_file(self.source_repo, 'README_MERGE', 'Version 2')
297 297 self.target_commit = self.target_repo.get_commit()
298 298 self.source_commit = self.source_repo.get_commit()
299 299 # This only works for Git and Mercurial
300 300 default_branch = self.target_repo.DEFAULT_BRANCH_NAME
301 301 self.target_ref = Reference('branch', default_branch, self.target_commit.raw_id)
302 302 self.source_ref = Reference('branch', default_branch, self.source_commit.raw_id)
303 303 self.workspace_id = 'test-merge-{}'.format(vcsbackend.alias)
304 304 self.repo_id = repo_id_generator(self.target_repo.path)
305 305
306 306 def test_merge_success(self, vcsbackend):
307 307 self.prepare_for_success(vcsbackend)
308 308
309 309 merge_response = self.target_repo.merge(
310 310 self.repo_id, self.workspace_id, self.target_ref, self.source_repo,
311 311 self.source_ref,
312 312 'test user', 'test@rhodecode.com', 'merge message 1',
313 313 dry_run=False)
314 314 expected_merge_response = MergeResponse(
315 315 True, True, merge_response.merge_ref,
316 316 MergeFailureReason.NONE)
317 317 assert merge_response == expected_merge_response
318 318
319 319 target_repo = backends.get_backend(vcsbackend.alias)(
320 320 self.target_repo.path)
321 321 target_commits = list(target_repo.get_commits())
322 322 commit_ids = [c.raw_id for c in target_commits[:-1]]
323 323 assert self.source_ref.commit_id in commit_ids
324 324 assert self.target_ref.commit_id in commit_ids
325 325
326 326 merge_commit = target_commits[-1]
327 327 assert merge_commit.raw_id == merge_response.merge_ref.commit_id
328 328 assert merge_commit.message.strip() == 'merge message 1'
329 329 assert merge_commit.author == 'test user <test@rhodecode.com>'
330 330
331 331 # We call it twice so to make sure we can handle updates
332 332 target_ref = Reference(
333 333 self.target_ref.type, self.target_ref.name,
334 334 merge_response.merge_ref.commit_id)
335 335
336 336 merge_response = target_repo.merge(
337 337 self.repo_id, self.workspace_id, target_ref, self.source_repo, self.source_ref,
338 338 'test user', 'test@rhodecode.com', 'merge message 2',
339 339 dry_run=False)
340 340 expected_merge_response = MergeResponse(
341 341 True, True, merge_response.merge_ref,
342 342 MergeFailureReason.NONE)
343 343 assert merge_response == expected_merge_response
344 344
345 345 target_repo = backends.get_backend(
346 346 vcsbackend.alias)(self.target_repo.path)
347 347 merge_commit = target_repo.get_commit(
348 348 merge_response.merge_ref.commit_id)
349 349 assert merge_commit.message.strip() == 'merge message 1'
350 350 assert merge_commit.author == 'test user <test@rhodecode.com>'
351 351
352 352 def test_merge_success_dry_run(self, vcsbackend):
353 353 self.prepare_for_success(vcsbackend)
354 354
355 355 merge_response = self.target_repo.merge(
356 356 self.repo_id, self.workspace_id, self.target_ref, self.source_repo,
357 357 self.source_ref, dry_run=True)
358 358
359 359 # We call it twice so to make sure we can handle updates
360 360 merge_response_update = self.target_repo.merge(
361 361 self.repo_id, self.workspace_id, self.target_ref, self.source_repo,
362 362 self.source_ref, dry_run=True)
363 363
364 364 # Multiple merges may differ in their commit id. Therefore we set the
365 365 # commit id to `None` before comparing the merge responses.
366 366 new_merge_ref = merge_response.merge_ref._replace(commit_id=None)
367 367 merge_response.merge_ref = new_merge_ref
368 368
369 369 new_update_merge_ref = merge_response_update.merge_ref._replace(commit_id=None)
370 370 merge_response_update.merge_ref = new_update_merge_ref
371 371
372 372 assert merge_response == merge_response_update
373 373 assert merge_response.possible is True
374 374 assert merge_response.executed is False
375 375 assert merge_response.merge_ref
376 376 assert merge_response.failure_reason is MergeFailureReason.NONE
377 377
378 378 @pytest.mark.parametrize('dry_run', [True, False])
379 379 def test_merge_conflict(self, vcsbackend, dry_run):
380 380 self.prepare_for_conflict(vcsbackend)
381 381
382 382 expected_merge_response = MergeResponse(
383 383 False, False, None, MergeFailureReason.MERGE_FAILED)
384 384
385 385 merge_response = self.target_repo.merge(
386 386 self.repo_id, self.workspace_id, self.target_ref,
387 387 self.source_repo, self.source_ref,
388 388 'test_user', 'test@rhodecode.com', 'test message', dry_run=dry_run)
389 389 assert merge_response == expected_merge_response
390 390
391 391 # We call it twice so to make sure we can handle updates
392 392 merge_response = self.target_repo.merge(
393 393 self.repo_id, self.workspace_id, self.target_ref, self.source_repo,
394 394 self.source_ref,
395 395 'test_user', 'test@rhodecode.com', 'test message', dry_run=dry_run)
396 396 assert merge_response == expected_merge_response
397 397
398 398 def test_merge_target_is_not_head(self, vcsbackend):
399 399 self.prepare_for_success(vcsbackend)
400 400 target_ref = Reference(
401 401 self.target_ref.type, self.target_ref.name, '0' * 40)
402 402 expected_merge_response = MergeResponse(
403 403 False, False, None, MergeFailureReason.TARGET_IS_NOT_HEAD,
404 404 metadata={'target_ref': target_ref})
405 405 merge_response = self.target_repo.merge(
406 406 self.repo_id, self.workspace_id, target_ref, self.source_repo,
407 407 self.source_ref, dry_run=True)
408 408
409 409 assert merge_response == expected_merge_response
410 410
411 411 def test_merge_missing_source_reference(self, vcsbackend):
412 412 self.prepare_for_success(vcsbackend)
413 413
414 414 source_ref = Reference(
415 415 self.source_ref.type, 'not_existing', self.source_ref.commit_id)
416 416 expected_merge_response = MergeResponse(
417 417 False, False, None, MergeFailureReason.MISSING_SOURCE_REF,
418 418 metadata={'source_ref': source_ref})
419 419
420 420 merge_response = self.target_repo.merge(
421 421 self.repo_id, self.workspace_id, self.target_ref,
422 422 self.source_repo, source_ref,
423 423 dry_run=True)
424 424
425 425 assert merge_response == expected_merge_response
426 426
427 427 def test_merge_raises_exception(self, vcsbackend):
428 428 self.prepare_for_success(vcsbackend)
429 429 expected_merge_response = MergeResponse(
430 430 False, False, None, MergeFailureReason.UNKNOWN,
431 431 metadata={'exception': 'ErrorForTest'})
432 432
433 433 with mock.patch.object(self.target_repo, '_merge_repo',
434 434 side_effect=RepositoryError()):
435 435 merge_response = self.target_repo.merge(
436 436 self.repo_id, self.workspace_id, self.target_ref,
437 437 self.source_repo, self.source_ref,
438 438 dry_run=True)
439 439
440 440 assert merge_response == expected_merge_response
441 441
442 442 def test_merge_invalid_user_name(self, vcsbackend):
443 443 repo = vcsbackend.create_repo(number_of_commits=1)
444 444 ref = Reference('branch', 'master', 'not_used')
445 445 workspace_id = 'test-errors-in-merge'
446 446 repo_id = repo_id_generator(workspace_id)
447 447 with pytest.raises(ValueError):
448 448 repo.merge(repo_id, workspace_id, ref, self, ref)
449 449
450 450 def test_merge_invalid_user_email(self, vcsbackend):
451 451 repo = vcsbackend.create_repo(number_of_commits=1)
452 452 ref = Reference('branch', 'master', 'not_used')
453 453 workspace_id = 'test-errors-in-merge'
454 454 repo_id = repo_id_generator(workspace_id)
455 455 with pytest.raises(ValueError):
456 456 repo.merge(
457 457 repo_id, workspace_id, ref, self, ref, 'user name')
458 458
459 459 def test_merge_invalid_message(self, vcsbackend):
460 460 repo = vcsbackend.create_repo(number_of_commits=1)
461 461 ref = Reference('branch', 'master', 'not_used')
462 462 workspace_id = 'test-errors-in-merge'
463 463 repo_id = repo_id_generator(workspace_id)
464 464 with pytest.raises(ValueError):
465 465 repo.merge(
466 466 repo_id, workspace_id, ref, self, ref,
467 467 'user name', 'user@email.com')
468 468
469 469
470 470 @pytest.mark.usefixtures("vcs_repository_support")
471 471 class TestRepositoryStrip(BackendTestMixin):
472 472 recreate_repo_per_test = True
473 473
474 474 @classmethod
475 475 def _get_commits(cls):
476 476 commits = [
477 477 {
478 478 'message': 'Initial commit',
479 479 'author': 'Joe Doe <joe.doe@example.com>',
480 480 'date': datetime.datetime(2010, 1, 1, 20),
481 481 'branch': 'master',
482 482 'added': [
483 483 FileNode('foobar', content='foobar'),
484 484 FileNode('foobar2', content='foobar2'),
485 485 ],
486 486 },
487 487 ]
488 488 for x in range(10):
489 489 commit_data = {
490 490 'message': 'Changed foobar - commit%s' % x,
491 491 'author': 'Jane Doe <jane.doe@example.com>',
492 492 'date': datetime.datetime(2010, 1, 1, 21, x),
493 493 'branch': 'master',
494 494 'changed': [
495 495 FileNode('foobar', 'FOOBAR - %s' % x),
496 496 ],
497 497 }
498 498 commits.append(commit_data)
499 499 return commits
500 500
501 501 @pytest.mark.backends("git", "hg")
502 502 def test_strip_commit(self):
503 503 tip = self.repo.get_commit()
504 504 assert tip.idx == 10
505 505 self.repo.strip(tip.raw_id, self.repo.DEFAULT_BRANCH_NAME)
506 506
507 507 tip = self.repo.get_commit()
508 508 assert tip.idx == 9
509 509
510 510 @pytest.mark.backends("git", "hg")
511 511 def test_strip_multiple_commits(self):
512 512 tip = self.repo.get_commit()
513 513 assert tip.idx == 10
514 514
515 515 old = self.repo.get_commit(commit_idx=5)
516 516 self.repo.strip(old.raw_id, self.repo.DEFAULT_BRANCH_NAME)
517 517
518 518 tip = self.repo.get_commit()
519 519 assert tip.idx == 4
520 520
521 521
522 522 @pytest.mark.backends('hg', 'git')
523 523 class TestRepositoryPull(object):
524 524
525 525 def test_pull(self, vcsbackend):
526 526 source_repo = vcsbackend.repo
527 527 target_repo = vcsbackend.create_repo()
528 528 assert len(source_repo.commit_ids) > len(target_repo.commit_ids)
529 529
530 530 target_repo.pull(source_repo.path)
531 531 # Note: Get a fresh instance, avoids caching trouble
532 532 target_repo = vcsbackend.backend(target_repo.path)
533 533 assert len(source_repo.commit_ids) == len(target_repo.commit_ids)
534 534
535 535 def test_pull_wrong_path(self, vcsbackend):
536 536 target_repo = vcsbackend.create_repo()
537 537 with pytest.raises(RepositoryError):
538 538 target_repo.pull(target_repo.path + "wrong")
539 539
540 540 def test_pull_specific_commits(self, vcsbackend):
541 541 source_repo = vcsbackend.repo
542 542 target_repo = vcsbackend.create_repo()
543 543
544 544 second_commit = source_repo[1].raw_id
545 545 if vcsbackend.alias == 'git':
546 546 second_commit_ref = 'refs/test-refs/a'
547 547 source_repo.set_refs(second_commit_ref, second_commit)
548 548
549 549 target_repo.pull(source_repo.path, commit_ids=[second_commit])
550 550 target_repo = vcsbackend.backend(target_repo.path)
551 551 assert 2 == len(target_repo.commit_ids)
552 552 assert second_commit == target_repo.get_commit().raw_id
@@ -1,186 +1,186 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22
23 23 import mock
24 24 import pytest
25 25
26 26 from rhodecode.tests import SVN_REPO, TEST_DIR, TESTS_TMP_PATH
27 27 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
28 28 from rhodecode.lib.vcs.conf import settings
29 29 from rhodecode.lib.vcs.exceptions import VCSError
30 30
31 31
32 32 pytestmark = [
33 33 pytest.mark.backends("svn"),
34 34 pytest.mark.usefixtures("baseapp"),
35 35 ]
36 36
37 37
38 38 @pytest.fixture()
39 39 def repo(baseapp):
40 40 repo = SubversionRepository(os.path.join(TESTS_TMP_PATH, SVN_REPO))
41 41 return repo
42 42
43 43
44 44 @pytest.fixture()
45 45 def head(repo):
46 46 return repo.get_commit()
47 47
48 48
49 49 def test_init_fails_if_path_does_not_exist():
50 50 path = os.path.join(TEST_DIR, 'i-do-not-exist')
51 51 with pytest.raises(VCSError):
52 52 SubversionRepository(path)
53 53
54 54
55 55 def test_init_fails_if_path_is_not_a_valid_repository(tmpdir):
56 path = unicode(tmpdir.mkdir(u'unicode Γ€'))
56 path = str(tmpdir.mkdir('unicode Γ€'))
57 57 with pytest.raises(VCSError):
58 58 SubversionRepository(path)
59 59
60 60
61 61 def test_repo_clone(vcsbackend, reposerver):
62 62 source = vcsbackend.create_repo(number_of_commits=3)
63 63 reposerver.serve(source)
64 64 repo = SubversionRepository(
65 65 vcsbackend.new_repo_path(),
66 66 create=True,
67 67 src_url=reposerver.url)
68 68
69 69 assert source.commit_ids == repo.commit_ids
70 70 assert source[0].message == repo[0].message
71 71
72 72
73 73 def test_latest_commit(head):
74 74 assert head.raw_id == '393'
75 75
76 76
77 77 def test_commit_description(head):
78 78 assert head.message == """Added a symlink"""
79 79
80 80
81 81 def test_commit_author(head):
82 82 assert head.author == 'marcin'
83 83
84 84
85 85 @pytest.mark.parametrize("filename, content, mime_type", [
86 86 ('test.txt', 'Text content\n', None),
87 87 ('test.bin', '\0 binary \0', 'application/octet-stream'),
88 88 ], ids=['text', 'binary'])
89 89 def test_sets_mime_type_correctly(vcsbackend, filename, content, mime_type):
90 90 repo = vcsbackend.create_repo()
91 91 vcsbackend.ensure_file(filename, content)
92 92 file_properties = repo._remote.node_properties(filename, 1)
93 93 assert file_properties.get('svn:mime-type') == mime_type
94 94
95 95
96 96 def test_slice_access(repo):
97 97 page_size = 5
98 98 page = 0
99 99 start = page * page_size
100 100 end = start + page_size - 1
101 101
102 102 commits = list(repo[start:end])
103 103 assert [commit.raw_id for commit in commits] == ['1', '2', '3', '4']
104 104
105 105
106 106 def test_walk_changelog_page(repo):
107 107 page_size = 5
108 108 page = 0
109 109 start = page * page_size
110 110 end = start + page_size - 1
111 111
112 112 commits = list(repo[start:end])
113 113 changelog = [
114 114 'r%s, %s, %s' % (c.raw_id, c.author, c.message) for c in commits]
115 115
116 116 expexted_messages = [
117 117 'r1, marcin, initial import',
118 118 'r2, marcin, hg ignore',
119 119 'r3, marcin, Pip standards refactor',
120 120 'r4, marcin, Base repository few new functions added']
121 121 assert changelog == expexted_messages
122 122
123 123
124 124 def test_read_full_file_tree(head):
125 125 for topnode, dirs, files in head.walk():
126 126 for f in files:
127 127 len(f.content)
128 128
129 129
130 130 def test_topnode_files_attribute(head):
131 131 topnode = head.get_node('')
132 132 topnode.files
133 133
134 134
135 135 @pytest.mark.parametrize("filename, content, branch, mime_type", [
136 136 (u'branches/plain/test.txt', 'Text content\n', 'plain', None),
137 137 (u'branches/uniΓ§ΓΆβˆ‚e/test.bin', '\0 binary \0', u'uniΓ§ΓΆβˆ‚e',
138 138 'application/octet-stream'),
139 139 ], ids=['text', 'binary'])
140 140 def test_unicode_refs(vcsbackend, filename, content, branch, mime_type):
141 141 repo = vcsbackend.create_repo()
142 142 vcsbackend.ensure_file(filename, content)
143 143 with mock.patch(("rhodecode.lib.vcs.backends.svn.repository"
144 144 ".SubversionRepository._patterns_from_section"),
145 145 return_value=['branches/*']):
146 146 assert u'branches/{0}'.format(branch) in repo.branches
147 147
148 148
149 149 def test_compatible_version(monkeypatch, vcsbackend):
150 150 monkeypatch.setattr(settings, 'SVN_COMPATIBLE_VERSION', 'pre-1.8-compatible')
151 151 path = vcsbackend.new_repo_path()
152 152 SubversionRepository(path, create=True)
153 153 with open('{}/db/format'.format(path)) as f:
154 154 first_line = f.readline().strip()
155 155 assert first_line == '4'
156 156
157 157
158 158 def test_invalid_compatible_version(monkeypatch, vcsbackend):
159 159 monkeypatch.setattr(settings, 'SVN_COMPATIBLE_VERSION', 'i-am-an-invalid-setting')
160 160 path = vcsbackend.new_repo_path()
161 161 with pytest.raises(Exception):
162 162 SubversionRepository(path, create=True)
163 163
164 164
165 165 class TestSVNCommit(object):
166 166
167 167 @pytest.fixture(autouse=True)
168 168 def prepare(self, repo):
169 169 self.repo = repo
170 170
171 171 def test_file_history_from_commits(self):
172 172 node = self.repo[10].get_node('setup.py')
173 173 commit_ids = [commit.raw_id for commit in node.history]
174 174 assert ['8'] == commit_ids
175 175
176 176 node = self.repo[20].get_node('setup.py')
177 177 node_ids = [commit.raw_id for commit in node.history]
178 178 assert ['18',
179 179 '8'] == node_ids
180 180
181 181 # special case we check history from commit that has this particular
182 182 # file changed this means we check if it's included as well
183 183 node = self.repo.get_commit('18').get_node('setup.py')
184 184 node_ids = [commit.raw_id for commit in node.history]
185 185 assert ['18',
186 186 '8'] == node_ids
General Comments 0
You need to be logged in to leave comments. Login now