##// END OF EJS Templates
pytest: rc_fixture should be module level.
marcink -
r2370:b349bab9 default
parent child Browse files
Show More
@@ -1,1836 +1,1836 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import collections
22 22 import datetime
23 23 import hashlib
24 24 import os
25 25 import re
26 26 import pprint
27 27 import shutil
28 28 import socket
29 29 import subprocess32
30 30 import time
31 31 import uuid
32 32 import dateutil.tz
33 33
34 34 import mock
35 35 import pyramid.testing
36 36 import pytest
37 37 import colander
38 38 import requests
39 39
40 40 import rhodecode
41 41 from rhodecode.lib.utils2 import AttributeDict
42 42 from rhodecode.model.changeset_status import ChangesetStatusModel
43 43 from rhodecode.model.comment import CommentsModel
44 44 from rhodecode.model.db import (
45 45 PullRequest, Repository, RhodeCodeSetting, ChangesetStatus, RepoGroup,
46 46 UserGroup, RepoRhodeCodeUi, RepoRhodeCodeSetting, RhodeCodeUi)
47 47 from rhodecode.model.meta import Session
48 48 from rhodecode.model.pull_request import PullRequestModel
49 49 from rhodecode.model.repo import RepoModel
50 50 from rhodecode.model.repo_group import RepoGroupModel
51 51 from rhodecode.model.user import UserModel
52 52 from rhodecode.model.settings import VcsSettingsModel
53 53 from rhodecode.model.user_group import UserGroupModel
54 54 from rhodecode.model.integration import IntegrationModel
55 55 from rhodecode.integrations import integration_type_registry
56 56 from rhodecode.integrations.types.base import IntegrationTypeBase
57 57 from rhodecode.lib.utils import repo2db_mapper
58 58 from rhodecode.lib.vcs import create_vcsserver_proxy
59 59 from rhodecode.lib.vcs.backends import get_backend
60 60 from rhodecode.lib.vcs.nodes import FileNode
61 61 from rhodecode.tests import (
62 62 login_user_session, get_new_dir, utils, TESTS_TMP_PATH,
63 63 TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR2_LOGIN,
64 64 TEST_USER_REGULAR_PASS)
65 65 from rhodecode.tests.utils import CustomTestApp, set_anonymous_access
66 66 from rhodecode.tests.fixture import Fixture
67 67
68 68
69 69 def _split_comma(value):
70 70 return value.split(',')
71 71
72 72
73 73 def pytest_addoption(parser):
74 74 parser.addoption(
75 75 '--keep-tmp-path', action='store_true',
76 76 help="Keep the test temporary directories")
77 77 parser.addoption(
78 78 '--backends', action='store', type=_split_comma,
79 79 default=['git', 'hg', 'svn'],
80 80 help="Select which backends to test for backend specific tests.")
81 81 parser.addoption(
82 82 '--dbs', action='store', type=_split_comma,
83 83 default=['sqlite'],
84 84 help="Select which database to test for database specific tests. "
85 85 "Possible options are sqlite,postgres,mysql")
86 86 parser.addoption(
87 87 '--appenlight', '--ae', action='store_true',
88 88 help="Track statistics in appenlight.")
89 89 parser.addoption(
90 90 '--appenlight-api-key', '--ae-key',
91 91 help="API key for Appenlight.")
92 92 parser.addoption(
93 93 '--appenlight-url', '--ae-url',
94 94 default="https://ae.rhodecode.com",
95 95 help="Appenlight service URL, defaults to https://ae.rhodecode.com")
96 96 parser.addoption(
97 97 '--sqlite-connection-string', action='store',
98 98 default='', help="Connection string for the dbs tests with SQLite")
99 99 parser.addoption(
100 100 '--postgres-connection-string', action='store',
101 101 default='', help="Connection string for the dbs tests with Postgres")
102 102 parser.addoption(
103 103 '--mysql-connection-string', action='store',
104 104 default='', help="Connection string for the dbs tests with MySQL")
105 105 parser.addoption(
106 106 '--repeat', type=int, default=100,
107 107 help="Number of repetitions in performance tests.")
108 108
109 109
110 110 def pytest_configure(config):
111 111 from rhodecode.config import patches
112 112
113 113
114 114 def pytest_collection_modifyitems(session, config, items):
115 115 # nottest marked, compare nose, used for transition from nose to pytest
116 116 remaining = [
117 117 i for i in items if getattr(i.obj, '__test__', True)]
118 118 items[:] = remaining
119 119
120 120
121 121 def pytest_generate_tests(metafunc):
122 122 # Support test generation based on --backend parameter
123 123 if 'backend_alias' in metafunc.fixturenames:
124 124 backends = get_backends_from_metafunc(metafunc)
125 125 scope = None
126 126 if not backends:
127 127 pytest.skip("Not enabled for any of selected backends")
128 128 metafunc.parametrize('backend_alias', backends, scope=scope)
129 129 elif hasattr(metafunc.function, 'backends'):
130 130 backends = get_backends_from_metafunc(metafunc)
131 131 if not backends:
132 132 pytest.skip("Not enabled for any of selected backends")
133 133
134 134
135 135 def get_backends_from_metafunc(metafunc):
136 136 requested_backends = set(metafunc.config.getoption('--backends'))
137 137 if hasattr(metafunc.function, 'backends'):
138 138 # Supported backends by this test function, created from
139 139 # pytest.mark.backends
140 140 backends = metafunc.function.backends.args
141 141 elif hasattr(metafunc.cls, 'backend_alias'):
142 142 # Support class attribute "backend_alias", this is mainly
143 143 # for legacy reasons for tests not yet using pytest.mark.backends
144 144 backends = [metafunc.cls.backend_alias]
145 145 else:
146 146 backends = metafunc.config.getoption('--backends')
147 147 return requested_backends.intersection(backends)
148 148
149 149
150 150 @pytest.fixture(scope='session', autouse=True)
151 151 def activate_example_rcextensions(request):
152 152 """
153 153 Patch in an example rcextensions module which verifies passed in kwargs.
154 154 """
155 155 from rhodecode.tests.other import example_rcextensions
156 156
157 157 old_extensions = rhodecode.EXTENSIONS
158 158 rhodecode.EXTENSIONS = example_rcextensions
159 159
160 160 @request.addfinalizer
161 161 def cleanup():
162 162 rhodecode.EXTENSIONS = old_extensions
163 163
164 164
165 165 @pytest.fixture
166 166 def capture_rcextensions():
167 167 """
168 168 Returns the recorded calls to entry points in rcextensions.
169 169 """
170 170 calls = rhodecode.EXTENSIONS.calls
171 171 calls.clear()
172 172 # Note: At this moment, it is still the empty dict, but that will
173 173 # be filled during the test run and since it is a reference this
174 174 # is enough to make it work.
175 175 return calls
176 176
177 177
178 178 @pytest.fixture(scope='session')
179 179 def http_environ_session():
180 180 """
181 181 Allow to use "http_environ" in session scope.
182 182 """
183 183 return http_environ(
184 184 http_host_stub=http_host_stub())
185 185
186 186
187 187 @pytest.fixture
188 188 def http_host_stub():
189 189 """
190 190 Value of HTTP_HOST in the test run.
191 191 """
192 192 return 'example.com:80'
193 193
194 194
195 195 @pytest.fixture
196 196 def http_host_only_stub():
197 197 """
198 198 Value of HTTP_HOST in the test run.
199 199 """
200 200 return http_host_stub().split(':')[0]
201 201
202 202
203 203 @pytest.fixture
204 204 def http_environ(http_host_stub):
205 205 """
206 206 HTTP extra environ keys.
207 207
208 208 User by the test application and as well for setting up the pylons
209 209 environment. In the case of the fixture "app" it should be possible
210 210 to override this for a specific test case.
211 211 """
212 212 return {
213 213 'SERVER_NAME': http_host_only_stub(),
214 214 'SERVER_PORT': http_host_stub.split(':')[1],
215 215 'HTTP_HOST': http_host_stub,
216 216 'HTTP_USER_AGENT': 'rc-test-agent',
217 217 'REQUEST_METHOD': 'GET'
218 218 }
219 219
220 220
221 221 @pytest.fixture(scope='function')
222 222 def app(request, config_stub, baseapp, http_environ):
223 223 app = CustomTestApp(
224 224 baseapp,
225 225 extra_environ=http_environ)
226 226 if request.cls:
227 227 request.cls.app = app
228 228 return app
229 229
230 230
231 231 @pytest.fixture(scope='session')
232 232 def app_settings(baseapp, ini_config):
233 233 """
234 234 Settings dictionary used to create the app.
235 235
236 236 Parses the ini file and passes the result through the sanitize and apply
237 237 defaults mechanism in `rhodecode.config.middleware`.
238 238 """
239 239 return baseapp.config.get_settings()
240 240
241 241
242 242 LoginData = collections.namedtuple('LoginData', ('csrf_token', 'user'))
243 243
244 244
245 245 def _autologin_user(app, *args):
246 246 session = login_user_session(app, *args)
247 247 csrf_token = rhodecode.lib.auth.get_csrf_token(session)
248 248 return LoginData(csrf_token, session['rhodecode_user'])
249 249
250 250
251 251 @pytest.fixture
252 252 def autologin_user(app):
253 253 """
254 254 Utility fixture which makes sure that the admin user is logged in
255 255 """
256 256 return _autologin_user(app)
257 257
258 258
259 259 @pytest.fixture
260 260 def autologin_regular_user(app):
261 261 """
262 262 Utility fixture which makes sure that the regular user is logged in
263 263 """
264 264 return _autologin_user(
265 265 app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
266 266
267 267
268 268 @pytest.fixture(scope='function')
269 269 def csrf_token(request, autologin_user):
270 270 return autologin_user.csrf_token
271 271
272 272
273 273 @pytest.fixture(scope='function')
274 274 def xhr_header(request):
275 275 return {'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'}
276 276
277 277
278 278 @pytest.fixture
279 279 def real_crypto_backend(monkeypatch):
280 280 """
281 281 Switch the production crypto backend on for this test.
282 282
283 283 During the test run the crypto backend is replaced with a faster
284 284 implementation based on the MD5 algorithm.
285 285 """
286 286 monkeypatch.setattr(rhodecode, 'is_test', False)
287 287
288 288
289 289 @pytest.fixture(scope='class')
290 290 def index_location(request, baseapp):
291 291 index_location = baseapp.config.get_settings()['search.location']
292 292 if request.cls:
293 293 request.cls.index_location = index_location
294 294 return index_location
295 295
296 296
297 297 @pytest.fixture(scope='session', autouse=True)
298 298 def tests_tmp_path(request):
299 299 """
300 300 Create temporary directory to be used during the test session.
301 301 """
302 302 if not os.path.exists(TESTS_TMP_PATH):
303 303 os.makedirs(TESTS_TMP_PATH)
304 304
305 305 if not request.config.getoption('--keep-tmp-path'):
306 306 @request.addfinalizer
307 307 def remove_tmp_path():
308 308 shutil.rmtree(TESTS_TMP_PATH)
309 309
310 310 return TESTS_TMP_PATH
311 311
312 312
313 313 @pytest.fixture
314 314 def test_repo_group(request):
315 315 """
316 316 Create a temporary repository group, and destroy it after
317 317 usage automatically
318 318 """
319 319 fixture = Fixture()
320 320 repogroupid = 'test_repo_group_%s' % str(time.time()).replace('.', '')
321 321 repo_group = fixture.create_repo_group(repogroupid)
322 322
323 323 def _cleanup():
324 324 fixture.destroy_repo_group(repogroupid)
325 325
326 326 request.addfinalizer(_cleanup)
327 327 return repo_group
328 328
329 329
330 330 @pytest.fixture
331 331 def test_user_group(request):
332 332 """
333 333 Create a temporary user group, and destroy it after
334 334 usage automatically
335 335 """
336 336 fixture = Fixture()
337 337 usergroupid = 'test_user_group_%s' % str(time.time()).replace('.', '')
338 338 user_group = fixture.create_user_group(usergroupid)
339 339
340 340 def _cleanup():
341 341 fixture.destroy_user_group(user_group)
342 342
343 343 request.addfinalizer(_cleanup)
344 344 return user_group
345 345
346 346
347 347 @pytest.fixture(scope='session')
348 348 def test_repo(request):
349 349 container = TestRepoContainer()
350 350 request.addfinalizer(container._cleanup)
351 351 return container
352 352
353 353
354 354 class TestRepoContainer(object):
355 355 """
356 356 Container for test repositories which are used read only.
357 357
358 358 Repositories will be created on demand and re-used during the lifetime
359 359 of this object.
360 360
361 361 Usage to get the svn test repository "minimal"::
362 362
363 363 test_repo = TestContainer()
364 364 repo = test_repo('minimal', 'svn')
365 365
366 366 """
367 367
368 368 dump_extractors = {
369 369 'git': utils.extract_git_repo_from_dump,
370 370 'hg': utils.extract_hg_repo_from_dump,
371 371 'svn': utils.extract_svn_repo_from_dump,
372 372 }
373 373
374 374 def __init__(self):
375 375 self._cleanup_repos = []
376 376 self._fixture = Fixture()
377 377 self._repos = {}
378 378
379 379 def __call__(self, dump_name, backend_alias, config=None):
380 380 key = (dump_name, backend_alias)
381 381 if key not in self._repos:
382 382 repo = self._create_repo(dump_name, backend_alias, config)
383 383 self._repos[key] = repo.repo_id
384 384 return Repository.get(self._repos[key])
385 385
386 386 def _create_repo(self, dump_name, backend_alias, config):
387 387 repo_name = '%s-%s' % (backend_alias, dump_name)
388 388 backend_class = get_backend(backend_alias)
389 389 dump_extractor = self.dump_extractors[backend_alias]
390 390 repo_path = dump_extractor(dump_name, repo_name)
391 391
392 392 vcs_repo = backend_class(repo_path, config=config)
393 393 repo2db_mapper({repo_name: vcs_repo})
394 394
395 395 repo = RepoModel().get_by_repo_name(repo_name)
396 396 self._cleanup_repos.append(repo_name)
397 397 return repo
398 398
399 399 def _cleanup(self):
400 400 for repo_name in reversed(self._cleanup_repos):
401 401 self._fixture.destroy_repo(repo_name)
402 402
403 403
404 404 @pytest.fixture
405 405 def backend(request, backend_alias, baseapp, test_repo):
406 406 """
407 407 Parametrized fixture which represents a single backend implementation.
408 408
409 409 It respects the option `--backends` to focus the test run on specific
410 410 backend implementations.
411 411
412 412 It also supports `pytest.mark.xfail_backends` to mark tests as failing
413 413 for specific backends. This is intended as a utility for incremental
414 414 development of a new backend implementation.
415 415 """
416 416 if backend_alias not in request.config.getoption('--backends'):
417 417 pytest.skip("Backend %s not selected." % (backend_alias, ))
418 418
419 419 utils.check_xfail_backends(request.node, backend_alias)
420 420 utils.check_skip_backends(request.node, backend_alias)
421 421
422 422 repo_name = 'vcs_test_%s' % (backend_alias, )
423 423 backend = Backend(
424 424 alias=backend_alias,
425 425 repo_name=repo_name,
426 426 test_name=request.node.name,
427 427 test_repo_container=test_repo)
428 428 request.addfinalizer(backend.cleanup)
429 429 return backend
430 430
431 431
432 432 @pytest.fixture
433 433 def backend_git(request, baseapp, test_repo):
434 434 return backend(request, 'git', baseapp, test_repo)
435 435
436 436
437 437 @pytest.fixture
438 438 def backend_hg(request, baseapp, test_repo):
439 439 return backend(request, 'hg', baseapp, test_repo)
440 440
441 441
442 442 @pytest.fixture
443 443 def backend_svn(request, baseapp, test_repo):
444 444 return backend(request, 'svn', baseapp, test_repo)
445 445
446 446
447 447 @pytest.fixture
448 448 def backend_random(backend_git):
449 449 """
450 450 Use this to express that your tests need "a backend.
451 451
452 452 A few of our tests need a backend, so that we can run the code. This
453 453 fixture is intended to be used for such cases. It will pick one of the
454 454 backends and run the tests.
455 455
456 456 The fixture `backend` would run the test multiple times for each
457 457 available backend which is a pure waste of time if the test is
458 458 independent of the backend type.
459 459 """
460 460 # TODO: johbo: Change this to pick a random backend
461 461 return backend_git
462 462
463 463
464 464 @pytest.fixture
465 465 def backend_stub(backend_git):
466 466 """
467 467 Use this to express that your tests need a backend stub
468 468
469 469 TODO: mikhail: Implement a real stub logic instead of returning
470 470 a git backend
471 471 """
472 472 return backend_git
473 473
474 474
475 475 @pytest.fixture
476 476 def repo_stub(backend_stub):
477 477 """
478 478 Use this to express that your tests need a repository stub
479 479 """
480 480 return backend_stub.create_repo()
481 481
482 482
483 483 class Backend(object):
484 484 """
485 485 Represents the test configuration for one supported backend
486 486
487 487 Provides easy access to different test repositories based on
488 488 `__getitem__`. Such repositories will only be created once per test
489 489 session.
490 490 """
491 491
492 492 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
493 493 _master_repo = None
494 494 _commit_ids = {}
495 495
496 496 def __init__(self, alias, repo_name, test_name, test_repo_container):
497 497 self.alias = alias
498 498 self.repo_name = repo_name
499 499 self._cleanup_repos = []
500 500 self._test_name = test_name
501 501 self._test_repo_container = test_repo_container
502 502 # TODO: johbo: Used as a delegate interim. Not yet sure if Backend or
503 503 # Fixture will survive in the end.
504 504 self._fixture = Fixture()
505 505
506 506 def __getitem__(self, key):
507 507 return self._test_repo_container(key, self.alias)
508 508
509 509 def create_test_repo(self, key, config=None):
510 510 return self._test_repo_container(key, self.alias, config)
511 511
512 512 @property
513 513 def repo(self):
514 514 """
515 515 Returns the "current" repository. This is the vcs_test repo or the
516 516 last repo which has been created with `create_repo`.
517 517 """
518 518 from rhodecode.model.db import Repository
519 519 return Repository.get_by_repo_name(self.repo_name)
520 520
521 521 @property
522 522 def default_branch_name(self):
523 523 VcsRepository = get_backend(self.alias)
524 524 return VcsRepository.DEFAULT_BRANCH_NAME
525 525
526 526 @property
527 527 def default_head_id(self):
528 528 """
529 529 Returns the default head id of the underlying backend.
530 530
531 531 This will be the default branch name in case the backend does have a
532 532 default branch. In the other cases it will point to a valid head
533 533 which can serve as the base to create a new commit on top of it.
534 534 """
535 535 vcsrepo = self.repo.scm_instance()
536 536 head_id = (
537 537 vcsrepo.DEFAULT_BRANCH_NAME or
538 538 vcsrepo.commit_ids[-1])
539 539 return head_id
540 540
541 541 @property
542 542 def commit_ids(self):
543 543 """
544 544 Returns the list of commits for the last created repository
545 545 """
546 546 return self._commit_ids
547 547
548 548 def create_master_repo(self, commits):
549 549 """
550 550 Create a repository and remember it as a template.
551 551
552 552 This allows to easily create derived repositories to construct
553 553 more complex scenarios for diff, compare and pull requests.
554 554
555 555 Returns a commit map which maps from commit message to raw_id.
556 556 """
557 557 self._master_repo = self.create_repo(commits=commits)
558 558 return self._commit_ids
559 559
560 560 def create_repo(
561 561 self, commits=None, number_of_commits=0, heads=None,
562 562 name_suffix=u'', **kwargs):
563 563 """
564 564 Create a repository and record it for later cleanup.
565 565
566 566 :param commits: Optional. A sequence of dict instances.
567 567 Will add a commit per entry to the new repository.
568 568 :param number_of_commits: Optional. If set to a number, this number of
569 569 commits will be added to the new repository.
570 570 :param heads: Optional. Can be set to a sequence of of commit
571 571 names which shall be pulled in from the master repository.
572 572
573 573 """
574 574 self.repo_name = self._next_repo_name() + name_suffix
575 575 repo = self._fixture.create_repo(
576 576 self.repo_name, repo_type=self.alias, **kwargs)
577 577 self._cleanup_repos.append(repo.repo_name)
578 578
579 579 commits = commits or [
580 580 {'message': 'Commit %s of %s' % (x, self.repo_name)}
581 581 for x in xrange(number_of_commits)]
582 582 self._add_commits_to_repo(repo.scm_instance(), commits)
583 583 if heads:
584 584 self.pull_heads(repo, heads)
585 585
586 586 return repo
587 587
588 588 def pull_heads(self, repo, heads):
589 589 """
590 590 Make sure that repo contains all commits mentioned in `heads`
591 591 """
592 592 vcsmaster = self._master_repo.scm_instance()
593 593 vcsrepo = repo.scm_instance()
594 594 vcsrepo.config.clear_section('hooks')
595 595 commit_ids = [self._commit_ids[h] for h in heads]
596 596 vcsrepo.pull(vcsmaster.path, commit_ids=commit_ids)
597 597
598 598 def create_fork(self):
599 599 repo_to_fork = self.repo_name
600 600 self.repo_name = self._next_repo_name()
601 601 repo = self._fixture.create_fork(repo_to_fork, self.repo_name)
602 602 self._cleanup_repos.append(self.repo_name)
603 603 return repo
604 604
605 605 def new_repo_name(self, suffix=u''):
606 606 self.repo_name = self._next_repo_name() + suffix
607 607 self._cleanup_repos.append(self.repo_name)
608 608 return self.repo_name
609 609
610 610 def _next_repo_name(self):
611 611 return u"%s_%s" % (
612 612 self.invalid_repo_name.sub(u'_', self._test_name),
613 613 len(self._cleanup_repos))
614 614
615 615 def ensure_file(self, filename, content='Test content\n'):
616 616 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
617 617 commits = [
618 618 {'added': [
619 619 FileNode(filename, content=content),
620 620 ]},
621 621 ]
622 622 self._add_commits_to_repo(self.repo.scm_instance(), commits)
623 623
624 624 def enable_downloads(self):
625 625 repo = self.repo
626 626 repo.enable_downloads = True
627 627 Session().add(repo)
628 628 Session().commit()
629 629
630 630 def cleanup(self):
631 631 for repo_name in reversed(self._cleanup_repos):
632 632 self._fixture.destroy_repo(repo_name)
633 633
634 634 def _add_commits_to_repo(self, repo, commits):
635 635 commit_ids = _add_commits_to_repo(repo, commits)
636 636 if not commit_ids:
637 637 return
638 638 self._commit_ids = commit_ids
639 639
640 640 # Creating refs for Git to allow fetching them from remote repository
641 641 if self.alias == 'git':
642 642 refs = {}
643 643 for message in self._commit_ids:
644 644 # TODO: mikhail: do more special chars replacements
645 645 ref_name = 'refs/test-refs/{}'.format(
646 646 message.replace(' ', ''))
647 647 refs[ref_name] = self._commit_ids[message]
648 648 self._create_refs(repo, refs)
649 649
650 650 def _create_refs(self, repo, refs):
651 651 for ref_name in refs:
652 652 repo.set_refs(ref_name, refs[ref_name])
653 653
654 654
655 655 @pytest.fixture
656 656 def vcsbackend(request, backend_alias, tests_tmp_path, baseapp, test_repo):
657 657 """
658 658 Parametrized fixture which represents a single vcs backend implementation.
659 659
660 660 See the fixture `backend` for more details. This one implements the same
661 661 concept, but on vcs level. So it does not provide model instances etc.
662 662
663 663 Parameters are generated dynamically, see :func:`pytest_generate_tests`
664 664 for how this works.
665 665 """
666 666 if backend_alias not in request.config.getoption('--backends'):
667 667 pytest.skip("Backend %s not selected." % (backend_alias, ))
668 668
669 669 utils.check_xfail_backends(request.node, backend_alias)
670 670 utils.check_skip_backends(request.node, backend_alias)
671 671
672 672 repo_name = 'vcs_test_%s' % (backend_alias, )
673 673 repo_path = os.path.join(tests_tmp_path, repo_name)
674 674 backend = VcsBackend(
675 675 alias=backend_alias,
676 676 repo_path=repo_path,
677 677 test_name=request.node.name,
678 678 test_repo_container=test_repo)
679 679 request.addfinalizer(backend.cleanup)
680 680 return backend
681 681
682 682
683 683 @pytest.fixture
684 684 def vcsbackend_git(request, tests_tmp_path, baseapp, test_repo):
685 685 return vcsbackend(request, 'git', tests_tmp_path, baseapp, test_repo)
686 686
687 687
688 688 @pytest.fixture
689 689 def vcsbackend_hg(request, tests_tmp_path, baseapp, test_repo):
690 690 return vcsbackend(request, 'hg', tests_tmp_path, baseapp, test_repo)
691 691
692 692
693 693 @pytest.fixture
694 694 def vcsbackend_svn(request, tests_tmp_path, baseapp, test_repo):
695 695 return vcsbackend(request, 'svn', tests_tmp_path, baseapp, test_repo)
696 696
697 697
698 698 @pytest.fixture
699 699 def vcsbackend_random(vcsbackend_git):
700 700 """
701 701 Use this to express that your tests need "a vcsbackend".
702 702
703 703 The fixture `vcsbackend` would run the test multiple times for each
704 704 available vcs backend which is a pure waste of time if the test is
705 705 independent of the vcs backend type.
706 706 """
707 707 # TODO: johbo: Change this to pick a random backend
708 708 return vcsbackend_git
709 709
710 710
711 711 @pytest.fixture
712 712 def vcsbackend_stub(vcsbackend_git):
713 713 """
714 714 Use this to express that your test just needs a stub of a vcsbackend.
715 715
716 716 Plan is to eventually implement an in-memory stub to speed tests up.
717 717 """
718 718 return vcsbackend_git
719 719
720 720
721 721 class VcsBackend(object):
722 722 """
723 723 Represents the test configuration for one supported vcs backend.
724 724 """
725 725
726 726 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
727 727
728 728 def __init__(self, alias, repo_path, test_name, test_repo_container):
729 729 self.alias = alias
730 730 self._repo_path = repo_path
731 731 self._cleanup_repos = []
732 732 self._test_name = test_name
733 733 self._test_repo_container = test_repo_container
734 734
735 735 def __getitem__(self, key):
736 736 return self._test_repo_container(key, self.alias).scm_instance()
737 737
738 738 @property
739 739 def repo(self):
740 740 """
741 741 Returns the "current" repository. This is the vcs_test repo of the last
742 742 repo which has been created.
743 743 """
744 744 Repository = get_backend(self.alias)
745 745 return Repository(self._repo_path)
746 746
747 747 @property
748 748 def backend(self):
749 749 """
750 750 Returns the backend implementation class.
751 751 """
752 752 return get_backend(self.alias)
753 753
754 754 def create_repo(self, commits=None, number_of_commits=0, _clone_repo=None):
755 755 repo_name = self._next_repo_name()
756 756 self._repo_path = get_new_dir(repo_name)
757 757 repo_class = get_backend(self.alias)
758 758 src_url = None
759 759 if _clone_repo:
760 760 src_url = _clone_repo.path
761 761 repo = repo_class(self._repo_path, create=True, src_url=src_url)
762 762 self._cleanup_repos.append(repo)
763 763
764 764 commits = commits or [
765 765 {'message': 'Commit %s of %s' % (x, repo_name)}
766 766 for x in xrange(number_of_commits)]
767 767 _add_commits_to_repo(repo, commits)
768 768 return repo
769 769
770 770 def clone_repo(self, repo):
771 771 return self.create_repo(_clone_repo=repo)
772 772
773 773 def cleanup(self):
774 774 for repo in self._cleanup_repos:
775 775 shutil.rmtree(repo.path)
776 776
777 777 def new_repo_path(self):
778 778 repo_name = self._next_repo_name()
779 779 self._repo_path = get_new_dir(repo_name)
780 780 return self._repo_path
781 781
782 782 def _next_repo_name(self):
783 783 return "%s_%s" % (
784 784 self.invalid_repo_name.sub('_', self._test_name),
785 785 len(self._cleanup_repos))
786 786
787 787 def add_file(self, repo, filename, content='Test content\n'):
788 788 imc = repo.in_memory_commit
789 789 imc.add(FileNode(filename, content=content))
790 790 imc.commit(
791 791 message=u'Automatic commit from vcsbackend fixture',
792 792 author=u'Automatic')
793 793
794 794 def ensure_file(self, filename, content='Test content\n'):
795 795 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
796 796 self.add_file(self.repo, filename, content)
797 797
798 798
799 799 def _add_commits_to_repo(vcs_repo, commits):
800 800 commit_ids = {}
801 801 if not commits:
802 802 return commit_ids
803 803
804 804 imc = vcs_repo.in_memory_commit
805 805 commit = None
806 806
807 807 for idx, commit in enumerate(commits):
808 808 message = unicode(commit.get('message', 'Commit %s' % idx))
809 809
810 810 for node in commit.get('added', []):
811 811 imc.add(FileNode(node.path, content=node.content))
812 812 for node in commit.get('changed', []):
813 813 imc.change(FileNode(node.path, content=node.content))
814 814 for node in commit.get('removed', []):
815 815 imc.remove(FileNode(node.path))
816 816
817 817 parents = [
818 818 vcs_repo.get_commit(commit_id=commit_ids[p])
819 819 for p in commit.get('parents', [])]
820 820
821 821 operations = ('added', 'changed', 'removed')
822 822 if not any((commit.get(o) for o in operations)):
823 823 imc.add(FileNode('file_%s' % idx, content=message))
824 824
825 825 commit = imc.commit(
826 826 message=message,
827 827 author=unicode(commit.get('author', 'Automatic')),
828 828 date=commit.get('date'),
829 829 branch=commit.get('branch'),
830 830 parents=parents)
831 831
832 832 commit_ids[commit.message] = commit.raw_id
833 833
834 834 return commit_ids
835 835
836 836
837 837 @pytest.fixture
838 838 def reposerver(request):
839 839 """
840 840 Allows to serve a backend repository
841 841 """
842 842
843 843 repo_server = RepoServer()
844 844 request.addfinalizer(repo_server.cleanup)
845 845 return repo_server
846 846
847 847
848 848 class RepoServer(object):
849 849 """
850 850 Utility to serve a local repository for the duration of a test case.
851 851
852 852 Supports only Subversion so far.
853 853 """
854 854
855 855 url = None
856 856
857 857 def __init__(self):
858 858 self._cleanup_servers = []
859 859
860 860 def serve(self, vcsrepo):
861 861 if vcsrepo.alias != 'svn':
862 862 raise TypeError("Backend %s not supported" % vcsrepo.alias)
863 863
864 864 proc = subprocess32.Popen(
865 865 ['svnserve', '-d', '--foreground', '--listen-host', 'localhost',
866 866 '--root', vcsrepo.path])
867 867 self._cleanup_servers.append(proc)
868 868 self.url = 'svn://localhost'
869 869
870 870 def cleanup(self):
871 871 for proc in self._cleanup_servers:
872 872 proc.terminate()
873 873
874 874
875 875 @pytest.fixture
876 876 def pr_util(backend, request, config_stub):
877 877 """
878 878 Utility for tests of models and for functional tests around pull requests.
879 879
880 880 It gives an instance of :class:`PRTestUtility` which provides various
881 881 utility methods around one pull request.
882 882
883 883 This fixture uses `backend` and inherits its parameterization.
884 884 """
885 885
886 886 util = PRTestUtility(backend)
887 887 request.addfinalizer(util.cleanup)
888 888
889 889 return util
890 890
891 891
892 892 class PRTestUtility(object):
893 893
894 894 pull_request = None
895 895 pull_request_id = None
896 896 mergeable_patcher = None
897 897 mergeable_mock = None
898 898 notification_patcher = None
899 899
900 900 def __init__(self, backend):
901 901 self.backend = backend
902 902
903 903 def create_pull_request(
904 904 self, commits=None, target_head=None, source_head=None,
905 905 revisions=None, approved=False, author=None, mergeable=False,
906 906 enable_notifications=True, name_suffix=u'', reviewers=None,
907 907 title=u"Test", description=u"Description"):
908 908 self.set_mergeable(mergeable)
909 909 if not enable_notifications:
910 910 # mock notification side effect
911 911 self.notification_patcher = mock.patch(
912 912 'rhodecode.model.notification.NotificationModel.create')
913 913 self.notification_patcher.start()
914 914
915 915 if not self.pull_request:
916 916 if not commits:
917 917 commits = [
918 918 {'message': 'c1'},
919 919 {'message': 'c2'},
920 920 {'message': 'c3'},
921 921 ]
922 922 target_head = 'c1'
923 923 source_head = 'c2'
924 924 revisions = ['c2']
925 925
926 926 self.commit_ids = self.backend.create_master_repo(commits)
927 927 self.target_repository = self.backend.create_repo(
928 928 heads=[target_head], name_suffix=name_suffix)
929 929 self.source_repository = self.backend.create_repo(
930 930 heads=[source_head], name_suffix=name_suffix)
931 931 self.author = author or UserModel().get_by_username(
932 932 TEST_USER_ADMIN_LOGIN)
933 933
934 934 model = PullRequestModel()
935 935 self.create_parameters = {
936 936 'created_by': self.author,
937 937 'source_repo': self.source_repository.repo_name,
938 938 'source_ref': self._default_branch_reference(source_head),
939 939 'target_repo': self.target_repository.repo_name,
940 940 'target_ref': self._default_branch_reference(target_head),
941 941 'revisions': [self.commit_ids[r] for r in revisions],
942 942 'reviewers': reviewers or self._get_reviewers(),
943 943 'title': title,
944 944 'description': description,
945 945 }
946 946 self.pull_request = model.create(**self.create_parameters)
947 947 assert model.get_versions(self.pull_request) == []
948 948
949 949 self.pull_request_id = self.pull_request.pull_request_id
950 950
951 951 if approved:
952 952 self.approve()
953 953
954 954 Session().add(self.pull_request)
955 955 Session().commit()
956 956
957 957 return self.pull_request
958 958
959 959 def approve(self):
960 960 self.create_status_votes(
961 961 ChangesetStatus.STATUS_APPROVED,
962 962 *self.pull_request.reviewers)
963 963
964 964 def close(self):
965 965 PullRequestModel().close_pull_request(self.pull_request, self.author)
966 966
967 967 def _default_branch_reference(self, commit_message):
968 968 reference = '%s:%s:%s' % (
969 969 'branch',
970 970 self.backend.default_branch_name,
971 971 self.commit_ids[commit_message])
972 972 return reference
973 973
974 974 def _get_reviewers(self):
975 975 return [
976 976 (TEST_USER_REGULAR_LOGIN, ['default1'], False),
977 977 (TEST_USER_REGULAR2_LOGIN, ['default2'], False),
978 978 ]
979 979
980 980 def update_source_repository(self, head=None):
981 981 heads = [head or 'c3']
982 982 self.backend.pull_heads(self.source_repository, heads=heads)
983 983
984 984 def add_one_commit(self, head=None):
985 985 self.update_source_repository(head=head)
986 986 old_commit_ids = set(self.pull_request.revisions)
987 987 PullRequestModel().update_commits(self.pull_request)
988 988 commit_ids = set(self.pull_request.revisions)
989 989 new_commit_ids = commit_ids - old_commit_ids
990 990 assert len(new_commit_ids) == 1
991 991 return new_commit_ids.pop()
992 992
993 993 def remove_one_commit(self):
994 994 assert len(self.pull_request.revisions) == 2
995 995 source_vcs = self.source_repository.scm_instance()
996 996 removed_commit_id = source_vcs.commit_ids[-1]
997 997
998 998 # TODO: johbo: Git and Mercurial have an inconsistent vcs api here,
999 999 # remove the if once that's sorted out.
1000 1000 if self.backend.alias == "git":
1001 1001 kwargs = {'branch_name': self.backend.default_branch_name}
1002 1002 else:
1003 1003 kwargs = {}
1004 1004 source_vcs.strip(removed_commit_id, **kwargs)
1005 1005
1006 1006 PullRequestModel().update_commits(self.pull_request)
1007 1007 assert len(self.pull_request.revisions) == 1
1008 1008 return removed_commit_id
1009 1009
1010 1010 def create_comment(self, linked_to=None):
1011 1011 comment = CommentsModel().create(
1012 1012 text=u"Test comment",
1013 1013 repo=self.target_repository.repo_name,
1014 1014 user=self.author,
1015 1015 pull_request=self.pull_request)
1016 1016 assert comment.pull_request_version_id is None
1017 1017
1018 1018 if linked_to:
1019 1019 PullRequestModel()._link_comments_to_version(linked_to)
1020 1020
1021 1021 return comment
1022 1022
1023 1023 def create_inline_comment(
1024 1024 self, linked_to=None, line_no=u'n1', file_path='file_1'):
1025 1025 comment = CommentsModel().create(
1026 1026 text=u"Test comment",
1027 1027 repo=self.target_repository.repo_name,
1028 1028 user=self.author,
1029 1029 line_no=line_no,
1030 1030 f_path=file_path,
1031 1031 pull_request=self.pull_request)
1032 1032 assert comment.pull_request_version_id is None
1033 1033
1034 1034 if linked_to:
1035 1035 PullRequestModel()._link_comments_to_version(linked_to)
1036 1036
1037 1037 return comment
1038 1038
1039 1039 def create_version_of_pull_request(self):
1040 1040 pull_request = self.create_pull_request()
1041 1041 version = PullRequestModel()._create_version_from_snapshot(
1042 1042 pull_request)
1043 1043 return version
1044 1044
1045 1045 def create_status_votes(self, status, *reviewers):
1046 1046 for reviewer in reviewers:
1047 1047 ChangesetStatusModel().set_status(
1048 1048 repo=self.pull_request.target_repo,
1049 1049 status=status,
1050 1050 user=reviewer.user_id,
1051 1051 pull_request=self.pull_request)
1052 1052
1053 1053 def set_mergeable(self, value):
1054 1054 if not self.mergeable_patcher:
1055 1055 self.mergeable_patcher = mock.patch.object(
1056 1056 VcsSettingsModel, 'get_general_settings')
1057 1057 self.mergeable_mock = self.mergeable_patcher.start()
1058 1058 self.mergeable_mock.return_value = {
1059 1059 'rhodecode_pr_merge_enabled': value}
1060 1060
1061 1061 def cleanup(self):
1062 1062 # In case the source repository is already cleaned up, the pull
1063 1063 # request will already be deleted.
1064 1064 pull_request = PullRequest().get(self.pull_request_id)
1065 1065 if pull_request:
1066 1066 PullRequestModel().delete(pull_request, pull_request.author)
1067 1067 Session().commit()
1068 1068
1069 1069 if self.notification_patcher:
1070 1070 self.notification_patcher.stop()
1071 1071
1072 1072 if self.mergeable_patcher:
1073 1073 self.mergeable_patcher.stop()
1074 1074
1075 1075
1076 1076 @pytest.fixture
1077 1077 def user_admin(baseapp):
1078 1078 """
1079 1079 Provides the default admin test user as an instance of `db.User`.
1080 1080 """
1081 1081 user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
1082 1082 return user
1083 1083
1084 1084
1085 1085 @pytest.fixture
1086 1086 def user_regular(baseapp):
1087 1087 """
1088 1088 Provides the default regular test user as an instance of `db.User`.
1089 1089 """
1090 1090 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1091 1091 return user
1092 1092
1093 1093
1094 1094 @pytest.fixture
1095 1095 def user_util(request, baseapp):
1096 1096 """
1097 1097 Provides a wired instance of `UserUtility` with integrated cleanup.
1098 1098 """
1099 1099 utility = UserUtility(test_name=request.node.name)
1100 1100 request.addfinalizer(utility.cleanup)
1101 1101 return utility
1102 1102
1103 1103
1104 1104 # TODO: johbo: Split this up into utilities per domain or something similar
1105 1105 class UserUtility(object):
1106 1106
1107 1107 def __init__(self, test_name="test"):
1108 1108 self._test_name = self._sanitize_name(test_name)
1109 1109 self.fixture = Fixture()
1110 1110 self.repo_group_ids = []
1111 1111 self.repos_ids = []
1112 1112 self.user_ids = []
1113 1113 self.user_group_ids = []
1114 1114 self.user_repo_permission_ids = []
1115 1115 self.user_group_repo_permission_ids = []
1116 1116 self.user_repo_group_permission_ids = []
1117 1117 self.user_group_repo_group_permission_ids = []
1118 1118 self.user_user_group_permission_ids = []
1119 1119 self.user_group_user_group_permission_ids = []
1120 1120 self.user_permissions = []
1121 1121
1122 1122 def _sanitize_name(self, name):
1123 1123 for char in ['[', ']']:
1124 1124 name = name.replace(char, '_')
1125 1125 return name
1126 1126
1127 1127 def create_repo_group(
1128 1128 self, owner=TEST_USER_ADMIN_LOGIN, auto_cleanup=True):
1129 1129 group_name = "{prefix}_repogroup_{count}".format(
1130 1130 prefix=self._test_name,
1131 1131 count=len(self.repo_group_ids))
1132 1132 repo_group = self.fixture.create_repo_group(
1133 1133 group_name, cur_user=owner)
1134 1134 if auto_cleanup:
1135 1135 self.repo_group_ids.append(repo_group.group_id)
1136 1136 return repo_group
1137 1137
1138 1138 def create_repo(self, owner=TEST_USER_ADMIN_LOGIN, parent=None,
1139 1139 auto_cleanup=True, repo_type='hg'):
1140 1140 repo_name = "{prefix}_repository_{count}".format(
1141 1141 prefix=self._test_name,
1142 1142 count=len(self.repos_ids))
1143 1143
1144 1144 repository = self.fixture.create_repo(
1145 1145 repo_name, cur_user=owner, repo_group=parent, repo_type=repo_type)
1146 1146 if auto_cleanup:
1147 1147 self.repos_ids.append(repository.repo_id)
1148 1148 return repository
1149 1149
1150 1150 def create_user(self, auto_cleanup=True, **kwargs):
1151 1151 user_name = "{prefix}_user_{count}".format(
1152 1152 prefix=self._test_name,
1153 1153 count=len(self.user_ids))
1154 1154 user = self.fixture.create_user(user_name, **kwargs)
1155 1155 if auto_cleanup:
1156 1156 self.user_ids.append(user.user_id)
1157 1157 return user
1158 1158
1159 1159 def create_user_with_group(self):
1160 1160 user = self.create_user()
1161 1161 user_group = self.create_user_group(members=[user])
1162 1162 return user, user_group
1163 1163
1164 1164 def create_user_group(self, owner=TEST_USER_ADMIN_LOGIN, members=None,
1165 1165 auto_cleanup=True, **kwargs):
1166 1166 group_name = "{prefix}_usergroup_{count}".format(
1167 1167 prefix=self._test_name,
1168 1168 count=len(self.user_group_ids))
1169 1169 user_group = self.fixture.create_user_group(
1170 1170 group_name, cur_user=owner, **kwargs)
1171 1171
1172 1172 if auto_cleanup:
1173 1173 self.user_group_ids.append(user_group.users_group_id)
1174 1174 if members:
1175 1175 for user in members:
1176 1176 UserGroupModel().add_user_to_group(user_group, user)
1177 1177 return user_group
1178 1178
1179 1179 def grant_user_permission(self, user_name, permission_name):
1180 1180 self._inherit_default_user_permissions(user_name, False)
1181 1181 self.user_permissions.append((user_name, permission_name))
1182 1182
1183 1183 def grant_user_permission_to_repo_group(
1184 1184 self, repo_group, user, permission_name):
1185 1185 permission = RepoGroupModel().grant_user_permission(
1186 1186 repo_group, user, permission_name)
1187 1187 self.user_repo_group_permission_ids.append(
1188 1188 (repo_group.group_id, user.user_id))
1189 1189 return permission
1190 1190
1191 1191 def grant_user_group_permission_to_repo_group(
1192 1192 self, repo_group, user_group, permission_name):
1193 1193 permission = RepoGroupModel().grant_user_group_permission(
1194 1194 repo_group, user_group, permission_name)
1195 1195 self.user_group_repo_group_permission_ids.append(
1196 1196 (repo_group.group_id, user_group.users_group_id))
1197 1197 return permission
1198 1198
1199 1199 def grant_user_permission_to_repo(
1200 1200 self, repo, user, permission_name):
1201 1201 permission = RepoModel().grant_user_permission(
1202 1202 repo, user, permission_name)
1203 1203 self.user_repo_permission_ids.append(
1204 1204 (repo.repo_id, user.user_id))
1205 1205 return permission
1206 1206
1207 1207 def grant_user_group_permission_to_repo(
1208 1208 self, repo, user_group, permission_name):
1209 1209 permission = RepoModel().grant_user_group_permission(
1210 1210 repo, user_group, permission_name)
1211 1211 self.user_group_repo_permission_ids.append(
1212 1212 (repo.repo_id, user_group.users_group_id))
1213 1213 return permission
1214 1214
1215 1215 def grant_user_permission_to_user_group(
1216 1216 self, target_user_group, user, permission_name):
1217 1217 permission = UserGroupModel().grant_user_permission(
1218 1218 target_user_group, user, permission_name)
1219 1219 self.user_user_group_permission_ids.append(
1220 1220 (target_user_group.users_group_id, user.user_id))
1221 1221 return permission
1222 1222
1223 1223 def grant_user_group_permission_to_user_group(
1224 1224 self, target_user_group, user_group, permission_name):
1225 1225 permission = UserGroupModel().grant_user_group_permission(
1226 1226 target_user_group, user_group, permission_name)
1227 1227 self.user_group_user_group_permission_ids.append(
1228 1228 (target_user_group.users_group_id, user_group.users_group_id))
1229 1229 return permission
1230 1230
1231 1231 def revoke_user_permission(self, user_name, permission_name):
1232 1232 self._inherit_default_user_permissions(user_name, True)
1233 1233 UserModel().revoke_perm(user_name, permission_name)
1234 1234
1235 1235 def _inherit_default_user_permissions(self, user_name, value):
1236 1236 user = UserModel().get_by_username(user_name)
1237 1237 user.inherit_default_permissions = value
1238 1238 Session().add(user)
1239 1239 Session().commit()
1240 1240
1241 1241 def cleanup(self):
1242 1242 self._cleanup_permissions()
1243 1243 self._cleanup_repos()
1244 1244 self._cleanup_repo_groups()
1245 1245 self._cleanup_user_groups()
1246 1246 self._cleanup_users()
1247 1247
1248 1248 def _cleanup_permissions(self):
1249 1249 if self.user_permissions:
1250 1250 for user_name, permission_name in self.user_permissions:
1251 1251 self.revoke_user_permission(user_name, permission_name)
1252 1252
1253 1253 for permission in self.user_repo_permission_ids:
1254 1254 RepoModel().revoke_user_permission(*permission)
1255 1255
1256 1256 for permission in self.user_group_repo_permission_ids:
1257 1257 RepoModel().revoke_user_group_permission(*permission)
1258 1258
1259 1259 for permission in self.user_repo_group_permission_ids:
1260 1260 RepoGroupModel().revoke_user_permission(*permission)
1261 1261
1262 1262 for permission in self.user_group_repo_group_permission_ids:
1263 1263 RepoGroupModel().revoke_user_group_permission(*permission)
1264 1264
1265 1265 for permission in self.user_user_group_permission_ids:
1266 1266 UserGroupModel().revoke_user_permission(*permission)
1267 1267
1268 1268 for permission in self.user_group_user_group_permission_ids:
1269 1269 UserGroupModel().revoke_user_group_permission(*permission)
1270 1270
1271 1271 def _cleanup_repo_groups(self):
1272 1272 def _repo_group_compare(first_group_id, second_group_id):
1273 1273 """
1274 1274 Gives higher priority to the groups with the most complex paths
1275 1275 """
1276 1276 first_group = RepoGroup.get(first_group_id)
1277 1277 second_group = RepoGroup.get(second_group_id)
1278 1278 first_group_parts = (
1279 1279 len(first_group.group_name.split('/')) if first_group else 0)
1280 1280 second_group_parts = (
1281 1281 len(second_group.group_name.split('/')) if second_group else 0)
1282 1282 return cmp(second_group_parts, first_group_parts)
1283 1283
1284 1284 sorted_repo_group_ids = sorted(
1285 1285 self.repo_group_ids, cmp=_repo_group_compare)
1286 1286 for repo_group_id in sorted_repo_group_ids:
1287 1287 self.fixture.destroy_repo_group(repo_group_id)
1288 1288
1289 1289 def _cleanup_repos(self):
1290 1290 sorted_repos_ids = sorted(self.repos_ids)
1291 1291 for repo_id in sorted_repos_ids:
1292 1292 self.fixture.destroy_repo(repo_id)
1293 1293
1294 1294 def _cleanup_user_groups(self):
1295 1295 def _user_group_compare(first_group_id, second_group_id):
1296 1296 """
1297 1297 Gives higher priority to the groups with the most complex paths
1298 1298 """
1299 1299 first_group = UserGroup.get(first_group_id)
1300 1300 second_group = UserGroup.get(second_group_id)
1301 1301 first_group_parts = (
1302 1302 len(first_group.users_group_name.split('/'))
1303 1303 if first_group else 0)
1304 1304 second_group_parts = (
1305 1305 len(second_group.users_group_name.split('/'))
1306 1306 if second_group else 0)
1307 1307 return cmp(second_group_parts, first_group_parts)
1308 1308
1309 1309 sorted_user_group_ids = sorted(
1310 1310 self.user_group_ids, cmp=_user_group_compare)
1311 1311 for user_group_id in sorted_user_group_ids:
1312 1312 self.fixture.destroy_user_group(user_group_id)
1313 1313
1314 1314 def _cleanup_users(self):
1315 1315 for user_id in self.user_ids:
1316 1316 self.fixture.destroy_user(user_id)
1317 1317
1318 1318
1319 1319 # TODO: Think about moving this into a pytest-pyro package and make it a
1320 1320 # pytest plugin
1321 1321 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
1322 1322 def pytest_runtest_makereport(item, call):
1323 1323 """
1324 1324 Adding the remote traceback if the exception has this information.
1325 1325
1326 1326 VCSServer attaches this information as the attribute `_vcs_server_traceback`
1327 1327 to the exception instance.
1328 1328 """
1329 1329 outcome = yield
1330 1330 report = outcome.get_result()
1331 1331 if call.excinfo:
1332 1332 _add_vcsserver_remote_traceback(report, call.excinfo.value)
1333 1333
1334 1334
1335 1335 def _add_vcsserver_remote_traceback(report, exc):
1336 1336 vcsserver_traceback = getattr(exc, '_vcs_server_traceback', None)
1337 1337
1338 1338 if vcsserver_traceback:
1339 1339 section = 'VCSServer remote traceback ' + report.when
1340 1340 report.sections.append((section, vcsserver_traceback))
1341 1341
1342 1342
1343 1343 @pytest.fixture(scope='session')
1344 1344 def testrun():
1345 1345 return {
1346 1346 'uuid': uuid.uuid4(),
1347 1347 'start': datetime.datetime.utcnow().isoformat(),
1348 1348 'timestamp': int(time.time()),
1349 1349 }
1350 1350
1351 1351
1352 1352 @pytest.fixture(autouse=True)
1353 1353 def collect_appenlight_stats(request, testrun):
1354 1354 """
1355 1355 This fixture reports memory consumtion of single tests.
1356 1356
1357 1357 It gathers data based on `psutil` and sends them to Appenlight. The option
1358 1358 ``--ae`` has te be used to enable this fixture and the API key for your
1359 1359 application has to be provided in ``--ae-key``.
1360 1360 """
1361 1361 try:
1362 1362 # cygwin cannot have yet psutil support.
1363 1363 import psutil
1364 1364 except ImportError:
1365 1365 return
1366 1366
1367 1367 if not request.config.getoption('--appenlight'):
1368 1368 return
1369 1369 else:
1370 1370 # Only request the baseapp fixture if appenlight tracking is
1371 1371 # enabled. This will speed up a test run of unit tests by 2 to 3
1372 1372 # seconds if appenlight is not enabled.
1373 1373 baseapp = request.getfuncargvalue("baseapp")
1374 1374 url = '{}/api/logs'.format(request.config.getoption('--appenlight-url'))
1375 1375 client = AppenlightClient(
1376 1376 url=url,
1377 1377 api_key=request.config.getoption('--appenlight-api-key'),
1378 1378 namespace=request.node.nodeid,
1379 1379 request=str(testrun['uuid']),
1380 1380 testrun=testrun)
1381 1381
1382 1382 client.collect({
1383 1383 'message': "Starting",
1384 1384 })
1385 1385
1386 1386 server_and_port = baseapp.config.get_settings()['vcs.server']
1387 1387 protocol = baseapp.config.get_settings()['vcs.server.protocol']
1388 1388 server = create_vcsserver_proxy(server_and_port, protocol)
1389 1389 with server:
1390 1390 vcs_pid = server.get_pid()
1391 1391 server.run_gc()
1392 1392 vcs_process = psutil.Process(vcs_pid)
1393 1393 mem = vcs_process.memory_info()
1394 1394 client.tag_before('vcsserver.rss', mem.rss)
1395 1395 client.tag_before('vcsserver.vms', mem.vms)
1396 1396
1397 1397 test_process = psutil.Process()
1398 1398 mem = test_process.memory_info()
1399 1399 client.tag_before('test.rss', mem.rss)
1400 1400 client.tag_before('test.vms', mem.vms)
1401 1401
1402 1402 client.tag_before('time', time.time())
1403 1403
1404 1404 @request.addfinalizer
1405 1405 def send_stats():
1406 1406 client.tag_after('time', time.time())
1407 1407 with server:
1408 1408 gc_stats = server.run_gc()
1409 1409 for tag, value in gc_stats.items():
1410 1410 client.tag_after(tag, value)
1411 1411 mem = vcs_process.memory_info()
1412 1412 client.tag_after('vcsserver.rss', mem.rss)
1413 1413 client.tag_after('vcsserver.vms', mem.vms)
1414 1414
1415 1415 mem = test_process.memory_info()
1416 1416 client.tag_after('test.rss', mem.rss)
1417 1417 client.tag_after('test.vms', mem.vms)
1418 1418
1419 1419 client.collect({
1420 1420 'message': "Finished",
1421 1421 })
1422 1422 client.send_stats()
1423 1423
1424 1424 return client
1425 1425
1426 1426
1427 1427 class AppenlightClient():
1428 1428
1429 1429 url_template = '{url}?protocol_version=0.5'
1430 1430
1431 1431 def __init__(
1432 1432 self, url, api_key, add_server=True, add_timestamp=True,
1433 1433 namespace=None, request=None, testrun=None):
1434 1434 self.url = self.url_template.format(url=url)
1435 1435 self.api_key = api_key
1436 1436 self.add_server = add_server
1437 1437 self.add_timestamp = add_timestamp
1438 1438 self.namespace = namespace
1439 1439 self.request = request
1440 1440 self.server = socket.getfqdn(socket.gethostname())
1441 1441 self.tags_before = {}
1442 1442 self.tags_after = {}
1443 1443 self.stats = []
1444 1444 self.testrun = testrun or {}
1445 1445
1446 1446 def tag_before(self, tag, value):
1447 1447 self.tags_before[tag] = value
1448 1448
1449 1449 def tag_after(self, tag, value):
1450 1450 self.tags_after[tag] = value
1451 1451
1452 1452 def collect(self, data):
1453 1453 if self.add_server:
1454 1454 data.setdefault('server', self.server)
1455 1455 if self.add_timestamp:
1456 1456 data.setdefault('date', datetime.datetime.utcnow().isoformat())
1457 1457 if self.namespace:
1458 1458 data.setdefault('namespace', self.namespace)
1459 1459 if self.request:
1460 1460 data.setdefault('request', self.request)
1461 1461 self.stats.append(data)
1462 1462
1463 1463 def send_stats(self):
1464 1464 tags = [
1465 1465 ('testrun', self.request),
1466 1466 ('testrun.start', self.testrun['start']),
1467 1467 ('testrun.timestamp', self.testrun['timestamp']),
1468 1468 ('test', self.namespace),
1469 1469 ]
1470 1470 for key, value in self.tags_before.items():
1471 1471 tags.append((key + '.before', value))
1472 1472 try:
1473 1473 delta = self.tags_after[key] - value
1474 1474 tags.append((key + '.delta', delta))
1475 1475 except Exception:
1476 1476 pass
1477 1477 for key, value in self.tags_after.items():
1478 1478 tags.append((key + '.after', value))
1479 1479 self.collect({
1480 1480 'message': "Collected tags",
1481 1481 'tags': tags,
1482 1482 })
1483 1483
1484 1484 response = requests.post(
1485 1485 self.url,
1486 1486 headers={
1487 1487 'X-appenlight-api-key': self.api_key},
1488 1488 json=self.stats,
1489 1489 )
1490 1490
1491 1491 if not response.status_code == 200:
1492 1492 pprint.pprint(self.stats)
1493 1493 print(response.headers)
1494 1494 print(response.text)
1495 1495 raise Exception('Sending to appenlight failed')
1496 1496
1497 1497
1498 1498 @pytest.fixture
1499 1499 def gist_util(request, baseapp):
1500 1500 """
1501 1501 Provides a wired instance of `GistUtility` with integrated cleanup.
1502 1502 """
1503 1503 utility = GistUtility()
1504 1504 request.addfinalizer(utility.cleanup)
1505 1505 return utility
1506 1506
1507 1507
1508 1508 class GistUtility(object):
1509 1509 def __init__(self):
1510 1510 self.fixture = Fixture()
1511 1511 self.gist_ids = []
1512 1512
1513 1513 def create_gist(self, **kwargs):
1514 1514 gist = self.fixture.create_gist(**kwargs)
1515 1515 self.gist_ids.append(gist.gist_id)
1516 1516 return gist
1517 1517
1518 1518 def cleanup(self):
1519 1519 for id_ in self.gist_ids:
1520 1520 self.fixture.destroy_gists(str(id_))
1521 1521
1522 1522
1523 1523 @pytest.fixture
1524 1524 def enabled_backends(request):
1525 1525 backends = request.config.option.backends
1526 1526 return backends[:]
1527 1527
1528 1528
1529 1529 @pytest.fixture
1530 1530 def settings_util(request):
1531 1531 """
1532 1532 Provides a wired instance of `SettingsUtility` with integrated cleanup.
1533 1533 """
1534 1534 utility = SettingsUtility()
1535 1535 request.addfinalizer(utility.cleanup)
1536 1536 return utility
1537 1537
1538 1538
1539 1539 class SettingsUtility(object):
1540 1540 def __init__(self):
1541 1541 self.rhodecode_ui_ids = []
1542 1542 self.rhodecode_setting_ids = []
1543 1543 self.repo_rhodecode_ui_ids = []
1544 1544 self.repo_rhodecode_setting_ids = []
1545 1545
1546 1546 def create_repo_rhodecode_ui(
1547 1547 self, repo, section, value, key=None, active=True, cleanup=True):
1548 1548 key = key or hashlib.sha1(
1549 1549 '{}{}{}'.format(section, value, repo.repo_id)).hexdigest()
1550 1550
1551 1551 setting = RepoRhodeCodeUi()
1552 1552 setting.repository_id = repo.repo_id
1553 1553 setting.ui_section = section
1554 1554 setting.ui_value = value
1555 1555 setting.ui_key = key
1556 1556 setting.ui_active = active
1557 1557 Session().add(setting)
1558 1558 Session().commit()
1559 1559
1560 1560 if cleanup:
1561 1561 self.repo_rhodecode_ui_ids.append(setting.ui_id)
1562 1562 return setting
1563 1563
1564 1564 def create_rhodecode_ui(
1565 1565 self, section, value, key=None, active=True, cleanup=True):
1566 1566 key = key or hashlib.sha1('{}{}'.format(section, value)).hexdigest()
1567 1567
1568 1568 setting = RhodeCodeUi()
1569 1569 setting.ui_section = section
1570 1570 setting.ui_value = value
1571 1571 setting.ui_key = key
1572 1572 setting.ui_active = active
1573 1573 Session().add(setting)
1574 1574 Session().commit()
1575 1575
1576 1576 if cleanup:
1577 1577 self.rhodecode_ui_ids.append(setting.ui_id)
1578 1578 return setting
1579 1579
1580 1580 def create_repo_rhodecode_setting(
1581 1581 self, repo, name, value, type_, cleanup=True):
1582 1582 setting = RepoRhodeCodeSetting(
1583 1583 repo.repo_id, key=name, val=value, type=type_)
1584 1584 Session().add(setting)
1585 1585 Session().commit()
1586 1586
1587 1587 if cleanup:
1588 1588 self.repo_rhodecode_setting_ids.append(setting.app_settings_id)
1589 1589 return setting
1590 1590
1591 1591 def create_rhodecode_setting(self, name, value, type_, cleanup=True):
1592 1592 setting = RhodeCodeSetting(key=name, val=value, type=type_)
1593 1593 Session().add(setting)
1594 1594 Session().commit()
1595 1595
1596 1596 if cleanup:
1597 1597 self.rhodecode_setting_ids.append(setting.app_settings_id)
1598 1598
1599 1599 return setting
1600 1600
1601 1601 def cleanup(self):
1602 1602 for id_ in self.rhodecode_ui_ids:
1603 1603 setting = RhodeCodeUi.get(id_)
1604 1604 Session().delete(setting)
1605 1605
1606 1606 for id_ in self.rhodecode_setting_ids:
1607 1607 setting = RhodeCodeSetting.get(id_)
1608 1608 Session().delete(setting)
1609 1609
1610 1610 for id_ in self.repo_rhodecode_ui_ids:
1611 1611 setting = RepoRhodeCodeUi.get(id_)
1612 1612 Session().delete(setting)
1613 1613
1614 1614 for id_ in self.repo_rhodecode_setting_ids:
1615 1615 setting = RepoRhodeCodeSetting.get(id_)
1616 1616 Session().delete(setting)
1617 1617
1618 1618 Session().commit()
1619 1619
1620 1620
1621 1621 @pytest.fixture
1622 1622 def no_notifications(request):
1623 1623 notification_patcher = mock.patch(
1624 1624 'rhodecode.model.notification.NotificationModel.create')
1625 1625 notification_patcher.start()
1626 1626 request.addfinalizer(notification_patcher.stop)
1627 1627
1628 1628
1629 1629 @pytest.fixture(scope='session')
1630 1630 def repeat(request):
1631 1631 """
1632 1632 The number of repetitions is based on this fixture.
1633 1633
1634 1634 Slower calls may divide it by 10 or 100. It is chosen in a way so that the
1635 1635 tests are not too slow in our default test suite.
1636 1636 """
1637 1637 return request.config.getoption('--repeat')
1638 1638
1639 1639
1640 1640 @pytest.fixture
1641 1641 def rhodecode_fixtures():
1642 1642 return Fixture()
1643 1643
1644 1644
1645 1645 @pytest.fixture
1646 1646 def context_stub():
1647 1647 """
1648 1648 Stub context object.
1649 1649 """
1650 1650 context = pyramid.testing.DummyResource()
1651 1651 return context
1652 1652
1653 1653
1654 1654 @pytest.fixture
1655 1655 def request_stub():
1656 1656 """
1657 1657 Stub request object.
1658 1658 """
1659 1659 from rhodecode.lib.base import bootstrap_request
1660 1660 request = bootstrap_request(scheme='https')
1661 1661 return request
1662 1662
1663 1663
1664 1664 @pytest.fixture
1665 1665 def config_stub(request, request_stub):
1666 1666 """
1667 1667 Set up pyramid.testing and return the Configurator.
1668 1668 """
1669 1669 from rhodecode.lib.base import bootstrap_config
1670 1670 config = bootstrap_config(request=request_stub)
1671 1671
1672 1672 @request.addfinalizer
1673 1673 def cleanup():
1674 1674 pyramid.testing.tearDown()
1675 1675
1676 1676 return config
1677 1677
1678 1678
1679 1679 @pytest.fixture
1680 1680 def StubIntegrationType():
1681 1681 class _StubIntegrationType(IntegrationTypeBase):
1682 1682 """ Test integration type class """
1683 1683
1684 1684 key = 'test'
1685 1685 display_name = 'Test integration type'
1686 1686 description = 'A test integration type for testing'
1687 1687 icon = 'test_icon_html_image'
1688 1688
1689 1689 def __init__(self, settings):
1690 1690 super(_StubIntegrationType, self).__init__(settings)
1691 1691 self.sent_events = [] # for testing
1692 1692
1693 1693 def send_event(self, event):
1694 1694 self.sent_events.append(event)
1695 1695
1696 1696 def settings_schema(self):
1697 1697 class SettingsSchema(colander.Schema):
1698 1698 test_string_field = colander.SchemaNode(
1699 1699 colander.String(),
1700 1700 missing=colander.required,
1701 1701 title='test string field',
1702 1702 )
1703 1703 test_int_field = colander.SchemaNode(
1704 1704 colander.Int(),
1705 1705 title='some integer setting',
1706 1706 )
1707 1707 return SettingsSchema()
1708 1708
1709 1709
1710 1710 integration_type_registry.register_integration_type(_StubIntegrationType)
1711 1711 return _StubIntegrationType
1712 1712
1713 1713 @pytest.fixture
1714 1714 def stub_integration_settings():
1715 1715 return {
1716 1716 'test_string_field': 'some data',
1717 1717 'test_int_field': 100,
1718 1718 }
1719 1719
1720 1720
1721 1721 @pytest.fixture
1722 1722 def repo_integration_stub(request, repo_stub, StubIntegrationType,
1723 1723 stub_integration_settings):
1724 1724 integration = IntegrationModel().create(
1725 1725 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1726 1726 name='test repo integration',
1727 1727 repo=repo_stub, repo_group=None, child_repos_only=None)
1728 1728
1729 1729 @request.addfinalizer
1730 1730 def cleanup():
1731 1731 IntegrationModel().delete(integration)
1732 1732
1733 1733 return integration
1734 1734
1735 1735
1736 1736 @pytest.fixture
1737 1737 def repogroup_integration_stub(request, test_repo_group, StubIntegrationType,
1738 1738 stub_integration_settings):
1739 1739 integration = IntegrationModel().create(
1740 1740 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1741 1741 name='test repogroup integration',
1742 1742 repo=None, repo_group=test_repo_group, child_repos_only=True)
1743 1743
1744 1744 @request.addfinalizer
1745 1745 def cleanup():
1746 1746 IntegrationModel().delete(integration)
1747 1747
1748 1748 return integration
1749 1749
1750 1750
1751 1751 @pytest.fixture
1752 1752 def repogroup_recursive_integration_stub(request, test_repo_group,
1753 1753 StubIntegrationType, stub_integration_settings):
1754 1754 integration = IntegrationModel().create(
1755 1755 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1756 1756 name='test recursive repogroup integration',
1757 1757 repo=None, repo_group=test_repo_group, child_repos_only=False)
1758 1758
1759 1759 @request.addfinalizer
1760 1760 def cleanup():
1761 1761 IntegrationModel().delete(integration)
1762 1762
1763 1763 return integration
1764 1764
1765 1765
1766 1766 @pytest.fixture
1767 1767 def global_integration_stub(request, StubIntegrationType,
1768 1768 stub_integration_settings):
1769 1769 integration = IntegrationModel().create(
1770 1770 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1771 1771 name='test global integration',
1772 1772 repo=None, repo_group=None, child_repos_only=None)
1773 1773
1774 1774 @request.addfinalizer
1775 1775 def cleanup():
1776 1776 IntegrationModel().delete(integration)
1777 1777
1778 1778 return integration
1779 1779
1780 1780
1781 1781 @pytest.fixture
1782 1782 def root_repos_integration_stub(request, StubIntegrationType,
1783 1783 stub_integration_settings):
1784 1784 integration = IntegrationModel().create(
1785 1785 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1786 1786 name='test global integration',
1787 1787 repo=None, repo_group=None, child_repos_only=True)
1788 1788
1789 1789 @request.addfinalizer
1790 1790 def cleanup():
1791 1791 IntegrationModel().delete(integration)
1792 1792
1793 1793 return integration
1794 1794
1795 1795
1796 1796 @pytest.fixture
1797 1797 def local_dt_to_utc():
1798 1798 def _factory(dt):
1799 1799 return dt.replace(tzinfo=dateutil.tz.tzlocal()).astimezone(
1800 1800 dateutil.tz.tzutc()).replace(tzinfo=None)
1801 1801 return _factory
1802 1802
1803 1803
1804 1804 @pytest.fixture
1805 1805 def disable_anonymous_user(request, baseapp):
1806 1806 set_anonymous_access(False)
1807 1807
1808 1808 @request.addfinalizer
1809 1809 def cleanup():
1810 1810 set_anonymous_access(True)
1811 1811
1812 1812
1813 @pytest.fixture
1813 @pytest.fixture(scope='module')
1814 1814 def rc_fixture(request):
1815 1815 return Fixture()
1816 1816
1817 1817
1818 1818 @pytest.fixture
1819 1819 def repo_groups(request):
1820 1820 fixture = Fixture()
1821 1821
1822 1822 session = Session()
1823 1823 zombie_group = fixture.create_repo_group('zombie')
1824 1824 parent_group = fixture.create_repo_group('parent')
1825 1825 child_group = fixture.create_repo_group('parent/child')
1826 1826 groups_in_db = session.query(RepoGroup).all()
1827 1827 assert len(groups_in_db) == 3
1828 1828 assert child_group.group_parent_id == parent_group.group_id
1829 1829
1830 1830 @request.addfinalizer
1831 1831 def cleanup():
1832 1832 fixture.destroy_repo_group(zombie_group)
1833 1833 fixture.destroy_repo_group(child_group)
1834 1834 fixture.destroy_repo_group(parent_group)
1835 1835
1836 1836 return zombie_group, parent_group, child_group
General Comments 0
You need to be logged in to leave comments. Login now