##// END OF EJS Templates
tests: fixes #4168 removing git xfails in pr and vcs commit tests
lisaq -
r677:191d2d88 default
parent child Browse files
Show More
@@ -1,917 +1,915 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import mock
22 22 import pytest
23 23 from webob.exc import HTTPNotFound
24 24
25 25 import rhodecode
26 26 from rhodecode.lib.vcs.nodes import FileNode
27 27 from rhodecode.model.changeset_status import ChangesetStatusModel
28 28 from rhodecode.model.db import (
29 29 PullRequest, ChangesetStatus, UserLog, Notification)
30 30 from rhodecode.model.meta import Session
31 31 from rhodecode.model.pull_request import PullRequestModel
32 32 from rhodecode.model.user import UserModel
33 33 from rhodecode.tests import assert_session_flash, url, TEST_USER_ADMIN_LOGIN
34 34 from rhodecode.tests.utils import AssertResponse
35 35
36 36
37 37 @pytest.mark.usefixtures('app', 'autologin_user')
38 38 @pytest.mark.backends("git", "hg")
39 39 class TestPullrequestsController:
40 40
41 41 def test_index(self, backend):
42 42 self.app.get(url(
43 43 controller='pullrequests', action='index',
44 44 repo_name=backend.repo_name))
45 45
46 46 def test_option_menu_create_pull_request_exists(self, backend):
47 47 repo_name = backend.repo_name
48 48 response = self.app.get(url('summary_home', repo_name=repo_name))
49 49
50 50 create_pr_link = '<a href="%s">Create Pull Request</a>' % url(
51 51 'pullrequest', repo_name=repo_name)
52 52 response.mustcontain(create_pr_link)
53 53
54 54 def test_global_redirect_of_pr(self, backend, pr_util):
55 55 pull_request = pr_util.create_pull_request()
56 56
57 57 response = self.app.get(
58 58 url('pull_requests_global',
59 59 pull_request_id=pull_request.pull_request_id))
60 60
61 61 repo_name = pull_request.target_repo.repo_name
62 62 redirect_url = url('pullrequest_show', repo_name=repo_name,
63 63 pull_request_id=pull_request.pull_request_id)
64 64 assert response.status == '302 Found'
65 65 assert redirect_url in response.location
66 66
67 @pytest.mark.xfail_backends(
68 "git", reason="Pending bugfix/feature, issue #6")
69 67 def test_create_pr_form_with_raw_commit_id(self, backend):
70 68 repo = backend.repo
71 69
72 70 self.app.get(
73 71 url(controller='pullrequests', action='index',
74 72 repo_name=repo.repo_name,
75 73 commit=repo.get_commit().raw_id),
76 74 status=200)
77 75
78 76 @pytest.mark.parametrize('pr_merge_enabled', [True, False])
79 77 def test_show(self, pr_util, pr_merge_enabled):
80 78 pull_request = pr_util.create_pull_request(
81 79 mergeable=pr_merge_enabled, enable_notifications=False)
82 80
83 81 response = self.app.get(url(
84 82 controller='pullrequests', action='show',
85 83 repo_name=pull_request.target_repo.scm_instance().name,
86 84 pull_request_id=str(pull_request.pull_request_id)))
87 85
88 86 for commit_id in pull_request.revisions:
89 87 response.mustcontain(commit_id)
90 88
91 89 assert pull_request.target_ref_parts.type in response
92 90 assert pull_request.target_ref_parts.name in response
93 91 target_clone_url = pull_request.target_repo.clone_url()
94 92 assert target_clone_url in response
95 93
96 94 assert 'class="pull-request-merge"' in response
97 95 assert (
98 96 'Server-side pull request merging is disabled.'
99 97 in response) != pr_merge_enabled
100 98
101 99 def test_close_status_visibility(self, pr_util, csrf_token):
102 100 from rhodecode.tests.functional.test_login import login_url, logut_url
103 101 # Logout
104 102 response = self.app.post(
105 103 logut_url,
106 104 params={'csrf_token': csrf_token})
107 105 # Login as regular user
108 106 response = self.app.post(login_url,
109 107 {'username': 'test_regular',
110 108 'password': 'test12'})
111 109
112 110 pull_request = pr_util.create_pull_request(author='test_regular')
113 111
114 112 response = self.app.get(url(
115 113 controller='pullrequests', action='show',
116 114 repo_name=pull_request.target_repo.scm_instance().name,
117 115 pull_request_id=str(pull_request.pull_request_id)))
118 116
119 117 assert 'Server-side pull request merging is disabled.' in response
120 118 assert 'value="forced_closed"' in response
121 119
122 120 def test_show_invalid_commit_id(self, pr_util):
123 121 # Simulating invalid revisions which will cause a lookup error
124 122 pull_request = pr_util.create_pull_request()
125 123 pull_request.revisions = ['invalid']
126 124 Session().add(pull_request)
127 125 Session().commit()
128 126
129 127 response = self.app.get(url(
130 128 controller='pullrequests', action='show',
131 129 repo_name=pull_request.target_repo.scm_instance().name,
132 130 pull_request_id=str(pull_request.pull_request_id)))
133 131
134 132 for commit_id in pull_request.revisions:
135 133 response.mustcontain(commit_id)
136 134
137 135 def test_show_invalid_source_reference(self, pr_util):
138 136 pull_request = pr_util.create_pull_request()
139 137 pull_request.source_ref = 'branch:b:invalid'
140 138 Session().add(pull_request)
141 139 Session().commit()
142 140
143 141 self.app.get(url(
144 142 controller='pullrequests', action='show',
145 143 repo_name=pull_request.target_repo.scm_instance().name,
146 144 pull_request_id=str(pull_request.pull_request_id)))
147 145
148 146 def test_edit_title_description(self, pr_util, csrf_token):
149 147 pull_request = pr_util.create_pull_request()
150 148 pull_request_id = pull_request.pull_request_id
151 149
152 150 response = self.app.post(
153 151 url(controller='pullrequests', action='update',
154 152 repo_name=pull_request.target_repo.repo_name,
155 153 pull_request_id=str(pull_request_id)),
156 154 params={
157 155 'edit_pull_request': 'true',
158 156 '_method': 'put',
159 157 'title': 'New title',
160 158 'description': 'New description',
161 159 'csrf_token': csrf_token})
162 160
163 161 assert_session_flash(
164 162 response, u'Pull request title & description updated.',
165 163 category='success')
166 164
167 165 pull_request = PullRequest.get(pull_request_id)
168 166 assert pull_request.title == 'New title'
169 167 assert pull_request.description == 'New description'
170 168
171 169 def test_edit_title_description_closed(self, pr_util, csrf_token):
172 170 pull_request = pr_util.create_pull_request()
173 171 pull_request_id = pull_request.pull_request_id
174 172 pr_util.close()
175 173
176 174 response = self.app.post(
177 175 url(controller='pullrequests', action='update',
178 176 repo_name=pull_request.target_repo.repo_name,
179 177 pull_request_id=str(pull_request_id)),
180 178 params={
181 179 'edit_pull_request': 'true',
182 180 '_method': 'put',
183 181 'title': 'New title',
184 182 'description': 'New description',
185 183 'csrf_token': csrf_token})
186 184
187 185 assert_session_flash(
188 186 response, u'Cannot update closed pull requests.',
189 187 category='error')
190 188
191 189 def test_update_invalid_source_reference(self, pr_util, csrf_token):
192 190 pull_request = pr_util.create_pull_request()
193 191 pull_request.source_ref = 'branch:invalid-branch:invalid-commit-id'
194 192 Session().add(pull_request)
195 193 Session().commit()
196 194
197 195 pull_request_id = pull_request.pull_request_id
198 196
199 197 response = self.app.post(
200 198 url(controller='pullrequests', action='update',
201 199 repo_name=pull_request.target_repo.repo_name,
202 200 pull_request_id=str(pull_request_id)),
203 201 params={'update_commits': 'true', '_method': 'put',
204 202 'csrf_token': csrf_token})
205 203
206 204 assert_session_flash(
207 205 response, u'Update failed due to missing commits.',
208 206 category='error')
209 207
210 208 def test_comment_and_close_pull_request(self, pr_util, csrf_token):
211 209 pull_request = pr_util.create_pull_request(approved=True)
212 210 pull_request_id = pull_request.pull_request_id
213 211 author = pull_request.user_id
214 212 repo = pull_request.target_repo.repo_id
215 213
216 214 self.app.post(
217 215 url(controller='pullrequests',
218 216 action='comment',
219 217 repo_name=pull_request.target_repo.scm_instance().name,
220 218 pull_request_id=str(pull_request_id)),
221 219 params={
222 220 'changeset_status':
223 221 ChangesetStatus.STATUS_APPROVED + '_closed',
224 222 'change_changeset_status': 'on',
225 223 'text': '',
226 224 'csrf_token': csrf_token},
227 225 status=302)
228 226
229 227 action = 'user_closed_pull_request:%d' % pull_request_id
230 228 journal = UserLog.query()\
231 229 .filter(UserLog.user_id == author)\
232 230 .filter(UserLog.repository_id == repo)\
233 231 .filter(UserLog.action == action)\
234 232 .all()
235 233 assert len(journal) == 1
236 234
237 235 def test_reject_and_close_pull_request(self, pr_util, csrf_token):
238 236 pull_request = pr_util.create_pull_request()
239 237 pull_request_id = pull_request.pull_request_id
240 238 response = self.app.post(
241 239 url(controller='pullrequests',
242 240 action='update',
243 241 repo_name=pull_request.target_repo.scm_instance().name,
244 242 pull_request_id=str(pull_request.pull_request_id)),
245 243 params={'close_pull_request': 'true', '_method': 'put',
246 244 'csrf_token': csrf_token})
247 245
248 246 pull_request = PullRequest.get(pull_request_id)
249 247
250 248 assert response.json is True
251 249 assert pull_request.is_closed()
252 250
253 251 # check only the latest status, not the review status
254 252 status = ChangesetStatusModel().get_status(
255 253 pull_request.source_repo, pull_request=pull_request)
256 254 assert status == ChangesetStatus.STATUS_REJECTED
257 255
258 256 def test_comment_force_close_pull_request(self, pr_util, csrf_token):
259 257 pull_request = pr_util.create_pull_request()
260 258 pull_request_id = pull_request.pull_request_id
261 259 reviewers_ids = [1, 2]
262 260 PullRequestModel().update_reviewers(pull_request_id, reviewers_ids)
263 261 author = pull_request.user_id
264 262 repo = pull_request.target_repo.repo_id
265 263 self.app.post(
266 264 url(controller='pullrequests',
267 265 action='comment',
268 266 repo_name=pull_request.target_repo.scm_instance().name,
269 267 pull_request_id=str(pull_request_id)),
270 268 params={
271 269 'changeset_status': 'forced_closed',
272 270 'csrf_token': csrf_token},
273 271 status=302)
274 272
275 273 pull_request = PullRequest.get(pull_request_id)
276 274
277 275 action = 'user_closed_pull_request:%d' % pull_request_id
278 276 journal = UserLog.query().filter(
279 277 UserLog.user_id == author,
280 278 UserLog.repository_id == repo,
281 279 UserLog.action == action).all()
282 280 assert len(journal) == 1
283 281
284 282 # check only the latest status, not the review status
285 283 status = ChangesetStatusModel().get_status(
286 284 pull_request.source_repo, pull_request=pull_request)
287 285 assert status == ChangesetStatus.STATUS_REJECTED
288 286
289 287 def test_create_pull_request(self, backend, csrf_token):
290 288 commits = [
291 289 {'message': 'ancestor'},
292 290 {'message': 'change'},
293 291 ]
294 292 commit_ids = backend.create_master_repo(commits)
295 293 target = backend.create_repo(heads=['ancestor'])
296 294 source = backend.create_repo(heads=['change'])
297 295
298 296 response = self.app.post(
299 297 url(
300 298 controller='pullrequests',
301 299 action='create',
302 300 repo_name=source.repo_name),
303 301 params={
304 302 'source_repo': source.repo_name,
305 303 'source_ref': 'branch:default:' + commit_ids['change'],
306 304 'target_repo': target.repo_name,
307 305 'target_ref': 'branch:default:' + commit_ids['ancestor'],
308 306 'pullrequest_desc': 'Description',
309 307 'pullrequest_title': 'Title',
310 308 'review_members': '1',
311 309 'revisions': commit_ids['change'],
312 310 'user': '',
313 311 'csrf_token': csrf_token,
314 312 },
315 313 status=302)
316 314
317 315 location = response.headers['Location']
318 316 pull_request_id = int(location.rsplit('/', 1)[1])
319 317 pull_request = PullRequest.get(pull_request_id)
320 318
321 319 # check that we have now both revisions
322 320 assert pull_request.revisions == [commit_ids['change']]
323 321 assert pull_request.source_ref == 'branch:default:' + commit_ids['change']
324 322 expected_target_ref = 'branch:default:' + commit_ids['ancestor']
325 323 assert pull_request.target_ref == expected_target_ref
326 324
327 325 def test_reviewer_notifications(self, backend, csrf_token):
328 326 # We have to use the app.post for this test so it will create the
329 327 # notifications properly with the new PR
330 328 commits = [
331 329 {'message': 'ancestor',
332 330 'added': [FileNode('file_A', content='content_of_ancestor')]},
333 331 {'message': 'change',
334 332 'added': [FileNode('file_a', content='content_of_change')]},
335 333 {'message': 'change-child'},
336 334 {'message': 'ancestor-child', 'parents': ['ancestor'],
337 335 'added': [
338 336 FileNode('file_B', content='content_of_ancestor_child')]},
339 337 {'message': 'ancestor-child-2'},
340 338 ]
341 339 commit_ids = backend.create_master_repo(commits)
342 340 target = backend.create_repo(heads=['ancestor-child'])
343 341 source = backend.create_repo(heads=['change'])
344 342
345 343 response = self.app.post(
346 344 url(
347 345 controller='pullrequests',
348 346 action='create',
349 347 repo_name=source.repo_name),
350 348 params={
351 349 'source_repo': source.repo_name,
352 350 'source_ref': 'branch:default:' + commit_ids['change'],
353 351 'target_repo': target.repo_name,
354 352 'target_ref': 'branch:default:' + commit_ids['ancestor-child'],
355 353 'pullrequest_desc': 'Description',
356 354 'pullrequest_title': 'Title',
357 355 'review_members': '2',
358 356 'revisions': commit_ids['change'],
359 357 'user': '',
360 358 'csrf_token': csrf_token,
361 359 },
362 360 status=302)
363 361
364 362 location = response.headers['Location']
365 363 pull_request_id = int(location.rsplit('/', 1)[1])
366 364 pull_request = PullRequest.get(pull_request_id)
367 365
368 366 # Check that a notification was made
369 367 notifications = Notification.query()\
370 368 .filter(Notification.created_by == pull_request.author.user_id,
371 369 Notification.type_ == Notification.TYPE_PULL_REQUEST,
372 370 Notification.subject.contains("wants you to review "
373 371 "pull request #%d"
374 372 % pull_request_id))
375 373 assert len(notifications.all()) == 1
376 374
377 375 # Change reviewers and check that a notification was made
378 376 PullRequestModel().update_reviewers(pull_request.pull_request_id, [1])
379 377 assert len(notifications.all()) == 2
380 378
381 379 def test_create_pull_request_stores_ancestor_commit_id(self, backend,
382 380 csrf_token):
383 381 commits = [
384 382 {'message': 'ancestor',
385 383 'added': [FileNode('file_A', content='content_of_ancestor')]},
386 384 {'message': 'change',
387 385 'added': [FileNode('file_a', content='content_of_change')]},
388 386 {'message': 'change-child'},
389 387 {'message': 'ancestor-child', 'parents': ['ancestor'],
390 388 'added': [
391 389 FileNode('file_B', content='content_of_ancestor_child')]},
392 390 {'message': 'ancestor-child-2'},
393 391 ]
394 392 commit_ids = backend.create_master_repo(commits)
395 393 target = backend.create_repo(heads=['ancestor-child'])
396 394 source = backend.create_repo(heads=['change'])
397 395
398 396 response = self.app.post(
399 397 url(
400 398 controller='pullrequests',
401 399 action='create',
402 400 repo_name=source.repo_name),
403 401 params={
404 402 'source_repo': source.repo_name,
405 403 'source_ref': 'branch:default:' + commit_ids['change'],
406 404 'target_repo': target.repo_name,
407 405 'target_ref': 'branch:default:' + commit_ids['ancestor-child'],
408 406 'pullrequest_desc': 'Description',
409 407 'pullrequest_title': 'Title',
410 408 'review_members': '1',
411 409 'revisions': commit_ids['change'],
412 410 'user': '',
413 411 'csrf_token': csrf_token,
414 412 },
415 413 status=302)
416 414
417 415 location = response.headers['Location']
418 416 pull_request_id = int(location.rsplit('/', 1)[1])
419 417 pull_request = PullRequest.get(pull_request_id)
420 418
421 419 # target_ref has to point to the ancestor's commit_id in order to
422 420 # show the correct diff
423 421 expected_target_ref = 'branch:default:' + commit_ids['ancestor']
424 422 assert pull_request.target_ref == expected_target_ref
425 423
426 424 # Check generated diff contents
427 425 response = response.follow()
428 426 assert 'content_of_ancestor' not in response.body
429 427 assert 'content_of_ancestor-child' not in response.body
430 428 assert 'content_of_change' in response.body
431 429
432 430 def test_merge_pull_request_enabled(self, pr_util, csrf_token):
433 431 # Clear any previous calls to rcextensions
434 432 rhodecode.EXTENSIONS.calls.clear()
435 433
436 434 pull_request = pr_util.create_pull_request(
437 435 approved=True, mergeable=True)
438 436 pull_request_id = pull_request.pull_request_id
439 437 repo_name = pull_request.target_repo.scm_instance().name,
440 438
441 439 response = self.app.post(
442 440 url(controller='pullrequests',
443 441 action='merge',
444 442 repo_name=str(repo_name[0]),
445 443 pull_request_id=str(pull_request_id)),
446 444 params={'csrf_token': csrf_token}).follow()
447 445
448 446 pull_request = PullRequest.get(pull_request_id)
449 447
450 448 assert response.status_int == 200
451 449 assert pull_request.is_closed()
452 450 assert_pull_request_status(
453 451 pull_request, ChangesetStatus.STATUS_APPROVED)
454 452
455 453 # Check the relevant log entries were added
456 454 user_logs = UserLog.query().order_by('-user_log_id').limit(4)
457 455 actions = [log.action for log in user_logs]
458 456 pr_commit_ids = PullRequestModel()._get_commit_ids(pull_request)
459 457 expected_actions = [
460 458 u'user_closed_pull_request:%d' % pull_request_id,
461 459 u'user_merged_pull_request:%d' % pull_request_id,
462 460 # The action below reflect that the post push actions were executed
463 461 u'user_commented_pull_request:%d' % pull_request_id,
464 462 u'push:%s' % ','.join(pr_commit_ids),
465 463 ]
466 464 assert actions == expected_actions
467 465
468 466 # Check post_push rcextension was really executed
469 467 push_calls = rhodecode.EXTENSIONS.calls['post_push']
470 468 assert len(push_calls) == 1
471 469 unused_last_call_args, last_call_kwargs = push_calls[0]
472 470 assert last_call_kwargs['action'] == 'push'
473 471 assert last_call_kwargs['pushed_revs'] == pr_commit_ids
474 472
475 473 def test_merge_pull_request_disabled(self, pr_util, csrf_token):
476 474 pull_request = pr_util.create_pull_request(mergeable=False)
477 475 pull_request_id = pull_request.pull_request_id
478 476 pull_request = PullRequest.get(pull_request_id)
479 477
480 478 response = self.app.post(
481 479 url(controller='pullrequests',
482 480 action='merge',
483 481 repo_name=pull_request.target_repo.scm_instance().name,
484 482 pull_request_id=str(pull_request.pull_request_id)),
485 483 params={'csrf_token': csrf_token}).follow()
486 484
487 485 assert response.status_int == 200
488 486 assert 'Server-side pull request merging is disabled.' in response.body
489 487
490 488 @pytest.mark.skip_backends('svn')
491 489 def test_merge_pull_request_not_approved(self, pr_util, csrf_token):
492 490 pull_request = pr_util.create_pull_request(mergeable=True)
493 491 pull_request_id = pull_request.pull_request_id
494 492 repo_name = pull_request.target_repo.scm_instance().name,
495 493
496 494 response = self.app.post(
497 495 url(controller='pullrequests',
498 496 action='merge',
499 497 repo_name=str(repo_name[0]),
500 498 pull_request_id=str(pull_request_id)),
501 499 params={'csrf_token': csrf_token}).follow()
502 500
503 501 pull_request = PullRequest.get(pull_request_id)
504 502
505 503 assert response.status_int == 200
506 504 assert ' Reviewer approval is pending.' in response.body
507 505
508 506 def test_update_source_revision(self, backend, csrf_token):
509 507 commits = [
510 508 {'message': 'ancestor'},
511 509 {'message': 'change'},
512 510 {'message': 'change-2'},
513 511 ]
514 512 commit_ids = backend.create_master_repo(commits)
515 513 target = backend.create_repo(heads=['ancestor'])
516 514 source = backend.create_repo(heads=['change'])
517 515
518 516 # create pr from a in source to A in target
519 517 pull_request = PullRequest()
520 518 pull_request.source_repo = source
521 519 # TODO: johbo: Make sure that we write the source ref this way!
522 520 pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
523 521 branch=backend.default_branch_name, commit_id=commit_ids['change'])
524 522 pull_request.target_repo = target
525 523
526 524 pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
527 525 branch=backend.default_branch_name,
528 526 commit_id=commit_ids['ancestor'])
529 527 pull_request.revisions = [commit_ids['change']]
530 528 pull_request.title = u"Test"
531 529 pull_request.description = u"Description"
532 530 pull_request.author = UserModel().get_by_username(
533 531 TEST_USER_ADMIN_LOGIN)
534 532 Session().add(pull_request)
535 533 Session().commit()
536 534 pull_request_id = pull_request.pull_request_id
537 535
538 536 # source has ancestor - change - change-2
539 537 backend.pull_heads(source, heads=['change-2'])
540 538
541 539 # update PR
542 540 self.app.post(
543 541 url(controller='pullrequests', action='update',
544 542 repo_name=target.repo_name,
545 543 pull_request_id=str(pull_request_id)),
546 544 params={'update_commits': 'true', '_method': 'put',
547 545 'csrf_token': csrf_token})
548 546
549 547 # check that we have now both revisions
550 548 pull_request = PullRequest.get(pull_request_id)
551 549 assert pull_request.revisions == [
552 550 commit_ids['change-2'], commit_ids['change']]
553 551
554 552 # TODO: johbo: this should be a test on its own
555 553 response = self.app.get(url(
556 554 controller='pullrequests', action='index',
557 555 repo_name=target.repo_name))
558 556 assert response.status_int == 200
559 557 assert 'Pull request updated to' in response.body
560 558 assert 'with 1 added, 0 removed commits.' in response.body
561 559
562 560 def test_update_target_revision(self, backend, csrf_token):
563 561 commits = [
564 562 {'message': 'ancestor'},
565 563 {'message': 'change'},
566 564 {'message': 'ancestor-new', 'parents': ['ancestor']},
567 565 {'message': 'change-rebased'},
568 566 ]
569 567 commit_ids = backend.create_master_repo(commits)
570 568 target = backend.create_repo(heads=['ancestor'])
571 569 source = backend.create_repo(heads=['change'])
572 570
573 571 # create pr from a in source to A in target
574 572 pull_request = PullRequest()
575 573 pull_request.source_repo = source
576 574 # TODO: johbo: Make sure that we write the source ref this way!
577 575 pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
578 576 branch=backend.default_branch_name, commit_id=commit_ids['change'])
579 577 pull_request.target_repo = target
580 578 # TODO: johbo: Target ref should be branch based, since tip can jump
581 579 # from branch to branch
582 580 pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
583 581 branch=backend.default_branch_name,
584 582 commit_id=commit_ids['ancestor'])
585 583 pull_request.revisions = [commit_ids['change']]
586 584 pull_request.title = u"Test"
587 585 pull_request.description = u"Description"
588 586 pull_request.author = UserModel().get_by_username(
589 587 TEST_USER_ADMIN_LOGIN)
590 588 Session().add(pull_request)
591 589 Session().commit()
592 590 pull_request_id = pull_request.pull_request_id
593 591
594 592 # target has ancestor - ancestor-new
595 593 # source has ancestor - ancestor-new - change-rebased
596 594 backend.pull_heads(target, heads=['ancestor-new'])
597 595 backend.pull_heads(source, heads=['change-rebased'])
598 596
599 597 # update PR
600 598 self.app.post(
601 599 url(controller='pullrequests', action='update',
602 600 repo_name=target.repo_name,
603 601 pull_request_id=str(pull_request_id)),
604 602 params={'update_commits': 'true', '_method': 'put',
605 603 'csrf_token': csrf_token},
606 604 status=200)
607 605
608 606 # check that we have now both revisions
609 607 pull_request = PullRequest.get(pull_request_id)
610 608 assert pull_request.revisions == [commit_ids['change-rebased']]
611 609 assert pull_request.target_ref == 'branch:{branch}:{commit_id}'.format(
612 610 branch=backend.default_branch_name,
613 611 commit_id=commit_ids['ancestor-new'])
614 612
615 613 # TODO: johbo: This should be a test on its own
616 614 response = self.app.get(url(
617 615 controller='pullrequests', action='index',
618 616 repo_name=target.repo_name))
619 617 assert response.status_int == 200
620 618 assert 'Pull request updated to' in response.body
621 619 assert 'with 1 added, 1 removed commits.' in response.body
622 620
623 621 def test_update_of_ancestor_reference(self, backend, csrf_token):
624 622 commits = [
625 623 {'message': 'ancestor'},
626 624 {'message': 'change'},
627 625 {'message': 'change-2'},
628 626 {'message': 'ancestor-new', 'parents': ['ancestor']},
629 627 {'message': 'change-rebased'},
630 628 ]
631 629 commit_ids = backend.create_master_repo(commits)
632 630 target = backend.create_repo(heads=['ancestor'])
633 631 source = backend.create_repo(heads=['change'])
634 632
635 633 # create pr from a in source to A in target
636 634 pull_request = PullRequest()
637 635 pull_request.source_repo = source
638 636 # TODO: johbo: Make sure that we write the source ref this way!
639 637 pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
640 638 branch=backend.default_branch_name,
641 639 commit_id=commit_ids['change'])
642 640 pull_request.target_repo = target
643 641 # TODO: johbo: Target ref should be branch based, since tip can jump
644 642 # from branch to branch
645 643 pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
646 644 branch=backend.default_branch_name,
647 645 commit_id=commit_ids['ancestor'])
648 646 pull_request.revisions = [commit_ids['change']]
649 647 pull_request.title = u"Test"
650 648 pull_request.description = u"Description"
651 649 pull_request.author = UserModel().get_by_username(
652 650 TEST_USER_ADMIN_LOGIN)
653 651 Session().add(pull_request)
654 652 Session().commit()
655 653 pull_request_id = pull_request.pull_request_id
656 654
657 655 # target has ancestor - ancestor-new
658 656 # source has ancestor - ancestor-new - change-rebased
659 657 backend.pull_heads(target, heads=['ancestor-new'])
660 658 backend.pull_heads(source, heads=['change-rebased'])
661 659
662 660 # update PR
663 661 self.app.post(
664 662 url(controller='pullrequests', action='update',
665 663 repo_name=target.repo_name,
666 664 pull_request_id=str(pull_request_id)),
667 665 params={'update_commits': 'true', '_method': 'put',
668 666 'csrf_token': csrf_token},
669 667 status=200)
670 668
671 669 # Expect the target reference to be updated correctly
672 670 pull_request = PullRequest.get(pull_request_id)
673 671 assert pull_request.revisions == [commit_ids['change-rebased']]
674 672 expected_target_ref = 'branch:{branch}:{commit_id}'.format(
675 673 branch=backend.default_branch_name,
676 674 commit_id=commit_ids['ancestor-new'])
677 675 assert pull_request.target_ref == expected_target_ref
678 676
679 677 def test_remove_pull_request_branch(self, backend_git, csrf_token):
680 678 branch_name = 'development'
681 679 commits = [
682 680 {'message': 'initial-commit'},
683 681 {'message': 'old-feature'},
684 682 {'message': 'new-feature', 'branch': branch_name},
685 683 ]
686 684 repo = backend_git.create_repo(commits)
687 685 commit_ids = backend_git.commit_ids
688 686
689 687 pull_request = PullRequest()
690 688 pull_request.source_repo = repo
691 689 pull_request.target_repo = repo
692 690 pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
693 691 branch=branch_name, commit_id=commit_ids['new-feature'])
694 692 pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
695 693 branch=backend_git.default_branch_name,
696 694 commit_id=commit_ids['old-feature'])
697 695 pull_request.revisions = [commit_ids['new-feature']]
698 696 pull_request.title = u"Test"
699 697 pull_request.description = u"Description"
700 698 pull_request.author = UserModel().get_by_username(
701 699 TEST_USER_ADMIN_LOGIN)
702 700 Session().add(pull_request)
703 701 Session().commit()
704 702
705 703 vcs = repo.scm_instance()
706 704 vcs.remove_ref('refs/heads/{}'.format(branch_name))
707 705
708 706 response = self.app.get(url(
709 707 controller='pullrequests', action='show',
710 708 repo_name=repo.repo_name,
711 709 pull_request_id=str(pull_request.pull_request_id)))
712 710
713 711 assert response.status_int == 200
714 712 assert_response = AssertResponse(response)
715 713 assert_response.element_contains(
716 714 '#changeset_compare_view_content .alert strong',
717 715 'Missing commits')
718 716 assert_response.element_contains(
719 717 '#changeset_compare_view_content .alert',
720 718 'This pull request cannot be displayed, because one or more'
721 719 ' commits no longer exist in the source repository.')
722 720
723 721 def test_strip_commits_from_pull_request(
724 722 self, backend, pr_util, csrf_token):
725 723 commits = [
726 724 {'message': 'initial-commit'},
727 725 {'message': 'old-feature'},
728 726 {'message': 'new-feature', 'parents': ['initial-commit']},
729 727 ]
730 728 pull_request = pr_util.create_pull_request(
731 729 commits, target_head='initial-commit', source_head='new-feature',
732 730 revisions=['new-feature'])
733 731
734 732 vcs = pr_util.source_repository.scm_instance()
735 733 if backend.alias == 'git':
736 734 vcs.strip(pr_util.commit_ids['new-feature'], branch_name='master')
737 735 else:
738 736 vcs.strip(pr_util.commit_ids['new-feature'])
739 737
740 738 response = self.app.get(url(
741 739 controller='pullrequests', action='show',
742 740 repo_name=pr_util.target_repository.repo_name,
743 741 pull_request_id=str(pull_request.pull_request_id)))
744 742
745 743 assert response.status_int == 200
746 744 assert_response = AssertResponse(response)
747 745 assert_response.element_contains(
748 746 '#changeset_compare_view_content .alert strong',
749 747 'Missing commits')
750 748 assert_response.element_contains(
751 749 '#changeset_compare_view_content .alert',
752 750 'This pull request cannot be displayed, because one or more'
753 751 ' commits no longer exist in the source repository.')
754 752 assert_response.element_contains(
755 753 '#update_commits',
756 754 'Update commits')
757 755
758 756 def test_strip_commits_and_update(
759 757 self, backend, pr_util, csrf_token):
760 758 commits = [
761 759 {'message': 'initial-commit'},
762 760 {'message': 'old-feature'},
763 761 {'message': 'new-feature', 'parents': ['old-feature']},
764 762 ]
765 763 pull_request = pr_util.create_pull_request(
766 764 commits, target_head='old-feature', source_head='new-feature',
767 765 revisions=['new-feature'], mergeable=True)
768 766
769 767 vcs = pr_util.source_repository.scm_instance()
770 768 if backend.alias == 'git':
771 769 vcs.strip(pr_util.commit_ids['new-feature'], branch_name='master')
772 770 else:
773 771 vcs.strip(pr_util.commit_ids['new-feature'])
774 772
775 773 response = self.app.post(
776 774 url(controller='pullrequests', action='update',
777 775 repo_name=pull_request.target_repo.repo_name,
778 776 pull_request_id=str(pull_request.pull_request_id)),
779 777 params={'update_commits': 'true', '_method': 'put',
780 778 'csrf_token': csrf_token})
781 779
782 780 assert response.status_int == 200
783 781 assert response.body == 'true'
784 782
785 783 # Make sure that after update, it won't raise 500 errors
786 784 response = self.app.get(url(
787 785 controller='pullrequests', action='show',
788 786 repo_name=pr_util.target_repository.repo_name,
789 787 pull_request_id=str(pull_request.pull_request_id)))
790 788
791 789 assert response.status_int == 200
792 790 assert_response = AssertResponse(response)
793 791 assert_response.element_contains(
794 792 '#changeset_compare_view_content .alert strong',
795 793 'Missing commits')
796 794
797 795 def test_branch_is_a_link(self, pr_util):
798 796 pull_request = pr_util.create_pull_request()
799 797 pull_request.source_ref = 'branch:origin:1234567890abcdef'
800 798 pull_request.target_ref = 'branch:target:abcdef1234567890'
801 799 Session().add(pull_request)
802 800 Session().commit()
803 801
804 802 response = self.app.get(url(
805 803 controller='pullrequests', action='show',
806 804 repo_name=pull_request.target_repo.scm_instance().name,
807 805 pull_request_id=str(pull_request.pull_request_id)))
808 806 assert response.status_int == 200
809 807 assert_response = AssertResponse(response)
810 808
811 809 origin = assert_response.get_element('.pr-origininfo .tag')
812 810 origin_children = origin.getchildren()
813 811 assert len(origin_children) == 1
814 812 target = assert_response.get_element('.pr-targetinfo .tag')
815 813 target_children = target.getchildren()
816 814 assert len(target_children) == 1
817 815
818 816 expected_origin_link = url(
819 817 'changelog_home',
820 818 repo_name=pull_request.source_repo.scm_instance().name,
821 819 branch='origin')
822 820 expected_target_link = url(
823 821 'changelog_home',
824 822 repo_name=pull_request.target_repo.scm_instance().name,
825 823 branch='target')
826 824 assert origin_children[0].attrib['href'] == expected_origin_link
827 825 assert origin_children[0].text == 'branch: origin'
828 826 assert target_children[0].attrib['href'] == expected_target_link
829 827 assert target_children[0].text == 'branch: target'
830 828
831 829 def test_bookmark_is_not_a_link(self, pr_util):
832 830 pull_request = pr_util.create_pull_request()
833 831 pull_request.source_ref = 'bookmark:origin:1234567890abcdef'
834 832 pull_request.target_ref = 'bookmark:target:abcdef1234567890'
835 833 Session().add(pull_request)
836 834 Session().commit()
837 835
838 836 response = self.app.get(url(
839 837 controller='pullrequests', action='show',
840 838 repo_name=pull_request.target_repo.scm_instance().name,
841 839 pull_request_id=str(pull_request.pull_request_id)))
842 840 assert response.status_int == 200
843 841 assert_response = AssertResponse(response)
844 842
845 843 origin = assert_response.get_element('.pr-origininfo .tag')
846 844 assert origin.text.strip() == 'bookmark: origin'
847 845 assert origin.getchildren() == []
848 846
849 847 target = assert_response.get_element('.pr-targetinfo .tag')
850 848 assert target.text.strip() == 'bookmark: target'
851 849 assert target.getchildren() == []
852 850
853 851 def test_tag_is_not_a_link(self, pr_util):
854 852 pull_request = pr_util.create_pull_request()
855 853 pull_request.source_ref = 'tag:origin:1234567890abcdef'
856 854 pull_request.target_ref = 'tag:target:abcdef1234567890'
857 855 Session().add(pull_request)
858 856 Session().commit()
859 857
860 858 response = self.app.get(url(
861 859 controller='pullrequests', action='show',
862 860 repo_name=pull_request.target_repo.scm_instance().name,
863 861 pull_request_id=str(pull_request.pull_request_id)))
864 862 assert response.status_int == 200
865 863 assert_response = AssertResponse(response)
866 864
867 865 origin = assert_response.get_element('.pr-origininfo .tag')
868 866 assert origin.text.strip() == 'tag: origin'
869 867 assert origin.getchildren() == []
870 868
871 869 target = assert_response.get_element('.pr-targetinfo .tag')
872 870 assert target.text.strip() == 'tag: target'
873 871 assert target.getchildren() == []
874 872
875 873 def test_description_is_escaped_on_index_page(self, backend, pr_util):
876 874 xss_description = "<script>alert('Hi!')</script>"
877 875 pull_request = pr_util.create_pull_request(description=xss_description)
878 876 response = self.app.get(url(
879 877 controller='pullrequests', action='show_all',
880 878 repo_name=pull_request.target_repo.repo_name))
881 879 response.mustcontain(
882 880 "&lt;script&gt;alert(&#39;Hi!&#39;)&lt;/script&gt;")
883 881
884 882
885 883 def assert_pull_request_status(pull_request, expected_status):
886 884 status = ChangesetStatusModel().calculated_review_status(
887 885 pull_request=pull_request)
888 886 assert status == expected_status
889 887
890 888
891 889 @pytest.mark.parametrize('action', ['show_all', 'index', 'create'])
892 890 @pytest.mark.usefixtures("autologin_user")
893 891 def test_redirects_to_repo_summary_for_svn_repositories(
894 892 backend_svn, app, action):
895 893 denied_actions = ['show_all', 'index', 'create']
896 894 for action in denied_actions:
897 895 response = app.get(url(
898 896 controller='pullrequests', action=action,
899 897 repo_name=backend_svn.repo_name))
900 898 assert response.status_int == 302
901 899
902 900 # Not allowed, redirect to the summary
903 901 redirected = response.follow()
904 902 summary_url = url('summary_home', repo_name=backend_svn.repo_name)
905 903
906 904 # URL adds leading slash and path doesn't have it
907 905 assert redirected.req.path == summary_url
908 906
909 907
910 908 def test_delete_comment_returns_404_if_comment_does_not_exist(pylonsapp):
911 909 # TODO: johbo: Global import not possible because models.forms blows up
912 910 from rhodecode.controllers.pullrequests import PullrequestsController
913 911 controller = PullrequestsController()
914 912 patcher = mock.patch(
915 913 'rhodecode.model.db.BaseModel.get', return_value=None)
916 914 with pytest.raises(HTTPNotFound), patcher:
917 915 controller._delete_comment(1)
@@ -1,561 +1,564 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import datetime
22 22 import time
23 23
24 24 import pytest
25 25
26 26 from rhodecode.lib.vcs.backends.base import (
27 27 CollectionGenerator, FILEMODE_DEFAULT, EmptyCommit)
28 28 from rhodecode.lib.vcs.exceptions import (
29 29 BranchDoesNotExistError, CommitDoesNotExistError,
30 30 RepositoryError, EmptyRepositoryError)
31 31 from rhodecode.lib.vcs.nodes import (
32 32 FileNode, AddedFileNodesGenerator,
33 33 ChangedFileNodesGenerator, RemovedFileNodesGenerator)
34 34 from rhodecode.tests import get_new_dir
35 35 from rhodecode.tests.vcs.base import BackendTestMixin
36 36
37 37
38 38 class TestBaseChangeset:
39 39
40 40 def test_is_deprecated(self):
41 41 from rhodecode.lib.vcs.backends.base import BaseChangeset
42 42 pytest.deprecated_call(BaseChangeset)
43 43
44 44
45 45 class TestEmptyCommit:
46 46
47 47 def test_branch_without_alias_returns_none(self):
48 48 commit = EmptyCommit()
49 49 assert commit.branch is None
50 50
51 51
52 52 class TestCommitsInNonEmptyRepo(BackendTestMixin):
53 53 recreate_repo_per_test = True
54 54
55 55 @classmethod
56 56 def _get_commits(cls):
57 57 start_date = datetime.datetime(2010, 1, 1, 20)
58 58 for x in xrange(5):
59 59 yield {
60 60 'message': 'Commit %d' % x,
61 61 'author': 'Joe Doe <joe.doe@example.com>',
62 62 'date': start_date + datetime.timedelta(hours=12 * x),
63 63 'added': [
64 64 FileNode('file_%d.txt' % x, content='Foobar %d' % x),
65 65 ],
66 66 }
67 67
68 68 def test_walk_returns_empty_list_in_case_of_file(self):
69 69 result = list(self.tip.walk('file_0.txt'))
70 70 assert result == []
71 71
72 72 @pytest.mark.backends("git", "hg")
73 73 def test_new_branch(self):
74 74 self.imc.add(FileNode('docs/index.txt',
75 75 content='Documentation\n'))
76 76 foobar_tip = self.imc.commit(
77 77 message=u'New branch: foobar',
78 78 author=u'joe',
79 79 branch='foobar',
80 80 )
81 81 assert 'foobar' in self.repo.branches
82 82 assert foobar_tip.branch == 'foobar'
83 83 # 'foobar' should be the only branch that contains the new commit
84 84 branch = self.repo.branches.values()
85 85 assert branch[0] != branch[1]
86 86
87 87 @pytest.mark.backends("git", "hg")
88 88 def test_new_head_in_default_branch(self):
89 89 tip = self.repo.get_commit()
90 90 self.imc.add(FileNode('docs/index.txt',
91 91 content='Documentation\n'))
92 92 foobar_tip = self.imc.commit(
93 93 message=u'New branch: foobar',
94 94 author=u'joe',
95 95 branch='foobar',
96 96 parents=[tip],
97 97 )
98 98 self.imc.change(FileNode('docs/index.txt',
99 99 content='Documentation\nand more...\n'))
100 100 newtip = self.imc.commit(
101 101 message=u'At default branch',
102 102 author=u'joe',
103 103 branch=foobar_tip.branch,
104 104 parents=[foobar_tip],
105 105 )
106 106
107 107 newest_tip = self.imc.commit(
108 108 message=u'Merged with %s' % foobar_tip.raw_id,
109 109 author=u'joe',
110 110 branch=self.backend_class.DEFAULT_BRANCH_NAME,
111 111 parents=[newtip, foobar_tip],
112 112 )
113 113
114 114 assert newest_tip.branch == self.backend_class.DEFAULT_BRANCH_NAME
115 115
116 116 @pytest.mark.backends("git", "hg")
117 117 def test_get_commits_respects_branch_name(self):
118 118 """
119 119 * e1930d0 (HEAD, master) Back in default branch
120 120 | * e1930d0 (docs) New Branch: docs2
121 121 | * dcc14fa New branch: docs
122 122 |/
123 123 * e63c41a Initial commit
124 124 ...
125 125 * 624d3db Commit 0
126 126
127 127 :return:
128 128 """
129 129 DEFAULT_BRANCH = self.repo.DEFAULT_BRANCH_NAME
130 130 TEST_BRANCH = 'docs'
131 131 org_tip = self.repo.get_commit()
132 132
133 133 self.imc.add(FileNode('readme.txt', content='Document\n'))
134 134 initial = self.imc.commit(
135 135 message=u'Initial commit',
136 136 author=u'joe',
137 137 parents=[org_tip],
138 138 branch=DEFAULT_BRANCH,)
139 139
140 140 self.imc.add(FileNode('newdoc.txt', content='foobar\n'))
141 141 docs_branch_commit1 = self.imc.commit(
142 142 message=u'New branch: docs',
143 143 author=u'joe',
144 144 parents=[initial],
145 145 branch=TEST_BRANCH,)
146 146
147 147 self.imc.add(FileNode('newdoc2.txt', content='foobar2\n'))
148 148 docs_branch_commit2 = self.imc.commit(
149 149 message=u'New branch: docs2',
150 150 author=u'joe',
151 151 parents=[docs_branch_commit1],
152 152 branch=TEST_BRANCH,)
153 153
154 154 self.imc.add(FileNode('newfile', content='hello world\n'))
155 155 self.imc.commit(
156 156 message=u'Back in default branch',
157 157 author=u'joe',
158 158 parents=[initial],
159 159 branch=DEFAULT_BRANCH,)
160 160
161 161 default_branch_commits = self.repo.get_commits(
162 162 branch_name=DEFAULT_BRANCH)
163 163 assert docs_branch_commit1 not in list(default_branch_commits)
164 164 assert docs_branch_commit2 not in list(default_branch_commits)
165 165
166 166 docs_branch_commits = self.repo.get_commits(
167 167 start_id=self.repo.commit_ids[0], end_id=self.repo.commit_ids[-1],
168 168 branch_name=TEST_BRANCH)
169 169 assert docs_branch_commit1 in list(docs_branch_commits)
170 170 assert docs_branch_commit2 in list(docs_branch_commits)
171 171
172 172 @pytest.mark.backends("svn")
173 173 def test_get_commits_respects_branch_name_svn(self, vcsbackend_svn):
174 174 repo = vcsbackend_svn['svn-simple-layout']
175 175 commits = repo.get_commits(branch_name='trunk')
176 176 commit_indexes = [c.idx for c in commits]
177 177 assert commit_indexes == [1, 2, 3, 7, 12, 15]
178 178
179 179 def test_get_commit_by_branch(self):
180 180 for branch, commit_id in self.repo.branches.iteritems():
181 181 assert commit_id == self.repo.get_commit(branch).raw_id
182 182
183 183 def test_get_commit_by_tag(self):
184 184 for tag, commit_id in self.repo.tags.iteritems():
185 185 assert commit_id == self.repo.get_commit(tag).raw_id
186 186
187 187 def test_get_commit_parents(self):
188 188 repo = self.repo
189 189 for test_idx in [1, 2, 3]:
190 190 commit = repo.get_commit(commit_idx=test_idx - 1)
191 191 assert [commit] == repo.get_commit(commit_idx=test_idx).parents
192 192
193 193 def test_get_commit_children(self):
194 194 repo = self.repo
195 195 for test_idx in [1, 2, 3]:
196 196 commit = repo.get_commit(commit_idx=test_idx + 1)
197 197 assert [commit] == repo.get_commit(commit_idx=test_idx).children
198 198
199 199
200 200 class TestCommits(BackendTestMixin):
201 201 recreate_repo_per_test = False
202 202
203 203 @classmethod
204 204 def _get_commits(cls):
205 205 start_date = datetime.datetime(2010, 1, 1, 20)
206 206 for x in xrange(5):
207 207 yield {
208 208 'message': u'Commit %d' % x,
209 209 'author': u'Joe Doe <joe.doe@example.com>',
210 210 'date': start_date + datetime.timedelta(hours=12 * x),
211 211 'added': [
212 212 FileNode('file_%d.txt' % x, content='Foobar %d' % x),
213 213 ],
214 214 }
215 215
216 216 def test_simple(self):
217 217 tip = self.repo.get_commit()
218 218 assert tip.date, datetime.datetime(2010, 1, 3 == 20)
219 219
220 220 def test_simple_serialized_commit(self):
221 221 tip = self.repo.get_commit()
222 222 # json.dumps(tip) uses .__json__() method
223 223 data = tip.__json__()
224 224 assert 'branch' in data
225 225 assert data['revision']
226 226
227 227 def test_retrieve_tip(self):
228 228 tip = self.repo.get_commit('tip')
229 229 assert tip == self.repo.get_commit()
230 230
231 231 def test_invalid(self):
232 232 with pytest.raises(CommitDoesNotExistError):
233 233 self.repo.get_commit(commit_idx=123456789)
234 234
235 235 def test_idx(self):
236 236 commit = self.repo[0]
237 237 assert commit.idx == 0
238 238
239 239 def test_negative_idx(self):
240 240 commit = self.repo.get_commit(commit_idx=-1)
241 241 assert commit.idx >= 0
242 242
243 243 def test_revision_is_deprecated(self):
244 244 def get_revision(commit):
245 245 return commit.revision
246 246
247 247 commit = self.repo[0]
248 248 pytest.deprecated_call(get_revision, commit)
249 249
250 250 def test_size(self):
251 251 tip = self.repo.get_commit()
252 252 size = 5 * len('Foobar N') # Size of 5 files
253 253 assert tip.size == size
254 254
255 255 def test_size_at_commit(self):
256 256 tip = self.repo.get_commit()
257 257 size = 5 * len('Foobar N') # Size of 5 files
258 258 assert self.repo.size_at_commit(tip.raw_id) == size
259 259
260 260 def test_size_at_first_commit(self):
261 261 commit = self.repo[0]
262 262 size = len('Foobar N') # Size of 1 file
263 263 assert self.repo.size_at_commit(commit.raw_id) == size
264 264
265 265 def test_author(self):
266 266 tip = self.repo.get_commit()
267 267 assert_text_equal(tip.author, u'Joe Doe <joe.doe@example.com>')
268 268
269 269 def test_author_name(self):
270 270 tip = self.repo.get_commit()
271 271 assert_text_equal(tip.author_name, u'Joe Doe')
272 272
273 273 def test_author_email(self):
274 274 tip = self.repo.get_commit()
275 275 assert_text_equal(tip.author_email, u'joe.doe@example.com')
276 276
277 277 def test_message(self):
278 278 tip = self.repo.get_commit()
279 279 assert_text_equal(tip.message, u'Commit 4')
280 280
281 281 def test_diff(self):
282 282 tip = self.repo.get_commit()
283 283 diff = tip.diff()
284 284 assert "+Foobar 4" in diff.raw
285 285
286 286 def test_prev(self):
287 287 tip = self.repo.get_commit()
288 288 prev_commit = tip.prev()
289 289 assert prev_commit.message == 'Commit 3'
290 290
291 291 def test_prev_raises_on_first_commit(self):
292 292 commit = self.repo.get_commit(commit_idx=0)
293 293 with pytest.raises(CommitDoesNotExistError):
294 294 commit.prev()
295 295
296 296 def test_prev_works_on_second_commit_issue_183(self):
297 297 commit = self.repo.get_commit(commit_idx=1)
298 298 prev_commit = commit.prev()
299 299 assert prev_commit.idx == 0
300 300
301 301 def test_next(self):
302 302 commit = self.repo.get_commit(commit_idx=2)
303 303 next_commit = commit.next()
304 304 assert next_commit.message == 'Commit 3'
305 305
306 306 def test_next_raises_on_tip(self):
307 307 commit = self.repo.get_commit()
308 308 with pytest.raises(CommitDoesNotExistError):
309 309 commit.next()
310 310
311 311 def test_get_file_commit(self):
312 312 commit = self.repo.get_commit()
313 313 commit.get_file_commit('file_4.txt')
314 314 assert commit.message == 'Commit 4'
315 315
316 316 def test_get_filenodes_generator(self):
317 317 tip = self.repo.get_commit()
318 318 filepaths = [node.path for node in tip.get_filenodes_generator()]
319 319 assert filepaths == ['file_%d.txt' % x for x in xrange(5)]
320 320
321 321 def test_get_file_annotate(self):
322 322 file_added_commit = self.repo.get_commit(commit_idx=3)
323 323 annotations = list(file_added_commit.get_file_annotate('file_3.txt'))
324 324 line_no, commit_id, commit_loader, line = annotations[0]
325 325 assert line_no == 1
326 326 assert commit_id == file_added_commit.raw_id
327 327 assert commit_loader() == file_added_commit
328
329 # git annotation is generated differently thus different results
328 330 if self.repo.alias == 'git':
329 pytest.xfail("TODO: Git returns wrong value in line")
331 assert line == '(Joe Doe 2010-01-03 08:00:00 +0000 1) Foobar 3'
332 else:
330 333 assert line == 'Foobar 3'
331 334
332 335 def test_get_file_annotate_does_not_exist(self):
333 336 file_added_commit = self.repo.get_commit(commit_idx=2)
334 337 # TODO: Should use a specific exception class here?
335 338 with pytest.raises(Exception):
336 339 list(file_added_commit.get_file_annotate('file_3.txt'))
337 340
338 341 def test_get_file_annotate_tip(self):
339 342 tip = self.repo.get_commit()
340 343 commit = self.repo.get_commit(commit_idx=3)
341 344 expected_values = list(commit.get_file_annotate('file_3.txt'))
342 345 annotations = list(tip.get_file_annotate('file_3.txt'))
343 346
344 347 # Note: Skip index 2 because the loader function is not the same
345 348 for idx in (0, 1, 3):
346 349 assert annotations[0][idx] == expected_values[0][idx]
347 350
348 351 def test_get_commits_is_ordered_by_date(self):
349 352 commits = self.repo.get_commits()
350 353 assert isinstance(commits, CollectionGenerator)
351 354 assert len(commits) == 0 or len(commits) != 0
352 355 commits = list(commits)
353 356 ordered_by_date = sorted(commits, key=lambda commit: commit.date)
354 357 assert commits == ordered_by_date
355 358
356 359 def test_get_commits_respects_start(self):
357 360 second_id = self.repo.commit_ids[1]
358 361 commits = self.repo.get_commits(start_id=second_id)
359 362 assert isinstance(commits, CollectionGenerator)
360 363 commits = list(commits)
361 364 assert len(commits) == 4
362 365
363 366 def test_get_commits_includes_start_commit(self):
364 367 second_id = self.repo.commit_ids[1]
365 368 commits = self.repo.get_commits(start_id=second_id)
366 369 assert isinstance(commits, CollectionGenerator)
367 370 commits = list(commits)
368 371 assert commits[0].raw_id == second_id
369 372
370 373 def test_get_commits_respects_end(self):
371 374 second_id = self.repo.commit_ids[1]
372 375 commits = self.repo.get_commits(end_id=second_id)
373 376 assert isinstance(commits, CollectionGenerator)
374 377 commits = list(commits)
375 378 assert commits[-1].raw_id == second_id
376 379 assert len(commits) == 2
377 380
378 381 def test_get_commits_respects_both_start_and_end(self):
379 382 second_id = self.repo.commit_ids[1]
380 383 third_id = self.repo.commit_ids[2]
381 384 commits = self.repo.get_commits(start_id=second_id, end_id=third_id)
382 385 assert isinstance(commits, CollectionGenerator)
383 386 commits = list(commits)
384 387 assert len(commits) == 2
385 388
386 389 def test_get_commits_on_empty_repo_raises_EmptyRepository_error(self):
387 390 repo_path = get_new_dir(str(time.time()))
388 391 repo = self.Backend(repo_path, create=True)
389 392
390 393 with pytest.raises(EmptyRepositoryError):
391 394 list(repo.get_commits(start_id='foobar'))
392 395
393 396 def test_get_commits_includes_end_commit(self):
394 397 second_id = self.repo.commit_ids[1]
395 398 commits = self.repo.get_commits(end_id=second_id)
396 399 assert isinstance(commits, CollectionGenerator)
397 400 assert len(commits) == 2
398 401 commits = list(commits)
399 402 assert commits[-1].raw_id == second_id
400 403
401 404 def test_get_commits_respects_start_date(self):
402 405 start_date = datetime.datetime(2010, 1, 2)
403 406 commits = self.repo.get_commits(start_date=start_date)
404 407 assert isinstance(commits, CollectionGenerator)
405 408 # Should be 4 commits after 2010-01-02 00:00:00
406 409 assert len(commits) == 4
407 410 for c in commits:
408 411 assert c.date >= start_date
409 412
410 413 def test_get_commits_respects_start_date_and_end_date(self):
411 414 start_date = datetime.datetime(2010, 1, 2)
412 415 end_date = datetime.datetime(2010, 1, 3)
413 416 commits = self.repo.get_commits(start_date=start_date,
414 417 end_date=end_date)
415 418 assert isinstance(commits, CollectionGenerator)
416 419 assert len(commits) == 2
417 420 for c in commits:
418 421 assert c.date >= start_date
419 422 assert c.date <= end_date
420 423
421 424 def test_get_commits_respects_end_date(self):
422 425 end_date = datetime.datetime(2010, 1, 2)
423 426 commits = self.repo.get_commits(end_date=end_date)
424 427 assert isinstance(commits, CollectionGenerator)
425 428 assert len(commits) == 1
426 429 for c in commits:
427 430 assert c.date <= end_date
428 431
429 432 def test_get_commits_respects_reverse(self):
430 433 commits = self.repo.get_commits() # no longer reverse support
431 434 assert isinstance(commits, CollectionGenerator)
432 435 assert len(commits) == 5
433 436 commit_ids = reversed([c.raw_id for c in commits])
434 437 assert list(commit_ids) == list(reversed(self.repo.commit_ids))
435 438
436 439 def test_get_commits_slice_generator(self):
437 440 commits = self.repo.get_commits(
438 441 branch_name=self.repo.DEFAULT_BRANCH_NAME)
439 442 assert isinstance(commits, CollectionGenerator)
440 443 commit_slice = list(commits[1:3])
441 444 assert len(commit_slice) == 2
442 445
443 446 def test_get_commits_raise_commitdoesnotexist_for_wrong_start(self):
444 447 with pytest.raises(CommitDoesNotExistError):
445 448 list(self.repo.get_commits(start_id='foobar'))
446 449
447 450 def test_get_commits_raise_commitdoesnotexist_for_wrong_end(self):
448 451 with pytest.raises(CommitDoesNotExistError):
449 452 list(self.repo.get_commits(end_id='foobar'))
450 453
451 454 def test_get_commits_raise_branchdoesnotexist_for_wrong_branch_name(self):
452 455 with pytest.raises(BranchDoesNotExistError):
453 456 list(self.repo.get_commits(branch_name='foobar'))
454 457
455 458 def test_get_commits_raise_repositoryerror_for_wrong_start_end(self):
456 459 start_id = self.repo.commit_ids[-1]
457 460 end_id = self.repo.commit_ids[0]
458 461 with pytest.raises(RepositoryError):
459 462 list(self.repo.get_commits(start_id=start_id, end_id=end_id))
460 463
461 464 def test_get_commits_raises_for_numerical_ids(self):
462 465 with pytest.raises(TypeError):
463 466 self.repo.get_commits(start_id=1, end_id=2)
464 467
465 468
466 469 @pytest.mark.parametrize("filename, expected", [
467 470 ("README.rst", False),
468 471 ("README", True),
469 472 ])
470 473 def test_commit_is_link(vcsbackend, filename, expected):
471 474 commit = vcsbackend.repo.get_commit()
472 475 link_status = commit.is_link(filename)
473 476 assert link_status is expected
474 477
475 478
476 479 class TestCommitsChanges(BackendTestMixin):
477 480 recreate_repo_per_test = False
478 481
479 482 @classmethod
480 483 def _get_commits(cls):
481 484 return [
482 485 {
483 486 'message': u'Initial',
484 487 'author': u'Joe Doe <joe.doe@example.com>',
485 488 'date': datetime.datetime(2010, 1, 1, 20),
486 489 'added': [
487 490 FileNode('foo/bar', content='foo'),
488 491 FileNode('foo/bał', content='foo'),
489 492 FileNode('foobar', content='foo'),
490 493 FileNode('qwe', content='foo'),
491 494 ],
492 495 },
493 496 {
494 497 'message': u'Massive changes',
495 498 'author': u'Joe Doe <joe.doe@example.com>',
496 499 'date': datetime.datetime(2010, 1, 1, 22),
497 500 'added': [FileNode('fallout', content='War never changes')],
498 501 'changed': [
499 502 FileNode('foo/bar', content='baz'),
500 503 FileNode('foobar', content='baz'),
501 504 ],
502 505 'removed': [FileNode('qwe')],
503 506 },
504 507 ]
505 508
506 509 def test_initial_commit(self):
507 510 commit = self.repo.get_commit(commit_idx=0)
508 511 assert set(commit.added) == set([
509 512 commit.get_node('foo/bar'),
510 513 commit.get_node('foo/bał'),
511 514 commit.get_node('foobar'),
512 515 commit.get_node('qwe'),
513 516 ])
514 517 assert set(commit.changed) == set()
515 518 assert set(commit.removed) == set()
516 519 assert set(commit.affected_files) == set(
517 520 ['foo/bar', 'foo/bał', 'foobar', 'qwe'])
518 521 assert commit.date == datetime.datetime(2010, 1, 1, 20, 0)
519 522
520 523 def test_head_added(self):
521 524 commit = self.repo.get_commit()
522 525 assert isinstance(commit.added, AddedFileNodesGenerator)
523 526 assert set(commit.added) == set([commit.get_node('fallout')])
524 527 assert isinstance(commit.changed, ChangedFileNodesGenerator)
525 528 assert set(commit.changed) == set([
526 529 commit.get_node('foo/bar'),
527 530 commit.get_node('foobar'),
528 531 ])
529 532 assert isinstance(commit.removed, RemovedFileNodesGenerator)
530 533 assert len(commit.removed) == 1
531 534 assert list(commit.removed)[0].path == 'qwe'
532 535
533 536 def test_get_filemode(self):
534 537 commit = self.repo.get_commit()
535 538 assert FILEMODE_DEFAULT == commit.get_file_mode('foo/bar')
536 539
537 540 def test_get_filemode_non_ascii(self):
538 541 commit = self.repo.get_commit()
539 542 assert FILEMODE_DEFAULT == commit.get_file_mode('foo/bał')
540 543 assert FILEMODE_DEFAULT == commit.get_file_mode(u'foo/bał')
541 544
542 545 def test_get_file_history(self):
543 546 commit = self.repo.get_commit()
544 547 history = commit.get_file_history('foo/bar')
545 548 assert len(history) == 2
546 549
547 550 def test_get_file_history_with_limit(self):
548 551 commit = self.repo.get_commit()
549 552 history = commit.get_file_history('foo/bar', limit=1)
550 553 assert len(history) == 1
551 554
552 555 def test_get_file_history_first_commit(self):
553 556 commit = self.repo[0]
554 557 history = commit.get_file_history('foo/bar')
555 558 assert len(history) == 1
556 559
557 560
558 561 def assert_text_equal(expected, given):
559 562 assert expected == given
560 563 assert isinstance(expected, unicode)
561 564 assert isinstance(given, unicode)
General Comments 0
You need to be logged in to leave comments. Login now