##// END OF EJS Templates
tests: further test fixes
marcink -
r3778:edf95982 new-ui
parent child Browse files
Show More
@@ -1,1888 +1,1902 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import collections
22 22 import datetime
23 23 import hashlib
24 24 import os
25 25 import re
26 26 import pprint
27 27 import shutil
28 28 import socket
29 29 import subprocess32
30 30 import time
31 31 import uuid
32 32 import dateutil.tz
33 33 import functools
34 34
35 35 import mock
36 36 import pyramid.testing
37 37 import pytest
38 38 import colander
39 39 import requests
40 40 import pyramid.paster
41 41
42 42 import rhodecode
43 43 from rhodecode.lib.utils2 import AttributeDict
44 44 from rhodecode.model.changeset_status import ChangesetStatusModel
45 45 from rhodecode.model.comment import CommentsModel
46 46 from rhodecode.model.db import (
47 47 PullRequest, Repository, RhodeCodeSetting, ChangesetStatus, RepoGroup,
48 48 UserGroup, RepoRhodeCodeUi, RepoRhodeCodeSetting, RhodeCodeUi)
49 49 from rhodecode.model.meta import Session
50 50 from rhodecode.model.pull_request import PullRequestModel
51 51 from rhodecode.model.repo import RepoModel
52 52 from rhodecode.model.repo_group import RepoGroupModel
53 53 from rhodecode.model.user import UserModel
54 54 from rhodecode.model.settings import VcsSettingsModel
55 55 from rhodecode.model.user_group import UserGroupModel
56 56 from rhodecode.model.integration import IntegrationModel
57 57 from rhodecode.integrations import integration_type_registry
58 58 from rhodecode.integrations.types.base import IntegrationTypeBase
59 59 from rhodecode.lib.utils import repo2db_mapper
60 60 from rhodecode.lib.vcs import create_vcsserver_proxy
61 61 from rhodecode.lib.vcs.backends import get_backend
62 62 from rhodecode.lib.vcs.nodes import FileNode
63 63 from rhodecode.tests import (
64 64 login_user_session, get_new_dir, utils, TESTS_TMP_PATH,
65 65 TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR2_LOGIN,
66 66 TEST_USER_REGULAR_PASS)
67 67 from rhodecode.tests.utils import CustomTestApp, set_anonymous_access
68 68 from rhodecode.tests.fixture import Fixture
69 69 from rhodecode.config import utils as config_utils
70 70
71 71 def _split_comma(value):
72 72 return value.split(',')
73 73
74 74
75 75 def pytest_addoption(parser):
76 76 parser.addoption(
77 77 '--keep-tmp-path', action='store_true',
78 78 help="Keep the test temporary directories")
79 79 parser.addoption(
80 80 '--backends', action='store', type=_split_comma,
81 81 default=['git', 'hg', 'svn'],
82 82 help="Select which backends to test for backend specific tests.")
83 83 parser.addoption(
84 84 '--dbs', action='store', type=_split_comma,
85 85 default=['sqlite'],
86 86 help="Select which database to test for database specific tests. "
87 87 "Possible options are sqlite,postgres,mysql")
88 88 parser.addoption(
89 89 '--appenlight', '--ae', action='store_true',
90 90 help="Track statistics in appenlight.")
91 91 parser.addoption(
92 92 '--appenlight-api-key', '--ae-key',
93 93 help="API key for Appenlight.")
94 94 parser.addoption(
95 95 '--appenlight-url', '--ae-url',
96 96 default="https://ae.rhodecode.com",
97 97 help="Appenlight service URL, defaults to https://ae.rhodecode.com")
98 98 parser.addoption(
99 99 '--sqlite-connection-string', action='store',
100 100 default='', help="Connection string for the dbs tests with SQLite")
101 101 parser.addoption(
102 102 '--postgres-connection-string', action='store',
103 103 default='', help="Connection string for the dbs tests with Postgres")
104 104 parser.addoption(
105 105 '--mysql-connection-string', action='store',
106 106 default='', help="Connection string for the dbs tests with MySQL")
107 107 parser.addoption(
108 108 '--repeat', type=int, default=100,
109 109 help="Number of repetitions in performance tests.")
110 110
111 111
112 112 def pytest_configure(config):
113 113 from rhodecode.config import patches
114 114
115 115
116 116 def pytest_collection_modifyitems(session, config, items):
117 117 # nottest marked, compare nose, used for transition from nose to pytest
118 118 remaining = [
119 119 i for i in items if getattr(i.obj, '__test__', True)]
120 120 items[:] = remaining
121 121
122 122
123 123 def pytest_generate_tests(metafunc):
124 124 # Support test generation based on --backend parameter
125 125 if 'backend_alias' in metafunc.fixturenames:
126 126 backends = get_backends_from_metafunc(metafunc)
127 127 scope = None
128 128 if not backends:
129 129 pytest.skip("Not enabled for any of selected backends")
130 130 metafunc.parametrize('backend_alias', backends, scope=scope)
131 131 elif hasattr(metafunc.function, 'backends'):
132 132 backends = get_backends_from_metafunc(metafunc)
133 133 if not backends:
134 134 pytest.skip("Not enabled for any of selected backends")
135 135
136 136
137 137 def get_backends_from_metafunc(metafunc):
138 138 requested_backends = set(metafunc.config.getoption('--backends'))
139 139 if hasattr(metafunc.function, 'backends'):
140 140 # Supported backends by this test function, created from
141 141 # pytest.mark.backends
142 142 backends = metafunc.definition.get_closest_marker('backends').args
143 143 elif hasattr(metafunc.cls, 'backend_alias'):
144 144 # Support class attribute "backend_alias", this is mainly
145 145 # for legacy reasons for tests not yet using pytest.mark.backends
146 146 backends = [metafunc.cls.backend_alias]
147 147 else:
148 148 backends = metafunc.config.getoption('--backends')
149 149 return requested_backends.intersection(backends)
150 150
151 151
152 152 @pytest.fixture(scope='session', autouse=True)
153 153 def activate_example_rcextensions(request):
154 154 """
155 155 Patch in an example rcextensions module which verifies passed in kwargs.
156 156 """
157 157 from rhodecode.config import rcextensions
158 158
159 159 old_extensions = rhodecode.EXTENSIONS
160 160 rhodecode.EXTENSIONS = rcextensions
161 161 rhodecode.EXTENSIONS.calls = collections.defaultdict(list)
162 162
163 163 @request.addfinalizer
164 164 def cleanup():
165 165 rhodecode.EXTENSIONS = old_extensions
166 166
167 167
168 168 @pytest.fixture
169 169 def capture_rcextensions():
170 170 """
171 171 Returns the recorded calls to entry points in rcextensions.
172 172 """
173 173 calls = rhodecode.EXTENSIONS.calls
174 174 calls.clear()
175 175 # Note: At this moment, it is still the empty dict, but that will
176 176 # be filled during the test run and since it is a reference this
177 177 # is enough to make it work.
178 178 return calls
179 179
180 180
181 181 @pytest.fixture(scope='session')
182 182 def http_environ_session():
183 183 """
184 184 Allow to use "http_environ" in session scope.
185 185 """
186 186 return plain_http_environ()
187 187
188 188
189 189 def plain_http_host_stub():
190 190 """
191 191 Value of HTTP_HOST in the test run.
192 192 """
193 193 return 'example.com:80'
194 194
195 195
196 196 @pytest.fixture
197 197 def http_host_stub():
198 198 """
199 199 Value of HTTP_HOST in the test run.
200 200 """
201 201 return plain_http_host_stub()
202 202
203 203
204 204 def plain_http_host_only_stub():
205 205 """
206 206 Value of HTTP_HOST in the test run.
207 207 """
208 208 return plain_http_host_stub().split(':')[0]
209 209
210 210
211 211 @pytest.fixture
212 212 def http_host_only_stub():
213 213 """
214 214 Value of HTTP_HOST in the test run.
215 215 """
216 216 return plain_http_host_only_stub()
217 217
218 218
219 219 def plain_http_environ():
220 220 """
221 221 HTTP extra environ keys.
222 222
223 223 User by the test application and as well for setting up the pylons
224 224 environment. In the case of the fixture "app" it should be possible
225 225 to override this for a specific test case.
226 226 """
227 227 return {
228 228 'SERVER_NAME': plain_http_host_only_stub(),
229 229 'SERVER_PORT': plain_http_host_stub().split(':')[1],
230 230 'HTTP_HOST': plain_http_host_stub(),
231 231 'HTTP_USER_AGENT': 'rc-test-agent',
232 232 'REQUEST_METHOD': 'GET'
233 233 }
234 234
235 235
236 236 @pytest.fixture
237 237 def http_environ():
238 238 """
239 239 HTTP extra environ keys.
240 240
241 241 User by the test application and as well for setting up the pylons
242 242 environment. In the case of the fixture "app" it should be possible
243 243 to override this for a specific test case.
244 244 """
245 245 return plain_http_environ()
246 246
247 247
248 248 @pytest.fixture(scope='session')
249 249 def baseapp(ini_config, vcsserver, http_environ_session):
250 250 from rhodecode.lib.pyramid_utils import get_app_config
251 251 from rhodecode.config.middleware import make_pyramid_app
252 252
253 253 print("Using the RhodeCode configuration:{}".format(ini_config))
254 254 pyramid.paster.setup_logging(ini_config)
255 255
256 256 settings = get_app_config(ini_config)
257 257 app = make_pyramid_app({'__file__': ini_config}, **settings)
258 258
259 259 return app
260 260
261 261
262 262 @pytest.fixture(scope='function')
263 263 def app(request, config_stub, baseapp, http_environ):
264 264 app = CustomTestApp(
265 265 baseapp,
266 266 extra_environ=http_environ)
267 267 if request.cls:
268 268 request.cls.app = app
269 269 return app
270 270
271 271
272 272 @pytest.fixture(scope='session')
273 273 def app_settings(baseapp, ini_config):
274 274 """
275 275 Settings dictionary used to create the app.
276 276
277 277 Parses the ini file and passes the result through the sanitize and apply
278 278 defaults mechanism in `rhodecode.config.middleware`.
279 279 """
280 280 return baseapp.config.get_settings()
281 281
282 282
283 283 @pytest.fixture(scope='session')
284 284 def db_connection(ini_settings):
285 285 # Initialize the database connection.
286 286 config_utils.initialize_database(ini_settings)
287 287
288 288
289 289 LoginData = collections.namedtuple('LoginData', ('csrf_token', 'user'))
290 290
291 291
292 292 def _autologin_user(app, *args):
293 293 session = login_user_session(app, *args)
294 294 csrf_token = rhodecode.lib.auth.get_csrf_token(session)
295 295 return LoginData(csrf_token, session['rhodecode_user'])
296 296
297 297
298 298 @pytest.fixture
299 299 def autologin_user(app):
300 300 """
301 301 Utility fixture which makes sure that the admin user is logged in
302 302 """
303 303 return _autologin_user(app)
304 304
305 305
306 306 @pytest.fixture
307 307 def autologin_regular_user(app):
308 308 """
309 309 Utility fixture which makes sure that the regular user is logged in
310 310 """
311 311 return _autologin_user(
312 312 app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
313 313
314 314
315 315 @pytest.fixture(scope='function')
316 316 def csrf_token(request, autologin_user):
317 317 return autologin_user.csrf_token
318 318
319 319
320 320 @pytest.fixture(scope='function')
321 321 def xhr_header(request):
322 322 return {'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'}
323 323
324 324
325 325 @pytest.fixture
326 326 def real_crypto_backend(monkeypatch):
327 327 """
328 328 Switch the production crypto backend on for this test.
329 329
330 330 During the test run the crypto backend is replaced with a faster
331 331 implementation based on the MD5 algorithm.
332 332 """
333 333 monkeypatch.setattr(rhodecode, 'is_test', False)
334 334
335 335
336 336 @pytest.fixture(scope='class')
337 337 def index_location(request, baseapp):
338 338 index_location = baseapp.config.get_settings()['search.location']
339 339 if request.cls:
340 340 request.cls.index_location = index_location
341 341 return index_location
342 342
343 343
344 344 @pytest.fixture(scope='session', autouse=True)
345 345 def tests_tmp_path(request):
346 346 """
347 347 Create temporary directory to be used during the test session.
348 348 """
349 349 if not os.path.exists(TESTS_TMP_PATH):
350 350 os.makedirs(TESTS_TMP_PATH)
351 351
352 352 if not request.config.getoption('--keep-tmp-path'):
353 353 @request.addfinalizer
354 354 def remove_tmp_path():
355 355 shutil.rmtree(TESTS_TMP_PATH)
356 356
357 357 return TESTS_TMP_PATH
358 358
359 359
360 360 @pytest.fixture
361 361 def test_repo_group(request):
362 362 """
363 363 Create a temporary repository group, and destroy it after
364 364 usage automatically
365 365 """
366 366 fixture = Fixture()
367 367 repogroupid = 'test_repo_group_%s' % str(time.time()).replace('.', '')
368 368 repo_group = fixture.create_repo_group(repogroupid)
369 369
370 370 def _cleanup():
371 371 fixture.destroy_repo_group(repogroupid)
372 372
373 373 request.addfinalizer(_cleanup)
374 374 return repo_group
375 375
376 376
377 377 @pytest.fixture
378 378 def test_user_group(request):
379 379 """
380 380 Create a temporary user group, and destroy it after
381 381 usage automatically
382 382 """
383 383 fixture = Fixture()
384 384 usergroupid = 'test_user_group_%s' % str(time.time()).replace('.', '')
385 385 user_group = fixture.create_user_group(usergroupid)
386 386
387 387 def _cleanup():
388 388 fixture.destroy_user_group(user_group)
389 389
390 390 request.addfinalizer(_cleanup)
391 391 return user_group
392 392
393 393
394 394 @pytest.fixture(scope='session')
395 395 def test_repo(request):
396 396 container = TestRepoContainer()
397 397 request.addfinalizer(container._cleanup)
398 398 return container
399 399
400 400
401 401 class TestRepoContainer(object):
402 402 """
403 403 Container for test repositories which are used read only.
404 404
405 405 Repositories will be created on demand and re-used during the lifetime
406 406 of this object.
407 407
408 408 Usage to get the svn test repository "minimal"::
409 409
410 410 test_repo = TestContainer()
411 411 repo = test_repo('minimal', 'svn')
412 412
413 413 """
414 414
415 415 dump_extractors = {
416 416 'git': utils.extract_git_repo_from_dump,
417 417 'hg': utils.extract_hg_repo_from_dump,
418 418 'svn': utils.extract_svn_repo_from_dump,
419 419 }
420 420
421 421 def __init__(self):
422 422 self._cleanup_repos = []
423 423 self._fixture = Fixture()
424 424 self._repos = {}
425 425
426 426 def __call__(self, dump_name, backend_alias, config=None):
427 427 key = (dump_name, backend_alias)
428 428 if key not in self._repos:
429 429 repo = self._create_repo(dump_name, backend_alias, config)
430 430 self._repos[key] = repo.repo_id
431 431 return Repository.get(self._repos[key])
432 432
433 433 def _create_repo(self, dump_name, backend_alias, config):
434 434 repo_name = '%s-%s' % (backend_alias, dump_name)
435 435 backend_class = get_backend(backend_alias)
436 436 dump_extractor = self.dump_extractors[backend_alias]
437 437 repo_path = dump_extractor(dump_name, repo_name)
438 438
439 439 vcs_repo = backend_class(repo_path, config=config)
440 440 repo2db_mapper({repo_name: vcs_repo})
441 441
442 442 repo = RepoModel().get_by_repo_name(repo_name)
443 443 self._cleanup_repos.append(repo_name)
444 444 return repo
445 445
446 446 def _cleanup(self):
447 447 for repo_name in reversed(self._cleanup_repos):
448 448 self._fixture.destroy_repo(repo_name)
449 449
450 450
451 451 def backend_base(request, backend_alias, baseapp, test_repo):
452 452 if backend_alias not in request.config.getoption('--backends'):
453 453 pytest.skip("Backend %s not selected." % (backend_alias, ))
454 454
455 455 utils.check_xfail_backends(request.node, backend_alias)
456 456 utils.check_skip_backends(request.node, backend_alias)
457 457
458 458 repo_name = 'vcs_test_%s' % (backend_alias, )
459 459 backend = Backend(
460 460 alias=backend_alias,
461 461 repo_name=repo_name,
462 462 test_name=request.node.name,
463 463 test_repo_container=test_repo)
464 464 request.addfinalizer(backend.cleanup)
465 465 return backend
466 466
467 467
468 468 @pytest.fixture
469 469 def backend(request, backend_alias, baseapp, test_repo):
470 470 """
471 471 Parametrized fixture which represents a single backend implementation.
472 472
473 473 It respects the option `--backends` to focus the test run on specific
474 474 backend implementations.
475 475
476 476 It also supports `pytest.mark.xfail_backends` to mark tests as failing
477 477 for specific backends. This is intended as a utility for incremental
478 478 development of a new backend implementation.
479 479 """
480 480 return backend_base(request, backend_alias, baseapp, test_repo)
481 481
482 482
483 483 @pytest.fixture
484 484 def backend_git(request, baseapp, test_repo):
485 485 return backend_base(request, 'git', baseapp, test_repo)
486 486
487 487
488 488 @pytest.fixture
489 489 def backend_hg(request, baseapp, test_repo):
490 490 return backend_base(request, 'hg', baseapp, test_repo)
491 491
492 492
493 493 @pytest.fixture
494 494 def backend_svn(request, baseapp, test_repo):
495 495 return backend_base(request, 'svn', baseapp, test_repo)
496 496
497 497
498 498 @pytest.fixture
499 499 def backend_random(backend_git):
500 500 """
501 501 Use this to express that your tests need "a backend.
502 502
503 503 A few of our tests need a backend, so that we can run the code. This
504 504 fixture is intended to be used for such cases. It will pick one of the
505 505 backends and run the tests.
506 506
507 507 The fixture `backend` would run the test multiple times for each
508 508 available backend which is a pure waste of time if the test is
509 509 independent of the backend type.
510 510 """
511 511 # TODO: johbo: Change this to pick a random backend
512 512 return backend_git
513 513
514 514
515 515 @pytest.fixture
516 516 def backend_stub(backend_git):
517 517 """
518 518 Use this to express that your tests need a backend stub
519 519
520 520 TODO: mikhail: Implement a real stub logic instead of returning
521 521 a git backend
522 522 """
523 523 return backend_git
524 524
525 525
526 526 @pytest.fixture
527 527 def repo_stub(backend_stub):
528 528 """
529 529 Use this to express that your tests need a repository stub
530 530 """
531 531 return backend_stub.create_repo()
532 532
533 533
534 534 class Backend(object):
535 535 """
536 536 Represents the test configuration for one supported backend
537 537
538 538 Provides easy access to different test repositories based on
539 539 `__getitem__`. Such repositories will only be created once per test
540 540 session.
541 541 """
542 542
543 543 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
544 544 _master_repo = None
545 545 _commit_ids = {}
546 546
547 547 def __init__(self, alias, repo_name, test_name, test_repo_container):
548 548 self.alias = alias
549 549 self.repo_name = repo_name
550 550 self._cleanup_repos = []
551 551 self._test_name = test_name
552 552 self._test_repo_container = test_repo_container
553 553 # TODO: johbo: Used as a delegate interim. Not yet sure if Backend or
554 554 # Fixture will survive in the end.
555 555 self._fixture = Fixture()
556 556
557 557 def __getitem__(self, key):
558 558 return self._test_repo_container(key, self.alias)
559 559
560 560 def create_test_repo(self, key, config=None):
561 561 return self._test_repo_container(key, self.alias, config)
562 562
563 563 @property
564 564 def repo(self):
565 565 """
566 566 Returns the "current" repository. This is the vcs_test repo or the
567 567 last repo which has been created with `create_repo`.
568 568 """
569 569 from rhodecode.model.db import Repository
570 570 return Repository.get_by_repo_name(self.repo_name)
571 571
572 572 @property
573 573 def default_branch_name(self):
574 574 VcsRepository = get_backend(self.alias)
575 575 return VcsRepository.DEFAULT_BRANCH_NAME
576 576
577 577 @property
578 578 def default_head_id(self):
579 579 """
580 580 Returns the default head id of the underlying backend.
581 581
582 582 This will be the default branch name in case the backend does have a
583 583 default branch. In the other cases it will point to a valid head
584 584 which can serve as the base to create a new commit on top of it.
585 585 """
586 586 vcsrepo = self.repo.scm_instance()
587 587 head_id = (
588 588 vcsrepo.DEFAULT_BRANCH_NAME or
589 589 vcsrepo.commit_ids[-1])
590 590 return head_id
591 591
592 592 @property
593 593 def commit_ids(self):
594 594 """
595 595 Returns the list of commits for the last created repository
596 596 """
597 597 return self._commit_ids
598 598
599 599 def create_master_repo(self, commits):
600 600 """
601 601 Create a repository and remember it as a template.
602 602
603 603 This allows to easily create derived repositories to construct
604 604 more complex scenarios for diff, compare and pull requests.
605 605
606 606 Returns a commit map which maps from commit message to raw_id.
607 607 """
608 608 self._master_repo = self.create_repo(commits=commits)
609 609 return self._commit_ids
610 610
611 611 def create_repo(
612 612 self, commits=None, number_of_commits=0, heads=None,
613 613 name_suffix=u'', bare=False, **kwargs):
614 614 """
615 615 Create a repository and record it for later cleanup.
616 616
617 617 :param commits: Optional. A sequence of dict instances.
618 618 Will add a commit per entry to the new repository.
619 619 :param number_of_commits: Optional. If set to a number, this number of
620 620 commits will be added to the new repository.
621 621 :param heads: Optional. Can be set to a sequence of of commit
622 622 names which shall be pulled in from the master repository.
623 623 :param name_suffix: adds special suffix to generated repo name
624 624 :param bare: set a repo as bare (no checkout)
625 625 """
626 626 self.repo_name = self._next_repo_name() + name_suffix
627 627 repo = self._fixture.create_repo(
628 628 self.repo_name, repo_type=self.alias, bare=bare, **kwargs)
629 629 self._cleanup_repos.append(repo.repo_name)
630 630
631 631 commits = commits or [
632 632 {'message': 'Commit %s of %s' % (x, self.repo_name)}
633 633 for x in range(number_of_commits)]
634 634 vcs_repo = repo.scm_instance()
635 635 vcs_repo.count()
636 636 self._add_commits_to_repo(vcs_repo, commits)
637 637 if heads:
638 638 self.pull_heads(repo, heads)
639 639
640 640 return repo
641 641
642 642 def pull_heads(self, repo, heads):
643 643 """
644 644 Make sure that repo contains all commits mentioned in `heads`
645 645 """
646 646 vcsmaster = self._master_repo.scm_instance()
647 647 vcsrepo = repo.scm_instance()
648 648 vcsrepo.config.clear_section('hooks')
649 649 commit_ids = [self._commit_ids[h] for h in heads]
650 650 vcsrepo.pull(vcsmaster.path, commit_ids=commit_ids)
651 651
652 652 def create_fork(self):
653 653 repo_to_fork = self.repo_name
654 654 self.repo_name = self._next_repo_name()
655 655 repo = self._fixture.create_fork(repo_to_fork, self.repo_name)
656 656 self._cleanup_repos.append(self.repo_name)
657 657 return repo
658 658
659 659 def new_repo_name(self, suffix=u''):
660 660 self.repo_name = self._next_repo_name() + suffix
661 661 self._cleanup_repos.append(self.repo_name)
662 662 return self.repo_name
663 663
664 664 def _next_repo_name(self):
665 665 return u"%s_%s" % (
666 666 self.invalid_repo_name.sub(u'_', self._test_name), len(self._cleanup_repos))
667 667
668 668 def ensure_file(self, filename, content='Test content\n'):
669 669 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
670 670 commits = [
671 671 {'added': [
672 672 FileNode(filename, content=content),
673 673 ]},
674 674 ]
675 675 self._add_commits_to_repo(self.repo.scm_instance(), commits)
676 676
677 677 def enable_downloads(self):
678 678 repo = self.repo
679 679 repo.enable_downloads = True
680 680 Session().add(repo)
681 681 Session().commit()
682 682
683 683 def cleanup(self):
684 684 for repo_name in reversed(self._cleanup_repos):
685 685 self._fixture.destroy_repo(repo_name)
686 686
687 687 def _add_commits_to_repo(self, repo, commits):
688 688 commit_ids = _add_commits_to_repo(repo, commits)
689 689 if not commit_ids:
690 690 return
691 691 self._commit_ids = commit_ids
692 692
693 693 # Creating refs for Git to allow fetching them from remote repository
694 694 if self.alias == 'git':
695 695 refs = {}
696 696 for message in self._commit_ids:
697 697 # TODO: mikhail: do more special chars replacements
698 698 ref_name = 'refs/test-refs/{}'.format(
699 699 message.replace(' ', ''))
700 700 refs[ref_name] = self._commit_ids[message]
701 701 self._create_refs(repo, refs)
702 702
703 703 def _create_refs(self, repo, refs):
704 704 for ref_name in refs:
705 705 repo.set_refs(ref_name, refs[ref_name])
706 706
707 707
708 708 def vcsbackend_base(request, backend_alias, tests_tmp_path, baseapp, test_repo):
709 709 if backend_alias not in request.config.getoption('--backends'):
710 710 pytest.skip("Backend %s not selected." % (backend_alias, ))
711 711
712 712 utils.check_xfail_backends(request.node, backend_alias)
713 713 utils.check_skip_backends(request.node, backend_alias)
714 714
715 715 repo_name = 'vcs_test_%s' % (backend_alias, )
716 716 repo_path = os.path.join(tests_tmp_path, repo_name)
717 717 backend = VcsBackend(
718 718 alias=backend_alias,
719 719 repo_path=repo_path,
720 720 test_name=request.node.name,
721 721 test_repo_container=test_repo)
722 722 request.addfinalizer(backend.cleanup)
723 723 return backend
724 724
725 725
726 726 @pytest.fixture
727 727 def vcsbackend(request, backend_alias, tests_tmp_path, baseapp, test_repo):
728 728 """
729 729 Parametrized fixture which represents a single vcs backend implementation.
730 730
731 731 See the fixture `backend` for more details. This one implements the same
732 732 concept, but on vcs level. So it does not provide model instances etc.
733 733
734 734 Parameters are generated dynamically, see :func:`pytest_generate_tests`
735 735 for how this works.
736 736 """
737 737 return vcsbackend_base(request, backend_alias, tests_tmp_path, baseapp, test_repo)
738 738
739 739
740 740 @pytest.fixture
741 741 def vcsbackend_git(request, tests_tmp_path, baseapp, test_repo):
742 742 return vcsbackend_base(request, 'git', tests_tmp_path, baseapp, test_repo)
743 743
744 744
745 745 @pytest.fixture
746 746 def vcsbackend_hg(request, tests_tmp_path, baseapp, test_repo):
747 747 return vcsbackend_base(request, 'hg', tests_tmp_path, baseapp, test_repo)
748 748
749 749
750 750 @pytest.fixture
751 751 def vcsbackend_svn(request, tests_tmp_path, baseapp, test_repo):
752 752 return vcsbackend_base(request, 'svn', tests_tmp_path, baseapp, test_repo)
753 753
754 754
755 755 @pytest.fixture
756 756 def vcsbackend_stub(vcsbackend_git):
757 757 """
758 758 Use this to express that your test just needs a stub of a vcsbackend.
759 759
760 760 Plan is to eventually implement an in-memory stub to speed tests up.
761 761 """
762 762 return vcsbackend_git
763 763
764 764
765 765 class VcsBackend(object):
766 766 """
767 767 Represents the test configuration for one supported vcs backend.
768 768 """
769 769
770 770 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
771 771
772 772 def __init__(self, alias, repo_path, test_name, test_repo_container):
773 773 self.alias = alias
774 774 self._repo_path = repo_path
775 775 self._cleanup_repos = []
776 776 self._test_name = test_name
777 777 self._test_repo_container = test_repo_container
778 778
779 779 def __getitem__(self, key):
780 780 return self._test_repo_container(key, self.alias).scm_instance()
781 781
782 782 @property
783 783 def repo(self):
784 784 """
785 785 Returns the "current" repository. This is the vcs_test repo of the last
786 786 repo which has been created.
787 787 """
788 788 Repository = get_backend(self.alias)
789 789 return Repository(self._repo_path)
790 790
791 791 @property
792 792 def backend(self):
793 793 """
794 794 Returns the backend implementation class.
795 795 """
796 796 return get_backend(self.alias)
797 797
798 798 def create_repo(self, commits=None, number_of_commits=0, _clone_repo=None,
799 799 bare=False):
800 800 repo_name = self._next_repo_name()
801 801 self._repo_path = get_new_dir(repo_name)
802 802 repo_class = get_backend(self.alias)
803 803 src_url = None
804 804 if _clone_repo:
805 805 src_url = _clone_repo.path
806 806 repo = repo_class(self._repo_path, create=True, src_url=src_url, bare=bare)
807 807 self._cleanup_repos.append(repo)
808 808
809 809 commits = commits or [
810 810 {'message': 'Commit %s of %s' % (x, repo_name)}
811 811 for x in xrange(number_of_commits)]
812 812 _add_commits_to_repo(repo, commits)
813 813 return repo
814 814
815 815 def clone_repo(self, repo):
816 816 return self.create_repo(_clone_repo=repo)
817 817
818 818 def cleanup(self):
819 819 for repo in self._cleanup_repos:
820 820 shutil.rmtree(repo.path)
821 821
822 822 def new_repo_path(self):
823 823 repo_name = self._next_repo_name()
824 824 self._repo_path = get_new_dir(repo_name)
825 825 return self._repo_path
826 826
827 827 def _next_repo_name(self):
828 828 return "%s_%s" % (
829 829 self.invalid_repo_name.sub('_', self._test_name),
830 830 len(self._cleanup_repos))
831 831
832 832 def add_file(self, repo, filename, content='Test content\n'):
833 833 imc = repo.in_memory_commit
834 834 imc.add(FileNode(filename, content=content))
835 835 imc.commit(
836 836 message=u'Automatic commit from vcsbackend fixture',
837 837 author=u'Automatic')
838 838
839 839 def ensure_file(self, filename, content='Test content\n'):
840 840 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
841 841 self.add_file(self.repo, filename, content)
842 842
843 843
844 844 def _add_commits_to_repo(vcs_repo, commits):
845 845 commit_ids = {}
846 846 if not commits:
847 847 return commit_ids
848 848
849 849 imc = vcs_repo.in_memory_commit
850 850 commit = None
851 851
852 852 for idx, commit in enumerate(commits):
853 853 message = unicode(commit.get('message', 'Commit %s' % idx))
854 854
855 855 for node in commit.get('added', []):
856 856 imc.add(FileNode(node.path, content=node.content))
857 857 for node in commit.get('changed', []):
858 858 imc.change(FileNode(node.path, content=node.content))
859 859 for node in commit.get('removed', []):
860 860 imc.remove(FileNode(node.path))
861 861
862 862 parents = [
863 863 vcs_repo.get_commit(commit_id=commit_ids[p])
864 864 for p in commit.get('parents', [])]
865 865
866 866 operations = ('added', 'changed', 'removed')
867 867 if not any((commit.get(o) for o in operations)):
868 868 imc.add(FileNode('file_%s' % idx, content=message))
869 869
870 870 commit = imc.commit(
871 871 message=message,
872 872 author=unicode(commit.get('author', 'Automatic')),
873 873 date=commit.get('date'),
874 874 branch=commit.get('branch'),
875 875 parents=parents)
876 876
877 877 commit_ids[commit.message] = commit.raw_id
878 878
879 879 return commit_ids
880 880
881 881
882 882 @pytest.fixture
883 883 def reposerver(request):
884 884 """
885 885 Allows to serve a backend repository
886 886 """
887 887
888 888 repo_server = RepoServer()
889 889 request.addfinalizer(repo_server.cleanup)
890 890 return repo_server
891 891
892 892
893 893 class RepoServer(object):
894 894 """
895 895 Utility to serve a local repository for the duration of a test case.
896 896
897 897 Supports only Subversion so far.
898 898 """
899 899
900 900 url = None
901 901
902 902 def __init__(self):
903 903 self._cleanup_servers = []
904 904
905 905 def serve(self, vcsrepo):
906 906 if vcsrepo.alias != 'svn':
907 907 raise TypeError("Backend %s not supported" % vcsrepo.alias)
908 908
909 909 proc = subprocess32.Popen(
910 910 ['svnserve', '-d', '--foreground', '--listen-host', 'localhost',
911 911 '--root', vcsrepo.path])
912 912 self._cleanup_servers.append(proc)
913 913 self.url = 'svn://localhost'
914 914
915 915 def cleanup(self):
916 916 for proc in self._cleanup_servers:
917 917 proc.terminate()
918 918
919 919
920 920 @pytest.fixture
921 921 def pr_util(backend, request, config_stub):
922 922 """
923 923 Utility for tests of models and for functional tests around pull requests.
924 924
925 925 It gives an instance of :class:`PRTestUtility` which provides various
926 926 utility methods around one pull request.
927 927
928 928 This fixture uses `backend` and inherits its parameterization.
929 929 """
930 930
931 931 util = PRTestUtility(backend)
932 932 request.addfinalizer(util.cleanup)
933 933
934 934 return util
935 935
936 936
937 937 class PRTestUtility(object):
938 938
939 939 pull_request = None
940 940 pull_request_id = None
941 941 mergeable_patcher = None
942 942 mergeable_mock = None
943 943 notification_patcher = None
944 944
945 945 def __init__(self, backend):
946 946 self.backend = backend
947 947
948 948 def create_pull_request(
949 949 self, commits=None, target_head=None, source_head=None,
950 950 revisions=None, approved=False, author=None, mergeable=False,
951 951 enable_notifications=True, name_suffix=u'', reviewers=None,
952 952 title=u"Test", description=u"Description"):
953 953 self.set_mergeable(mergeable)
954 954 if not enable_notifications:
955 955 # mock notification side effect
956 956 self.notification_patcher = mock.patch(
957 957 'rhodecode.model.notification.NotificationModel.create')
958 958 self.notification_patcher.start()
959 959
960 960 if not self.pull_request:
961 961 if not commits:
962 962 commits = [
963 963 {'message': 'c1'},
964 964 {'message': 'c2'},
965 965 {'message': 'c3'},
966 966 ]
967 967 target_head = 'c1'
968 968 source_head = 'c2'
969 969 revisions = ['c2']
970 970
971 971 self.commit_ids = self.backend.create_master_repo(commits)
972 972 self.target_repository = self.backend.create_repo(
973 973 heads=[target_head], name_suffix=name_suffix)
974 974 self.source_repository = self.backend.create_repo(
975 975 heads=[source_head], name_suffix=name_suffix)
976 976 self.author = author or UserModel().get_by_username(
977 977 TEST_USER_ADMIN_LOGIN)
978 978
979 979 model = PullRequestModel()
980 980 self.create_parameters = {
981 981 'created_by': self.author,
982 982 'source_repo': self.source_repository.repo_name,
983 983 'source_ref': self._default_branch_reference(source_head),
984 984 'target_repo': self.target_repository.repo_name,
985 985 'target_ref': self._default_branch_reference(target_head),
986 986 'revisions': [self.commit_ids[r] for r in revisions],
987 987 'reviewers': reviewers or self._get_reviewers(),
988 988 'title': title,
989 989 'description': description,
990 990 }
991 991 self.pull_request = model.create(**self.create_parameters)
992 992 assert model.get_versions(self.pull_request) == []
993 993
994 994 self.pull_request_id = self.pull_request.pull_request_id
995 995
996 996 if approved:
997 997 self.approve()
998 998
999 999 Session().add(self.pull_request)
1000 1000 Session().commit()
1001 1001
1002 1002 return self.pull_request
1003 1003
1004 1004 def approve(self):
1005 1005 self.create_status_votes(
1006 1006 ChangesetStatus.STATUS_APPROVED,
1007 1007 *self.pull_request.reviewers)
1008 1008
1009 1009 def close(self):
1010 1010 PullRequestModel().close_pull_request(self.pull_request, self.author)
1011 1011
1012 1012 def _default_branch_reference(self, commit_message):
1013 1013 reference = '%s:%s:%s' % (
1014 1014 'branch',
1015 1015 self.backend.default_branch_name,
1016 1016 self.commit_ids[commit_message])
1017 1017 return reference
1018 1018
1019 1019 def _get_reviewers(self):
1020 1020 return [
1021 1021 (TEST_USER_REGULAR_LOGIN, ['default1'], False, []),
1022 1022 (TEST_USER_REGULAR2_LOGIN, ['default2'], False, []),
1023 1023 ]
1024 1024
1025 1025 def update_source_repository(self, head=None):
1026 1026 heads = [head or 'c3']
1027 1027 self.backend.pull_heads(self.source_repository, heads=heads)
1028 1028
1029 1029 def add_one_commit(self, head=None):
1030 1030 self.update_source_repository(head=head)
1031 1031 old_commit_ids = set(self.pull_request.revisions)
1032 1032 PullRequestModel().update_commits(self.pull_request)
1033 1033 commit_ids = set(self.pull_request.revisions)
1034 1034 new_commit_ids = commit_ids - old_commit_ids
1035 1035 assert len(new_commit_ids) == 1
1036 1036 return new_commit_ids.pop()
1037 1037
1038 1038 def remove_one_commit(self):
1039 1039 assert len(self.pull_request.revisions) == 2
1040 1040 source_vcs = self.source_repository.scm_instance()
1041 1041 removed_commit_id = source_vcs.commit_ids[-1]
1042 1042
1043 1043 # TODO: johbo: Git and Mercurial have an inconsistent vcs api here,
1044 1044 # remove the if once that's sorted out.
1045 1045 if self.backend.alias == "git":
1046 1046 kwargs = {'branch_name': self.backend.default_branch_name}
1047 1047 else:
1048 1048 kwargs = {}
1049 1049 source_vcs.strip(removed_commit_id, **kwargs)
1050 1050
1051 1051 PullRequestModel().update_commits(self.pull_request)
1052 1052 assert len(self.pull_request.revisions) == 1
1053 1053 return removed_commit_id
1054 1054
1055 1055 def create_comment(self, linked_to=None):
1056 1056 comment = CommentsModel().create(
1057 1057 text=u"Test comment",
1058 1058 repo=self.target_repository.repo_name,
1059 1059 user=self.author,
1060 1060 pull_request=self.pull_request)
1061 1061 assert comment.pull_request_version_id is None
1062 1062
1063 1063 if linked_to:
1064 1064 PullRequestModel()._link_comments_to_version(linked_to)
1065 1065
1066 1066 return comment
1067 1067
1068 1068 def create_inline_comment(
1069 1069 self, linked_to=None, line_no=u'n1', file_path='file_1'):
1070 1070 comment = CommentsModel().create(
1071 1071 text=u"Test comment",
1072 1072 repo=self.target_repository.repo_name,
1073 1073 user=self.author,
1074 1074 line_no=line_no,
1075 1075 f_path=file_path,
1076 1076 pull_request=self.pull_request)
1077 1077 assert comment.pull_request_version_id is None
1078 1078
1079 1079 if linked_to:
1080 1080 PullRequestModel()._link_comments_to_version(linked_to)
1081 1081
1082 1082 return comment
1083 1083
1084 1084 def create_version_of_pull_request(self):
1085 1085 pull_request = self.create_pull_request()
1086 1086 version = PullRequestModel()._create_version_from_snapshot(
1087 1087 pull_request)
1088 1088 return version
1089 1089
1090 1090 def create_status_votes(self, status, *reviewers):
1091 1091 for reviewer in reviewers:
1092 1092 ChangesetStatusModel().set_status(
1093 1093 repo=self.pull_request.target_repo,
1094 1094 status=status,
1095 1095 user=reviewer.user_id,
1096 1096 pull_request=self.pull_request)
1097 1097
1098 1098 def set_mergeable(self, value):
1099 1099 if not self.mergeable_patcher:
1100 1100 self.mergeable_patcher = mock.patch.object(
1101 1101 VcsSettingsModel, 'get_general_settings')
1102 1102 self.mergeable_mock = self.mergeable_patcher.start()
1103 1103 self.mergeable_mock.return_value = {
1104 1104 'rhodecode_pr_merge_enabled': value}
1105 1105
1106 1106 def cleanup(self):
1107 1107 # In case the source repository is already cleaned up, the pull
1108 1108 # request will already be deleted.
1109 1109 pull_request = PullRequest().get(self.pull_request_id)
1110 1110 if pull_request:
1111 1111 PullRequestModel().delete(pull_request, pull_request.author)
1112 1112 Session().commit()
1113 1113
1114 1114 if self.notification_patcher:
1115 1115 self.notification_patcher.stop()
1116 1116
1117 1117 if self.mergeable_patcher:
1118 1118 self.mergeable_patcher.stop()
1119 1119
1120 1120
1121 1121 @pytest.fixture
1122 1122 def user_admin(baseapp):
1123 1123 """
1124 1124 Provides the default admin test user as an instance of `db.User`.
1125 1125 """
1126 1126 user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
1127 1127 return user
1128 1128
1129 1129
1130 1130 @pytest.fixture
1131 1131 def user_regular(baseapp):
1132 1132 """
1133 1133 Provides the default regular test user as an instance of `db.User`.
1134 1134 """
1135 1135 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1136 1136 return user
1137 1137
1138 1138
1139 1139 @pytest.fixture
1140 1140 def user_util(request, db_connection):
1141 1141 """
1142 1142 Provides a wired instance of `UserUtility` with integrated cleanup.
1143 1143 """
1144 1144 utility = UserUtility(test_name=request.node.name)
1145 1145 request.addfinalizer(utility.cleanup)
1146 1146 return utility
1147 1147
1148 1148
1149 1149 # TODO: johbo: Split this up into utilities per domain or something similar
1150 1150 class UserUtility(object):
1151 1151
1152 1152 def __init__(self, test_name="test"):
1153 1153 self._test_name = self._sanitize_name(test_name)
1154 1154 self.fixture = Fixture()
1155 1155 self.repo_group_ids = []
1156 1156 self.repos_ids = []
1157 1157 self.user_ids = []
1158 1158 self.user_group_ids = []
1159 1159 self.user_repo_permission_ids = []
1160 1160 self.user_group_repo_permission_ids = []
1161 1161 self.user_repo_group_permission_ids = []
1162 1162 self.user_group_repo_group_permission_ids = []
1163 1163 self.user_user_group_permission_ids = []
1164 1164 self.user_group_user_group_permission_ids = []
1165 1165 self.user_permissions = []
1166 1166
1167 1167 def _sanitize_name(self, name):
1168 1168 for char in ['[', ']']:
1169 1169 name = name.replace(char, '_')
1170 1170 return name
1171 1171
1172 1172 def create_repo_group(
1173 1173 self, owner=TEST_USER_ADMIN_LOGIN, auto_cleanup=True):
1174 1174 group_name = "{prefix}_repogroup_{count}".format(
1175 1175 prefix=self._test_name,
1176 1176 count=len(self.repo_group_ids))
1177 1177 repo_group = self.fixture.create_repo_group(
1178 1178 group_name, cur_user=owner)
1179 1179 if auto_cleanup:
1180 1180 self.repo_group_ids.append(repo_group.group_id)
1181 1181 return repo_group
1182 1182
1183 1183 def create_repo(self, owner=TEST_USER_ADMIN_LOGIN, parent=None,
1184 1184 auto_cleanup=True, repo_type='hg', bare=False):
1185 1185 repo_name = "{prefix}_repository_{count}".format(
1186 1186 prefix=self._test_name,
1187 1187 count=len(self.repos_ids))
1188 1188
1189 1189 repository = self.fixture.create_repo(
1190 1190 repo_name, cur_user=owner, repo_group=parent, repo_type=repo_type, bare=bare)
1191 1191 if auto_cleanup:
1192 1192 self.repos_ids.append(repository.repo_id)
1193 1193 return repository
1194 1194
1195 1195 def create_user(self, auto_cleanup=True, **kwargs):
1196 1196 user_name = "{prefix}_user_{count}".format(
1197 1197 prefix=self._test_name,
1198 1198 count=len(self.user_ids))
1199 1199 user = self.fixture.create_user(user_name, **kwargs)
1200 1200 if auto_cleanup:
1201 1201 self.user_ids.append(user.user_id)
1202 1202 return user
1203 1203
1204 1204 def create_additional_user_email(self, user, email):
1205 1205 uem = self.fixture.create_additional_user_email(user=user, email=email)
1206 1206 return uem
1207 1207
1208 1208 def create_user_with_group(self):
1209 1209 user = self.create_user()
1210 1210 user_group = self.create_user_group(members=[user])
1211 1211 return user, user_group
1212 1212
1213 1213 def create_user_group(self, owner=TEST_USER_ADMIN_LOGIN, members=None,
1214 1214 auto_cleanup=True, **kwargs):
1215 1215 group_name = "{prefix}_usergroup_{count}".format(
1216 1216 prefix=self._test_name,
1217 1217 count=len(self.user_group_ids))
1218 1218 user_group = self.fixture.create_user_group(
1219 1219 group_name, cur_user=owner, **kwargs)
1220 1220
1221 1221 if auto_cleanup:
1222 1222 self.user_group_ids.append(user_group.users_group_id)
1223 1223 if members:
1224 1224 for user in members:
1225 1225 UserGroupModel().add_user_to_group(user_group, user)
1226 1226 return user_group
1227 1227
1228 1228 def grant_user_permission(self, user_name, permission_name):
1229 1229 self.inherit_default_user_permissions(user_name, False)
1230 1230 self.user_permissions.append((user_name, permission_name))
1231 1231
1232 1232 def grant_user_permission_to_repo_group(
1233 1233 self, repo_group, user, permission_name):
1234 1234 permission = RepoGroupModel().grant_user_permission(
1235 1235 repo_group, user, permission_name)
1236 1236 self.user_repo_group_permission_ids.append(
1237 1237 (repo_group.group_id, user.user_id))
1238 1238 return permission
1239 1239
1240 1240 def grant_user_group_permission_to_repo_group(
1241 1241 self, repo_group, user_group, permission_name):
1242 1242 permission = RepoGroupModel().grant_user_group_permission(
1243 1243 repo_group, user_group, permission_name)
1244 1244 self.user_group_repo_group_permission_ids.append(
1245 1245 (repo_group.group_id, user_group.users_group_id))
1246 1246 return permission
1247 1247
1248 1248 def grant_user_permission_to_repo(
1249 1249 self, repo, user, permission_name):
1250 1250 permission = RepoModel().grant_user_permission(
1251 1251 repo, user, permission_name)
1252 1252 self.user_repo_permission_ids.append(
1253 1253 (repo.repo_id, user.user_id))
1254 1254 return permission
1255 1255
1256 1256 def grant_user_group_permission_to_repo(
1257 1257 self, repo, user_group, permission_name):
1258 1258 permission = RepoModel().grant_user_group_permission(
1259 1259 repo, user_group, permission_name)
1260 1260 self.user_group_repo_permission_ids.append(
1261 1261 (repo.repo_id, user_group.users_group_id))
1262 1262 return permission
1263 1263
1264 1264 def grant_user_permission_to_user_group(
1265 1265 self, target_user_group, user, permission_name):
1266 1266 permission = UserGroupModel().grant_user_permission(
1267 1267 target_user_group, user, permission_name)
1268 1268 self.user_user_group_permission_ids.append(
1269 1269 (target_user_group.users_group_id, user.user_id))
1270 1270 return permission
1271 1271
1272 1272 def grant_user_group_permission_to_user_group(
1273 1273 self, target_user_group, user_group, permission_name):
1274 1274 permission = UserGroupModel().grant_user_group_permission(
1275 1275 target_user_group, user_group, permission_name)
1276 1276 self.user_group_user_group_permission_ids.append(
1277 1277 (target_user_group.users_group_id, user_group.users_group_id))
1278 1278 return permission
1279 1279
1280 1280 def revoke_user_permission(self, user_name, permission_name):
1281 1281 self.inherit_default_user_permissions(user_name, True)
1282 1282 UserModel().revoke_perm(user_name, permission_name)
1283 1283
1284 1284 def inherit_default_user_permissions(self, user_name, value):
1285 1285 user = UserModel().get_by_username(user_name)
1286 1286 user.inherit_default_permissions = value
1287 1287 Session().add(user)
1288 1288 Session().commit()
1289 1289
1290 1290 def cleanup(self):
1291 1291 self._cleanup_permissions()
1292 1292 self._cleanup_repos()
1293 1293 self._cleanup_repo_groups()
1294 1294 self._cleanup_user_groups()
1295 1295 self._cleanup_users()
1296 1296
1297 1297 def _cleanup_permissions(self):
1298 1298 if self.user_permissions:
1299 1299 for user_name, permission_name in self.user_permissions:
1300 1300 self.revoke_user_permission(user_name, permission_name)
1301 1301
1302 1302 for permission in self.user_repo_permission_ids:
1303 1303 RepoModel().revoke_user_permission(*permission)
1304 1304
1305 1305 for permission in self.user_group_repo_permission_ids:
1306 1306 RepoModel().revoke_user_group_permission(*permission)
1307 1307
1308 1308 for permission in self.user_repo_group_permission_ids:
1309 1309 RepoGroupModel().revoke_user_permission(*permission)
1310 1310
1311 1311 for permission in self.user_group_repo_group_permission_ids:
1312 1312 RepoGroupModel().revoke_user_group_permission(*permission)
1313 1313
1314 1314 for permission in self.user_user_group_permission_ids:
1315 1315 UserGroupModel().revoke_user_permission(*permission)
1316 1316
1317 1317 for permission in self.user_group_user_group_permission_ids:
1318 1318 UserGroupModel().revoke_user_group_permission(*permission)
1319 1319
1320 1320 def _cleanup_repo_groups(self):
1321 1321 def _repo_group_compare(first_group_id, second_group_id):
1322 1322 """
1323 1323 Gives higher priority to the groups with the most complex paths
1324 1324 """
1325 1325 first_group = RepoGroup.get(first_group_id)
1326 1326 second_group = RepoGroup.get(second_group_id)
1327 1327 first_group_parts = (
1328 1328 len(first_group.group_name.split('/')) if first_group else 0)
1329 1329 second_group_parts = (
1330 1330 len(second_group.group_name.split('/')) if second_group else 0)
1331 1331 return cmp(second_group_parts, first_group_parts)
1332 1332
1333 1333 sorted_repo_group_ids = sorted(
1334 1334 self.repo_group_ids, cmp=_repo_group_compare)
1335 1335 for repo_group_id in sorted_repo_group_ids:
1336 1336 self.fixture.destroy_repo_group(repo_group_id)
1337 1337
1338 1338 def _cleanup_repos(self):
1339 1339 sorted_repos_ids = sorted(self.repos_ids)
1340 1340 for repo_id in sorted_repos_ids:
1341 1341 self.fixture.destroy_repo(repo_id)
1342 1342
1343 1343 def _cleanup_user_groups(self):
1344 1344 def _user_group_compare(first_group_id, second_group_id):
1345 1345 """
1346 1346 Gives higher priority to the groups with the most complex paths
1347 1347 """
1348 1348 first_group = UserGroup.get(first_group_id)
1349 1349 second_group = UserGroup.get(second_group_id)
1350 1350 first_group_parts = (
1351 1351 len(first_group.users_group_name.split('/'))
1352 1352 if first_group else 0)
1353 1353 second_group_parts = (
1354 1354 len(second_group.users_group_name.split('/'))
1355 1355 if second_group else 0)
1356 1356 return cmp(second_group_parts, first_group_parts)
1357 1357
1358 1358 sorted_user_group_ids = sorted(
1359 1359 self.user_group_ids, cmp=_user_group_compare)
1360 1360 for user_group_id in sorted_user_group_ids:
1361 1361 self.fixture.destroy_user_group(user_group_id)
1362 1362
1363 1363 def _cleanup_users(self):
1364 1364 for user_id in self.user_ids:
1365 1365 self.fixture.destroy_user(user_id)
1366 1366
1367 1367
1368 1368 # TODO: Think about moving this into a pytest-pyro package and make it a
1369 1369 # pytest plugin
1370 1370 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
1371 1371 def pytest_runtest_makereport(item, call):
1372 1372 """
1373 1373 Adding the remote traceback if the exception has this information.
1374 1374
1375 1375 VCSServer attaches this information as the attribute `_vcs_server_traceback`
1376 1376 to the exception instance.
1377 1377 """
1378 1378 outcome = yield
1379 1379 report = outcome.get_result()
1380 1380 if call.excinfo:
1381 1381 _add_vcsserver_remote_traceback(report, call.excinfo.value)
1382 1382
1383 1383
1384 1384 def _add_vcsserver_remote_traceback(report, exc):
1385 1385 vcsserver_traceback = getattr(exc, '_vcs_server_traceback', None)
1386 1386
1387 1387 if vcsserver_traceback:
1388 1388 section = 'VCSServer remote traceback ' + report.when
1389 1389 report.sections.append((section, vcsserver_traceback))
1390 1390
1391 1391
1392 1392 @pytest.fixture(scope='session')
1393 1393 def testrun():
1394 1394 return {
1395 1395 'uuid': uuid.uuid4(),
1396 1396 'start': datetime.datetime.utcnow().isoformat(),
1397 1397 'timestamp': int(time.time()),
1398 1398 }
1399 1399
1400 1400
1401 1401 @pytest.fixture(autouse=True)
1402 1402 def collect_appenlight_stats(request, testrun):
1403 1403 """
1404 1404 This fixture reports memory consumtion of single tests.
1405 1405
1406 1406 It gathers data based on `psutil` and sends them to Appenlight. The option
1407 1407 ``--ae`` has te be used to enable this fixture and the API key for your
1408 1408 application has to be provided in ``--ae-key``.
1409 1409 """
1410 1410 try:
1411 1411 # cygwin cannot have yet psutil support.
1412 1412 import psutil
1413 1413 except ImportError:
1414 1414 return
1415 1415
1416 1416 if not request.config.getoption('--appenlight'):
1417 1417 return
1418 1418 else:
1419 1419 # Only request the baseapp fixture if appenlight tracking is
1420 1420 # enabled. This will speed up a test run of unit tests by 2 to 3
1421 1421 # seconds if appenlight is not enabled.
1422 1422 baseapp = request.getfuncargvalue("baseapp")
1423 1423 url = '{}/api/logs'.format(request.config.getoption('--appenlight-url'))
1424 1424 client = AppenlightClient(
1425 1425 url=url,
1426 1426 api_key=request.config.getoption('--appenlight-api-key'),
1427 1427 namespace=request.node.nodeid,
1428 1428 request=str(testrun['uuid']),
1429 1429 testrun=testrun)
1430 1430
1431 1431 client.collect({
1432 1432 'message': "Starting",
1433 1433 })
1434 1434
1435 1435 server_and_port = baseapp.config.get_settings()['vcs.server']
1436 1436 protocol = baseapp.config.get_settings()['vcs.server.protocol']
1437 1437 server = create_vcsserver_proxy(server_and_port, protocol)
1438 1438 with server:
1439 1439 vcs_pid = server.get_pid()
1440 1440 server.run_gc()
1441 1441 vcs_process = psutil.Process(vcs_pid)
1442 1442 mem = vcs_process.memory_info()
1443 1443 client.tag_before('vcsserver.rss', mem.rss)
1444 1444 client.tag_before('vcsserver.vms', mem.vms)
1445 1445
1446 1446 test_process = psutil.Process()
1447 1447 mem = test_process.memory_info()
1448 1448 client.tag_before('test.rss', mem.rss)
1449 1449 client.tag_before('test.vms', mem.vms)
1450 1450
1451 1451 client.tag_before('time', time.time())
1452 1452
1453 1453 @request.addfinalizer
1454 1454 def send_stats():
1455 1455 client.tag_after('time', time.time())
1456 1456 with server:
1457 1457 gc_stats = server.run_gc()
1458 1458 for tag, value in gc_stats.items():
1459 1459 client.tag_after(tag, value)
1460 1460 mem = vcs_process.memory_info()
1461 1461 client.tag_after('vcsserver.rss', mem.rss)
1462 1462 client.tag_after('vcsserver.vms', mem.vms)
1463 1463
1464 1464 mem = test_process.memory_info()
1465 1465 client.tag_after('test.rss', mem.rss)
1466 1466 client.tag_after('test.vms', mem.vms)
1467 1467
1468 1468 client.collect({
1469 1469 'message': "Finished",
1470 1470 })
1471 1471 client.send_stats()
1472 1472
1473 1473 return client
1474 1474
1475 1475
1476 1476 class AppenlightClient():
1477 1477
1478 1478 url_template = '{url}?protocol_version=0.5'
1479 1479
1480 1480 def __init__(
1481 1481 self, url, api_key, add_server=True, add_timestamp=True,
1482 1482 namespace=None, request=None, testrun=None):
1483 1483 self.url = self.url_template.format(url=url)
1484 1484 self.api_key = api_key
1485 1485 self.add_server = add_server
1486 1486 self.add_timestamp = add_timestamp
1487 1487 self.namespace = namespace
1488 1488 self.request = request
1489 1489 self.server = socket.getfqdn(socket.gethostname())
1490 1490 self.tags_before = {}
1491 1491 self.tags_after = {}
1492 1492 self.stats = []
1493 1493 self.testrun = testrun or {}
1494 1494
1495 1495 def tag_before(self, tag, value):
1496 1496 self.tags_before[tag] = value
1497 1497
1498 1498 def tag_after(self, tag, value):
1499 1499 self.tags_after[tag] = value
1500 1500
1501 1501 def collect(self, data):
1502 1502 if self.add_server:
1503 1503 data.setdefault('server', self.server)
1504 1504 if self.add_timestamp:
1505 1505 data.setdefault('date', datetime.datetime.utcnow().isoformat())
1506 1506 if self.namespace:
1507 1507 data.setdefault('namespace', self.namespace)
1508 1508 if self.request:
1509 1509 data.setdefault('request', self.request)
1510 1510 self.stats.append(data)
1511 1511
1512 1512 def send_stats(self):
1513 1513 tags = [
1514 1514 ('testrun', self.request),
1515 1515 ('testrun.start', self.testrun['start']),
1516 1516 ('testrun.timestamp', self.testrun['timestamp']),
1517 1517 ('test', self.namespace),
1518 1518 ]
1519 1519 for key, value in self.tags_before.items():
1520 1520 tags.append((key + '.before', value))
1521 1521 try:
1522 1522 delta = self.tags_after[key] - value
1523 1523 tags.append((key + '.delta', delta))
1524 1524 except Exception:
1525 1525 pass
1526 1526 for key, value in self.tags_after.items():
1527 1527 tags.append((key + '.after', value))
1528 1528 self.collect({
1529 1529 'message': "Collected tags",
1530 1530 'tags': tags,
1531 1531 })
1532 1532
1533 1533 response = requests.post(
1534 1534 self.url,
1535 1535 headers={
1536 1536 'X-appenlight-api-key': self.api_key},
1537 1537 json=self.stats,
1538 1538 )
1539 1539
1540 1540 if not response.status_code == 200:
1541 1541 pprint.pprint(self.stats)
1542 1542 print(response.headers)
1543 1543 print(response.text)
1544 1544 raise Exception('Sending to appenlight failed')
1545 1545
1546 1546
1547 1547 @pytest.fixture
1548 1548 def gist_util(request, db_connection):
1549 1549 """
1550 1550 Provides a wired instance of `GistUtility` with integrated cleanup.
1551 1551 """
1552 1552 utility = GistUtility()
1553 1553 request.addfinalizer(utility.cleanup)
1554 1554 return utility
1555 1555
1556 1556
1557 1557 class GistUtility(object):
1558 1558 def __init__(self):
1559 1559 self.fixture = Fixture()
1560 1560 self.gist_ids = []
1561 1561
1562 1562 def create_gist(self, **kwargs):
1563 1563 gist = self.fixture.create_gist(**kwargs)
1564 1564 self.gist_ids.append(gist.gist_id)
1565 1565 return gist
1566 1566
1567 1567 def cleanup(self):
1568 1568 for id_ in self.gist_ids:
1569 1569 self.fixture.destroy_gists(str(id_))
1570 1570
1571 1571
1572 1572 @pytest.fixture
1573 1573 def enabled_backends(request):
1574 1574 backends = request.config.option.backends
1575 1575 return backends[:]
1576 1576
1577 1577
1578 1578 @pytest.fixture
1579 1579 def settings_util(request, db_connection):
1580 1580 """
1581 1581 Provides a wired instance of `SettingsUtility` with integrated cleanup.
1582 1582 """
1583 1583 utility = SettingsUtility()
1584 1584 request.addfinalizer(utility.cleanup)
1585 1585 return utility
1586 1586
1587 1587
1588 1588 class SettingsUtility(object):
1589 1589 def __init__(self):
1590 1590 self.rhodecode_ui_ids = []
1591 1591 self.rhodecode_setting_ids = []
1592 1592 self.repo_rhodecode_ui_ids = []
1593 1593 self.repo_rhodecode_setting_ids = []
1594 1594
1595 1595 def create_repo_rhodecode_ui(
1596 1596 self, repo, section, value, key=None, active=True, cleanup=True):
1597 1597 key = key or hashlib.sha1(
1598 1598 '{}{}{}'.format(section, value, repo.repo_id)).hexdigest()
1599 1599
1600 1600 setting = RepoRhodeCodeUi()
1601 1601 setting.repository_id = repo.repo_id
1602 1602 setting.ui_section = section
1603 1603 setting.ui_value = value
1604 1604 setting.ui_key = key
1605 1605 setting.ui_active = active
1606 1606 Session().add(setting)
1607 1607 Session().commit()
1608 1608
1609 1609 if cleanup:
1610 1610 self.repo_rhodecode_ui_ids.append(setting.ui_id)
1611 1611 return setting
1612 1612
1613 1613 def create_rhodecode_ui(
1614 1614 self, section, value, key=None, active=True, cleanup=True):
1615 1615 key = key or hashlib.sha1('{}{}'.format(section, value)).hexdigest()
1616 1616
1617 1617 setting = RhodeCodeUi()
1618 1618 setting.ui_section = section
1619 1619 setting.ui_value = value
1620 1620 setting.ui_key = key
1621 1621 setting.ui_active = active
1622 1622 Session().add(setting)
1623 1623 Session().commit()
1624 1624
1625 1625 if cleanup:
1626 1626 self.rhodecode_ui_ids.append(setting.ui_id)
1627 1627 return setting
1628 1628
1629 1629 def create_repo_rhodecode_setting(
1630 1630 self, repo, name, value, type_, cleanup=True):
1631 1631 setting = RepoRhodeCodeSetting(
1632 1632 repo.repo_id, key=name, val=value, type=type_)
1633 1633 Session().add(setting)
1634 1634 Session().commit()
1635 1635
1636 1636 if cleanup:
1637 1637 self.repo_rhodecode_setting_ids.append(setting.app_settings_id)
1638 1638 return setting
1639 1639
1640 1640 def create_rhodecode_setting(self, name, value, type_, cleanup=True):
1641 1641 setting = RhodeCodeSetting(key=name, val=value, type=type_)
1642 1642 Session().add(setting)
1643 1643 Session().commit()
1644 1644
1645 1645 if cleanup:
1646 1646 self.rhodecode_setting_ids.append(setting.app_settings_id)
1647 1647
1648 1648 return setting
1649 1649
1650 1650 def cleanup(self):
1651 1651 for id_ in self.rhodecode_ui_ids:
1652 1652 setting = RhodeCodeUi.get(id_)
1653 1653 Session().delete(setting)
1654 1654
1655 1655 for id_ in self.rhodecode_setting_ids:
1656 1656 setting = RhodeCodeSetting.get(id_)
1657 1657 Session().delete(setting)
1658 1658
1659 1659 for id_ in self.repo_rhodecode_ui_ids:
1660 1660 setting = RepoRhodeCodeUi.get(id_)
1661 1661 Session().delete(setting)
1662 1662
1663 1663 for id_ in self.repo_rhodecode_setting_ids:
1664 1664 setting = RepoRhodeCodeSetting.get(id_)
1665 1665 Session().delete(setting)
1666 1666
1667 1667 Session().commit()
1668 1668
1669 1669
1670 1670 @pytest.fixture
1671 1671 def no_notifications(request):
1672 1672 notification_patcher = mock.patch(
1673 1673 'rhodecode.model.notification.NotificationModel.create')
1674 1674 notification_patcher.start()
1675 1675 request.addfinalizer(notification_patcher.stop)
1676 1676
1677 1677
1678 1678 @pytest.fixture(scope='session')
1679 1679 def repeat(request):
1680 1680 """
1681 1681 The number of repetitions is based on this fixture.
1682 1682
1683 1683 Slower calls may divide it by 10 or 100. It is chosen in a way so that the
1684 1684 tests are not too slow in our default test suite.
1685 1685 """
1686 1686 return request.config.getoption('--repeat')
1687 1687
1688 1688
1689 1689 @pytest.fixture
1690 1690 def rhodecode_fixtures():
1691 1691 return Fixture()
1692 1692
1693 1693
1694 1694 @pytest.fixture
1695 1695 def context_stub():
1696 1696 """
1697 1697 Stub context object.
1698 1698 """
1699 1699 context = pyramid.testing.DummyResource()
1700 1700 return context
1701 1701
1702 1702
1703 1703 @pytest.fixture
1704 1704 def request_stub():
1705 1705 """
1706 1706 Stub request object.
1707 1707 """
1708 1708 from rhodecode.lib.base import bootstrap_request
1709 1709 request = bootstrap_request(scheme='https')
1710 1710 return request
1711 1711
1712 1712
1713 1713 @pytest.fixture
1714 1714 def config_stub(request, request_stub):
1715 1715 """
1716 1716 Set up pyramid.testing and return the Configurator.
1717 1717 """
1718 1718 from rhodecode.lib.base import bootstrap_config
1719 1719 config = bootstrap_config(request=request_stub)
1720 1720
1721 1721 @request.addfinalizer
1722 1722 def cleanup():
1723 1723 pyramid.testing.tearDown()
1724 1724
1725 1725 return config
1726 1726
1727 1727
1728 1728 @pytest.fixture
1729 1729 def StubIntegrationType():
1730 1730 class _StubIntegrationType(IntegrationTypeBase):
1731 1731 """ Test integration type class """
1732 1732
1733 1733 key = 'test'
1734 1734 display_name = 'Test integration type'
1735 1735 description = 'A test integration type for testing'
1736 1736
1737 1737 @classmethod
1738 1738 def icon(cls):
1739 1739 return 'test_icon_html_image'
1740 1740
1741 1741 def __init__(self, settings):
1742 1742 super(_StubIntegrationType, self).__init__(settings)
1743 1743 self.sent_events = [] # for testing
1744 1744
1745 1745 def send_event(self, event):
1746 1746 self.sent_events.append(event)
1747 1747
1748 1748 def settings_schema(self):
1749 1749 class SettingsSchema(colander.Schema):
1750 1750 test_string_field = colander.SchemaNode(
1751 1751 colander.String(),
1752 1752 missing=colander.required,
1753 1753 title='test string field',
1754 1754 )
1755 1755 test_int_field = colander.SchemaNode(
1756 1756 colander.Int(),
1757 1757 title='some integer setting',
1758 1758 )
1759 1759 return SettingsSchema()
1760 1760
1761 1761
1762 1762 integration_type_registry.register_integration_type(_StubIntegrationType)
1763 1763 return _StubIntegrationType
1764 1764
1765 1765 @pytest.fixture
1766 1766 def stub_integration_settings():
1767 1767 return {
1768 1768 'test_string_field': 'some data',
1769 1769 'test_int_field': 100,
1770 1770 }
1771 1771
1772 1772
1773 1773 @pytest.fixture
1774 1774 def repo_integration_stub(request, repo_stub, StubIntegrationType,
1775 1775 stub_integration_settings):
1776 1776 integration = IntegrationModel().create(
1777 1777 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1778 1778 name='test repo integration',
1779 1779 repo=repo_stub, repo_group=None, child_repos_only=None)
1780 1780
1781 1781 @request.addfinalizer
1782 1782 def cleanup():
1783 1783 IntegrationModel().delete(integration)
1784 1784
1785 1785 return integration
1786 1786
1787 1787
1788 1788 @pytest.fixture
1789 1789 def repogroup_integration_stub(request, test_repo_group, StubIntegrationType,
1790 1790 stub_integration_settings):
1791 1791 integration = IntegrationModel().create(
1792 1792 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1793 1793 name='test repogroup integration',
1794 1794 repo=None, repo_group=test_repo_group, child_repos_only=True)
1795 1795
1796 1796 @request.addfinalizer
1797 1797 def cleanup():
1798 1798 IntegrationModel().delete(integration)
1799 1799
1800 1800 return integration
1801 1801
1802 1802
1803 1803 @pytest.fixture
1804 1804 def repogroup_recursive_integration_stub(request, test_repo_group,
1805 1805 StubIntegrationType, stub_integration_settings):
1806 1806 integration = IntegrationModel().create(
1807 1807 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1808 1808 name='test recursive repogroup integration',
1809 1809 repo=None, repo_group=test_repo_group, child_repos_only=False)
1810 1810
1811 1811 @request.addfinalizer
1812 1812 def cleanup():
1813 1813 IntegrationModel().delete(integration)
1814 1814
1815 1815 return integration
1816 1816
1817 1817
1818 1818 @pytest.fixture
1819 1819 def global_integration_stub(request, StubIntegrationType,
1820 1820 stub_integration_settings):
1821 1821 integration = IntegrationModel().create(
1822 1822 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1823 1823 name='test global integration',
1824 1824 repo=None, repo_group=None, child_repos_only=None)
1825 1825
1826 1826 @request.addfinalizer
1827 1827 def cleanup():
1828 1828 IntegrationModel().delete(integration)
1829 1829
1830 1830 return integration
1831 1831
1832 1832
1833 1833 @pytest.fixture
1834 1834 def root_repos_integration_stub(request, StubIntegrationType,
1835 1835 stub_integration_settings):
1836 1836 integration = IntegrationModel().create(
1837 1837 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1838 1838 name='test global integration',
1839 1839 repo=None, repo_group=None, child_repos_only=True)
1840 1840
1841 1841 @request.addfinalizer
1842 1842 def cleanup():
1843 1843 IntegrationModel().delete(integration)
1844 1844
1845 1845 return integration
1846 1846
1847 1847
1848 1848 @pytest.fixture
1849 1849 def local_dt_to_utc():
1850 1850 def _factory(dt):
1851 1851 return dt.replace(tzinfo=dateutil.tz.tzlocal()).astimezone(
1852 1852 dateutil.tz.tzutc()).replace(tzinfo=None)
1853 1853 return _factory
1854 1854
1855 1855
1856 1856 @pytest.fixture
1857 1857 def disable_anonymous_user(request, baseapp):
1858 1858 set_anonymous_access(False)
1859 1859
1860 1860 @request.addfinalizer
1861 1861 def cleanup():
1862 1862 set_anonymous_access(True)
1863 1863
1864 1864
1865 1865 @pytest.fixture(scope='module')
1866 1866 def rc_fixture(request):
1867 1867 return Fixture()
1868 1868
1869 1869
1870 1870 @pytest.fixture
1871 1871 def repo_groups(request):
1872 1872 fixture = Fixture()
1873 1873
1874 1874 session = Session()
1875 1875 zombie_group = fixture.create_repo_group('zombie')
1876 1876 parent_group = fixture.create_repo_group('parent')
1877 1877 child_group = fixture.create_repo_group('parent/child')
1878 1878 groups_in_db = session.query(RepoGroup).all()
1879 1879 assert len(groups_in_db) == 3
1880 1880 assert child_group.group_parent_id == parent_group.group_id
1881 1881
1882 1882 @request.addfinalizer
1883 1883 def cleanup():
1884 1884 fixture.destroy_repo_group(zombie_group)
1885 1885 fixture.destroy_repo_group(child_group)
1886 1886 fixture.destroy_repo_group(parent_group)
1887 1887
1888 1888 return zombie_group, parent_group, child_group
1889
1890
1891 @pytest.fixture(scope="session")
1892 def tmp_path_factory(request):
1893 """Return a :class:`_pytest.tmpdir.TempPathFactory` instance for the test session.
1894 """
1895
1896 class TempPathFactory:
1897
1898 def mktemp(self, basename):
1899 import tempfile
1900 return tempfile.mktemp(basename)
1901
1902 return TempPathFactory()
@@ -1,1293 +1,1275 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import datetime
22 22 import mock
23 23 import os
24 24 import sys
25 25 import shutil
26 26
27 27 import pytest
28 28
29 29 from rhodecode.lib.utils import make_db_config
30 30 from rhodecode.lib.vcs.backends.base import Reference
31 31 from rhodecode.lib.vcs.backends.git import (
32 32 GitRepository, GitCommit, discover_git_version)
33 33 from rhodecode.lib.vcs.exceptions import (
34 34 RepositoryError, VCSError, NodeDoesNotExistError)
35 35 from rhodecode.lib.vcs.nodes import (
36 36 NodeKind, FileNode, DirNode, NodeState, SubModuleNode)
37 37 from rhodecode.tests import TEST_GIT_REPO, TEST_GIT_REPO_CLONE, get_new_dir
38 38 from rhodecode.tests.vcs.conftest import BackendTestMixin
39 39
40 40
41 41 pytestmark = pytest.mark.backends("git")
42 42
43 43
44 def repo_path_generator():
45 """
46 Return a different path to be used for cloning repos.
47 """
48 i = 0
49 while True:
50 i += 1
51 yield '%s-%d' % (TEST_GIT_REPO_CLONE, i)
52
53
54 REPO_PATH_GENERATOR = repo_path_generator()
55
56
57 class TestGitRepository:
58
59 # pylint: disable=protected-access
60
61 def __check_for_existing_repo(self):
62 if os.path.exists(TEST_GIT_REPO_CLONE):
63 self.fail('Cannot test git clone repo as location %s already '
64 'exists. You should manually remove it first.'
65 % TEST_GIT_REPO_CLONE)
44 class TestGitRepository(object):
66 45
67 46 @pytest.fixture(autouse=True)
68 47 def prepare(self, request, baseapp):
69 48 self.repo = GitRepository(TEST_GIT_REPO, bare=True)
70 49
71 def get_clone_repo(self):
50 def get_clone_repo(self, tmp_path_factory):
72 51 """
73 52 Return a non bare clone of the base repo.
74 53 """
75 clone_path = next(REPO_PATH_GENERATOR)
54 clone_path = tmp_path_factory.mktemp('clone-url')
76 55 repo_clone = GitRepository(
77 56 clone_path, create=True, src_url=self.repo.path, bare=False)
78 57
79 58 return repo_clone
80 59
81 def get_empty_repo(self, bare=False):
60 def get_empty_repo(self, tmp_path_factory, bare=False):
82 61 """
83 62 Return a non bare empty repo.
84 63 """
85 return GitRepository(next(REPO_PATH_GENERATOR), create=True, bare=bare)
64 clone_path = tmp_path_factory.mktemp('empty-repo')
65 return GitRepository(clone_path, create=True, bare=bare)
86 66
87 67 def test_wrong_repo_path(self):
88 68 wrong_repo_path = '/tmp/errorrepo_git'
89 69 with pytest.raises(RepositoryError):
90 70 GitRepository(wrong_repo_path)
91 71
92 def test_repo_clone(self):
93 self.__check_for_existing_repo()
72 def test_repo_clone(self, tmp_path_factory):
94 73 repo = GitRepository(TEST_GIT_REPO)
74 clone_path = tmp_path_factory.mktemp('_') + '_' + TEST_GIT_REPO_CLONE
95 75 repo_clone = GitRepository(
96 TEST_GIT_REPO_CLONE,
76 clone_path,
97 77 src_url=TEST_GIT_REPO, create=True, do_workspace_checkout=True)
78
98 79 assert len(repo.commit_ids) == len(repo_clone.commit_ids)
99 80 # Checking hashes of commits should be enough
100 81 for commit in repo.get_commits():
101 82 raw_id = commit.raw_id
102 83 assert raw_id == repo_clone.get_commit(raw_id).raw_id
103 84
104 85 def test_repo_clone_without_create(self):
105 86 with pytest.raises(RepositoryError):
106 87 GitRepository(
107 88 TEST_GIT_REPO_CLONE + '_wo_create', src_url=TEST_GIT_REPO)
108 89
109 def test_repo_clone_with_update(self):
90 def test_repo_clone_with_update(self, tmp_path_factory):
110 91 repo = GitRepository(TEST_GIT_REPO)
111 clone_path = TEST_GIT_REPO_CLONE + '_with_update'
92 clone_path = tmp_path_factory.mktemp('_') + '_' + TEST_GIT_REPO_CLONE + '_update'
93
112 94 repo_clone = GitRepository(
113 95 clone_path,
114 96 create=True, src_url=TEST_GIT_REPO, do_workspace_checkout=True)
115 97 assert len(repo.commit_ids) == len(repo_clone.commit_ids)
116 98
117 99 # check if current workdir was updated
118 100 fpath = os.path.join(clone_path, 'MANIFEST.in')
119 101 assert os.path.isfile(fpath)
120 102
121 def test_repo_clone_without_update(self):
103 def test_repo_clone_without_update(self, tmp_path_factory):
122 104 repo = GitRepository(TEST_GIT_REPO)
123 clone_path = TEST_GIT_REPO_CLONE + '_without_update'
105 clone_path = tmp_path_factory.mktemp('_') + '_' + TEST_GIT_REPO_CLONE + '_without_update'
124 106 repo_clone = GitRepository(
125 107 clone_path,
126 108 create=True, src_url=TEST_GIT_REPO, do_workspace_checkout=False)
127 109 assert len(repo.commit_ids) == len(repo_clone.commit_ids)
128 110 # check if current workdir was *NOT* updated
129 111 fpath = os.path.join(clone_path, 'MANIFEST.in')
130 112 # Make sure it's not bare repo
131 113 assert not repo_clone.bare
132 114 assert not os.path.isfile(fpath)
133 115
134 def test_repo_clone_into_bare_repo(self):
116 def test_repo_clone_into_bare_repo(self, tmp_path_factory):
135 117 repo = GitRepository(TEST_GIT_REPO)
136 clone_path = TEST_GIT_REPO_CLONE + '_bare.git'
118 clone_path = tmp_path_factory.mktemp('_') + '_' + TEST_GIT_REPO_CLONE + '_bare.git'
137 119 repo_clone = GitRepository(
138 120 clone_path, create=True, src_url=repo.path, bare=True)
139 121 assert repo_clone.bare
140 122
141 123 def test_create_repo_is_not_bare_by_default(self):
142 124 repo = GitRepository(get_new_dir('not-bare-by-default'), create=True)
143 125 assert not repo.bare
144 126
145 127 def test_create_bare_repo(self):
146 128 repo = GitRepository(get_new_dir('bare-repo'), create=True, bare=True)
147 129 assert repo.bare
148 130
149 131 def test_update_server_info(self):
150 132 self.repo._update_server_info()
151 133
152 134 def test_fetch(self, vcsbackend_git):
153 135 # Note: This is a git specific part of the API, it's only implemented
154 136 # by the git backend.
155 137 source_repo = vcsbackend_git.repo
156 138 target_repo = vcsbackend_git.create_repo(bare=True)
157 139 target_repo.fetch(source_repo.path)
158 140 # Note: Get a fresh instance, avoids caching trouble
159 141 target_repo = vcsbackend_git.backend(target_repo.path)
160 142 assert len(source_repo.commit_ids) == len(target_repo.commit_ids)
161 143
162 144 def test_commit_ids(self):
163 145 # there are 112 commits (by now)
164 146 # so we can assume they would be available from now on
165 147 subset = {'c1214f7e79e02fc37156ff215cd71275450cffc3',
166 148 '38b5fe81f109cb111f549bfe9bb6b267e10bc557',
167 149 'fa6600f6848800641328adbf7811fd2372c02ab2',
168 150 '102607b09cdd60e2793929c4f90478be29f85a17',
169 151 '49d3fd156b6f7db46313fac355dca1a0b94a0017',
170 152 '2d1028c054665b962fa3d307adfc923ddd528038',
171 153 'd7e0d30fbcae12c90680eb095a4f5f02505ce501',
172 154 'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
173 155 'dd80b0f6cf5052f17cc738c2951c4f2070200d7f',
174 156 '8430a588b43b5d6da365400117c89400326e7992',
175 157 'd955cd312c17b02143c04fa1099a352b04368118',
176 158 'f67b87e5c629c2ee0ba58f85197e423ff28d735b',
177 159 'add63e382e4aabc9e1afdc4bdc24506c269b7618',
178 160 'f298fe1189f1b69779a4423f40b48edf92a703fc',
179 161 'bd9b619eb41994cac43d67cf4ccc8399c1125808',
180 162 '6e125e7c890379446e98980d8ed60fba87d0f6d1',
181 163 'd4a54db9f745dfeba6933bf5b1e79e15d0af20bd',
182 164 '0b05e4ed56c802098dfc813cbe779b2f49e92500',
183 165 '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
184 166 '45223f8f114c64bf4d6f853e3c35a369a6305520',
185 167 'ca1eb7957a54bce53b12d1a51b13452f95bc7c7e',
186 168 'f5ea29fc42ef67a2a5a7aecff10e1566699acd68',
187 169 '27d48942240f5b91dfda77accd2caac94708cc7d',
188 170 '622f0eb0bafd619d2560c26f80f09e3b0b0d78af',
189 171 'e686b958768ee96af8029fe19c6050b1a8dd3b2b'}
190 172 assert subset.issubset(set(self.repo.commit_ids))
191 173
192 174 def test_slicing(self):
193 175 # 4 1 5 10 95
194 176 for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5),
195 177 (10, 20, 10), (5, 100, 95)]:
196 178 commit_ids = list(self.repo[sfrom:sto])
197 179 assert len(commit_ids) == size
198 180 assert commit_ids[0] == self.repo.get_commit(commit_idx=sfrom)
199 181 assert commit_ids[-1] == self.repo.get_commit(commit_idx=sto - 1)
200 182
201 183 def test_branches(self):
202 184 # TODO: Need more tests here
203 185 # Removed (those are 'remotes' branches for cloned repo)
204 186 # assert 'master' in self.repo.branches
205 187 # assert 'gittree' in self.repo.branches
206 188 # assert 'web-branch' in self.repo.branches
207 189 for __, commit_id in self.repo.branches.items():
208 190 assert isinstance(self.repo.get_commit(commit_id), GitCommit)
209 191
210 192 def test_tags(self):
211 193 # TODO: Need more tests here
212 194 assert 'v0.1.1' in self.repo.tags
213 195 assert 'v0.1.2' in self.repo.tags
214 196 for __, commit_id in self.repo.tags.items():
215 197 assert isinstance(self.repo.get_commit(commit_id), GitCommit)
216 198
217 199 def _test_single_commit_cache(self, commit_id):
218 200 commit = self.repo.get_commit(commit_id)
219 201 assert commit_id in self.repo.commits
220 202 assert commit is self.repo.commits[commit_id]
221 203
222 204 def test_initial_commit(self):
223 205 commit_id = self.repo.commit_ids[0]
224 206 init_commit = self.repo.get_commit(commit_id)
225 207 init_author = init_commit.author
226 208
227 209 assert init_commit.message == 'initial import\n'
228 210 assert init_author == 'Marcin Kuzminski <marcin@python-blog.com>'
229 211 assert init_author == init_commit.committer
230 212 for path in ('vcs/__init__.py',
231 213 'vcs/backends/BaseRepository.py',
232 214 'vcs/backends/__init__.py'):
233 215 assert isinstance(init_commit.get_node(path), FileNode)
234 216 for path in ('', 'vcs', 'vcs/backends'):
235 217 assert isinstance(init_commit.get_node(path), DirNode)
236 218
237 219 with pytest.raises(NodeDoesNotExistError):
238 220 init_commit.get_node(path='foobar')
239 221
240 222 node = init_commit.get_node('vcs/')
241 223 assert hasattr(node, 'kind')
242 224 assert node.kind == NodeKind.DIR
243 225
244 226 node = init_commit.get_node('vcs')
245 227 assert hasattr(node, 'kind')
246 228 assert node.kind == NodeKind.DIR
247 229
248 230 node = init_commit.get_node('vcs/__init__.py')
249 231 assert hasattr(node, 'kind')
250 232 assert node.kind == NodeKind.FILE
251 233
252 234 def test_not_existing_commit(self):
253 235 with pytest.raises(RepositoryError):
254 236 self.repo.get_commit('f' * 40)
255 237
256 238 def test_commit10(self):
257 239
258 240 commit10 = self.repo.get_commit(self.repo.commit_ids[9])
259 241 README = """===
260 242 VCS
261 243 ===
262 244
263 245 Various Version Control System management abstraction layer for Python.
264 246
265 247 Introduction
266 248 ------------
267 249
268 250 TODO: To be written...
269 251
270 252 """
271 253 node = commit10.get_node('README.rst')
272 254 assert node.kind == NodeKind.FILE
273 255 assert node.content == README
274 256
275 257 def test_head(self):
276 258 assert self.repo.head == self.repo.get_commit().raw_id
277 259
278 def test_checkout_with_create(self):
279 repo_clone = self.get_clone_repo()
260 def test_checkout_with_create(self, tmp_path_factory):
261 repo_clone = self.get_clone_repo(tmp_path_factory)
280 262
281 263 new_branch = 'new_branch'
282 264 assert repo_clone._current_branch() == 'master'
283 265 assert set(repo_clone.branches) == {'master'}
284 266 repo_clone._checkout(new_branch, create=True)
285 267
286 268 # Branches is a lazy property so we need to recrete the Repo object.
287 269 repo_clone = GitRepository(repo_clone.path)
288 270 assert set(repo_clone.branches) == {'master', new_branch}
289 271 assert repo_clone._current_branch() == new_branch
290 272
291 def test_checkout(self):
292 repo_clone = self.get_clone_repo()
273 def test_checkout(self, tmp_path_factory):
274 repo_clone = self.get_clone_repo(tmp_path_factory)
293 275
294 276 repo_clone._checkout('new_branch', create=True)
295 277 repo_clone._checkout('master')
296 278
297 279 assert repo_clone._current_branch() == 'master'
298 280
299 def test_checkout_same_branch(self):
300 repo_clone = self.get_clone_repo()
281 def test_checkout_same_branch(self, tmp_path_factory):
282 repo_clone = self.get_clone_repo(tmp_path_factory)
301 283
302 284 repo_clone._checkout('master')
303 285 assert repo_clone._current_branch() == 'master'
304 286
305 def test_checkout_branch_already_exists(self):
306 repo_clone = self.get_clone_repo()
287 def test_checkout_branch_already_exists(self, tmp_path_factory):
288 repo_clone = self.get_clone_repo(tmp_path_factory)
307 289
308 290 with pytest.raises(RepositoryError):
309 291 repo_clone._checkout('master', create=True)
310 292
311 293 def test_checkout_bare_repo(self):
312 294 with pytest.raises(RepositoryError):
313 295 self.repo._checkout('master')
314 296
315 297 def test_current_branch_bare_repo(self):
316 298 with pytest.raises(RepositoryError):
317 299 self.repo._current_branch()
318 300
319 def test_current_branch_empty_repo(self):
320 repo = self.get_empty_repo()
301 def test_current_branch_empty_repo(self, tmp_path_factory):
302 repo = self.get_empty_repo(tmp_path_factory)
321 303 assert repo._current_branch() is None
322 304
323 def test_local_clone(self):
324 clone_path = next(REPO_PATH_GENERATOR)
305 def test_local_clone(self, tmp_path_factory):
306 clone_path = tmp_path_factory.mktemp('test-local-clone')
325 307 self.repo._local_clone(clone_path, 'master')
326 308 repo_clone = GitRepository(clone_path)
327 309
328 310 assert self.repo.commit_ids == repo_clone.commit_ids
329 311
330 def test_local_clone_with_specific_branch(self):
331 source_repo = self.get_clone_repo()
312 def test_local_clone_with_specific_branch(self, tmp_path_factory):
313 source_repo = self.get_clone_repo(tmp_path_factory)
332 314
333 315 # Create a new branch in source repo
334 316 new_branch_commit = source_repo.commit_ids[-3]
335 317 source_repo._checkout(new_branch_commit)
336 318 source_repo._checkout('new_branch', create=True)
337 319
338 clone_path = next(REPO_PATH_GENERATOR)
320 clone_path = tmp_path_factory.mktemp('git-clone-path-1')
339 321 source_repo._local_clone(clone_path, 'new_branch')
340 322 repo_clone = GitRepository(clone_path)
341 323
342 324 assert source_repo.commit_ids[:-3 + 1] == repo_clone.commit_ids
343 325
344 clone_path = next(REPO_PATH_GENERATOR)
326 clone_path = tmp_path_factory.mktemp('git-clone-path-2')
345 327 source_repo._local_clone(clone_path, 'master')
346 328 repo_clone = GitRepository(clone_path)
347 329
348 330 assert source_repo.commit_ids == repo_clone.commit_ids
349 331
350 332 def test_local_clone_fails_if_target_exists(self):
351 333 with pytest.raises(RepositoryError):
352 334 self.repo._local_clone(self.repo.path, 'master')
353 335
354 def test_local_fetch(self):
355 target_repo = self.get_empty_repo()
356 source_repo = self.get_clone_repo()
336 def test_local_fetch(self, tmp_path_factory):
337 target_repo = self.get_empty_repo(tmp_path_factory)
338 source_repo = self.get_clone_repo(tmp_path_factory)
357 339
358 340 # Create a new branch in source repo
359 341 master_commit = source_repo.commit_ids[-1]
360 342 new_branch_commit = source_repo.commit_ids[-3]
361 343 source_repo._checkout(new_branch_commit)
362 344 source_repo._checkout('new_branch', create=True)
363 345
364 346 target_repo._local_fetch(source_repo.path, 'new_branch')
365 347 assert target_repo._last_fetch_heads() == [new_branch_commit]
366 348
367 349 target_repo._local_fetch(source_repo.path, 'master')
368 350 assert target_repo._last_fetch_heads() == [master_commit]
369 351
370 def test_local_fetch_from_bare_repo(self):
371 target_repo = self.get_empty_repo()
352 def test_local_fetch_from_bare_repo(self, tmp_path_factory):
353 target_repo = self.get_empty_repo(tmp_path_factory)
372 354 target_repo._local_fetch(self.repo.path, 'master')
373 355
374 356 master_commit = self.repo.commit_ids[-1]
375 357 assert target_repo._last_fetch_heads() == [master_commit]
376 358
377 359 def test_local_fetch_from_same_repo(self):
378 360 with pytest.raises(ValueError):
379 361 self.repo._local_fetch(self.repo.path, 'master')
380 362
381 def test_local_fetch_branch_does_not_exist(self):
382 target_repo = self.get_empty_repo()
363 def test_local_fetch_branch_does_not_exist(self, tmp_path_factory):
364 target_repo = self.get_empty_repo(tmp_path_factory)
383 365
384 366 with pytest.raises(RepositoryError):
385 367 target_repo._local_fetch(self.repo.path, 'new_branch')
386 368
387 def test_local_pull(self):
388 target_repo = self.get_empty_repo()
389 source_repo = self.get_clone_repo()
369 def test_local_pull(self, tmp_path_factory):
370 target_repo = self.get_empty_repo(tmp_path_factory)
371 source_repo = self.get_clone_repo(tmp_path_factory)
390 372
391 373 # Create a new branch in source repo
392 374 master_commit = source_repo.commit_ids[-1]
393 375 new_branch_commit = source_repo.commit_ids[-3]
394 376 source_repo._checkout(new_branch_commit)
395 377 source_repo._checkout('new_branch', create=True)
396 378
397 379 target_repo._local_pull(source_repo.path, 'new_branch')
398 380 target_repo = GitRepository(target_repo.path)
399 381 assert target_repo.head == new_branch_commit
400 382
401 383 target_repo._local_pull(source_repo.path, 'master')
402 384 target_repo = GitRepository(target_repo.path)
403 385 assert target_repo.head == master_commit
404 386
405 387 def test_local_pull_in_bare_repo(self):
406 388 with pytest.raises(RepositoryError):
407 389 self.repo._local_pull(self.repo.path, 'master')
408 390
409 def test_local_merge(self):
410 target_repo = self.get_empty_repo()
411 source_repo = self.get_clone_repo()
391 def test_local_merge(self, tmp_path_factory):
392 target_repo = self.get_empty_repo(tmp_path_factory)
393 source_repo = self.get_clone_repo(tmp_path_factory)
412 394
413 395 # Create a new branch in source repo
414 396 master_commit = source_repo.commit_ids[-1]
415 397 new_branch_commit = source_repo.commit_ids[-3]
416 398 source_repo._checkout(new_branch_commit)
417 399 source_repo._checkout('new_branch', create=True)
418 400
419 401 # This is required as one cannot do a -ff-only merge in an empty repo.
420 402 target_repo._local_pull(source_repo.path, 'new_branch')
421 403
422 404 target_repo._local_fetch(source_repo.path, 'master')
423 405 merge_message = 'Merge message\n\nDescription:...'
424 406 user_name = 'Albert Einstein'
425 407 user_email = 'albert@einstein.com'
426 408 target_repo._local_merge(merge_message, user_name, user_email,
427 409 target_repo._last_fetch_heads())
428 410
429 411 target_repo = GitRepository(target_repo.path)
430 412 assert target_repo.commit_ids[-2] == master_commit
431 413 last_commit = target_repo.get_commit(target_repo.head)
432 414 assert last_commit.message.strip() == merge_message
433 415 assert last_commit.author == '%s <%s>' % (user_name, user_email)
434 416
435 417 assert not os.path.exists(
436 418 os.path.join(target_repo.path, '.git', 'MERGE_HEAD'))
437 419
438 420 def test_local_merge_raises_exception_on_conflict(self, vcsbackend_git):
439 421 target_repo = vcsbackend_git.create_repo(number_of_commits=1)
440 422 vcsbackend_git.ensure_file('README', 'I will conflict with you!!!')
441 423
442 424 target_repo._local_fetch(self.repo.path, 'master')
443 425 with pytest.raises(RepositoryError):
444 426 target_repo._local_merge(
445 427 'merge_message', 'user name', 'user@name.com',
446 428 target_repo._last_fetch_heads())
447 429
448 430 # Check we are not left in an intermediate merge state
449 431 assert not os.path.exists(
450 432 os.path.join(target_repo.path, '.git', 'MERGE_HEAD'))
451 433
452 def test_local_merge_into_empty_repo(self):
453 target_repo = self.get_empty_repo()
434 def test_local_merge_into_empty_repo(self, tmp_path_factory):
435 target_repo = self.get_empty_repo(tmp_path_factory)
454 436
455 437 # This is required as one cannot do a -ff-only merge in an empty repo.
456 438 target_repo._local_fetch(self.repo.path, 'master')
457 439 with pytest.raises(RepositoryError):
458 440 target_repo._local_merge(
459 441 'merge_message', 'user name', 'user@name.com',
460 442 target_repo._last_fetch_heads())
461 443
462 444 def test_local_merge_in_bare_repo(self):
463 445 with pytest.raises(RepositoryError):
464 446 self.repo._local_merge(
465 447 'merge_message', 'user name', 'user@name.com', None)
466 448
467 def test_local_push_non_bare(self):
468 target_repo = self.get_empty_repo()
449 def test_local_push_non_bare(self, tmp_path_factory):
450 target_repo = self.get_empty_repo(tmp_path_factory)
469 451
470 452 pushed_branch = 'pushed_branch'
471 453 self.repo._local_push('master', target_repo.path, pushed_branch)
472 454 # Fix the HEAD of the target repo, or otherwise GitRepository won't
473 455 # report any branches.
474 456 with open(os.path.join(target_repo.path, '.git', 'HEAD'), 'w') as f:
475 457 f.write('ref: refs/heads/%s' % pushed_branch)
476 458
477 459 target_repo = GitRepository(target_repo.path)
478 460
479 461 assert (target_repo.branches[pushed_branch] ==
480 462 self.repo.branches['master'])
481 463
482 def test_local_push_bare(self):
483 target_repo = self.get_empty_repo(bare=True)
464 def test_local_push_bare(self, tmp_path_factory):
465 target_repo = self.get_empty_repo(tmp_path_factory, bare=True)
484 466
485 467 pushed_branch = 'pushed_branch'
486 468 self.repo._local_push('master', target_repo.path, pushed_branch)
487 469 # Fix the HEAD of the target repo, or otherwise GitRepository won't
488 470 # report any branches.
489 471 with open(os.path.join(target_repo.path, 'HEAD'), 'w') as f:
490 472 f.write('ref: refs/heads/%s' % pushed_branch)
491 473
492 474 target_repo = GitRepository(target_repo.path)
493 475
494 476 assert (target_repo.branches[pushed_branch] ==
495 477 self.repo.branches['master'])
496 478
497 def test_local_push_non_bare_target_branch_is_checked_out(self):
498 target_repo = self.get_clone_repo()
479 def test_local_push_non_bare_target_branch_is_checked_out(self, tmp_path_factory):
480 target_repo = self.get_clone_repo(tmp_path_factory)
499 481
500 482 pushed_branch = 'pushed_branch'
501 483 # Create a new branch in source repo
502 484 new_branch_commit = target_repo.commit_ids[-3]
503 485 target_repo._checkout(new_branch_commit)
504 486 target_repo._checkout(pushed_branch, create=True)
505 487
506 488 self.repo._local_push('master', target_repo.path, pushed_branch)
507 489
508 490 target_repo = GitRepository(target_repo.path)
509 491
510 492 assert (target_repo.branches[pushed_branch] ==
511 493 self.repo.branches['master'])
512 494
513 495 def test_local_push_raises_exception_on_conflict(self, vcsbackend_git):
514 496 target_repo = vcsbackend_git.create_repo(number_of_commits=1)
515 497 with pytest.raises(RepositoryError):
516 498 self.repo._local_push('master', target_repo.path, 'master')
517 499
518 def test_hooks_can_be_enabled_via_env_variable_for_local_push(self):
519 target_repo = self.get_empty_repo(bare=True)
500 def test_hooks_can_be_enabled_via_env_variable_for_local_push(self, tmp_path_factory):
501 target_repo = self.get_empty_repo(tmp_path_factory, bare=True)
520 502
521 503 with mock.patch.object(self.repo, 'run_git_command') as run_mock:
522 504 self.repo._local_push(
523 505 'master', target_repo.path, 'master', enable_hooks=True)
524 506 env = run_mock.call_args[1]['extra_env']
525 507 assert 'RC_SKIP_HOOKS' not in env
526 508
527 509 def _add_failing_hook(self, repo_path, hook_name, bare=False):
528 510 path_components = (
529 511 ['hooks', hook_name] if bare else ['.git', 'hooks', hook_name])
530 512 hook_path = os.path.join(repo_path, *path_components)
531 513 with open(hook_path, 'w') as f:
532 514 script_lines = [
533 515 '#!%s' % sys.executable,
534 516 'import os',
535 517 'import sys',
536 518 'if os.environ.get("RC_SKIP_HOOKS"):',
537 519 ' sys.exit(0)',
538 520 'sys.exit(1)',
539 521 ]
540 522 f.write('\n'.join(script_lines))
541 523 os.chmod(hook_path, 0o755)
542 524
543 def test_local_push_does_not_execute_hook(self):
544 target_repo = self.get_empty_repo()
525 def test_local_push_does_not_execute_hook(self, tmp_path_factory):
526 target_repo = self.get_empty_repo(tmp_path_factory)
545 527
546 528 pushed_branch = 'pushed_branch'
547 529 self._add_failing_hook(target_repo.path, 'pre-receive')
548 530 self.repo._local_push('master', target_repo.path, pushed_branch)
549 531 # Fix the HEAD of the target repo, or otherwise GitRepository won't
550 532 # report any branches.
551 533 with open(os.path.join(target_repo.path, '.git', 'HEAD'), 'w') as f:
552 534 f.write('ref: refs/heads/%s' % pushed_branch)
553 535
554 536 target_repo = GitRepository(target_repo.path)
555 537
556 538 assert (target_repo.branches[pushed_branch] ==
557 539 self.repo.branches['master'])
558 540
559 def test_local_push_executes_hook(self):
560 target_repo = self.get_empty_repo(bare=True)
541 def test_local_push_executes_hook(self, tmp_path_factory):
542 target_repo = self.get_empty_repo(tmp_path_factory, bare=True)
561 543 self._add_failing_hook(target_repo.path, 'pre-receive', bare=True)
562 544 with pytest.raises(RepositoryError):
563 545 self.repo._local_push(
564 546 'master', target_repo.path, 'master', enable_hooks=True)
565 547
566 548 def test_maybe_prepare_merge_workspace(self):
567 549 workspace = self.repo._maybe_prepare_merge_workspace(
568 550 2, 'pr2', Reference('branch', 'master', 'unused'),
569 551 Reference('branch', 'master', 'unused'))
570 552
571 553 assert os.path.isdir(workspace)
572 554 workspace_repo = GitRepository(workspace)
573 555 assert workspace_repo.branches == self.repo.branches
574 556
575 557 # Calling it a second time should also succeed
576 558 workspace = self.repo._maybe_prepare_merge_workspace(
577 559 2, 'pr2', Reference('branch', 'master', 'unused'),
578 560 Reference('branch', 'master', 'unused'))
579 561 assert os.path.isdir(workspace)
580 562
581 563 def test_maybe_prepare_merge_workspace_different_refs(self):
582 564 workspace = self.repo._maybe_prepare_merge_workspace(
583 565 2, 'pr2', Reference('branch', 'master', 'unused'),
584 566 Reference('branch', 'develop', 'unused'))
585 567
586 568 assert os.path.isdir(workspace)
587 569 workspace_repo = GitRepository(workspace)
588 570 assert workspace_repo.branches == self.repo.branches
589 571
590 572 # Calling it a second time should also succeed
591 573 workspace = self.repo._maybe_prepare_merge_workspace(
592 574 2, 'pr2', Reference('branch', 'master', 'unused'),
593 575 Reference('branch', 'develop', 'unused'))
594 576 assert os.path.isdir(workspace)
595 577
596 578 def test_cleanup_merge_workspace(self):
597 579 workspace = self.repo._maybe_prepare_merge_workspace(
598 580 2, 'pr3', Reference('branch', 'master', 'unused'),
599 581 Reference('branch', 'master', 'unused'))
600 582 self.repo.cleanup_merge_workspace(2, 'pr3')
601 583
602 584 assert not os.path.exists(workspace)
603 585
604 586 def test_cleanup_merge_workspace_invalid_workspace_id(self):
605 587 # No assert: because in case of an inexistent workspace this function
606 588 # should still succeed.
607 589 self.repo.cleanup_merge_workspace(1, 'pr4')
608 590
609 591 def test_set_refs(self):
610 592 test_ref = 'refs/test-refs/abcde'
611 593 test_commit_id = 'ecb86e1f424f2608262b130db174a7dfd25a6623'
612 594
613 595 self.repo.set_refs(test_ref, test_commit_id)
614 596 stdout, _ = self.repo.run_git_command(['show-ref'])
615 597 assert test_ref in stdout
616 598 assert test_commit_id in stdout
617 599
618 600 def test_remove_ref(self):
619 601 test_ref = 'refs/test-refs/abcde'
620 602 test_commit_id = 'ecb86e1f424f2608262b130db174a7dfd25a6623'
621 603 self.repo.set_refs(test_ref, test_commit_id)
622 604 stdout, _ = self.repo.run_git_command(['show-ref'])
623 605 assert test_ref in stdout
624 606 assert test_commit_id in stdout
625 607
626 608 self.repo.remove_ref(test_ref)
627 609 stdout, _ = self.repo.run_git_command(['show-ref'])
628 610 assert test_ref not in stdout
629 611 assert test_commit_id not in stdout
630 612
631 613
632 614 class TestGitCommit(object):
633 615
634 616 @pytest.fixture(autouse=True)
635 617 def prepare(self):
636 618 self.repo = GitRepository(TEST_GIT_REPO)
637 619
638 620 def test_default_commit(self):
639 621 tip = self.repo.get_commit()
640 622 assert tip == self.repo.get_commit(None)
641 623 assert tip == self.repo.get_commit('tip')
642 624
643 625 def test_root_node(self):
644 626 tip = self.repo.get_commit()
645 627 assert tip.root is tip.get_node('')
646 628
647 629 def test_lazy_fetch(self):
648 630 """
649 631 Test if commit's nodes expands and are cached as we walk through
650 632 the commit. This test is somewhat hard to write as order of tests
651 633 is a key here. Written by running command after command in a shell.
652 634 """
653 635 commit_id = '2a13f185e4525f9d4b59882791a2d397b90d5ddc'
654 636 assert commit_id in self.repo.commit_ids
655 637 commit = self.repo.get_commit(commit_id)
656 638 assert len(commit.nodes) == 0
657 639 root = commit.root
658 640 assert len(commit.nodes) == 1
659 641 assert len(root.nodes) == 8
660 642 # accessing root.nodes updates commit.nodes
661 643 assert len(commit.nodes) == 9
662 644
663 645 docs = root.get_node('docs')
664 646 # we haven't yet accessed anything new as docs dir was already cached
665 647 assert len(commit.nodes) == 9
666 648 assert len(docs.nodes) == 8
667 649 # accessing docs.nodes updates commit.nodes
668 650 assert len(commit.nodes) == 17
669 651
670 652 assert docs is commit.get_node('docs')
671 653 assert docs is root.nodes[0]
672 654 assert docs is root.dirs[0]
673 655 assert docs is commit.get_node('docs')
674 656
675 657 def test_nodes_with_commit(self):
676 658 commit_id = '2a13f185e4525f9d4b59882791a2d397b90d5ddc'
677 659 commit = self.repo.get_commit(commit_id)
678 660 root = commit.root
679 661 docs = root.get_node('docs')
680 662 assert docs is commit.get_node('docs')
681 663 api = docs.get_node('api')
682 664 assert api is commit.get_node('docs/api')
683 665 index = api.get_node('index.rst')
684 666 assert index is commit.get_node('docs/api/index.rst')
685 667 assert index is commit.get_node('docs')\
686 668 .get_node('api')\
687 669 .get_node('index.rst')
688 670
689 671 def test_branch_and_tags(self):
690 672 """
691 673 rev0 = self.repo.commit_ids[0]
692 674 commit0 = self.repo.get_commit(rev0)
693 675 assert commit0.branch == 'master'
694 676 assert commit0.tags == []
695 677
696 678 rev10 = self.repo.commit_ids[10]
697 679 commit10 = self.repo.get_commit(rev10)
698 680 assert commit10.branch == 'master'
699 681 assert commit10.tags == []
700 682
701 683 rev44 = self.repo.commit_ids[44]
702 684 commit44 = self.repo.get_commit(rev44)
703 685 assert commit44.branch == 'web-branch'
704 686
705 687 tip = self.repo.get_commit('tip')
706 688 assert 'tip' in tip.tags
707 689 """
708 690 # Those tests would fail - branches are now going
709 691 # to be changed at main API in order to support git backend
710 692 pass
711 693
712 694 def test_file_size(self):
713 695 to_check = (
714 696 ('c1214f7e79e02fc37156ff215cd71275450cffc3',
715 697 'vcs/backends/BaseRepository.py', 502),
716 698 ('d7e0d30fbcae12c90680eb095a4f5f02505ce501',
717 699 'vcs/backends/hg.py', 854),
718 700 ('6e125e7c890379446e98980d8ed60fba87d0f6d1',
719 701 'setup.py', 1068),
720 702
721 703 ('d955cd312c17b02143c04fa1099a352b04368118',
722 704 'vcs/backends/base.py', 2921),
723 705 ('ca1eb7957a54bce53b12d1a51b13452f95bc7c7e',
724 706 'vcs/backends/base.py', 3936),
725 707 ('f50f42baeed5af6518ef4b0cb2f1423f3851a941',
726 708 'vcs/backends/base.py', 6189),
727 709 )
728 710 for commit_id, path, size in to_check:
729 711 node = self.repo.get_commit(commit_id).get_node(path)
730 712 assert node.is_file()
731 713 assert node.size == size
732 714
733 715 def test_file_history_from_commits(self):
734 716 node = self.repo[10].get_node('setup.py')
735 717 commit_ids = [commit.raw_id for commit in node.history]
736 718 assert ['ff7ca51e58c505fec0dd2491de52c622bb7a806b'] == commit_ids
737 719
738 720 node = self.repo[20].get_node('setup.py')
739 721 node_ids = [commit.raw_id for commit in node.history]
740 722 assert ['191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
741 723 'ff7ca51e58c505fec0dd2491de52c622bb7a806b'] == node_ids
742 724
743 725 # special case we check history from commit that has this particular
744 726 # file changed this means we check if it's included as well
745 727 node = self.repo.get_commit('191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e') \
746 728 .get_node('setup.py')
747 729 node_ids = [commit.raw_id for commit in node.history]
748 730 assert ['191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
749 731 'ff7ca51e58c505fec0dd2491de52c622bb7a806b'] == node_ids
750 732
751 733 def test_file_history(self):
752 734 # we can only check if those commits are present in the history
753 735 # as we cannot update this test every time file is changed
754 736 files = {
755 737 'setup.py': [
756 738 '54386793436c938cff89326944d4c2702340037d',
757 739 '51d254f0ecf5df2ce50c0b115741f4cf13985dab',
758 740 '998ed409c795fec2012b1c0ca054d99888b22090',
759 741 '5e0eb4c47f56564395f76333f319d26c79e2fb09',
760 742 '0115510b70c7229dbc5dc49036b32e7d91d23acd',
761 743 '7cb3fd1b6d8c20ba89e2264f1c8baebc8a52d36e',
762 744 '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
763 745 '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
764 746 'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
765 747 ],
766 748 'vcs/nodes.py': [
767 749 '33fa3223355104431402a888fa77a4e9956feb3e',
768 750 'fa014c12c26d10ba682fadb78f2a11c24c8118e1',
769 751 'e686b958768ee96af8029fe19c6050b1a8dd3b2b',
770 752 'ab5721ca0a081f26bf43d9051e615af2cc99952f',
771 753 'c877b68d18e792a66b7f4c529ea02c8f80801542',
772 754 '4313566d2e417cb382948f8d9d7c765330356054',
773 755 '6c2303a793671e807d1cfc70134c9ca0767d98c2',
774 756 '54386793436c938cff89326944d4c2702340037d',
775 757 '54000345d2e78b03a99d561399e8e548de3f3203',
776 758 '1c6b3677b37ea064cb4b51714d8f7498f93f4b2b',
777 759 '2d03ca750a44440fb5ea8b751176d1f36f8e8f46',
778 760 '2a08b128c206db48c2f0b8f70df060e6db0ae4f8',
779 761 '30c26513ff1eb8e5ce0e1c6b477ee5dc50e2f34b',
780 762 'ac71e9503c2ca95542839af0ce7b64011b72ea7c',
781 763 '12669288fd13adba2a9b7dd5b870cc23ffab92d2',
782 764 '5a0c84f3e6fe3473e4c8427199d5a6fc71a9b382',
783 765 '12f2f5e2b38e6ff3fbdb5d722efed9aa72ecb0d5',
784 766 '5eab1222a7cd4bfcbabc218ca6d04276d4e27378',
785 767 'f50f42baeed5af6518ef4b0cb2f1423f3851a941',
786 768 'd7e390a45f6aa96f04f5e7f583ad4f867431aa25',
787 769 'f15c21f97864b4f071cddfbf2750ec2e23859414',
788 770 'e906ef056cf539a4e4e5fc8003eaf7cf14dd8ade',
789 771 'ea2b108b48aa8f8c9c4a941f66c1a03315ca1c3b',
790 772 '84dec09632a4458f79f50ddbbd155506c460b4f9',
791 773 '0115510b70c7229dbc5dc49036b32e7d91d23acd',
792 774 '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
793 775 '3bf1c5868e570e39569d094f922d33ced2fa3b2b',
794 776 'b8d04012574729d2c29886e53b1a43ef16dd00a1',
795 777 '6970b057cffe4aab0a792aa634c89f4bebf01441',
796 778 'dd80b0f6cf5052f17cc738c2951c4f2070200d7f',
797 779 'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
798 780 ],
799 781 'vcs/backends/git.py': [
800 782 '4cf116ad5a457530381135e2f4c453e68a1b0105',
801 783 '9a751d84d8e9408e736329767387f41b36935153',
802 784 'cb681fb539c3faaedbcdf5ca71ca413425c18f01',
803 785 '428f81bb652bcba8d631bce926e8834ff49bdcc6',
804 786 '180ab15aebf26f98f714d8c68715e0f05fa6e1c7',
805 787 '2b8e07312a2e89e92b90426ab97f349f4bce2a3a',
806 788 '50e08c506174d8645a4bb517dd122ac946a0f3bf',
807 789 '54000345d2e78b03a99d561399e8e548de3f3203',
808 790 ],
809 791 }
810 792 for path, commit_ids in files.items():
811 793 node = self.repo.get_commit(commit_ids[0]).get_node(path)
812 794 node_ids = [commit.raw_id for commit in node.history]
813 795 assert set(commit_ids).issubset(set(node_ids)), (
814 796 "We assumed that %s is subset of commit_ids for which file %s "
815 797 "has been changed, and history of that node returned: %s"
816 798 % (commit_ids, path, node_ids))
817 799
818 800 def test_file_annotate(self):
819 801 files = {
820 802 'vcs/backends/__init__.py': {
821 803 'c1214f7e79e02fc37156ff215cd71275450cffc3': {
822 804 'lines_no': 1,
823 805 'commits': [
824 806 'c1214f7e79e02fc37156ff215cd71275450cffc3',
825 807 ],
826 808 },
827 809 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647': {
828 810 'lines_no': 21,
829 811 'commits': [
830 812 '49d3fd156b6f7db46313fac355dca1a0b94a0017',
831 813 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
832 814 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
833 815 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
834 816 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
835 817 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
836 818 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
837 819 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
838 820 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
839 821 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
840 822 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
841 823 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
842 824 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
843 825 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
844 826 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
845 827 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
846 828 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
847 829 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
848 830 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
849 831 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
850 832 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
851 833 ],
852 834 },
853 835 'e29b67bd158580fc90fc5e9111240b90e6e86064': {
854 836 'lines_no': 32,
855 837 'commits': [
856 838 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
857 839 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
858 840 '5eab1222a7cd4bfcbabc218ca6d04276d4e27378',
859 841 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
860 842 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
861 843 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
862 844 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
863 845 '54000345d2e78b03a99d561399e8e548de3f3203',
864 846 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
865 847 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
866 848 '78c3f0c23b7ee935ec276acb8b8212444c33c396',
867 849 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
868 850 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
869 851 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
870 852 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
871 853 '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
872 854 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
873 855 '78c3f0c23b7ee935ec276acb8b8212444c33c396',
874 856 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
875 857 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
876 858 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
877 859 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
878 860 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
879 861 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
880 862 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
881 863 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
882 864 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
883 865 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
884 866 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
885 867 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
886 868 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
887 869 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
888 870 ],
889 871 },
890 872 },
891 873 }
892 874
893 875 for fname, commit_dict in files.items():
894 876 for commit_id, __ in commit_dict.items():
895 877 commit = self.repo.get_commit(commit_id)
896 878
897 879 l1_1 = [x[1] for x in commit.get_file_annotate(fname)]
898 880 l1_2 = [x[2]().raw_id for x in commit.get_file_annotate(fname)]
899 881 assert l1_1 == l1_2
900 882 l1 = l1_1
901 883 l2 = files[fname][commit_id]['commits']
902 884 assert l1 == l2, (
903 885 "The lists of commit_ids for %s@commit_id %s"
904 886 "from annotation list should match each other, "
905 887 "got \n%s \nvs \n%s " % (fname, commit_id, l1, l2))
906 888
907 889 def test_files_state(self):
908 890 """
909 891 Tests state of FileNodes.
910 892 """
911 893 node = self.repo\
912 894 .get_commit('e6ea6d16e2f26250124a1f4b4fe37a912f9d86a0')\
913 895 .get_node('vcs/utils/diffs.py')
914 896 assert node.state, NodeState.ADDED
915 897 assert node.added
916 898 assert not node.changed
917 899 assert not node.not_changed
918 900 assert not node.removed
919 901
920 902 node = self.repo\
921 903 .get_commit('33fa3223355104431402a888fa77a4e9956feb3e')\
922 904 .get_node('.hgignore')
923 905 assert node.state, NodeState.CHANGED
924 906 assert not node.added
925 907 assert node.changed
926 908 assert not node.not_changed
927 909 assert not node.removed
928 910
929 911 node = self.repo\
930 912 .get_commit('e29b67bd158580fc90fc5e9111240b90e6e86064')\
931 913 .get_node('setup.py')
932 914 assert node.state, NodeState.NOT_CHANGED
933 915 assert not node.added
934 916 assert not node.changed
935 917 assert node.not_changed
936 918 assert not node.removed
937 919
938 920 # If node has REMOVED state then trying to fetch it would raise
939 921 # CommitError exception
940 922 commit = self.repo.get_commit(
941 923 'fa6600f6848800641328adbf7811fd2372c02ab2')
942 924 path = 'vcs/backends/BaseRepository.py'
943 925 with pytest.raises(NodeDoesNotExistError):
944 926 commit.get_node(path)
945 927 # but it would be one of ``removed`` (commit's attribute)
946 928 assert path in [rf.path for rf in commit.removed]
947 929
948 930 commit = self.repo.get_commit(
949 931 '54386793436c938cff89326944d4c2702340037d')
950 932 changed = [
951 933 'setup.py', 'tests/test_nodes.py', 'vcs/backends/hg.py',
952 934 'vcs/nodes.py']
953 935 assert set(changed) == set([f.path for f in commit.changed])
954 936
955 937 def test_unicode_branch_refs(self):
956 938 unicode_branches = {
957 939 'refs/heads/unicode': '6c0ce52b229aa978889e91b38777f800e85f330b',
958 940 u'refs/heads/uniΓ§ΓΆβˆ‚e': 'ΓΌrl',
959 941 }
960 942 with mock.patch(
961 943 ("rhodecode.lib.vcs.backends.git.repository"
962 944 ".GitRepository._refs"),
963 945 unicode_branches):
964 946 branches = self.repo.branches
965 947
966 948 assert 'unicode' in branches
967 949 assert u'uniΓ§ΓΆβˆ‚e' in branches
968 950
969 951 def test_unicode_tag_refs(self):
970 952 unicode_tags = {
971 953 'refs/tags/unicode': '6c0ce52b229aa978889e91b38777f800e85f330b',
972 954 u'refs/tags/uniΓ§ΓΆβˆ‚e': '6c0ce52b229aa978889e91b38777f800e85f330b',
973 955 }
974 956 with mock.patch(
975 957 ("rhodecode.lib.vcs.backends.git.repository"
976 958 ".GitRepository._refs"),
977 959 unicode_tags):
978 960 tags = self.repo.tags
979 961
980 962 assert 'unicode' in tags
981 963 assert u'uniΓ§ΓΆβˆ‚e' in tags
982 964
983 965 def test_commit_message_is_unicode(self):
984 966 for commit in self.repo:
985 967 assert type(commit.message) == unicode
986 968
987 969 def test_commit_author_is_unicode(self):
988 970 for commit in self.repo:
989 971 assert type(commit.author) == unicode
990 972
991 973 def test_repo_files_content_is_unicode(self):
992 974 commit = self.repo.get_commit()
993 975 for node in commit.get_node('/'):
994 976 if node.is_file():
995 977 assert type(node.content) == unicode
996 978
997 979 def test_wrong_path(self):
998 980 # There is 'setup.py' in the root dir but not there:
999 981 path = 'foo/bar/setup.py'
1000 982 tip = self.repo.get_commit()
1001 983 with pytest.raises(VCSError):
1002 984 tip.get_node(path)
1003 985
1004 986 @pytest.mark.parametrize("author_email, commit_id", [
1005 987 ('marcin@python-blog.com', 'c1214f7e79e02fc37156ff215cd71275450cffc3'),
1006 988 ('lukasz.balcerzak@python-center.pl',
1007 989 'ff7ca51e58c505fec0dd2491de52c622bb7a806b'),
1008 990 ('none@none', '8430a588b43b5d6da365400117c89400326e7992'),
1009 991 ])
1010 992 def test_author_email(self, author_email, commit_id):
1011 993 commit = self.repo.get_commit(commit_id)
1012 994 assert author_email == commit.author_email
1013 995
1014 996 @pytest.mark.parametrize("author, commit_id", [
1015 997 ('Marcin Kuzminski', 'c1214f7e79e02fc37156ff215cd71275450cffc3'),
1016 998 ('Lukasz Balcerzak', 'ff7ca51e58c505fec0dd2491de52c622bb7a806b'),
1017 999 ('marcink', '8430a588b43b5d6da365400117c89400326e7992'),
1018 1000 ])
1019 1001 def test_author_username(self, author, commit_id):
1020 1002 commit = self.repo.get_commit(commit_id)
1021 1003 assert author == commit.author_name
1022 1004
1023 1005
1024 1006 class TestLargeFileRepo(object):
1025 1007
1026 1008 def test_large_file(self, backend_git):
1027 1009 conf = make_db_config()
1028 1010 repo = backend_git.create_test_repo('largefiles', conf)
1029 1011
1030 1012 tip = repo.scm_instance().get_commit()
1031 1013
1032 1014 # extract stored LF node into the origin cache
1033 1015 lfs_store = os.path.join(repo.repo_path, repo.repo_name, 'lfs_store')
1034 1016
1035 1017 oid = '7b331c02e313c7599d5a90212e17e6d3cb729bd2e1c9b873c302a63c95a2f9bf'
1036 1018 oid_path = os.path.join(lfs_store, oid)
1037 1019 oid_destination = os.path.join(
1038 1020 conf.get('vcs_git_lfs', 'store_location'), oid)
1039 1021 shutil.copy(oid_path, oid_destination)
1040 1022
1041 1023 node = tip.get_node('1MB.zip')
1042 1024
1043 1025 lf_node = node.get_largefile_node()
1044 1026
1045 1027 assert lf_node.is_largefile() is True
1046 1028 assert lf_node.size == 1024000
1047 1029 assert lf_node.name == '1MB.zip'
1048 1030
1049 1031
1050 1032 @pytest.mark.usefixtures("vcs_repository_support")
1051 1033 class TestGitSpecificWithRepo(BackendTestMixin):
1052 1034
1053 1035 @classmethod
1054 1036 def _get_commits(cls):
1055 1037 return [
1056 1038 {
1057 1039 'message': 'Initial',
1058 1040 'author': 'Joe Doe <joe.doe@example.com>',
1059 1041 'date': datetime.datetime(2010, 1, 1, 20),
1060 1042 'added': [
1061 1043 FileNode('foobar/static/js/admin/base.js', content='base'),
1062 1044 FileNode(
1063 1045 'foobar/static/admin', content='admin',
1064 1046 mode=0o120000), # this is a link
1065 1047 FileNode('foo', content='foo'),
1066 1048 ],
1067 1049 },
1068 1050 {
1069 1051 'message': 'Second',
1070 1052 'author': 'Joe Doe <joe.doe@example.com>',
1071 1053 'date': datetime.datetime(2010, 1, 1, 22),
1072 1054 'added': [
1073 1055 FileNode('foo2', content='foo2'),
1074 1056 ],
1075 1057 },
1076 1058 ]
1077 1059
1078 1060 def test_paths_slow_traversing(self):
1079 1061 commit = self.repo.get_commit()
1080 1062 assert commit.get_node('foobar').get_node('static').get_node('js')\
1081 1063 .get_node('admin').get_node('base.js').content == 'base'
1082 1064
1083 1065 def test_paths_fast_traversing(self):
1084 1066 commit = self.repo.get_commit()
1085 1067 assert (
1086 1068 commit.get_node('foobar/static/js/admin/base.js').content ==
1087 1069 'base')
1088 1070
1089 1071 def test_get_diff_runs_git_command_with_hashes(self):
1090 1072 comm1 = self.repo[0]
1091 1073 comm2 = self.repo[1]
1092 1074 self.repo.run_git_command = mock.Mock(return_value=['', ''])
1093 1075 self.repo.get_diff(comm1, comm2)
1094 1076
1095 1077 self.repo.run_git_command.assert_called_once_with(
1096 1078 ['diff', '-U3', '--full-index', '--binary', '-p', '-M',
1097 1079 '--abbrev=40', comm1.raw_id, comm2.raw_id])
1098 1080
1099 1081 def test_get_diff_runs_git_command_with_str_hashes(self):
1100 1082 comm2 = self.repo[1]
1101 1083 self.repo.run_git_command = mock.Mock(return_value=['', ''])
1102 1084 self.repo.get_diff(self.repo.EMPTY_COMMIT, comm2)
1103 1085 self.repo.run_git_command.assert_called_once_with(
1104 1086 ['show', '-U3', '--full-index', '--binary', '-p', '-M',
1105 1087 '--abbrev=40', comm2.raw_id])
1106 1088
1107 1089 def test_get_diff_runs_git_command_with_path_if_its_given(self):
1108 1090 comm1 = self.repo[0]
1109 1091 comm2 = self.repo[1]
1110 1092 self.repo.run_git_command = mock.Mock(return_value=['', ''])
1111 1093 self.repo.get_diff(comm1, comm2, 'foo')
1112 1094 self.repo.run_git_command.assert_called_once_with(
1113 1095 ['diff', '-U3', '--full-index', '--binary', '-p', '-M',
1114 1096 '--abbrev=40', self.repo._lookup_commit(0),
1115 1097 comm2.raw_id, '--', 'foo'])
1116 1098
1117 1099
1118 1100 @pytest.mark.usefixtures("vcs_repository_support")
1119 1101 class TestGitRegression(BackendTestMixin):
1120 1102
1121 1103 @classmethod
1122 1104 def _get_commits(cls):
1123 1105 return [
1124 1106 {
1125 1107 'message': 'Initial',
1126 1108 'author': 'Joe Doe <joe.doe@example.com>',
1127 1109 'date': datetime.datetime(2010, 1, 1, 20),
1128 1110 'added': [
1129 1111 FileNode('bot/__init__.py', content='base'),
1130 1112 FileNode('bot/templates/404.html', content='base'),
1131 1113 FileNode('bot/templates/500.html', content='base'),
1132 1114 ],
1133 1115 },
1134 1116 {
1135 1117 'message': 'Second',
1136 1118 'author': 'Joe Doe <joe.doe@example.com>',
1137 1119 'date': datetime.datetime(2010, 1, 1, 22),
1138 1120 'added': [
1139 1121 FileNode('bot/build/migrations/1.py', content='foo2'),
1140 1122 FileNode('bot/build/migrations/2.py', content='foo2'),
1141 1123 FileNode(
1142 1124 'bot/build/static/templates/f.html', content='foo2'),
1143 1125 FileNode(
1144 1126 'bot/build/static/templates/f1.html', content='foo2'),
1145 1127 FileNode('bot/build/templates/err.html', content='foo2'),
1146 1128 FileNode('bot/build/templates/err2.html', content='foo2'),
1147 1129 ],
1148 1130 },
1149 1131 ]
1150 1132
1151 1133 @pytest.mark.parametrize("path, expected_paths", [
1152 1134 ('bot', [
1153 1135 'bot/build',
1154 1136 'bot/templates',
1155 1137 'bot/__init__.py']),
1156 1138 ('bot/build', [
1157 1139 'bot/build/migrations',
1158 1140 'bot/build/static',
1159 1141 'bot/build/templates']),
1160 1142 ('bot/build/static', [
1161 1143 'bot/build/static/templates']),
1162 1144 ('bot/build/static/templates', [
1163 1145 'bot/build/static/templates/f.html',
1164 1146 'bot/build/static/templates/f1.html']),
1165 1147 ('bot/build/templates', [
1166 1148 'bot/build/templates/err.html',
1167 1149 'bot/build/templates/err2.html']),
1168 1150 ('bot/templates/', [
1169 1151 'bot/templates/404.html',
1170 1152 'bot/templates/500.html']),
1171 1153 ])
1172 1154 def test_similar_paths(self, path, expected_paths):
1173 1155 commit = self.repo.get_commit()
1174 1156 paths = [n.path for n in commit.get_nodes(path)]
1175 1157 assert paths == expected_paths
1176 1158
1177 1159
1178 1160 class TestDiscoverGitVersion(object):
1179 1161
1180 1162 def test_returns_git_version(self, baseapp):
1181 1163 version = discover_git_version()
1182 1164 assert version
1183 1165
1184 1166 def test_returns_empty_string_without_vcsserver(self):
1185 1167 mock_connection = mock.Mock()
1186 1168 mock_connection.discover_git_version = mock.Mock(
1187 1169 side_effect=Exception)
1188 1170 with mock.patch('rhodecode.lib.vcs.connection.Git', mock_connection):
1189 1171 version = discover_git_version()
1190 1172 assert version == ''
1191 1173
1192 1174
1193 1175 class TestGetSubmoduleUrl(object):
1194 1176 def test_submodules_file_found(self):
1195 1177 commit = GitCommit(repository=mock.Mock(), raw_id='abcdef12', idx=1)
1196 1178 node = mock.Mock()
1197 1179 with mock.patch.object(
1198 1180 commit, 'get_node', return_value=node) as get_node_mock:
1199 1181 node.content = (
1200 1182 '[submodule "subrepo1"]\n'
1201 1183 '\tpath = subrepo1\n'
1202 1184 '\turl = https://code.rhodecode.com/dulwich\n'
1203 1185 )
1204 1186 result = commit._get_submodule_url('subrepo1')
1205 1187 get_node_mock.assert_called_once_with('.gitmodules')
1206 1188 assert result == 'https://code.rhodecode.com/dulwich'
1207 1189
1208 1190 def test_complex_submodule_path(self):
1209 1191 commit = GitCommit(repository=mock.Mock(), raw_id='abcdef12', idx=1)
1210 1192 node = mock.Mock()
1211 1193 with mock.patch.object(
1212 1194 commit, 'get_node', return_value=node) as get_node_mock:
1213 1195 node.content = (
1214 1196 '[submodule "complex/subrepo/path"]\n'
1215 1197 '\tpath = complex/subrepo/path\n'
1216 1198 '\turl = https://code.rhodecode.com/dulwich\n'
1217 1199 )
1218 1200 result = commit._get_submodule_url('complex/subrepo/path')
1219 1201 get_node_mock.assert_called_once_with('.gitmodules')
1220 1202 assert result == 'https://code.rhodecode.com/dulwich'
1221 1203
1222 1204 def test_submodules_file_not_found(self):
1223 1205 commit = GitCommit(repository=mock.Mock(), raw_id='abcdef12', idx=1)
1224 1206 with mock.patch.object(
1225 1207 commit, 'get_node', side_effect=NodeDoesNotExistError):
1226 1208 result = commit._get_submodule_url('complex/subrepo/path')
1227 1209 assert result is None
1228 1210
1229 1211 def test_path_not_found(self):
1230 1212 commit = GitCommit(repository=mock.Mock(), raw_id='abcdef12', idx=1)
1231 1213 node = mock.Mock()
1232 1214 with mock.patch.object(
1233 1215 commit, 'get_node', return_value=node) as get_node_mock:
1234 1216 node.content = (
1235 1217 '[submodule "subrepo1"]\n'
1236 1218 '\tpath = subrepo1\n'
1237 1219 '\turl = https://code.rhodecode.com/dulwich\n'
1238 1220 )
1239 1221 result = commit._get_submodule_url('subrepo2')
1240 1222 get_node_mock.assert_called_once_with('.gitmodules')
1241 1223 assert result is None
1242 1224
1243 1225 def test_returns_cached_values(self):
1244 1226 commit = GitCommit(repository=mock.Mock(), raw_id='abcdef12', idx=1)
1245 1227 node = mock.Mock()
1246 1228 with mock.patch.object(
1247 1229 commit, 'get_node', return_value=node) as get_node_mock:
1248 1230 node.content = (
1249 1231 '[submodule "subrepo1"]\n'
1250 1232 '\tpath = subrepo1\n'
1251 1233 '\turl = https://code.rhodecode.com/dulwich\n'
1252 1234 )
1253 1235 for _ in range(3):
1254 1236 commit._get_submodule_url('subrepo1')
1255 1237 get_node_mock.assert_called_once_with('.gitmodules')
1256 1238
1257 1239 def test_get_node_returns_a_link(self):
1258 1240 repository = mock.Mock()
1259 1241 repository.alias = 'git'
1260 1242 commit = GitCommit(repository=repository, raw_id='abcdef12', idx=1)
1261 1243 submodule_url = 'https://code.rhodecode.com/dulwich'
1262 1244 get_id_patch = mock.patch.object(
1263 1245 commit, '_get_id_for_path', return_value=(1, 'link'))
1264 1246 get_submodule_patch = mock.patch.object(
1265 1247 commit, '_get_submodule_url', return_value=submodule_url)
1266 1248
1267 1249 with get_id_patch, get_submodule_patch as submodule_mock:
1268 1250 node = commit.get_node('/abcde')
1269 1251
1270 1252 submodule_mock.assert_called_once_with('/abcde')
1271 1253 assert type(node) == SubModuleNode
1272 1254 assert node.url == submodule_url
1273 1255
1274 1256 def test_get_nodes_returns_links(self):
1275 1257 repository = mock.MagicMock()
1276 1258 repository.alias = 'git'
1277 1259 repository._remote.tree_items.return_value = [
1278 1260 ('subrepo', 'stat', 1, 'link')
1279 1261 ]
1280 1262 commit = GitCommit(repository=repository, raw_id='abcdef12', idx=1)
1281 1263 submodule_url = 'https://code.rhodecode.com/dulwich'
1282 1264 get_id_patch = mock.patch.object(
1283 1265 commit, '_get_id_for_path', return_value=(1, 'tree'))
1284 1266 get_submodule_patch = mock.patch.object(
1285 1267 commit, '_get_submodule_url', return_value=submodule_url)
1286 1268
1287 1269 with get_id_patch, get_submodule_patch as submodule_mock:
1288 1270 nodes = commit.get_nodes('/abcde')
1289 1271
1290 1272 submodule_mock.assert_called_once_with('/abcde/subrepo')
1291 1273 assert len(nodes) == 1
1292 1274 assert type(nodes[0]) == SubModuleNode
1293 1275 assert nodes[0].url == submodule_url
@@ -1,281 +1,282 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22
23 23 import pytest
24 24
25 25 from rhodecode.lib.vcs.backends.git.repository import GitRepository
26 26 from rhodecode.lib.vcs.backends.hg.repository import MercurialRepository
27 27 from rhodecode.lib.vcs.nodes import FileNode
28 28 from rhodecode.model.db import Repository
29 29 from rhodecode.model.meta import Session
30 30 from rhodecode.tests import GIT_REPO, HG_REPO
31 31
32 32 from rhodecode.tests.vcs_operations import (
33 33 Command, _check_proper_clone, _check_proper_git_push, _check_proper_hg_push,
34 34 _add_files_and_push)
35 35
36 36
37 37 @pytest.mark.usefixtures("disable_locking")
38 38 class TestVCSOperationsSpecial(object):
39 39
40 40 def test_git_sets_default_branch_if_not_master(
41 41 self, backend_git, tmpdir, rc_web_server):
42 42 empty_repo = backend_git.create_repo()
43 43 clone_url = rc_web_server.repo_clone_url(empty_repo.repo_name)
44 44
45 45 cmd = Command(tmpdir.strpath)
46 46 cmd.execute('git clone', clone_url)
47 47
48 48 repo = GitRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
49 repo._checkout('test', create=True)
49 50 repo.in_memory_commit.add(FileNode('file', content=''))
50 51 repo.in_memory_commit.commit(
51 52 message='Commit on branch test',
52 53 author='Automatic test',
53 54 branch='test')
54 55
55 56 repo_cmd = Command(repo.path)
56 57 stdout, stderr = repo_cmd.execute('git push --verbose origin test')
57 58 _check_proper_git_push(
58 59 stdout, stderr, branch='test', should_set_default_branch=True)
59 60
60 61 stdout, stderr = cmd.execute(
61 62 'git clone', clone_url, empty_repo.repo_name + '-clone')
62 63 _check_proper_clone(stdout, stderr, 'git')
63 64
64 65 # Doing an explicit commit in order to get latest user logs on MySQL
65 66 Session().commit()
66 67
67 68 def test_git_fetches_from_remote_repository_with_annotated_tags(
68 69 self, backend_git, rc_web_server):
69 70 # Note: This is a test specific to the git backend. It checks the
70 71 # integration of fetching from a remote repository which contains
71 72 # annotated tags.
72 73
73 74 # Dulwich shows this specific behavior only when
74 75 # operating against a remote repository.
75 76 source_repo = backend_git['annotated-tag']
76 77 target_vcs_repo = backend_git.create_repo().scm_instance()
77 78 target_vcs_repo.fetch(rc_web_server.repo_clone_url(source_repo.repo_name))
78 79
79 80 def test_git_push_shows_pull_request_refs(self, backend_git, rc_web_server, tmpdir):
80 81 """
81 82 test if remote info about refs is visible
82 83 """
83 84 empty_repo = backend_git.create_repo()
84 85
85 86 clone_url = rc_web_server.repo_clone_url(empty_repo.repo_name)
86 87
87 88 cmd = Command(tmpdir.strpath)
88 89 cmd.execute('git clone', clone_url)
89 90
90 91 repo = GitRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
91 92 repo.in_memory_commit.add(FileNode('readme.md', content='## Hello'))
92 93 repo.in_memory_commit.commit(
93 94 message='Commit on branch Master',
94 95 author='Automatic test',
95 96 branch='master')
96 97
97 98 repo_cmd = Command(repo.path)
98 99 stdout, stderr = repo_cmd.execute('git push --verbose origin master')
99 100 _check_proper_git_push(stdout, stderr, branch='master')
100 101
101 102 ref = '{}/{}/pull-request/new?branch=master'.format(
102 103 rc_web_server.host_url(), empty_repo.repo_name)
103 104 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stderr
104 105 assert 'remote: RhodeCode: push completed' in stderr
105 106
106 107 # push on the same branch
107 108 repo = GitRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
108 109 repo.in_memory_commit.add(FileNode('setup.py', content='print\n'))
109 110 repo.in_memory_commit.commit(
110 111 message='Commit2 on branch Master',
111 112 author='Automatic test2',
112 113 branch='master')
113 114
114 115 repo_cmd = Command(repo.path)
115 116 stdout, stderr = repo_cmd.execute('git push --verbose origin master')
116 117 _check_proper_git_push(stdout, stderr, branch='master')
117 118
118 119 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stderr
119 120 assert 'remote: RhodeCode: push completed' in stderr
120 121
121 122 # new Branch
122 123 repo = GitRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
123 124 repo.in_memory_commit.add(FileNode('feature1.py', content='## Hello world'))
124 125 repo.in_memory_commit.commit(
125 126 message='Commit on branch feature',
126 127 author='Automatic test',
127 128 branch='feature')
128 129
129 130 repo_cmd = Command(repo.path)
130 131 stdout, stderr = repo_cmd.execute('git push --verbose origin feature')
131 132 _check_proper_git_push(stdout, stderr, branch='feature')
132 133
133 134 ref = '{}/{}/pull-request/new?branch=feature'.format(
134 135 rc_web_server.host_url(), empty_repo.repo_name)
135 136 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stderr
136 137 assert 'remote: RhodeCode: push completed' in stderr
137 138
138 139 def test_hg_push_shows_pull_request_refs(self, backend_hg, rc_web_server, tmpdir):
139 140 empty_repo = backend_hg.create_repo()
140 141
141 142 clone_url = rc_web_server.repo_clone_url(empty_repo.repo_name)
142 143
143 144 cmd = Command(tmpdir.strpath)
144 145 cmd.execute('hg clone', clone_url)
145 146
146 147 repo = MercurialRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
147 148 repo.in_memory_commit.add(FileNode(u'readme.md', content=u'## Hello'))
148 149 repo.in_memory_commit.commit(
149 150 message=u'Commit on branch default',
150 151 author=u'Automatic test',
151 152 branch='default')
152 153
153 154 repo_cmd = Command(repo.path)
154 155 repo_cmd.execute('hg checkout default')
155 156
156 157 stdout, stderr = repo_cmd.execute('hg push --verbose', clone_url)
157 158 _check_proper_hg_push(stdout, stderr, branch='default')
158 159
159 160 ref = '{}/{}/pull-request/new?branch=default'.format(
160 161 rc_web_server.host_url(), empty_repo.repo_name)
161 162 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
162 163 assert 'remote: RhodeCode: push completed' in stdout
163 164
164 165 # push on the same branch
165 166 repo = MercurialRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
166 167 repo.in_memory_commit.add(FileNode(u'setup.py', content=u'print\n'))
167 168 repo.in_memory_commit.commit(
168 169 message=u'Commit2 on branch default',
169 170 author=u'Automatic test2',
170 171 branch=u'default')
171 172
172 173 repo_cmd = Command(repo.path)
173 174 repo_cmd.execute('hg checkout default')
174 175
175 176 stdout, stderr = repo_cmd.execute('hg push --verbose', clone_url)
176 177 _check_proper_hg_push(stdout, stderr, branch='default')
177 178
178 179 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
179 180 assert 'remote: RhodeCode: push completed' in stdout
180 181
181 182 # new Branch
182 183 repo = MercurialRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
183 184 repo.in_memory_commit.add(FileNode(u'feature1.py', content=u'## Hello world'))
184 185 repo.in_memory_commit.commit(
185 186 message=u'Commit on branch feature',
186 187 author=u'Automatic test',
187 188 branch=u'feature')
188 189
189 190 repo_cmd = Command(repo.path)
190 191 repo_cmd.execute('hg checkout feature')
191 192
192 193 stdout, stderr = repo_cmd.execute('hg push --new-branch --verbose', clone_url)
193 194 _check_proper_hg_push(stdout, stderr, branch='feature')
194 195
195 196 ref = '{}/{}/pull-request/new?branch=feature'.format(
196 197 rc_web_server.host_url(), empty_repo.repo_name)
197 198 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
198 199 assert 'remote: RhodeCode: push completed' in stdout
199 200
200 201 def test_hg_push_shows_pull_request_refs_book(self, backend_hg, rc_web_server, tmpdir):
201 202 empty_repo = backend_hg.create_repo()
202 203
203 204 clone_url = rc_web_server.repo_clone_url(empty_repo.repo_name)
204 205
205 206 cmd = Command(tmpdir.strpath)
206 207 cmd.execute('hg clone', clone_url)
207 208
208 209 repo = MercurialRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
209 210 repo.in_memory_commit.add(FileNode(u'readme.md', content=u'## Hello'))
210 211 repo.in_memory_commit.commit(
211 212 message=u'Commit on branch default',
212 213 author=u'Automatic test',
213 214 branch='default')
214 215
215 216 repo_cmd = Command(repo.path)
216 217 repo_cmd.execute('hg checkout default')
217 218
218 219 stdout, stderr = repo_cmd.execute('hg push --verbose', clone_url)
219 220 _check_proper_hg_push(stdout, stderr, branch='default')
220 221
221 222 ref = '{}/{}/pull-request/new?branch=default'.format(
222 223 rc_web_server.host_url(), empty_repo.repo_name)
223 224 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
224 225 assert 'remote: RhodeCode: push completed' in stdout
225 226
226 227 # add bookmark
227 228 repo = MercurialRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
228 229 repo.in_memory_commit.add(FileNode(u'setup.py', content=u'print\n'))
229 230 repo.in_memory_commit.commit(
230 231 message=u'Commit2 on branch default',
231 232 author=u'Automatic test2',
232 233 branch=u'default')
233 234
234 235 repo_cmd = Command(repo.path)
235 236 repo_cmd.execute('hg checkout default')
236 237 repo_cmd.execute('hg bookmark feature2')
237 238 stdout, stderr = repo_cmd.execute('hg push -B feature2 --verbose', clone_url)
238 239 _check_proper_hg_push(stdout, stderr, branch='default')
239 240
240 241 ref = '{}/{}/pull-request/new?branch=default'.format(
241 242 rc_web_server.host_url(), empty_repo.repo_name)
242 243 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
243 244 ref = '{}/{}/pull-request/new?bookmark=feature2'.format(
244 245 rc_web_server.host_url(), empty_repo.repo_name)
245 246 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
246 247 assert 'remote: RhodeCode: push completed' in stdout
247 248 assert 'exporting bookmark feature2' in stdout
248 249
249 250 def test_push_is_forbidden_on_archived_repo_hg(self, backend_hg, rc_web_server, tmpdir):
250 251 empty_repo = backend_hg.create_repo()
251 252 repo_name = empty_repo.repo_name
252 253
253 254 repo = Repository.get_by_repo_name(repo_name)
254 255 repo.archived = True
255 256 Session().commit()
256 257
257 258 clone_url = rc_web_server.repo_clone_url(repo_name)
258 259 stdout, stderr = Command('/tmp').execute(
259 260 'hg clone', clone_url, tmpdir.strpath)
260 261
261 262 stdout, stderr = _add_files_and_push(
262 263 'hg', tmpdir.strpath, clone_url=clone_url)
263 264
264 265 assert 'abort: HTTP Error 403: Forbidden' in stderr
265 266
266 267 def test_push_is_forbidden_on_archived_repo_git(self, backend_git, rc_web_server, tmpdir):
267 268 empty_repo = backend_git.create_repo()
268 269 repo_name = empty_repo.repo_name
269 270
270 271 repo = Repository.get_by_repo_name(repo_name)
271 272 repo.archived = True
272 273 Session().commit()
273 274
274 275 clone_url = rc_web_server.repo_clone_url(repo_name)
275 276 stdout, stderr = Command('/tmp').execute(
276 277 'git clone', clone_url, tmpdir.strpath)
277 278
278 279 stdout, stderr = _add_files_and_push(
279 280 'git', tmpdir.strpath, clone_url=clone_url)
280 281
281 282 assert "The requested URL returned error: 403" in stderr
General Comments 0
You need to be logged in to leave comments. Login now