##// END OF EJS Templates
tests: Adapt asserts to work with merge reference onbject in merge response.
Martin Bornhold -
r1059:eda82a60 default
parent child Browse files
Show More
@@ -1,523 +1,525 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import datetime
22 22 from urllib2 import URLError
23 23
24 24 import mock
25 25 import pytest
26 26
27 27 from rhodecode.lib.vcs import backends
28 28 from rhodecode.lib.vcs.backends.base import (
29 29 Config, BaseInMemoryCommit, Reference, MergeResponse, MergeFailureReason)
30 30 from rhodecode.lib.vcs.exceptions import VCSError, RepositoryError
31 31 from rhodecode.lib.vcs.nodes import FileNode
32 32 from rhodecode.tests.vcs.base import BackendTestMixin
33 33
34 34
35 35 class TestRepositoryBase(BackendTestMixin):
36 36 recreate_repo_per_test = False
37 37
38 38 def test_init_accepts_unicode_path(self, tmpdir):
39 39 path = unicode(tmpdir.join(u'unicode Γ€'))
40 40 self.Backend(path, create=True)
41 41
42 42 def test_init_accepts_str_path(self, tmpdir):
43 43 path = str(tmpdir.join('str Γ€'))
44 44 self.Backend(path, create=True)
45 45
46 46 def test_init_fails_if_path_does_not_exist(self, tmpdir):
47 47 path = unicode(tmpdir.join('i-do-not-exist'))
48 48 with pytest.raises(VCSError):
49 49 self.Backend(path)
50 50
51 51 def test_init_fails_if_path_is_not_a_valid_repository(self, tmpdir):
52 52 path = unicode(tmpdir.mkdir(u'unicode Γ€'))
53 53 with pytest.raises(VCSError):
54 54 self.Backend(path)
55 55
56 56 def test_has_commits_attribute(self):
57 57 self.repo.commit_ids
58 58
59 59 def test_name(self):
60 60 assert self.repo.name.startswith('vcs-test')
61 61
62 62 @pytest.mark.backends("hg", "git")
63 63 def test_has_default_branch_name(self):
64 64 assert self.repo.DEFAULT_BRANCH_NAME is not None
65 65
66 66 @pytest.mark.backends("svn")
67 67 def test_has_no_default_branch_name(self):
68 68 assert self.repo.DEFAULT_BRANCH_NAME is None
69 69
70 70 def test_has_empty_commit(self):
71 71 assert self.repo.EMPTY_COMMIT_ID is not None
72 72 assert self.repo.EMPTY_COMMIT is not None
73 73
74 74 def test_empty_changeset_is_deprecated(self):
75 75 def get_empty_changeset(repo):
76 76 return repo.EMPTY_CHANGESET
77 77 pytest.deprecated_call(get_empty_changeset, self.repo)
78 78
79 79 def test_bookmarks(self):
80 80 assert len(self.repo.bookmarks) == 0
81 81
82 82 # TODO: Cover two cases: Local repo path, remote URL
83 83 def test_check_url(self):
84 84 config = Config()
85 85 assert self.Backend.check_url(self.repo.path, config)
86 86
87 87 def test_check_url_invalid(self):
88 88 config = Config()
89 89 with pytest.raises(URLError):
90 90 self.Backend.check_url(self.repo.path + "invalid", config)
91 91
92 92 def test_get_contact(self):
93 93 self.repo.contact
94 94
95 95 def test_get_description(self):
96 96 self.repo.description
97 97
98 98 def test_get_hook_location(self):
99 99 assert len(self.repo.get_hook_location()) != 0
100 100
101 101 def test_last_change(self):
102 102 assert self.repo.last_change >= datetime.datetime(2010, 1, 1, 21, 0)
103 103
104 104 def test_last_change_in_empty_repository(self, vcsbackend):
105 105 delta = datetime.timedelta(seconds=1)
106 106 start = datetime.datetime.now()
107 107 empty_repo = vcsbackend.create_repo()
108 108 now = datetime.datetime.now()
109 109 assert empty_repo.last_change >= start - delta
110 110 assert empty_repo.last_change <= now + delta
111 111
112 112 def test_repo_equality(self):
113 113 assert self.repo == self.repo
114 114
115 115 def test_repo_equality_broken_object(self):
116 116 import copy
117 117 _repo = copy.copy(self.repo)
118 118 delattr(_repo, 'path')
119 119 assert self.repo != _repo
120 120
121 121 def test_repo_equality_other_object(self):
122 122 class dummy(object):
123 123 path = self.repo.path
124 124 assert self.repo != dummy()
125 125
126 126 def test_get_commit_is_implemented(self):
127 127 self.repo.get_commit()
128 128
129 129 def test_get_commits_is_implemented(self):
130 130 commit_iter = iter(self.repo.get_commits())
131 131 commit = next(commit_iter)
132 132 assert commit.idx == 0
133 133
134 134 def test_supports_iteration(self):
135 135 repo_iter = iter(self.repo)
136 136 commit = next(repo_iter)
137 137 assert commit.idx == 0
138 138
139 139 def test_in_memory_commit(self):
140 140 imc = self.repo.in_memory_commit
141 141 assert isinstance(imc, BaseInMemoryCommit)
142 142
143 143 @pytest.mark.backends("hg")
144 144 def test__get_url_unicode(self):
145 145 url = u'/home/repos/malmΓΆ'
146 146 assert self.repo._get_url(url)
147 147
148 148
149 149 class TestDeprecatedRepositoryAPI(BackendTestMixin):
150 150 recreate_repo_per_test = False
151 151
152 152 def test_revisions_is_deprecated(self):
153 153 def get_revisions(repo):
154 154 return repo.revisions
155 155 pytest.deprecated_call(get_revisions, self.repo)
156 156
157 157 def test_get_changeset_is_deprecated(self):
158 158 pytest.deprecated_call(self.repo.get_changeset)
159 159
160 160 def test_get_changesets_is_deprecated(self):
161 161 pytest.deprecated_call(self.repo.get_changesets)
162 162
163 163 def test_in_memory_changeset_is_deprecated(self):
164 164 def get_imc(repo):
165 165 return repo.in_memory_changeset
166 166 pytest.deprecated_call(get_imc, self.repo)
167 167
168 168
169 169 # TODO: these tests are incomplete, must check the resulting compare result for
170 170 # correcteness
171 171 class TestRepositoryCompare:
172 172
173 173 @pytest.mark.parametrize('merge', [True, False])
174 174 def test_compare_commits_of_same_repository(self, vcsbackend, merge):
175 175 target_repo = vcsbackend.create_repo(number_of_commits=5)
176 176 target_repo.compare(
177 177 target_repo[1].raw_id, target_repo[3].raw_id, target_repo,
178 178 merge=merge)
179 179
180 180 @pytest.mark.xfail_backends('svn')
181 181 @pytest.mark.parametrize('merge', [True, False])
182 182 def test_compare_cloned_repositories(self, vcsbackend, merge):
183 183 target_repo = vcsbackend.create_repo(number_of_commits=5)
184 184 source_repo = vcsbackend.clone_repo(target_repo)
185 185 assert target_repo != source_repo
186 186
187 187 vcsbackend.add_file(source_repo, 'newfile', 'somecontent')
188 188 source_commit = source_repo.get_commit()
189 189
190 190 target_repo.compare(
191 191 target_repo[1].raw_id, source_repo[3].raw_id, source_repo,
192 192 merge=merge)
193 193
194 194 @pytest.mark.xfail_backends('svn')
195 195 @pytest.mark.parametrize('merge', [True, False])
196 196 def test_compare_unrelated_repositories(self, vcsbackend, merge):
197 197 orig = vcsbackend.create_repo(number_of_commits=5)
198 198 unrelated = vcsbackend.create_repo(number_of_commits=5)
199 199 assert orig != unrelated
200 200
201 201 orig.compare(
202 202 orig[1].raw_id, unrelated[3].raw_id, unrelated, merge=merge)
203 203
204 204
205 205 class TestRepositoryGetCommonAncestor:
206 206
207 207 def test_get_common_ancestor_from_same_repo_existing(self, vcsbackend):
208 208 target_repo = vcsbackend.create_repo(number_of_commits=5)
209 209
210 210 expected_ancestor = target_repo[2].raw_id
211 211
212 212 assert target_repo.get_common_ancestor(
213 213 commit_id1=target_repo[2].raw_id,
214 214 commit_id2=target_repo[4].raw_id,
215 215 repo2=target_repo
216 216 ) == expected_ancestor
217 217
218 218 assert target_repo.get_common_ancestor(
219 219 commit_id1=target_repo[4].raw_id,
220 220 commit_id2=target_repo[2].raw_id,
221 221 repo2=target_repo
222 222 ) == expected_ancestor
223 223
224 224 @pytest.mark.xfail_backends("svn")
225 225 def test_get_common_ancestor_from_cloned_repo_existing(self, vcsbackend):
226 226 target_repo = vcsbackend.create_repo(number_of_commits=5)
227 227 source_repo = vcsbackend.clone_repo(target_repo)
228 228 assert target_repo != source_repo
229 229
230 230 vcsbackend.add_file(source_repo, 'newfile', 'somecontent')
231 231 source_commit = source_repo.get_commit()
232 232
233 233 expected_ancestor = target_repo[4].raw_id
234 234
235 235 assert target_repo.get_common_ancestor(
236 236 commit_id1=target_repo[4].raw_id,
237 237 commit_id2=source_commit.raw_id,
238 238 repo2=source_repo
239 239 ) == expected_ancestor
240 240
241 241 assert target_repo.get_common_ancestor(
242 242 commit_id1=source_commit.raw_id,
243 243 commit_id2=target_repo[4].raw_id,
244 244 repo2=target_repo
245 245 ) == expected_ancestor
246 246
247 247 @pytest.mark.xfail_backends("svn")
248 248 def test_get_common_ancestor_from_unrelated_repo_missing(self, vcsbackend):
249 249 original = vcsbackend.create_repo(number_of_commits=5)
250 250 unrelated = vcsbackend.create_repo(number_of_commits=5)
251 251 assert original != unrelated
252 252
253 253 assert original.get_common_ancestor(
254 254 commit_id1=original[0].raw_id,
255 255 commit_id2=unrelated[0].raw_id,
256 256 repo2=unrelated
257 257 ) == None
258 258
259 259 assert original.get_common_ancestor(
260 260 commit_id1=original[-1].raw_id,
261 261 commit_id2=unrelated[-1].raw_id,
262 262 repo2=unrelated
263 263 ) == None
264 264
265 265
266 266 @pytest.mark.backends("git", "hg")
267 267 class TestRepositoryMerge:
268 268 def prepare_for_success(self, vcsbackend):
269 269 self.target_repo = vcsbackend.create_repo(number_of_commits=1)
270 270 self.source_repo = vcsbackend.clone_repo(self.target_repo)
271 271 vcsbackend.add_file(self.target_repo, 'README_MERGE1', 'Version 1')
272 272 vcsbackend.add_file(self.source_repo, 'README_MERGE2', 'Version 2')
273 273 imc = self.source_repo.in_memory_commit
274 274 imc.add(FileNode('file_x', content=self.source_repo.name))
275 275 imc.commit(
276 276 message=u'Automatic commit from repo merge test',
277 277 author=u'Automatic')
278 278 self.target_commit = self.target_repo.get_commit()
279 279 self.source_commit = self.source_repo.get_commit()
280 280 # This only works for Git and Mercurial
281 281 default_branch = self.target_repo.DEFAULT_BRANCH_NAME
282 282 self.target_ref = Reference(
283 283 'branch', default_branch, self.target_commit.raw_id)
284 284 self.source_ref = Reference(
285 285 'branch', default_branch, self.source_commit.raw_id)
286 286 self.workspace = 'test-merge'
287 287
288 288 def prepare_for_conflict(self, vcsbackend):
289 289 self.target_repo = vcsbackend.create_repo(number_of_commits=1)
290 290 self.source_repo = vcsbackend.clone_repo(self.target_repo)
291 291 vcsbackend.add_file(self.target_repo, 'README_MERGE', 'Version 1')
292 292 vcsbackend.add_file(self.source_repo, 'README_MERGE', 'Version 2')
293 293 self.target_commit = self.target_repo.get_commit()
294 294 self.source_commit = self.source_repo.get_commit()
295 295 # This only works for Git and Mercurial
296 296 default_branch = self.target_repo.DEFAULT_BRANCH_NAME
297 297 self.target_ref = Reference(
298 298 'branch', default_branch, self.target_commit.raw_id)
299 299 self.source_ref = Reference(
300 300 'branch', default_branch, self.source_commit.raw_id)
301 301 self.workspace = 'test-merge'
302 302
303 303 def test_merge_success(self, vcsbackend):
304 304 self.prepare_for_success(vcsbackend)
305 305
306 306 merge_response = self.target_repo.merge(
307 307 self.target_ref, self.source_repo, self.source_ref, self.workspace,
308 308 'test user', 'test@rhodecode.com', 'merge message 1',
309 309 dry_run=False)
310 310 expected_merge_response = MergeResponse(
311 311 True, True, merge_response.merge_ref,
312 312 MergeFailureReason.NONE)
313 313 assert merge_response == expected_merge_response
314 314
315 315 target_repo = backends.get_backend(vcsbackend.alias)(
316 316 self.target_repo.path)
317 317 target_commits = list(target_repo.get_commits())
318 318 commit_ids = [c.raw_id for c in target_commits[:-1]]
319 319 assert self.source_ref.commit_id in commit_ids
320 320 assert self.target_ref.commit_id in commit_ids
321 321
322 322 merge_commit = target_commits[-1]
323 assert merge_commit.raw_id == merge_response.merge_ref
323 assert merge_commit.raw_id == merge_response.merge_ref.commit_id
324 324 assert merge_commit.message.strip() == 'merge message 1'
325 325 assert merge_commit.author == 'test user <test@rhodecode.com>'
326 326
327 327 # We call it twice so to make sure we can handle updates
328 328 target_ref = Reference(
329 329 self.target_ref.type, self.target_ref.name,
330 330 merge_response.merge_ref.commit_id)
331 331
332 332 merge_response = target_repo.merge(
333 333 target_ref, self.source_repo, self.source_ref, self.workspace,
334 334 'test user', 'test@rhodecode.com', 'merge message 2',
335 335 dry_run=False)
336 336 expected_merge_response = MergeResponse(
337 337 True, True, merge_response.merge_ref,
338 338 MergeFailureReason.NONE)
339 339 assert merge_response == expected_merge_response
340 340
341 341 target_repo = backends.get_backend(
342 342 vcsbackend.alias)(self.target_repo.path)
343 343 merge_commit = target_repo.get_commit(
344 344 merge_response.merge_ref.commit_id)
345 345 assert merge_commit.message.strip() == 'merge message 1'
346 346 assert merge_commit.author == 'test user <test@rhodecode.com>'
347 347
348 348 def test_merge_success_dry_run(self, vcsbackend):
349 349 self.prepare_for_success(vcsbackend)
350 expected_merge_response = MergeResponse(
351 True, False, None, MergeFailureReason.NONE)
352 350
353 351 merge_response = self.target_repo.merge(
354 352 self.target_ref, self.source_repo, self.source_ref, self.workspace,
355 353 dry_run=True)
356 assert merge_response == expected_merge_response
357 354
358 355 # We call it twice so to make sure we can handle updates
359 merge_response = self.target_repo.merge(
356 merge_response_update = self.target_repo.merge(
360 357 self.target_ref, self.source_repo, self.source_ref, self.workspace,
361 358 dry_run=True)
362 assert merge_response == expected_merge_response
359
360 assert merge_response == merge_response_update
361 assert merge_response.possible is True
362 assert merge_response.executed is False
363 assert merge_response.merge_ref
364 assert merge_response.failure_reason is MergeFailureReason.NONE
363 365
364 366 @pytest.mark.parametrize('dry_run', [True, False])
365 367 def test_merge_conflict(self, vcsbackend, dry_run):
366 368 self.prepare_for_conflict(vcsbackend)
367 369 expected_merge_response = MergeResponse(
368 370 False, False, None, MergeFailureReason.MERGE_FAILED)
369 371
370 372 merge_response = self.target_repo.merge(
371 373 self.target_ref, self.source_repo, self.source_ref, self.workspace,
372 374 'test_user', 'test@rhodecode.com', 'test message', dry_run=dry_run)
373 375 assert merge_response == expected_merge_response
374 376
375 377 # We call it twice so to make sure we can handle updates
376 378 merge_response = self.target_repo.merge(
377 379 self.target_ref, self.source_repo, self.source_ref, self.workspace,
378 380 'test_user', 'test@rhodecode.com', 'test message', dry_run=dry_run)
379 381 assert merge_response == expected_merge_response
380 382
381 383 def test_merge_target_is_not_head(self, vcsbackend):
382 384 self.prepare_for_success(vcsbackend)
383 385 expected_merge_response = MergeResponse(
384 386 False, False, None, MergeFailureReason.TARGET_IS_NOT_HEAD)
385 387
386 388 target_ref = Reference(
387 389 self.target_ref.type, self.target_ref.name, '0' * 40)
388 390
389 391 merge_response = self.target_repo.merge(
390 392 target_ref, self.source_repo, self.source_ref, self.workspace,
391 393 dry_run=True)
392 394
393 395 assert merge_response == expected_merge_response
394 396
395 397 def test_merge_missing_commit(self, vcsbackend):
396 398 self.prepare_for_success(vcsbackend)
397 399 expected_merge_response = MergeResponse(
398 400 False, False, None, MergeFailureReason.MISSING_COMMIT)
399 401
400 402 source_ref = Reference(
401 403 self.source_ref.type, 'not_existing', self.source_ref.commit_id)
402 404
403 405 merge_response = self.target_repo.merge(
404 406 self.target_ref, self.source_repo, source_ref, self.workspace,
405 407 dry_run=True)
406 408
407 409 assert merge_response == expected_merge_response
408 410
409 411 def test_merge_raises_exception(self, vcsbackend):
410 412 self.prepare_for_success(vcsbackend)
411 413 expected_merge_response = MergeResponse(
412 414 False, False, None, MergeFailureReason.UNKNOWN)
413 415
414 416 with mock.patch.object(self.target_repo, '_merge_repo',
415 417 side_effect=RepositoryError()):
416 418 merge_response = self.target_repo.merge(
417 419 self.target_ref, self.source_repo, self.source_ref,
418 420 self.workspace, dry_run=True)
419 421
420 422 assert merge_response == expected_merge_response
421 423
422 424 def test_merge_invalid_user_name(self, vcsbackend):
423 425 repo = vcsbackend.create_repo(number_of_commits=1)
424 426 ref = Reference('branch', 'master', 'not_used')
425 427 with pytest.raises(ValueError):
426 428 repo.merge(ref, self, ref, 'workspace_id')
427 429
428 430 def test_merge_invalid_user_email(self, vcsbackend):
429 431 repo = vcsbackend.create_repo(number_of_commits=1)
430 432 ref = Reference('branch', 'master', 'not_used')
431 433 with pytest.raises(ValueError):
432 434 repo.merge(ref, self, ref, 'workspace_id', 'user name')
433 435
434 436 def test_merge_invalid_message(self, vcsbackend):
435 437 repo = vcsbackend.create_repo(number_of_commits=1)
436 438 ref = Reference('branch', 'master', 'not_used')
437 439 with pytest.raises(ValueError):
438 440 repo.merge(
439 441 ref, self, ref, 'workspace_id', 'user name', 'user@email.com')
440 442
441 443
442 444 class TestRepositoryStrip(BackendTestMixin):
443 445 recreate_repo_per_test = True
444 446
445 447 @classmethod
446 448 def _get_commits(cls):
447 449 commits = [
448 450 {
449 451 'message': 'Initial commit',
450 452 'author': 'Joe Doe <joe.doe@example.com>',
451 453 'date': datetime.datetime(2010, 1, 1, 20),
452 454 'branch': 'master',
453 455 'added': [
454 456 FileNode('foobar', content='foobar'),
455 457 FileNode('foobar2', content='foobar2'),
456 458 ],
457 459 },
458 460 ]
459 461 for x in xrange(10):
460 462 commit_data = {
461 463 'message': 'Changed foobar - commit%s' % x,
462 464 'author': 'Jane Doe <jane.doe@example.com>',
463 465 'date': datetime.datetime(2010, 1, 1, 21, x),
464 466 'branch': 'master',
465 467 'changed': [
466 468 FileNode('foobar', 'FOOBAR - %s' % x),
467 469 ],
468 470 }
469 471 commits.append(commit_data)
470 472 return commits
471 473
472 474 @pytest.mark.backends("git", "hg")
473 475 def test_strip_commit(self):
474 476 tip = self.repo.get_commit()
475 477 assert tip.idx == 10
476 478 self.repo.strip(tip.raw_id, self.repo.DEFAULT_BRANCH_NAME)
477 479
478 480 tip = self.repo.get_commit()
479 481 assert tip.idx == 9
480 482
481 483 @pytest.mark.backends("git", "hg")
482 484 def test_strip_multiple_commits(self):
483 485 tip = self.repo.get_commit()
484 486 assert tip.idx == 10
485 487
486 488 old = self.repo.get_commit(commit_idx=5)
487 489 self.repo.strip(old.raw_id, self.repo.DEFAULT_BRANCH_NAME)
488 490
489 491 tip = self.repo.get_commit()
490 492 assert tip.idx == 4
491 493
492 494
493 495 @pytest.mark.backends('hg', 'git')
494 496 class TestRepositoryPull:
495 497
496 498 def test_pull(self, vcsbackend):
497 499 source_repo = vcsbackend.repo
498 500 target_repo = vcsbackend.create_repo()
499 501 assert len(source_repo.commit_ids) > len(target_repo.commit_ids)
500 502
501 503 target_repo.pull(source_repo.path)
502 504 # Note: Get a fresh instance, avoids caching trouble
503 505 target_repo = vcsbackend.backend(target_repo.path)
504 506 assert len(source_repo.commit_ids) == len(target_repo.commit_ids)
505 507
506 508 def test_pull_wrong_path(self, vcsbackend):
507 509 target_repo = vcsbackend.create_repo()
508 510 with pytest.raises(RepositoryError):
509 511 target_repo.pull(target_repo.path + "wrong")
510 512
511 513 def test_pull_specific_commits(self, vcsbackend):
512 514 source_repo = vcsbackend.repo
513 515 target_repo = vcsbackend.create_repo()
514 516
515 517 second_commit = source_repo[1].raw_id
516 518 if vcsbackend.alias == 'git':
517 519 second_commit_ref = 'refs/test-refs/a'
518 520 source_repo.set_refs(second_commit_ref, second_commit)
519 521
520 522 target_repo.pull(source_repo.path, commit_ids=[second_commit])
521 523 target_repo = vcsbackend.backend(target_repo.path)
522 524 assert 2 == len(target_repo.commit_ids)
523 525 assert second_commit == target_repo.get_commit().raw_id
General Comments 0
You need to be logged in to leave comments. Login now