##// END OF EJS Templates
tests: fixed merge tests cases
super-admin -
r5177:80d550be default
parent child Browse files
Show More
@@ -1,560 +1,648 b''
1
2 1 # Copyright (C) 2010-2023 RhodeCode GmbH
3 2 #
4 3 # This program is free software: you can redistribute it and/or modify
5 4 # it under the terms of the GNU Affero General Public License, version 3
6 5 # (only), as published by the Free Software Foundation.
7 6 #
8 7 # This program is distributed in the hope that it will be useful,
9 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 10 # GNU General Public License for more details.
12 11 #
13 12 # You should have received a copy of the GNU Affero General Public License
14 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 14 #
16 15 # This program is dual-licensed. If you wish to learn more about the
17 16 # RhodeCode Enterprise Edition, including its added features, Support services,
18 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 18
20 19 import datetime
21 20 from urllib.error import URLError
22 21
23 22 import mock
24 23 import pytest
25 24
26 25 from rhodecode.lib.vcs import backends
27 26 from rhodecode.lib.vcs.backends.base import (
28 Config, BaseInMemoryCommit, Reference, MergeResponse, MergeFailureReason)
27 Config,
28 BaseInMemoryCommit,
29 Reference,
30 MergeResponse,
31 MergeFailureReason,
32 )
29 33 from rhodecode.lib.vcs.exceptions import VCSError, RepositoryError
30 34 from rhodecode.lib.vcs.nodes import FileNode
31 35 from rhodecode.tests.vcs.conftest import BackendTestMixin
32 36 from rhodecode.tests import repo_id_generator
33 37
34 38
35 39 @pytest.mark.usefixtures("vcs_repository_support")
36 40 class TestRepositoryBase(BackendTestMixin):
37 41 recreate_repo_per_test = False
38 42
39 43 def test_init_accepts_unicode_path(self, tmpdir):
40 path = str(tmpdir.join(u'unicode Γ€'))
44 path = str(tmpdir.join("unicode Γ€"))
41 45 self.Backend(path, create=True)
42 46
43 47 def test_init_accepts_str_path(self, tmpdir):
44 path = str(tmpdir.join('str Γ€'))
48 path = str(tmpdir.join("str Γ€"))
45 49 self.Backend(path, create=True)
46 50
47 51 def test_init_fails_if_path_does_not_exist(self, tmpdir):
48 path = str(tmpdir.join('i-do-not-exist'))
52 path = str(tmpdir.join("i-do-not-exist"))
49 53 with pytest.raises(VCSError):
50 54 self.Backend(path)
51 55
52 56 def test_init_fails_if_path_is_not_a_valid_repository(self, tmpdir):
53 path = str(tmpdir.mkdir(u'unicode Γ€'))
57 path = str(tmpdir.mkdir("unicode Γ€"))
54 58 with pytest.raises(VCSError):
55 59 self.Backend(path)
56 60
57 61 def test_has_commits_attribute(self):
58 62 assert self.repo.commit_ids
59 63
60 64 def test_name(self):
61 assert self.repo.name.startswith('vcs-test')
65 assert self.repo.name.startswith("vcs-test")
62 66
63 67 @pytest.mark.backends("hg", "git")
64 68 def test_has_default_branch_name(self):
65 69 assert self.repo.DEFAULT_BRANCH_NAME is not None
66 70
67 71 @pytest.mark.backends("svn")
68 72 def test_has_no_default_branch_name(self):
69 73 assert self.repo.DEFAULT_BRANCH_NAME is None
70 74
71 75 def test_has_empty_commit(self):
72 76 assert self.repo.EMPTY_COMMIT_ID is not None
73 77 assert self.repo.EMPTY_COMMIT is not None
74 78
75 79 def test_empty_changeset_is_deprecated(self):
76 80 def get_empty_changeset(repo):
77 81 return repo.EMPTY_CHANGESET
82
78 83 pytest.deprecated_call(get_empty_changeset, self.repo)
79 84
80 85 def test_bookmarks(self):
81 86 assert len(self.repo.bookmarks) == 0
82 87
83 88 def test_check_url_on_path(self):
84 89 config = Config()
85 90 assert self.Backend.check_url(self.repo.path, config)
86 91
87 92 def test_check_url_on_remote_url(self):
88 93 config = Config()
89 94 url = {
90 'hg': 'https://code.rhodecode.com/rhodecode-vcsserver',
91 'svn': 'https://code.rhodecode.com/svn-doc',
92 'git': 'https://code.rhodecode.com/appenlight',
95 "hg": "https://code.rhodecode.com/rhodecode-vcsserver",
96 "svn": "https://code.rhodecode.com/svn-doc",
97 "git": "https://code.rhodecode.com/appenlight",
93 98 }[self.repo.alias]
94 99
95 100 assert self.Backend.check_url(url, config)
96 101
97 102 def test_check_url_invalid(self):
98 103 config = Config()
99 104 with pytest.raises(URLError):
100 105 self.Backend.check_url(self.repo.path + "invalid", config)
101 106
102 107 def test_get_contact(self):
103 108 assert self.repo.contact
104 109
105 110 def test_get_description(self):
106 111 assert self.repo.description
107 112
108 113 def test_get_hook_location(self):
109 114 assert len(self.repo.get_hook_location()) != 0
110 115
111 116 def test_last_change(self, local_dt_to_utc):
112 117 assert self.repo.last_change >= local_dt_to_utc(
113 datetime.datetime(2010, 1, 1, 21, 0))
118 datetime.datetime(2010, 1, 1, 21, 0)
119 )
114 120
115 121 def test_last_change_in_empty_repository(self, vcsbackend, local_dt_to_utc):
116 122 delta = datetime.timedelta(seconds=1)
117 123
118 124 start = local_dt_to_utc(datetime.datetime.now())
119 125 empty_repo = vcsbackend.create_repo()
120 126 now = local_dt_to_utc(datetime.datetime.now())
121 127 assert empty_repo.last_change >= start - delta
122 128 assert empty_repo.last_change <= now + delta
123 129
124 130 def test_repo_equality(self):
125 131 assert self.repo == self.repo
126 132
127 133 def test_repo_equality_broken_object(self):
128 134 import copy
135
129 136 _repo = copy.copy(self.repo)
130 delattr(_repo, 'path')
137 delattr(_repo, "path")
131 138 assert self.repo != _repo
132 139
133 140 def test_repo_equality_other_object(self):
134 141 class dummy(object):
135 142 path = self.repo.path
143
136 144 assert self.repo != dummy()
137 145
138 146 def test_get_commit_is_implemented(self):
139 147 self.repo.get_commit()
140 148
141 149 def test_get_commits_is_implemented(self):
142 150 commit_iter = iter(self.repo.get_commits())
143 151 commit = next(commit_iter)
144 152 assert commit.idx == 0
145 153
146 154 def test_supports_iteration(self):
147 155 repo_iter = iter(self.repo)
148 156 commit = next(repo_iter)
149 157 assert commit.idx == 0
150 158
151 159 def test_in_memory_commit(self):
152 160 imc = self.repo.in_memory_commit
153 161 assert isinstance(imc, BaseInMemoryCommit)
154 162
155 163 @pytest.mark.backends("hg")
156 164 def test__get_url_unicode(self):
157 url = u'/home/repos/malmΓΆ'
165 url = "/home/repos/malmΓΆ"
158 166 assert self.repo._get_url(url)
159 167
160 168
161 169 @pytest.mark.usefixtures("vcs_repository_support")
162 170 class TestDeprecatedRepositoryAPI(BackendTestMixin):
163 171 recreate_repo_per_test = False
164 172
165 173 def test_revisions_is_deprecated(self):
166 174 def get_revisions(repo):
167 175 return repo.revisions
176
168 177 pytest.deprecated_call(get_revisions, self.repo)
169 178
170 179 def test_get_changeset_is_deprecated(self):
171 180 pytest.deprecated_call(self.repo.get_changeset)
172 181
173 182 def test_get_changesets_is_deprecated(self):
174 183 pytest.deprecated_call(self.repo.get_changesets)
175 184
176 185 def test_in_memory_changeset_is_deprecated(self):
177 186 def get_imc(repo):
178 187 return repo.in_memory_changeset
188
179 189 pytest.deprecated_call(get_imc, self.repo)
180 190
181 191
182 192 # TODO: these tests are incomplete, must check the resulting compare result for
183 193 # correcteness
184 194 class TestRepositoryCompare:
185
186 @pytest.mark.parametrize('merge', [True, False])
195 @pytest.mark.parametrize("merge", [True, False])
187 196 def test_compare_commits_of_same_repository(self, vcsbackend, merge):
188 197 target_repo = vcsbackend.create_repo(number_of_commits=5)
189 198 target_repo.compare(
190 target_repo[1].raw_id, target_repo[3].raw_id, target_repo,
191 merge=merge)
199 target_repo[1].raw_id, target_repo[3].raw_id, target_repo, merge=merge
200 )
192 201
193 @pytest.mark.xfail_backends('svn')
194 @pytest.mark.parametrize('merge', [True, False])
202 @pytest.mark.xfail_backends("svn")
203 @pytest.mark.parametrize("merge", [True, False])
195 204 def test_compare_cloned_repositories(self, vcsbackend, merge):
196 205 target_repo = vcsbackend.create_repo(number_of_commits=5)
197 206 source_repo = vcsbackend.clone_repo(target_repo)
198 207 assert target_repo != source_repo
199 208
200 vcsbackend.add_file(source_repo, b'newfile', b'somecontent')
209 vcsbackend.add_file(source_repo, b"newfile", b"somecontent")
201 210 source_commit = source_repo.get_commit()
202 211
203 212 target_repo.compare(
204 target_repo[1].raw_id, source_repo[3].raw_id, source_repo,
205 merge=merge)
213 target_repo[1].raw_id, source_repo[3].raw_id, source_repo, merge=merge
214 )
206 215
207 @pytest.mark.xfail_backends('svn')
208 @pytest.mark.parametrize('merge', [True, False])
216 @pytest.mark.xfail_backends("svn")
217 @pytest.mark.parametrize("merge", [True, False])
209 218 def test_compare_unrelated_repositories(self, vcsbackend, merge):
210 219 orig = vcsbackend.create_repo(number_of_commits=5)
211 220 unrelated = vcsbackend.create_repo(number_of_commits=5)
212 221 assert orig != unrelated
213 222
214 orig.compare(
215 orig[1].raw_id, unrelated[3].raw_id, unrelated, merge=merge)
223 orig.compare(orig[1].raw_id, unrelated[3].raw_id, unrelated, merge=merge)
216 224
217 225
218 226 class TestRepositoryGetCommonAncestor:
219
220 227 def test_get_common_ancestor_from_same_repo_existing(self, vcsbackend):
221 228 target_repo = vcsbackend.create_repo(number_of_commits=5)
222 229
223 230 expected_ancestor = target_repo[2].raw_id
224 231
225 assert target_repo.get_common_ancestor(
232 assert (
233 target_repo.get_common_ancestor(
226 234 commit_id1=target_repo[2].raw_id,
227 235 commit_id2=target_repo[4].raw_id,
228 repo2=target_repo
229 ) == expected_ancestor
236 repo2=target_repo,
237 )
238 == expected_ancestor
239 )
230 240
231 assert target_repo.get_common_ancestor(
241 assert (
242 target_repo.get_common_ancestor(
232 243 commit_id1=target_repo[4].raw_id,
233 244 commit_id2=target_repo[2].raw_id,
234 repo2=target_repo
235 ) == expected_ancestor
245 repo2=target_repo,
246 )
247 == expected_ancestor
248 )
236 249
237 250 @pytest.mark.xfail_backends("svn")
238 251 def test_get_common_ancestor_from_cloned_repo_existing(self, vcsbackend):
239 252 target_repo = vcsbackend.create_repo(number_of_commits=5)
240 253 source_repo = vcsbackend.clone_repo(target_repo)
241 254 assert target_repo != source_repo
242 255
243 vcsbackend.add_file(source_repo, b'newfile', b'somecontent')
256 vcsbackend.add_file(source_repo, b"newfile", b"somecontent")
244 257 source_commit = source_repo.get_commit()
245 258
246 259 expected_ancestor = target_repo[4].raw_id
247 260
248 assert target_repo.get_common_ancestor(
261 assert (
262 target_repo.get_common_ancestor(
249 263 commit_id1=target_repo[4].raw_id,
250 264 commit_id2=source_commit.raw_id,
251 repo2=source_repo
252 ) == expected_ancestor
265 repo2=source_repo,
266 )
267 == expected_ancestor
268 )
253 269
254 assert target_repo.get_common_ancestor(
270 assert (
271 target_repo.get_common_ancestor(
255 272 commit_id1=source_commit.raw_id,
256 273 commit_id2=target_repo[4].raw_id,
257 repo2=target_repo
258 ) == expected_ancestor
274 repo2=target_repo,
275 )
276 == expected_ancestor
277 )
259 278
260 279 @pytest.mark.xfail_backends("svn")
261 280 def test_get_common_ancestor_from_unrelated_repo_missing(self, vcsbackend):
262 281 original = vcsbackend.create_repo(number_of_commits=5)
263 282 unrelated = vcsbackend.create_repo(number_of_commits=5)
264 283 assert original != unrelated
265 284
266 assert original.get_common_ancestor(
285 assert (
286 original.get_common_ancestor(
267 287 commit_id1=original[0].raw_id,
268 288 commit_id2=unrelated[0].raw_id,
269 repo2=unrelated
270 ) is None
289 repo2=unrelated,
290 )
291 is None
292 )
271 293
272 assert original.get_common_ancestor(
294 assert (
295 original.get_common_ancestor(
273 296 commit_id1=original[-1].raw_id,
274 297 commit_id2=unrelated[-1].raw_id,
275 repo2=unrelated
276 ) is None
298 repo2=unrelated,
299 )
300 is None
301 )
277 302
278 303
279 304 @pytest.mark.backends("git", "hg")
280 305 class TestRepositoryMerge(object):
281 306 def prepare_for_success(self, vcsbackend):
282 307 self.target_repo = vcsbackend.create_repo(number_of_commits=1)
283 308 self.source_repo = vcsbackend.clone_repo(self.target_repo)
284 vcsbackend.add_file(self.target_repo, b'README_MERGE1', b'Version 1')
285 vcsbackend.add_file(self.source_repo, b'README_MERGE2', b'Version 2')
309 vcsbackend.add_file(self.target_repo, b"README_MERGE1", b"Version 1")
310 vcsbackend.add_file(self.source_repo, b"README_MERGE2", b"Version 2")
286 311 imc = self.source_repo.in_memory_commit
287 imc.add(FileNode(b'file_x', content=self.source_repo.name))
312 imc.add(FileNode(b"file_x", content=self.source_repo.name))
288 313 imc.commit(
289 message=u'Automatic commit from repo merge test',
290 author=u'Automatic <automatic@rhodecode.com>')
314 message="Automatic commit from repo merge test",
315 author="Automatic <automatic@rhodecode.com>",
316 )
291 317 self.target_commit = self.target_repo.get_commit()
292 318 self.source_commit = self.source_repo.get_commit()
293 319 # This only works for Git and Mercurial
294 320 default_branch = self.target_repo.DEFAULT_BRANCH_NAME
295 self.target_ref = Reference('branch', default_branch, self.target_commit.raw_id)
296 self.source_ref = Reference('branch', default_branch, self.source_commit.raw_id)
297 self.workspace_id = 'test-merge-{}'.format(vcsbackend.alias)
321 self.target_ref = Reference("branch", default_branch, self.target_commit.raw_id)
322 self.source_ref = Reference("branch", default_branch, self.source_commit.raw_id)
323 self.workspace_id = "test-merge-{}".format(vcsbackend.alias)
298 324 self.repo_id = repo_id_generator(self.target_repo.path)
299 325
300 326 def prepare_for_conflict(self, vcsbackend):
301 327 self.target_repo = vcsbackend.create_repo(number_of_commits=1)
302 328 self.source_repo = vcsbackend.clone_repo(self.target_repo)
303 vcsbackend.add_file(self.target_repo, b'README_MERGE', b'Version 1')
304 vcsbackend.add_file(self.source_repo, b'README_MERGE', b'Version 2')
329 vcsbackend.add_file(self.target_repo, b"README_MERGE", b"Version 1")
330 vcsbackend.add_file(self.source_repo, b"README_MERGE", b"Version 2")
305 331 self.target_commit = self.target_repo.get_commit()
306 332 self.source_commit = self.source_repo.get_commit()
307 333 # This only works for Git and Mercurial
308 334 default_branch = self.target_repo.DEFAULT_BRANCH_NAME
309 self.target_ref = Reference('branch', default_branch, self.target_commit.raw_id)
310 self.source_ref = Reference('branch', default_branch, self.source_commit.raw_id)
311 self.workspace_id = 'test-merge-{}'.format(vcsbackend.alias)
335 self.target_ref = Reference("branch", default_branch, self.target_commit.raw_id)
336 self.source_ref = Reference("branch", default_branch, self.source_commit.raw_id)
337 self.workspace_id = "test-merge-{}".format(vcsbackend.alias)
312 338 self.repo_id = repo_id_generator(self.target_repo.path)
313 339
314 340 def test_merge_success(self, vcsbackend):
315 341 self.prepare_for_success(vcsbackend)
316 342
317 343 merge_response = self.target_repo.merge(
318 self.repo_id, self.workspace_id, self.target_ref, self.source_repo,
344 self.repo_id,
345 self.workspace_id,
346 self.target_ref,
347 self.source_repo,
319 348 self.source_ref,
320 'test user', 'test@rhodecode.com', 'merge message 1',
321 dry_run=False)
349 "test user",
350 "test@rhodecode.com",
351 "merge message 1",
352 dry_run=False,
353 )
322 354 expected_merge_response = MergeResponse(
323 True, True, merge_response.merge_ref,
324 MergeFailureReason.NONE)
355 True, True, merge_response.merge_ref, MergeFailureReason.NONE
356 )
325 357 assert merge_response == expected_merge_response
326 358
327 target_repo = backends.get_backend(vcsbackend.alias)(
328 self.target_repo.path)
359 target_repo = backends.get_backend(vcsbackend.alias)(self.target_repo.path)
329 360 target_commits = list(target_repo.get_commits())
330 361 commit_ids = [c.raw_id for c in target_commits[:-1]]
331 362 assert self.source_ref.commit_id in commit_ids
332 363 assert self.target_ref.commit_id in commit_ids
333 364
334 365 merge_commit = target_commits[-1]
335 366 assert merge_commit.raw_id == merge_response.merge_ref.commit_id
336 assert merge_commit.message.strip() == 'merge message 1'
337 assert merge_commit.author == 'test user <test@rhodecode.com>'
367 assert merge_commit.message.strip() == "merge message 1"
368 assert merge_commit.author == "test user <test@rhodecode.com>"
338 369
339 370 # We call it twice so to make sure we can handle updates
340 371 target_ref = Reference(
341 self.target_ref.type, self.target_ref.name,
342 merge_response.merge_ref.commit_id)
372 self.target_ref.type,
373 self.target_ref.name,
374 merge_response.merge_ref.commit_id,
375 )
343 376
344 377 merge_response = target_repo.merge(
345 self.repo_id, self.workspace_id, target_ref, self.source_repo, self.source_ref,
346 'test user', 'test@rhodecode.com', 'merge message 2',
347 dry_run=False)
378 self.repo_id,
379 self.workspace_id,
380 target_ref,
381 self.source_repo,
382 self.source_ref,
383 "test user",
384 "test@rhodecode.com",
385 "merge message 2",
386 dry_run=False,
387 )
348 388 expected_merge_response = MergeResponse(
349 True, True, merge_response.merge_ref,
350 MergeFailureReason.NONE)
389 True, True, merge_response.merge_ref, MergeFailureReason.NONE
390 )
351 391 assert merge_response == expected_merge_response
352 392
353 target_repo = backends.get_backend(
354 vcsbackend.alias)(self.target_repo.path)
355 merge_commit = target_repo.get_commit(
356 merge_response.merge_ref.commit_id)
357 assert merge_commit.message.strip() == 'merge message 1'
358 assert merge_commit.author == 'test user <test@rhodecode.com>'
393 target_repo = backends.get_backend(vcsbackend.alias)(self.target_repo.path)
394 merge_commit = target_repo.get_commit(merge_response.merge_ref.commit_id)
395 assert merge_commit.message.strip() == "merge message 1"
396 assert merge_commit.author == "test user <test@rhodecode.com>"
359 397
360 398 def test_merge_success_dry_run(self, vcsbackend):
361 399 self.prepare_for_success(vcsbackend)
362 400
363 401 merge_response = self.target_repo.merge(
364 self.repo_id, self.workspace_id, self.target_ref, self.source_repo,
365 self.source_ref, dry_run=True)
402 self.repo_id,
403 self.workspace_id,
404 self.target_ref,
405 self.source_repo,
406 self.source_ref,
407 dry_run=True,
408 )
366 409
367 410 # We call it twice so to make sure we can handle updates
368 411 merge_response_update = self.target_repo.merge(
369 self.repo_id, self.workspace_id, self.target_ref, self.source_repo,
370 self.source_ref, dry_run=True)
412 self.repo_id,
413 self.workspace_id,
414 self.target_ref,
415 self.source_repo,
416 self.source_ref,
417 dry_run=True,
418 )
419
420 assert merge_response.merge_ref
421 assert merge_response_update.merge_ref
371 422
372 423 # Multiple merges may differ in their commit id. Therefore, we set the
373 424 # commit id to `None` before comparing the merge responses.
374 new_merge_ref = merge_response.merge_ref.commit_id = None
375 merge_response.merge_ref = new_merge_ref
425 merge_response.merge_ref.commit_id = 'abcdeabcde'
376 426
377 new_update_merge_ref = merge_response_update.merge_ref.commit_id = None
378 merge_response_update.merge_ref = new_update_merge_ref
427 merge_response_update.merge_ref.commit_id = 'abcdeabcde'
379 428
380 429 assert merge_response == merge_response_update
381 430 assert merge_response.possible is True
382 431 assert merge_response.executed is False
383 432 assert merge_response.merge_ref
384 433 assert merge_response.failure_reason is MergeFailureReason.NONE
385 434
386 @pytest.mark.parametrize('dry_run', [True, False])
435 @pytest.mark.parametrize("dry_run", [True, False])
387 436 def test_merge_conflict(self, vcsbackend, dry_run):
388 437 self.prepare_for_conflict(vcsbackend)
389 438
390 439 expected_merge_response = MergeResponse(
391 False, False, None, MergeFailureReason.MERGE_FAILED)
440 False, False, None, MergeFailureReason.MERGE_FAILED
441 )
392 442
393 443 merge_response = self.target_repo.merge(
394 self.repo_id, self.workspace_id, self.target_ref,
395 self.source_repo, self.source_ref,
396 'test_user', 'test@rhodecode.com', 'test message', dry_run=dry_run)
444 self.repo_id,
445 self.workspace_id,
446 self.target_ref,
447 self.source_repo,
448 self.source_ref,
449 "test_user",
450 "test@rhodecode.com",
451 "test message",
452 dry_run=dry_run,
453 )
397 454 assert merge_response == expected_merge_response
398 455
399 456 # We call it twice so to make sure we can handle updates
400 457 merge_response = self.target_repo.merge(
401 self.repo_id, self.workspace_id, self.target_ref, self.source_repo,
458 self.repo_id,
459 self.workspace_id,
460 self.target_ref,
461 self.source_repo,
402 462 self.source_ref,
403 'test_user', 'test@rhodecode.com', 'test message', dry_run=dry_run)
463 "test_user",
464 "test@rhodecode.com",
465 "test message",
466 dry_run=dry_run,
467 )
404 468 assert merge_response == expected_merge_response
405 469
406 470 def test_merge_target_is_not_head(self, vcsbackend):
407 471 self.prepare_for_success(vcsbackend)
408 target_ref = Reference(
409 self.target_ref.type, self.target_ref.name, '0' * 40)
472 target_ref = Reference(self.target_ref.type, self.target_ref.name, "0" * 40)
410 473 expected_merge_response = MergeResponse(
411 False, False, None, MergeFailureReason.TARGET_IS_NOT_HEAD,
412 metadata={'target_ref': target_ref})
474 False,
475 False,
476 None,
477 MergeFailureReason.TARGET_IS_NOT_HEAD,
478 metadata={"target_ref": target_ref},
479 )
413 480 merge_response = self.target_repo.merge(
414 self.repo_id, self.workspace_id, target_ref, self.source_repo,
415 self.source_ref, dry_run=True)
481 self.repo_id,
482 self.workspace_id,
483 target_ref,
484 self.source_repo,
485 self.source_ref,
486 dry_run=True,
487 )
416 488
417 489 assert merge_response == expected_merge_response
418 490
419 491 def test_merge_missing_source_reference(self, vcsbackend):
420 492 self.prepare_for_success(vcsbackend)
421 493
422 494 source_ref = Reference(
423 self.source_ref.type, 'not_existing', self.source_ref.commit_id)
495 self.source_ref.type, "not_existing", self.source_ref.commit_id
496 )
424 497 expected_merge_response = MergeResponse(
425 False, False, None, MergeFailureReason.MISSING_SOURCE_REF,
426 metadata={'source_ref': source_ref})
498 False,
499 False,
500 None,
501 MergeFailureReason.MISSING_SOURCE_REF,
502 metadata={"source_ref": source_ref},
503 )
427 504
428 505 merge_response = self.target_repo.merge(
429 self.repo_id, self.workspace_id, self.target_ref,
430 self.source_repo, source_ref,
431 dry_run=True)
506 self.repo_id,
507 self.workspace_id,
508 self.target_ref,
509 self.source_repo,
510 source_ref,
511 dry_run=True,
512 )
432 513
433 514 assert merge_response == expected_merge_response
434 515
435 516 def test_merge_raises_exception(self, vcsbackend):
436 517 self.prepare_for_success(vcsbackend)
437 518 expected_merge_response = MergeResponse(
438 False, False, None, MergeFailureReason.UNKNOWN,
439 metadata={'exception': 'ErrorForTest'})
519 False,
520 False,
521 None,
522 MergeFailureReason.UNKNOWN,
523 metadata={"exception": "ErrorForTest"},
524 )
440 525
441 with mock.patch.object(self.target_repo, '_merge_repo',
442 side_effect=RepositoryError()):
526 with mock.patch.object(
527 self.target_repo, "_merge_repo", side_effect=RepositoryError()
528 ):
443 529 merge_response = self.target_repo.merge(
444 self.repo_id, self.workspace_id, self.target_ref,
445 self.source_repo, self.source_ref,
446 dry_run=True)
530 self.repo_id,
531 self.workspace_id,
532 self.target_ref,
533 self.source_repo,
534 self.source_ref,
535 dry_run=True,
536 )
447 537
448 538 assert merge_response == expected_merge_response
449 539
450 540 def test_merge_invalid_user_name(self, vcsbackend):
451 541 repo = vcsbackend.create_repo(number_of_commits=1)
452 ref = Reference('branch', 'master', 'not_used')
453 workspace_id = 'test-errors-in-merge'
542 ref = Reference("branch", "master", "not_used")
543 workspace_id = "test-errors-in-merge"
454 544 repo_id = repo_id_generator(workspace_id)
455 545 with pytest.raises(ValueError):
456 546 repo.merge(repo_id, workspace_id, ref, self, ref)
457 547
458 548 def test_merge_invalid_user_email(self, vcsbackend):
459 549 repo = vcsbackend.create_repo(number_of_commits=1)
460 ref = Reference('branch', 'master', 'not_used')
461 workspace_id = 'test-errors-in-merge'
550 ref = Reference("branch", "master", "not_used")
551 workspace_id = "test-errors-in-merge"
552 repo_id = repo_id_generator(workspace_id)
553 with pytest.raises(ValueError):
554 repo.merge(repo_id, workspace_id, ref, self, ref, "user name")
555
556 def test_merge_invalid_message(self, vcsbackend):
557 repo = vcsbackend.create_repo(number_of_commits=1)
558 ref = Reference("branch", "master", "not_used")
559 workspace_id = "test-errors-in-merge"
462 560 repo_id = repo_id_generator(workspace_id)
463 561 with pytest.raises(ValueError):
464 562 repo.merge(
465 repo_id, workspace_id, ref, self, ref, 'user name')
466
467 def test_merge_invalid_message(self, vcsbackend):
468 repo = vcsbackend.create_repo(number_of_commits=1)
469 ref = Reference('branch', 'master', 'not_used')
470 workspace_id = 'test-errors-in-merge'
471 repo_id = repo_id_generator(workspace_id)
472 with pytest.raises(ValueError):
473 repo.merge(
474 repo_id, workspace_id, ref, self, ref,
475 'user name', 'user@email.com')
563 repo_id, workspace_id, ref, self, ref, "user name", "user@email.com"
564 )
476 565
477 566
478 567 @pytest.mark.usefixtures("vcs_repository_support")
479 568 class TestRepositoryStrip(BackendTestMixin):
480 569 recreate_repo_per_test = True
481 570
482 571 @classmethod
483 572 def _get_commits(cls):
484 573 commits = [
485 574 {
486 'message': 'Initial commit',
487 'author': 'Joe Doe <joe.doe@example.com>',
488 'date': datetime.datetime(2010, 1, 1, 20),
489 'branch': 'master',
490 'added': [
491 FileNode(b'foobar', content='foobar'),
492 FileNode(b'foobar2', content='foobar2'),
575 "message": "Initial commit",
576 "author": "Joe Doe <joe.doe@example.com>",
577 "date": datetime.datetime(2010, 1, 1, 20),
578 "branch": "master",
579 "added": [
580 FileNode(b"foobar", content="foobar"),
581 FileNode(b"foobar2", content="foobar2"),
493 582 ],
494 583 },
495 584 ]
496 585 for x in range(10):
497 586 commit_data = {
498 'message': 'Changed foobar - commit%s' % x,
499 'author': 'Jane Doe <jane.doe@example.com>',
500 'date': datetime.datetime(2010, 1, 1, 21, x),
501 'branch': 'master',
502 'changed': [
503 FileNode(b'foobar', 'FOOBAR - %s' % x),
587 "message": "Changed foobar - commit%s" % x,
588 "author": "Jane Doe <jane.doe@example.com>",
589 "date": datetime.datetime(2010, 1, 1, 21, x),
590 "branch": "master",
591 "changed": [
592 FileNode(b"foobar", "FOOBAR - %s" % x),
504 593 ],
505 594 }
506 595 commits.append(commit_data)
507 596 return commits
508 597
509 598 @pytest.mark.backends("git", "hg")
510 599 def test_strip_commit(self):
511 600 tip = self.repo.get_commit()
512 601 assert tip.idx == 10
513 602 self.repo.strip(tip.raw_id, self.repo.DEFAULT_BRANCH_NAME)
514 603
515 604 tip = self.repo.get_commit()
516 605 assert tip.idx == 9
517 606
518 607 @pytest.mark.backends("git", "hg")
519 608 def test_strip_multiple_commits(self):
520 609 tip = self.repo.get_commit()
521 610 assert tip.idx == 10
522 611
523 612 old = self.repo.get_commit(commit_idx=5)
524 613 self.repo.strip(old.raw_id, self.repo.DEFAULT_BRANCH_NAME)
525 614
526 615 tip = self.repo.get_commit()
527 616 assert tip.idx == 4
528 617
529 618
530 @pytest.mark.backends('hg', 'git')
619 @pytest.mark.backends("hg", "git")
531 620 class TestRepositoryPull(object):
532
533 621 def test_pull(self, vcsbackend):
534 622 source_repo = vcsbackend.repo
535 623 target_repo = vcsbackend.create_repo()
536 624 assert len(source_repo.commit_ids) > len(target_repo.commit_ids)
537 625
538 626 target_repo.pull(source_repo.path)
539 627 # Note: Get a fresh instance, avoids caching trouble
540 628 target_repo = vcsbackend.backend(target_repo.path)
541 629 assert len(source_repo.commit_ids) == len(target_repo.commit_ids)
542 630
543 631 def test_pull_wrong_path(self, vcsbackend):
544 632 target_repo = vcsbackend.create_repo()
545 633 with pytest.raises(RepositoryError):
546 634 target_repo.pull(target_repo.path + "wrong")
547 635
548 636 def test_pull_specific_commits(self, vcsbackend):
549 637 source_repo = vcsbackend.repo
550 638 target_repo = vcsbackend.create_repo()
551 639
552 640 second_commit = source_repo[1].raw_id
553 if vcsbackend.alias == 'git':
554 second_commit_ref = 'refs/test-refs/a'
641 if vcsbackend.alias == "git":
642 second_commit_ref = "refs/test-refs/a"
555 643 source_repo.set_refs(second_commit_ref, second_commit)
556 644
557 645 target_repo.pull(source_repo.path, commit_ids=[second_commit])
558 646 target_repo = vcsbackend.backend(target_repo.path)
559 647 assert 2 == len(target_repo.commit_ids)
560 648 assert second_commit == target_repo.get_commit().raw_id
General Comments 0
You need to be logged in to leave comments. Login now