##// END OF EJS Templates
tests: added creation of repos for user-util fixture.
marcink -
r1266:55bee39a default
parent child Browse files
Show More
@@ -1,1798 +1,1816 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import collections
22 22 import datetime
23 23 import hashlib
24 24 import os
25 25 import re
26 26 import pprint
27 27 import shutil
28 28 import socket
29 29 import subprocess32
30 30 import time
31 31 import uuid
32 32
33 33 import mock
34 34 import pyramid.testing
35 35 import pytest
36 36 import colander
37 37 import requests
38 38
39 39 import rhodecode
40 40 from rhodecode.lib.utils2 import AttributeDict
41 41 from rhodecode.model.changeset_status import ChangesetStatusModel
42 42 from rhodecode.model.comment import ChangesetCommentsModel
43 43 from rhodecode.model.db import (
44 44 PullRequest, Repository, RhodeCodeSetting, ChangesetStatus, RepoGroup,
45 45 UserGroup, RepoRhodeCodeUi, RepoRhodeCodeSetting, RhodeCodeUi)
46 46 from rhodecode.model.meta import Session
47 47 from rhodecode.model.pull_request import PullRequestModel
48 48 from rhodecode.model.repo import RepoModel
49 49 from rhodecode.model.repo_group import RepoGroupModel
50 50 from rhodecode.model.user import UserModel
51 51 from rhodecode.model.settings import VcsSettingsModel
52 52 from rhodecode.model.user_group import UserGroupModel
53 53 from rhodecode.model.integration import IntegrationModel
54 54 from rhodecode.integrations import integration_type_registry
55 55 from rhodecode.integrations.types.base import IntegrationTypeBase
56 56 from rhodecode.lib.utils import repo2db_mapper
57 57 from rhodecode.lib.vcs import create_vcsserver_proxy
58 58 from rhodecode.lib.vcs.backends import get_backend
59 59 from rhodecode.lib.vcs.nodes import FileNode
60 60 from rhodecode.tests import (
61 61 login_user_session, get_new_dir, utils, TESTS_TMP_PATH,
62 62 TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR2_LOGIN,
63 63 TEST_USER_REGULAR_PASS)
64 64 from rhodecode.tests.utils import CustomTestApp
65 65 from rhodecode.tests.fixture import Fixture
66 66
67 67
68 68 def _split_comma(value):
69 69 return value.split(',')
70 70
71 71
72 72 def pytest_addoption(parser):
73 73 parser.addoption(
74 74 '--keep-tmp-path', action='store_true',
75 75 help="Keep the test temporary directories")
76 76 parser.addoption(
77 77 '--backends', action='store', type=_split_comma,
78 78 default=['git', 'hg', 'svn'],
79 79 help="Select which backends to test for backend specific tests.")
80 80 parser.addoption(
81 81 '--dbs', action='store', type=_split_comma,
82 82 default=['sqlite'],
83 83 help="Select which database to test for database specific tests. "
84 84 "Possible options are sqlite,postgres,mysql")
85 85 parser.addoption(
86 86 '--appenlight', '--ae', action='store_true',
87 87 help="Track statistics in appenlight.")
88 88 parser.addoption(
89 89 '--appenlight-api-key', '--ae-key',
90 90 help="API key for Appenlight.")
91 91 parser.addoption(
92 92 '--appenlight-url', '--ae-url',
93 93 default="https://ae.rhodecode.com",
94 94 help="Appenlight service URL, defaults to https://ae.rhodecode.com")
95 95 parser.addoption(
96 96 '--sqlite-connection-string', action='store',
97 97 default='', help="Connection string for the dbs tests with SQLite")
98 98 parser.addoption(
99 99 '--postgres-connection-string', action='store',
100 100 default='', help="Connection string for the dbs tests with Postgres")
101 101 parser.addoption(
102 102 '--mysql-connection-string', action='store',
103 103 default='', help="Connection string for the dbs tests with MySQL")
104 104 parser.addoption(
105 105 '--repeat', type=int, default=100,
106 106 help="Number of repetitions in performance tests.")
107 107
108 108
109 109 def pytest_configure(config):
110 110 # Appy the kombu patch early on, needed for test discovery on Python 2.7.11
111 111 from rhodecode.config import patches
112 112 patches.kombu_1_5_1_python_2_7_11()
113 113
114 114
115 115 def pytest_collection_modifyitems(session, config, items):
116 116 # nottest marked, compare nose, used for transition from nose to pytest
117 117 remaining = [
118 118 i for i in items if getattr(i.obj, '__test__', True)]
119 119 items[:] = remaining
120 120
121 121
122 122 def pytest_generate_tests(metafunc):
123 123 # Support test generation based on --backend parameter
124 124 if 'backend_alias' in metafunc.fixturenames:
125 125 backends = get_backends_from_metafunc(metafunc)
126 126 scope = None
127 127 if not backends:
128 128 pytest.skip("Not enabled for any of selected backends")
129 129 metafunc.parametrize('backend_alias', backends, scope=scope)
130 130 elif hasattr(metafunc.function, 'backends'):
131 131 backends = get_backends_from_metafunc(metafunc)
132 132 if not backends:
133 133 pytest.skip("Not enabled for any of selected backends")
134 134
135 135
136 136 def get_backends_from_metafunc(metafunc):
137 137 requested_backends = set(metafunc.config.getoption('--backends'))
138 138 if hasattr(metafunc.function, 'backends'):
139 139 # Supported backends by this test function, created from
140 140 # pytest.mark.backends
141 141 backends = metafunc.function.backends.args
142 142 elif hasattr(metafunc.cls, 'backend_alias'):
143 143 # Support class attribute "backend_alias", this is mainly
144 144 # for legacy reasons for tests not yet using pytest.mark.backends
145 145 backends = [metafunc.cls.backend_alias]
146 146 else:
147 147 backends = metafunc.config.getoption('--backends')
148 148 return requested_backends.intersection(backends)
149 149
150 150
151 151 @pytest.fixture(scope='session', autouse=True)
152 152 def activate_example_rcextensions(request):
153 153 """
154 154 Patch in an example rcextensions module which verifies passed in kwargs.
155 155 """
156 156 from rhodecode.tests.other import example_rcextensions
157 157
158 158 old_extensions = rhodecode.EXTENSIONS
159 159 rhodecode.EXTENSIONS = example_rcextensions
160 160
161 161 @request.addfinalizer
162 162 def cleanup():
163 163 rhodecode.EXTENSIONS = old_extensions
164 164
165 165
166 166 @pytest.fixture
167 167 def capture_rcextensions():
168 168 """
169 169 Returns the recorded calls to entry points in rcextensions.
170 170 """
171 171 calls = rhodecode.EXTENSIONS.calls
172 172 calls.clear()
173 173 # Note: At this moment, it is still the empty dict, but that will
174 174 # be filled during the test run and since it is a reference this
175 175 # is enough to make it work.
176 176 return calls
177 177
178 178
179 179 @pytest.fixture(scope='session')
180 180 def http_environ_session():
181 181 """
182 182 Allow to use "http_environ" in session scope.
183 183 """
184 184 return http_environ(
185 185 http_host_stub=http_host_stub())
186 186
187 187
188 188 @pytest.fixture
189 189 def http_host_stub():
190 190 """
191 191 Value of HTTP_HOST in the test run.
192 192 """
193 193 return 'test.example.com:80'
194 194
195 195
196 196 @pytest.fixture
197 197 def http_environ(http_host_stub):
198 198 """
199 199 HTTP extra environ keys.
200 200
201 201 User by the test application and as well for setting up the pylons
202 202 environment. In the case of the fixture "app" it should be possible
203 203 to override this for a specific test case.
204 204 """
205 205 return {
206 206 'SERVER_NAME': http_host_stub.split(':')[0],
207 207 'SERVER_PORT': http_host_stub.split(':')[1],
208 208 'HTTP_HOST': http_host_stub,
209 209 }
210 210
211 211
212 212 @pytest.fixture(scope='function')
213 213 def app(request, pylonsapp, http_environ):
214 214
215 215
216 216 app = CustomTestApp(
217 217 pylonsapp,
218 218 extra_environ=http_environ)
219 219 if request.cls:
220 220 request.cls.app = app
221 221 return app
222 222
223 223
224 224 @pytest.fixture(scope='session')
225 225 def app_settings(pylonsapp, pylons_config):
226 226 """
227 227 Settings dictionary used to create the app.
228 228
229 229 Parses the ini file and passes the result through the sanitize and apply
230 230 defaults mechanism in `rhodecode.config.middleware`.
231 231 """
232 232 from paste.deploy.loadwsgi import loadcontext, APP
233 233 from rhodecode.config.middleware import (
234 234 sanitize_settings_and_apply_defaults)
235 235 context = loadcontext(APP, 'config:' + pylons_config)
236 236 settings = sanitize_settings_and_apply_defaults(context.config())
237 237 return settings
238 238
239 239
240 240 @pytest.fixture(scope='session')
241 241 def db(app_settings):
242 242 """
243 243 Initializes the database connection.
244 244
245 245 It uses the same settings which are used to create the ``pylonsapp`` or
246 246 ``app`` fixtures.
247 247 """
248 248 from rhodecode.config.utils import initialize_database
249 249 initialize_database(app_settings)
250 250
251 251
252 252 LoginData = collections.namedtuple('LoginData', ('csrf_token', 'user'))
253 253
254 254
255 255 def _autologin_user(app, *args):
256 256 session = login_user_session(app, *args)
257 257 csrf_token = rhodecode.lib.auth.get_csrf_token(session)
258 258 return LoginData(csrf_token, session['rhodecode_user'])
259 259
260 260
261 261 @pytest.fixture
262 262 def autologin_user(app):
263 263 """
264 264 Utility fixture which makes sure that the admin user is logged in
265 265 """
266 266 return _autologin_user(app)
267 267
268 268
269 269 @pytest.fixture
270 270 def autologin_regular_user(app):
271 271 """
272 272 Utility fixture which makes sure that the regular user is logged in
273 273 """
274 274 return _autologin_user(
275 275 app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
276 276
277 277
278 278 @pytest.fixture(scope='function')
279 279 def csrf_token(request, autologin_user):
280 280 return autologin_user.csrf_token
281 281
282 282
283 283 @pytest.fixture(scope='function')
284 284 def xhr_header(request):
285 285 return {'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'}
286 286
287 287
288 288 @pytest.fixture
289 289 def real_crypto_backend(monkeypatch):
290 290 """
291 291 Switch the production crypto backend on for this test.
292 292
293 293 During the test run the crypto backend is replaced with a faster
294 294 implementation based on the MD5 algorithm.
295 295 """
296 296 monkeypatch.setattr(rhodecode, 'is_test', False)
297 297
298 298
299 299 @pytest.fixture(scope='class')
300 300 def index_location(request, pylonsapp):
301 301 index_location = pylonsapp.config['app_conf']['search.location']
302 302 if request.cls:
303 303 request.cls.index_location = index_location
304 304 return index_location
305 305
306 306
307 307 @pytest.fixture(scope='session', autouse=True)
308 308 def tests_tmp_path(request):
309 309 """
310 310 Create temporary directory to be used during the test session.
311 311 """
312 312 if not os.path.exists(TESTS_TMP_PATH):
313 313 os.makedirs(TESTS_TMP_PATH)
314 314
315 315 if not request.config.getoption('--keep-tmp-path'):
316 316 @request.addfinalizer
317 317 def remove_tmp_path():
318 318 shutil.rmtree(TESTS_TMP_PATH)
319 319
320 320 return TESTS_TMP_PATH
321 321
322 322
323 323 @pytest.fixture(scope='session', autouse=True)
324 324 def patch_pyro_request_scope_proxy_factory(request):
325 325 """
326 326 Patch the pyro proxy factory to always use the same dummy request object
327 327 when under test. This will return the same pyro proxy on every call.
328 328 """
329 329 dummy_request = pyramid.testing.DummyRequest()
330 330
331 331 def mocked_call(self, request=None):
332 332 return self.getProxy(request=dummy_request)
333 333
334 334 patcher = mock.patch(
335 335 'rhodecode.lib.vcs.client.RequestScopeProxyFactory.__call__',
336 336 new=mocked_call)
337 337 patcher.start()
338 338
339 339 @request.addfinalizer
340 340 def undo_patching():
341 341 patcher.stop()
342 342
343 343
344 344 @pytest.fixture
345 345 def test_repo_group(request):
346 346 """
347 347 Create a temporary repository group, and destroy it after
348 348 usage automatically
349 349 """
350 350 fixture = Fixture()
351 351 repogroupid = 'test_repo_group_%s' % int(time.time())
352 352 repo_group = fixture.create_repo_group(repogroupid)
353 353
354 354 def _cleanup():
355 355 fixture.destroy_repo_group(repogroupid)
356 356
357 357 request.addfinalizer(_cleanup)
358 358 return repo_group
359 359
360 360
361 361 @pytest.fixture
362 362 def test_user_group(request):
363 363 """
364 364 Create a temporary user group, and destroy it after
365 365 usage automatically
366 366 """
367 367 fixture = Fixture()
368 368 usergroupid = 'test_user_group_%s' % int(time.time())
369 369 user_group = fixture.create_user_group(usergroupid)
370 370
371 371 def _cleanup():
372 372 fixture.destroy_user_group(user_group)
373 373
374 374 request.addfinalizer(_cleanup)
375 375 return user_group
376 376
377 377
378 378 @pytest.fixture(scope='session')
379 379 def test_repo(request):
380 380 container = TestRepoContainer()
381 381 request.addfinalizer(container._cleanup)
382 382 return container
383 383
384 384
385 385 class TestRepoContainer(object):
386 386 """
387 387 Container for test repositories which are used read only.
388 388
389 389 Repositories will be created on demand and re-used during the lifetime
390 390 of this object.
391 391
392 392 Usage to get the svn test repository "minimal"::
393 393
394 394 test_repo = TestContainer()
395 395 repo = test_repo('minimal', 'svn')
396 396
397 397 """
398 398
399 399 dump_extractors = {
400 400 'git': utils.extract_git_repo_from_dump,
401 401 'hg': utils.extract_hg_repo_from_dump,
402 402 'svn': utils.extract_svn_repo_from_dump,
403 403 }
404 404
405 405 def __init__(self):
406 406 self._cleanup_repos = []
407 407 self._fixture = Fixture()
408 408 self._repos = {}
409 409
410 410 def __call__(self, dump_name, backend_alias):
411 411 key = (dump_name, backend_alias)
412 412 if key not in self._repos:
413 413 repo = self._create_repo(dump_name, backend_alias)
414 414 self._repos[key] = repo.repo_id
415 415 return Repository.get(self._repos[key])
416 416
417 417 def _create_repo(self, dump_name, backend_alias):
418 418 repo_name = '%s-%s' % (backend_alias, dump_name)
419 419 backend_class = get_backend(backend_alias)
420 420 dump_extractor = self.dump_extractors[backend_alias]
421 421 repo_path = dump_extractor(dump_name, repo_name)
422 422 vcs_repo = backend_class(repo_path)
423 423 repo2db_mapper({repo_name: vcs_repo})
424 424 repo = RepoModel().get_by_repo_name(repo_name)
425 425 self._cleanup_repos.append(repo_name)
426 426 return repo
427 427
428 428 def _cleanup(self):
429 429 for repo_name in reversed(self._cleanup_repos):
430 430 self._fixture.destroy_repo(repo_name)
431 431
432 432
433 433 @pytest.fixture
434 434 def backend(request, backend_alias, pylonsapp, test_repo):
435 435 """
436 436 Parametrized fixture which represents a single backend implementation.
437 437
438 438 It respects the option `--backends` to focus the test run on specific
439 439 backend implementations.
440 440
441 441 It also supports `pytest.mark.xfail_backends` to mark tests as failing
442 442 for specific backends. This is intended as a utility for incremental
443 443 development of a new backend implementation.
444 444 """
445 445 if backend_alias not in request.config.getoption('--backends'):
446 446 pytest.skip("Backend %s not selected." % (backend_alias, ))
447 447
448 448 utils.check_xfail_backends(request.node, backend_alias)
449 449 utils.check_skip_backends(request.node, backend_alias)
450 450
451 451 repo_name = 'vcs_test_%s' % (backend_alias, )
452 452 backend = Backend(
453 453 alias=backend_alias,
454 454 repo_name=repo_name,
455 455 test_name=request.node.name,
456 456 test_repo_container=test_repo)
457 457 request.addfinalizer(backend.cleanup)
458 458 return backend
459 459
460 460
461 461 @pytest.fixture
462 462 def backend_git(request, pylonsapp, test_repo):
463 463 return backend(request, 'git', pylonsapp, test_repo)
464 464
465 465
466 466 @pytest.fixture
467 467 def backend_hg(request, pylonsapp, test_repo):
468 468 return backend(request, 'hg', pylonsapp, test_repo)
469 469
470 470
471 471 @pytest.fixture
472 472 def backend_svn(request, pylonsapp, test_repo):
473 473 return backend(request, 'svn', pylonsapp, test_repo)
474 474
475 475
476 476 @pytest.fixture
477 477 def backend_random(backend_git):
478 478 """
479 479 Use this to express that your tests need "a backend.
480 480
481 481 A few of our tests need a backend, so that we can run the code. This
482 482 fixture is intended to be used for such cases. It will pick one of the
483 483 backends and run the tests.
484 484
485 485 The fixture `backend` would run the test multiple times for each
486 486 available backend which is a pure waste of time if the test is
487 487 independent of the backend type.
488 488 """
489 489 # TODO: johbo: Change this to pick a random backend
490 490 return backend_git
491 491
492 492
493 493 @pytest.fixture
494 494 def backend_stub(backend_git):
495 495 """
496 496 Use this to express that your tests need a backend stub
497 497
498 498 TODO: mikhail: Implement a real stub logic instead of returning
499 499 a git backend
500 500 """
501 501 return backend_git
502 502
503 503
504 504 @pytest.fixture
505 505 def repo_stub(backend_stub):
506 506 """
507 507 Use this to express that your tests need a repository stub
508 508 """
509 509 return backend_stub.create_repo()
510 510
511 511
512 512 class Backend(object):
513 513 """
514 514 Represents the test configuration for one supported backend
515 515
516 516 Provides easy access to different test repositories based on
517 517 `__getitem__`. Such repositories will only be created once per test
518 518 session.
519 519 """
520 520
521 521 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
522 522 _master_repo = None
523 523 _commit_ids = {}
524 524
525 525 def __init__(self, alias, repo_name, test_name, test_repo_container):
526 526 self.alias = alias
527 527 self.repo_name = repo_name
528 528 self._cleanup_repos = []
529 529 self._test_name = test_name
530 530 self._test_repo_container = test_repo_container
531 531 # TODO: johbo: Used as a delegate interim. Not yet sure if Backend or
532 532 # Fixture will survive in the end.
533 533 self._fixture = Fixture()
534 534
535 535 def __getitem__(self, key):
536 536 return self._test_repo_container(key, self.alias)
537 537
538 538 @property
539 539 def repo(self):
540 540 """
541 541 Returns the "current" repository. This is the vcs_test repo or the
542 542 last repo which has been created with `create_repo`.
543 543 """
544 544 from rhodecode.model.db import Repository
545 545 return Repository.get_by_repo_name(self.repo_name)
546 546
547 547 @property
548 548 def default_branch_name(self):
549 549 VcsRepository = get_backend(self.alias)
550 550 return VcsRepository.DEFAULT_BRANCH_NAME
551 551
552 552 @property
553 553 def default_head_id(self):
554 554 """
555 555 Returns the default head id of the underlying backend.
556 556
557 557 This will be the default branch name in case the backend does have a
558 558 default branch. In the other cases it will point to a valid head
559 559 which can serve as the base to create a new commit on top of it.
560 560 """
561 561 vcsrepo = self.repo.scm_instance()
562 562 head_id = (
563 563 vcsrepo.DEFAULT_BRANCH_NAME or
564 564 vcsrepo.commit_ids[-1])
565 565 return head_id
566 566
567 567 @property
568 568 def commit_ids(self):
569 569 """
570 570 Returns the list of commits for the last created repository
571 571 """
572 572 return self._commit_ids
573 573
574 574 def create_master_repo(self, commits):
575 575 """
576 576 Create a repository and remember it as a template.
577 577
578 578 This allows to easily create derived repositories to construct
579 579 more complex scenarios for diff, compare and pull requests.
580 580
581 581 Returns a commit map which maps from commit message to raw_id.
582 582 """
583 583 self._master_repo = self.create_repo(commits=commits)
584 584 return self._commit_ids
585 585
586 586 def create_repo(
587 587 self, commits=None, number_of_commits=0, heads=None,
588 588 name_suffix=u'', **kwargs):
589 589 """
590 590 Create a repository and record it for later cleanup.
591 591
592 592 :param commits: Optional. A sequence of dict instances.
593 593 Will add a commit per entry to the new repository.
594 594 :param number_of_commits: Optional. If set to a number, this number of
595 595 commits will be added to the new repository.
596 596 :param heads: Optional. Can be set to a sequence of of commit
597 597 names which shall be pulled in from the master repository.
598 598
599 599 """
600 600 self.repo_name = self._next_repo_name() + name_suffix
601 601 repo = self._fixture.create_repo(
602 602 self.repo_name, repo_type=self.alias, **kwargs)
603 603 self._cleanup_repos.append(repo.repo_name)
604 604
605 605 commits = commits or [
606 606 {'message': 'Commit %s of %s' % (x, self.repo_name)}
607 607 for x in xrange(number_of_commits)]
608 608 self._add_commits_to_repo(repo.scm_instance(), commits)
609 609 if heads:
610 610 self.pull_heads(repo, heads)
611 611
612 612 return repo
613 613
614 614 def pull_heads(self, repo, heads):
615 615 """
616 616 Make sure that repo contains all commits mentioned in `heads`
617 617 """
618 618 vcsmaster = self._master_repo.scm_instance()
619 619 vcsrepo = repo.scm_instance()
620 620 vcsrepo.config.clear_section('hooks')
621 621 commit_ids = [self._commit_ids[h] for h in heads]
622 622 vcsrepo.pull(vcsmaster.path, commit_ids=commit_ids)
623 623
624 624 def create_fork(self):
625 625 repo_to_fork = self.repo_name
626 626 self.repo_name = self._next_repo_name()
627 627 repo = self._fixture.create_fork(repo_to_fork, self.repo_name)
628 628 self._cleanup_repos.append(self.repo_name)
629 629 return repo
630 630
631 631 def new_repo_name(self, suffix=u''):
632 632 self.repo_name = self._next_repo_name() + suffix
633 633 self._cleanup_repos.append(self.repo_name)
634 634 return self.repo_name
635 635
636 636 def _next_repo_name(self):
637 637 return u"%s_%s" % (
638 638 self.invalid_repo_name.sub(u'_', self._test_name),
639 639 len(self._cleanup_repos))
640 640
641 641 def ensure_file(self, filename, content='Test content\n'):
642 642 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
643 643 commits = [
644 644 {'added': [
645 645 FileNode(filename, content=content),
646 646 ]},
647 647 ]
648 648 self._add_commits_to_repo(self.repo.scm_instance(), commits)
649 649
650 650 def enable_downloads(self):
651 651 repo = self.repo
652 652 repo.enable_downloads = True
653 653 Session().add(repo)
654 654 Session().commit()
655 655
656 656 def cleanup(self):
657 657 for repo_name in reversed(self._cleanup_repos):
658 658 self._fixture.destroy_repo(repo_name)
659 659
660 660 def _add_commits_to_repo(self, repo, commits):
661 661 commit_ids = _add_commits_to_repo(repo, commits)
662 662 if not commit_ids:
663 663 return
664 664 self._commit_ids = commit_ids
665 665
666 666 # Creating refs for Git to allow fetching them from remote repository
667 667 if self.alias == 'git':
668 668 refs = {}
669 669 for message in self._commit_ids:
670 670 # TODO: mikhail: do more special chars replacements
671 671 ref_name = 'refs/test-refs/{}'.format(
672 672 message.replace(' ', ''))
673 673 refs[ref_name] = self._commit_ids[message]
674 674 self._create_refs(repo, refs)
675 675
676 676 def _create_refs(self, repo, refs):
677 677 for ref_name in refs:
678 678 repo.set_refs(ref_name, refs[ref_name])
679 679
680 680
681 681 @pytest.fixture
682 682 def vcsbackend(request, backend_alias, tests_tmp_path, pylonsapp, test_repo):
683 683 """
684 684 Parametrized fixture which represents a single vcs backend implementation.
685 685
686 686 See the fixture `backend` for more details. This one implements the same
687 687 concept, but on vcs level. So it does not provide model instances etc.
688 688
689 689 Parameters are generated dynamically, see :func:`pytest_generate_tests`
690 690 for how this works.
691 691 """
692 692 if backend_alias not in request.config.getoption('--backends'):
693 693 pytest.skip("Backend %s not selected." % (backend_alias, ))
694 694
695 695 utils.check_xfail_backends(request.node, backend_alias)
696 696 utils.check_skip_backends(request.node, backend_alias)
697 697
698 698 repo_name = 'vcs_test_%s' % (backend_alias, )
699 699 repo_path = os.path.join(tests_tmp_path, repo_name)
700 700 backend = VcsBackend(
701 701 alias=backend_alias,
702 702 repo_path=repo_path,
703 703 test_name=request.node.name,
704 704 test_repo_container=test_repo)
705 705 request.addfinalizer(backend.cleanup)
706 706 return backend
707 707
708 708
709 709 @pytest.fixture
710 710 def vcsbackend_git(request, tests_tmp_path, pylonsapp, test_repo):
711 711 return vcsbackend(request, 'git', tests_tmp_path, pylonsapp, test_repo)
712 712
713 713
714 714 @pytest.fixture
715 715 def vcsbackend_hg(request, tests_tmp_path, pylonsapp, test_repo):
716 716 return vcsbackend(request, 'hg', tests_tmp_path, pylonsapp, test_repo)
717 717
718 718
719 719 @pytest.fixture
720 720 def vcsbackend_svn(request, tests_tmp_path, pylonsapp, test_repo):
721 721 return vcsbackend(request, 'svn', tests_tmp_path, pylonsapp, test_repo)
722 722
723 723
724 724 @pytest.fixture
725 725 def vcsbackend_random(vcsbackend_git):
726 726 """
727 727 Use this to express that your tests need "a vcsbackend".
728 728
729 729 The fixture `vcsbackend` would run the test multiple times for each
730 730 available vcs backend which is a pure waste of time if the test is
731 731 independent of the vcs backend type.
732 732 """
733 733 # TODO: johbo: Change this to pick a random backend
734 734 return vcsbackend_git
735 735
736 736
737 737 @pytest.fixture
738 738 def vcsbackend_stub(vcsbackend_git):
739 739 """
740 740 Use this to express that your test just needs a stub of a vcsbackend.
741 741
742 742 Plan is to eventually implement an in-memory stub to speed tests up.
743 743 """
744 744 return vcsbackend_git
745 745
746 746
747 747 class VcsBackend(object):
748 748 """
749 749 Represents the test configuration for one supported vcs backend.
750 750 """
751 751
752 752 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
753 753
754 754 def __init__(self, alias, repo_path, test_name, test_repo_container):
755 755 self.alias = alias
756 756 self._repo_path = repo_path
757 757 self._cleanup_repos = []
758 758 self._test_name = test_name
759 759 self._test_repo_container = test_repo_container
760 760
761 761 def __getitem__(self, key):
762 762 return self._test_repo_container(key, self.alias).scm_instance()
763 763
764 764 @property
765 765 def repo(self):
766 766 """
767 767 Returns the "current" repository. This is the vcs_test repo of the last
768 768 repo which has been created.
769 769 """
770 770 Repository = get_backend(self.alias)
771 771 return Repository(self._repo_path)
772 772
773 773 @property
774 774 def backend(self):
775 775 """
776 776 Returns the backend implementation class.
777 777 """
778 778 return get_backend(self.alias)
779 779
780 780 def create_repo(self, commits=None, number_of_commits=0, _clone_repo=None):
781 781 repo_name = self._next_repo_name()
782 782 self._repo_path = get_new_dir(repo_name)
783 783 repo_class = get_backend(self.alias)
784 784 src_url = None
785 785 if _clone_repo:
786 786 src_url = _clone_repo.path
787 787 repo = repo_class(self._repo_path, create=True, src_url=src_url)
788 788 self._cleanup_repos.append(repo)
789 789
790 790 commits = commits or [
791 791 {'message': 'Commit %s of %s' % (x, repo_name)}
792 792 for x in xrange(number_of_commits)]
793 793 _add_commits_to_repo(repo, commits)
794 794 return repo
795 795
796 796 def clone_repo(self, repo):
797 797 return self.create_repo(_clone_repo=repo)
798 798
799 799 def cleanup(self):
800 800 for repo in self._cleanup_repos:
801 801 shutil.rmtree(repo.path)
802 802
803 803 def new_repo_path(self):
804 804 repo_name = self._next_repo_name()
805 805 self._repo_path = get_new_dir(repo_name)
806 806 return self._repo_path
807 807
808 808 def _next_repo_name(self):
809 809 return "%s_%s" % (
810 810 self.invalid_repo_name.sub('_', self._test_name),
811 811 len(self._cleanup_repos))
812 812
813 813 def add_file(self, repo, filename, content='Test content\n'):
814 814 imc = repo.in_memory_commit
815 815 imc.add(FileNode(filename, content=content))
816 816 imc.commit(
817 817 message=u'Automatic commit from vcsbackend fixture',
818 818 author=u'Automatic')
819 819
820 820 def ensure_file(self, filename, content='Test content\n'):
821 821 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
822 822 self.add_file(self.repo, filename, content)
823 823
824 824
825 825 def _add_commits_to_repo(vcs_repo, commits):
826 826 commit_ids = {}
827 827 if not commits:
828 828 return commit_ids
829 829
830 830 imc = vcs_repo.in_memory_commit
831 831 commit = None
832 832
833 833 for idx, commit in enumerate(commits):
834 834 message = unicode(commit.get('message', 'Commit %s' % idx))
835 835
836 836 for node in commit.get('added', []):
837 837 imc.add(FileNode(node.path, content=node.content))
838 838 for node in commit.get('changed', []):
839 839 imc.change(FileNode(node.path, content=node.content))
840 840 for node in commit.get('removed', []):
841 841 imc.remove(FileNode(node.path))
842 842
843 843 parents = [
844 844 vcs_repo.get_commit(commit_id=commit_ids[p])
845 845 for p in commit.get('parents', [])]
846 846
847 847 operations = ('added', 'changed', 'removed')
848 848 if not any((commit.get(o) for o in operations)):
849 849 imc.add(FileNode('file_%s' % idx, content=message))
850 850
851 851 commit = imc.commit(
852 852 message=message,
853 853 author=unicode(commit.get('author', 'Automatic')),
854 854 date=commit.get('date'),
855 855 branch=commit.get('branch'),
856 856 parents=parents)
857 857
858 858 commit_ids[commit.message] = commit.raw_id
859 859
860 860 return commit_ids
861 861
862 862
863 863 @pytest.fixture
864 864 def reposerver(request):
865 865 """
866 866 Allows to serve a backend repository
867 867 """
868 868
869 869 repo_server = RepoServer()
870 870 request.addfinalizer(repo_server.cleanup)
871 871 return repo_server
872 872
873 873
874 874 class RepoServer(object):
875 875 """
876 876 Utility to serve a local repository for the duration of a test case.
877 877
878 878 Supports only Subversion so far.
879 879 """
880 880
881 881 url = None
882 882
883 883 def __init__(self):
884 884 self._cleanup_servers = []
885 885
886 886 def serve(self, vcsrepo):
887 887 if vcsrepo.alias != 'svn':
888 888 raise TypeError("Backend %s not supported" % vcsrepo.alias)
889 889
890 890 proc = subprocess32.Popen(
891 891 ['svnserve', '-d', '--foreground', '--listen-host', 'localhost',
892 892 '--root', vcsrepo.path])
893 893 self._cleanup_servers.append(proc)
894 894 self.url = 'svn://localhost'
895 895
896 896 def cleanup(self):
897 897 for proc in self._cleanup_servers:
898 898 proc.terminate()
899 899
900 900
901 901 @pytest.fixture
902 902 def pr_util(backend, request):
903 903 """
904 904 Utility for tests of models and for functional tests around pull requests.
905 905
906 906 It gives an instance of :class:`PRTestUtility` which provides various
907 907 utility methods around one pull request.
908 908
909 909 This fixture uses `backend` and inherits its parameterization.
910 910 """
911 911
912 912 util = PRTestUtility(backend)
913 913
914 914 @request.addfinalizer
915 915 def cleanup():
916 916 util.cleanup()
917 917
918 918 return util
919 919
920 920
921 921 class PRTestUtility(object):
922 922
923 923 pull_request = None
924 924 pull_request_id = None
925 925 mergeable_patcher = None
926 926 mergeable_mock = None
927 927 notification_patcher = None
928 928
929 929 def __init__(self, backend):
930 930 self.backend = backend
931 931
932 932 def create_pull_request(
933 933 self, commits=None, target_head=None, source_head=None,
934 934 revisions=None, approved=False, author=None, mergeable=False,
935 935 enable_notifications=True, name_suffix=u'', reviewers=None,
936 936 title=u"Test", description=u"Description"):
937 937 self.set_mergeable(mergeable)
938 938 if not enable_notifications:
939 939 # mock notification side effect
940 940 self.notification_patcher = mock.patch(
941 941 'rhodecode.model.notification.NotificationModel.create')
942 942 self.notification_patcher.start()
943 943
944 944 if not self.pull_request:
945 945 if not commits:
946 946 commits = [
947 947 {'message': 'c1'},
948 948 {'message': 'c2'},
949 949 {'message': 'c3'},
950 950 ]
951 951 target_head = 'c1'
952 952 source_head = 'c2'
953 953 revisions = ['c2']
954 954
955 955 self.commit_ids = self.backend.create_master_repo(commits)
956 956 self.target_repository = self.backend.create_repo(
957 957 heads=[target_head], name_suffix=name_suffix)
958 958 self.source_repository = self.backend.create_repo(
959 959 heads=[source_head], name_suffix=name_suffix)
960 960 self.author = author or UserModel().get_by_username(
961 961 TEST_USER_ADMIN_LOGIN)
962 962
963 963 model = PullRequestModel()
964 964 self.create_parameters = {
965 965 'created_by': self.author,
966 966 'source_repo': self.source_repository.repo_name,
967 967 'source_ref': self._default_branch_reference(source_head),
968 968 'target_repo': self.target_repository.repo_name,
969 969 'target_ref': self._default_branch_reference(target_head),
970 970 'revisions': [self.commit_ids[r] for r in revisions],
971 971 'reviewers': reviewers or self._get_reviewers(),
972 972 'title': title,
973 973 'description': description,
974 974 }
975 975 self.pull_request = model.create(**self.create_parameters)
976 976 assert model.get_versions(self.pull_request) == []
977 977
978 978 self.pull_request_id = self.pull_request.pull_request_id
979 979
980 980 if approved:
981 981 self.approve()
982 982
983 983 Session().add(self.pull_request)
984 984 Session().commit()
985 985
986 986 return self.pull_request
987 987
988 988 def approve(self):
989 989 self.create_status_votes(
990 990 ChangesetStatus.STATUS_APPROVED,
991 991 *self.pull_request.reviewers)
992 992
993 993 def close(self):
994 994 PullRequestModel().close_pull_request(self.pull_request, self.author)
995 995
996 996 def _default_branch_reference(self, commit_message):
997 997 reference = '%s:%s:%s' % (
998 998 'branch',
999 999 self.backend.default_branch_name,
1000 1000 self.commit_ids[commit_message])
1001 1001 return reference
1002 1002
1003 1003 def _get_reviewers(self):
1004 1004 model = UserModel()
1005 1005 return [
1006 1006 model.get_by_username(TEST_USER_REGULAR_LOGIN),
1007 1007 model.get_by_username(TEST_USER_REGULAR2_LOGIN),
1008 1008 ]
1009 1009
1010 1010 def update_source_repository(self, head=None):
1011 1011 heads = [head or 'c3']
1012 1012 self.backend.pull_heads(self.source_repository, heads=heads)
1013 1013
1014 1014 def add_one_commit(self, head=None):
1015 1015 self.update_source_repository(head=head)
1016 1016 old_commit_ids = set(self.pull_request.revisions)
1017 1017 PullRequestModel().update_commits(self.pull_request)
1018 1018 commit_ids = set(self.pull_request.revisions)
1019 1019 new_commit_ids = commit_ids - old_commit_ids
1020 1020 assert len(new_commit_ids) == 1
1021 1021 return new_commit_ids.pop()
1022 1022
1023 1023 def remove_one_commit(self):
1024 1024 assert len(self.pull_request.revisions) == 2
1025 1025 source_vcs = self.source_repository.scm_instance()
1026 1026 removed_commit_id = source_vcs.commit_ids[-1]
1027 1027
1028 1028 # TODO: johbo: Git and Mercurial have an inconsistent vcs api here,
1029 1029 # remove the if once that's sorted out.
1030 1030 if self.backend.alias == "git":
1031 1031 kwargs = {'branch_name': self.backend.default_branch_name}
1032 1032 else:
1033 1033 kwargs = {}
1034 1034 source_vcs.strip(removed_commit_id, **kwargs)
1035 1035
1036 1036 PullRequestModel().update_commits(self.pull_request)
1037 1037 assert len(self.pull_request.revisions) == 1
1038 1038 return removed_commit_id
1039 1039
1040 1040 def create_comment(self, linked_to=None):
1041 1041 comment = ChangesetCommentsModel().create(
1042 1042 text=u"Test comment",
1043 1043 repo=self.target_repository.repo_name,
1044 1044 user=self.author,
1045 1045 pull_request=self.pull_request)
1046 1046 assert comment.pull_request_version_id is None
1047 1047
1048 1048 if linked_to:
1049 1049 PullRequestModel()._link_comments_to_version(linked_to)
1050 1050
1051 1051 return comment
1052 1052
1053 1053 def create_inline_comment(
1054 1054 self, linked_to=None, line_no=u'n1', file_path='file_1'):
1055 1055 comment = ChangesetCommentsModel().create(
1056 1056 text=u"Test comment",
1057 1057 repo=self.target_repository.repo_name,
1058 1058 user=self.author,
1059 1059 line_no=line_no,
1060 1060 f_path=file_path,
1061 1061 pull_request=self.pull_request)
1062 1062 assert comment.pull_request_version_id is None
1063 1063
1064 1064 if linked_to:
1065 1065 PullRequestModel()._link_comments_to_version(linked_to)
1066 1066
1067 1067 return comment
1068 1068
1069 1069 def create_version_of_pull_request(self):
1070 1070 pull_request = self.create_pull_request()
1071 1071 version = PullRequestModel()._create_version_from_snapshot(
1072 1072 pull_request)
1073 1073 return version
1074 1074
1075 1075 def create_status_votes(self, status, *reviewers):
1076 1076 for reviewer in reviewers:
1077 1077 ChangesetStatusModel().set_status(
1078 1078 repo=self.pull_request.target_repo,
1079 1079 status=status,
1080 1080 user=reviewer.user_id,
1081 1081 pull_request=self.pull_request)
1082 1082
1083 1083 def set_mergeable(self, value):
1084 1084 if not self.mergeable_patcher:
1085 1085 self.mergeable_patcher = mock.patch.object(
1086 1086 VcsSettingsModel, 'get_general_settings')
1087 1087 self.mergeable_mock = self.mergeable_patcher.start()
1088 1088 self.mergeable_mock.return_value = {
1089 1089 'rhodecode_pr_merge_enabled': value}
1090 1090
1091 1091 def cleanup(self):
1092 1092 # In case the source repository is already cleaned up, the pull
1093 1093 # request will already be deleted.
1094 1094 pull_request = PullRequest().get(self.pull_request_id)
1095 1095 if pull_request:
1096 1096 PullRequestModel().delete(pull_request)
1097 1097 Session().commit()
1098 1098
1099 1099 if self.notification_patcher:
1100 1100 self.notification_patcher.stop()
1101 1101
1102 1102 if self.mergeable_patcher:
1103 1103 self.mergeable_patcher.stop()
1104 1104
1105 1105
1106 1106 @pytest.fixture
1107 1107 def user_admin(pylonsapp):
1108 1108 """
1109 1109 Provides the default admin test user as an instance of `db.User`.
1110 1110 """
1111 1111 user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
1112 1112 return user
1113 1113
1114 1114
1115 1115 @pytest.fixture
1116 1116 def user_regular(pylonsapp):
1117 1117 """
1118 1118 Provides the default regular test user as an instance of `db.User`.
1119 1119 """
1120 1120 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1121 1121 return user
1122 1122
1123 1123
1124 1124 @pytest.fixture
1125 1125 def user_util(request, pylonsapp):
1126 1126 """
1127 1127 Provides a wired instance of `UserUtility` with integrated cleanup.
1128 1128 """
1129 1129 utility = UserUtility(test_name=request.node.name)
1130 1130 request.addfinalizer(utility.cleanup)
1131 1131 return utility
1132 1132
1133 1133
1134 1134 # TODO: johbo: Split this up into utilities per domain or something similar
1135 1135 class UserUtility(object):
1136 1136
1137 1137 def __init__(self, test_name="test"):
1138 1138 self._test_name = self._sanitize_name(test_name)
1139 1139 self.fixture = Fixture()
1140 1140 self.repo_group_ids = []
1141 self.repos_ids = []
1141 1142 self.user_ids = []
1142 1143 self.user_group_ids = []
1143 1144 self.user_repo_permission_ids = []
1144 1145 self.user_group_repo_permission_ids = []
1145 1146 self.user_repo_group_permission_ids = []
1146 1147 self.user_group_repo_group_permission_ids = []
1147 1148 self.user_user_group_permission_ids = []
1148 1149 self.user_group_user_group_permission_ids = []
1149 1150 self.user_permissions = []
1150 1151
1151 1152 def _sanitize_name(self, name):
1152 1153 for char in ['[', ']']:
1153 1154 name = name.replace(char, '_')
1154 1155 return name
1155 1156
1156 1157 def create_repo_group(
1157 1158 self, owner=TEST_USER_ADMIN_LOGIN, auto_cleanup=True):
1158 1159 group_name = "{prefix}_repogroup_{count}".format(
1159 1160 prefix=self._test_name,
1160 1161 count=len(self.repo_group_ids))
1161 1162 repo_group = self.fixture.create_repo_group(
1162 1163 group_name, cur_user=owner)
1163 1164 if auto_cleanup:
1164 1165 self.repo_group_ids.append(repo_group.group_id)
1165 1166 return repo_group
1166 1167
1168 def create_repo(self, owner=TEST_USER_ADMIN_LOGIN, parent=None, auto_cleanup=True):
1169 repo_name = "{prefix}_repository_{count}".format(
1170 prefix=self._test_name,
1171 count=len(self.repos_ids))
1172
1173 repository = self.fixture.create_repo(
1174 repo_name, cur_user=owner, repo_group=parent)
1175 if auto_cleanup:
1176 self.repos_ids.append(repository.repo_id)
1177 return repository
1178
1167 1179 def create_user(self, auto_cleanup=True, **kwargs):
1168 1180 user_name = "{prefix}_user_{count}".format(
1169 1181 prefix=self._test_name,
1170 1182 count=len(self.user_ids))
1171 1183 user = self.fixture.create_user(user_name, **kwargs)
1172 1184 if auto_cleanup:
1173 1185 self.user_ids.append(user.user_id)
1174 1186 return user
1175 1187
1176 1188 def create_user_with_group(self):
1177 1189 user = self.create_user()
1178 1190 user_group = self.create_user_group(members=[user])
1179 1191 return user, user_group
1180 1192
1181 1193 def create_user_group(self, members=None, auto_cleanup=True, **kwargs):
1182 1194 group_name = "{prefix}_usergroup_{count}".format(
1183 1195 prefix=self._test_name,
1184 1196 count=len(self.user_group_ids))
1185 1197 user_group = self.fixture.create_user_group(group_name, **kwargs)
1186 1198 if auto_cleanup:
1187 1199 self.user_group_ids.append(user_group.users_group_id)
1188 1200 if members:
1189 1201 for user in members:
1190 1202 UserGroupModel().add_user_to_group(user_group, user)
1191 1203 return user_group
1192 1204
1193 1205 def grant_user_permission(self, user_name, permission_name):
1194 1206 self._inherit_default_user_permissions(user_name, False)
1195 1207 self.user_permissions.append((user_name, permission_name))
1196 1208
1197 1209 def grant_user_permission_to_repo_group(
1198 1210 self, repo_group, user, permission_name):
1199 1211 permission = RepoGroupModel().grant_user_permission(
1200 1212 repo_group, user, permission_name)
1201 1213 self.user_repo_group_permission_ids.append(
1202 1214 (repo_group.group_id, user.user_id))
1203 1215 return permission
1204 1216
1205 1217 def grant_user_group_permission_to_repo_group(
1206 1218 self, repo_group, user_group, permission_name):
1207 1219 permission = RepoGroupModel().grant_user_group_permission(
1208 1220 repo_group, user_group, permission_name)
1209 1221 self.user_group_repo_group_permission_ids.append(
1210 1222 (repo_group.group_id, user_group.users_group_id))
1211 1223 return permission
1212 1224
1213 1225 def grant_user_permission_to_repo(
1214 1226 self, repo, user, permission_name):
1215 1227 permission = RepoModel().grant_user_permission(
1216 1228 repo, user, permission_name)
1217 1229 self.user_repo_permission_ids.append(
1218 1230 (repo.repo_id, user.user_id))
1219 1231 return permission
1220 1232
1221 1233 def grant_user_group_permission_to_repo(
1222 1234 self, repo, user_group, permission_name):
1223 1235 permission = RepoModel().grant_user_group_permission(
1224 1236 repo, user_group, permission_name)
1225 1237 self.user_group_repo_permission_ids.append(
1226 1238 (repo.repo_id, user_group.users_group_id))
1227 1239 return permission
1228 1240
1229 1241 def grant_user_permission_to_user_group(
1230 1242 self, target_user_group, user, permission_name):
1231 1243 permission = UserGroupModel().grant_user_permission(
1232 1244 target_user_group, user, permission_name)
1233 1245 self.user_user_group_permission_ids.append(
1234 1246 (target_user_group.users_group_id, user.user_id))
1235 1247 return permission
1236 1248
1237 1249 def grant_user_group_permission_to_user_group(
1238 1250 self, target_user_group, user_group, permission_name):
1239 1251 permission = UserGroupModel().grant_user_group_permission(
1240 1252 target_user_group, user_group, permission_name)
1241 1253 self.user_group_user_group_permission_ids.append(
1242 1254 (target_user_group.users_group_id, user_group.users_group_id))
1243 1255 return permission
1244 1256
1245 1257 def revoke_user_permission(self, user_name, permission_name):
1246 1258 self._inherit_default_user_permissions(user_name, True)
1247 1259 UserModel().revoke_perm(user_name, permission_name)
1248 1260
1249 1261 def _inherit_default_user_permissions(self, user_name, value):
1250 1262 user = UserModel().get_by_username(user_name)
1251 1263 user.inherit_default_permissions = value
1252 1264 Session().add(user)
1253 1265 Session().commit()
1254 1266
1255 1267 def cleanup(self):
1256 1268 self._cleanup_permissions()
1269 self._cleanup_repos()
1257 1270 self._cleanup_repo_groups()
1258 1271 self._cleanup_user_groups()
1259 1272 self._cleanup_users()
1260 1273
1261 1274 def _cleanup_permissions(self):
1262 1275 if self.user_permissions:
1263 1276 for user_name, permission_name in self.user_permissions:
1264 1277 self.revoke_user_permission(user_name, permission_name)
1265 1278
1266 1279 for permission in self.user_repo_permission_ids:
1267 1280 RepoModel().revoke_user_permission(*permission)
1268 1281
1269 1282 for permission in self.user_group_repo_permission_ids:
1270 1283 RepoModel().revoke_user_group_permission(*permission)
1271 1284
1272 1285 for permission in self.user_repo_group_permission_ids:
1273 1286 RepoGroupModel().revoke_user_permission(*permission)
1274 1287
1275 1288 for permission in self.user_group_repo_group_permission_ids:
1276 1289 RepoGroupModel().revoke_user_group_permission(*permission)
1277 1290
1278 1291 for permission in self.user_user_group_permission_ids:
1279 1292 UserGroupModel().revoke_user_permission(*permission)
1280 1293
1281 1294 for permission in self.user_group_user_group_permission_ids:
1282 1295 UserGroupModel().revoke_user_group_permission(*permission)
1283 1296
1284 1297 def _cleanup_repo_groups(self):
1285 1298 def _repo_group_compare(first_group_id, second_group_id):
1286 1299 """
1287 1300 Gives higher priority to the groups with the most complex paths
1288 1301 """
1289 1302 first_group = RepoGroup.get(first_group_id)
1290 1303 second_group = RepoGroup.get(second_group_id)
1291 1304 first_group_parts = (
1292 1305 len(first_group.group_name.split('/')) if first_group else 0)
1293 1306 second_group_parts = (
1294 1307 len(second_group.group_name.split('/')) if second_group else 0)
1295 1308 return cmp(second_group_parts, first_group_parts)
1296 1309
1297 1310 sorted_repo_group_ids = sorted(
1298 1311 self.repo_group_ids, cmp=_repo_group_compare)
1299 1312 for repo_group_id in sorted_repo_group_ids:
1300 1313 self.fixture.destroy_repo_group(repo_group_id)
1301 1314
1315 def _cleanup_repos(self):
1316 sorted_repos_ids = sorted(self.repos_ids)
1317 for repo_id in sorted_repos_ids:
1318 self.fixture.destroy_repo(repo_id)
1319
1302 1320 def _cleanup_user_groups(self):
1303 1321 def _user_group_compare(first_group_id, second_group_id):
1304 1322 """
1305 1323 Gives higher priority to the groups with the most complex paths
1306 1324 """
1307 1325 first_group = UserGroup.get(first_group_id)
1308 1326 second_group = UserGroup.get(second_group_id)
1309 1327 first_group_parts = (
1310 1328 len(first_group.users_group_name.split('/'))
1311 1329 if first_group else 0)
1312 1330 second_group_parts = (
1313 1331 len(second_group.users_group_name.split('/'))
1314 1332 if second_group else 0)
1315 1333 return cmp(second_group_parts, first_group_parts)
1316 1334
1317 1335 sorted_user_group_ids = sorted(
1318 1336 self.user_group_ids, cmp=_user_group_compare)
1319 1337 for user_group_id in sorted_user_group_ids:
1320 1338 self.fixture.destroy_user_group(user_group_id)
1321 1339
1322 1340 def _cleanup_users(self):
1323 1341 for user_id in self.user_ids:
1324 1342 self.fixture.destroy_user(user_id)
1325 1343
1326 1344
1327 1345 # TODO: Think about moving this into a pytest-pyro package and make it a
1328 1346 # pytest plugin
1329 1347 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
1330 1348 def pytest_runtest_makereport(item, call):
1331 1349 """
1332 1350 Adding the remote traceback if the exception has this information.
1333 1351
1334 1352 Pyro4 attaches this information as the attribute `_vcs_server_traceback`
1335 1353 to the exception instance.
1336 1354 """
1337 1355 outcome = yield
1338 1356 report = outcome.get_result()
1339 1357 if call.excinfo:
1340 1358 _add_vcsserver_remote_traceback(report, call.excinfo.value)
1341 1359
1342 1360
1343 1361 def _add_vcsserver_remote_traceback(report, exc):
1344 1362 vcsserver_traceback = getattr(exc, '_vcs_server_traceback', None)
1345 1363
1346 1364 if vcsserver_traceback:
1347 1365 section = 'VCSServer remote traceback ' + report.when
1348 1366 report.sections.append((section, vcsserver_traceback))
1349 1367
1350 1368
1351 1369 @pytest.fixture(scope='session')
1352 1370 def testrun():
1353 1371 return {
1354 1372 'uuid': uuid.uuid4(),
1355 1373 'start': datetime.datetime.utcnow().isoformat(),
1356 1374 'timestamp': int(time.time()),
1357 1375 }
1358 1376
1359 1377
1360 1378 @pytest.fixture(autouse=True)
1361 1379 def collect_appenlight_stats(request, testrun):
1362 1380 """
1363 1381 This fixture reports memory consumtion of single tests.
1364 1382
1365 1383 It gathers data based on `psutil` and sends them to Appenlight. The option
1366 1384 ``--ae`` has te be used to enable this fixture and the API key for your
1367 1385 application has to be provided in ``--ae-key``.
1368 1386 """
1369 1387 try:
1370 1388 # cygwin cannot have yet psutil support.
1371 1389 import psutil
1372 1390 except ImportError:
1373 1391 return
1374 1392
1375 1393 if not request.config.getoption('--appenlight'):
1376 1394 return
1377 1395 else:
1378 1396 # Only request the pylonsapp fixture if appenlight tracking is
1379 1397 # enabled. This will speed up a test run of unit tests by 2 to 3
1380 1398 # seconds if appenlight is not enabled.
1381 1399 pylonsapp = request.getfuncargvalue("pylonsapp")
1382 1400 url = '{}/api/logs'.format(request.config.getoption('--appenlight-url'))
1383 1401 client = AppenlightClient(
1384 1402 url=url,
1385 1403 api_key=request.config.getoption('--appenlight-api-key'),
1386 1404 namespace=request.node.nodeid,
1387 1405 request=str(testrun['uuid']),
1388 1406 testrun=testrun)
1389 1407
1390 1408 client.collect({
1391 1409 'message': "Starting",
1392 1410 })
1393 1411
1394 1412 server_and_port = pylonsapp.config['vcs.server']
1395 1413 server = create_vcsserver_proxy(server_and_port)
1396 1414 with server:
1397 1415 vcs_pid = server.get_pid()
1398 1416 server.run_gc()
1399 1417 vcs_process = psutil.Process(vcs_pid)
1400 1418 mem = vcs_process.memory_info()
1401 1419 client.tag_before('vcsserver.rss', mem.rss)
1402 1420 client.tag_before('vcsserver.vms', mem.vms)
1403 1421
1404 1422 test_process = psutil.Process()
1405 1423 mem = test_process.memory_info()
1406 1424 client.tag_before('test.rss', mem.rss)
1407 1425 client.tag_before('test.vms', mem.vms)
1408 1426
1409 1427 client.tag_before('time', time.time())
1410 1428
1411 1429 @request.addfinalizer
1412 1430 def send_stats():
1413 1431 client.tag_after('time', time.time())
1414 1432 with server:
1415 1433 gc_stats = server.run_gc()
1416 1434 for tag, value in gc_stats.items():
1417 1435 client.tag_after(tag, value)
1418 1436 mem = vcs_process.memory_info()
1419 1437 client.tag_after('vcsserver.rss', mem.rss)
1420 1438 client.tag_after('vcsserver.vms', mem.vms)
1421 1439
1422 1440 mem = test_process.memory_info()
1423 1441 client.tag_after('test.rss', mem.rss)
1424 1442 client.tag_after('test.vms', mem.vms)
1425 1443
1426 1444 client.collect({
1427 1445 'message': "Finished",
1428 1446 })
1429 1447 client.send_stats()
1430 1448
1431 1449 return client
1432 1450
1433 1451
1434 1452 class AppenlightClient():
1435 1453
1436 1454 url_template = '{url}?protocol_version=0.5'
1437 1455
1438 1456 def __init__(
1439 1457 self, url, api_key, add_server=True, add_timestamp=True,
1440 1458 namespace=None, request=None, testrun=None):
1441 1459 self.url = self.url_template.format(url=url)
1442 1460 self.api_key = api_key
1443 1461 self.add_server = add_server
1444 1462 self.add_timestamp = add_timestamp
1445 1463 self.namespace = namespace
1446 1464 self.request = request
1447 1465 self.server = socket.getfqdn(socket.gethostname())
1448 1466 self.tags_before = {}
1449 1467 self.tags_after = {}
1450 1468 self.stats = []
1451 1469 self.testrun = testrun or {}
1452 1470
1453 1471 def tag_before(self, tag, value):
1454 1472 self.tags_before[tag] = value
1455 1473
1456 1474 def tag_after(self, tag, value):
1457 1475 self.tags_after[tag] = value
1458 1476
1459 1477 def collect(self, data):
1460 1478 if self.add_server:
1461 1479 data.setdefault('server', self.server)
1462 1480 if self.add_timestamp:
1463 1481 data.setdefault('date', datetime.datetime.utcnow().isoformat())
1464 1482 if self.namespace:
1465 1483 data.setdefault('namespace', self.namespace)
1466 1484 if self.request:
1467 1485 data.setdefault('request', self.request)
1468 1486 self.stats.append(data)
1469 1487
1470 1488 def send_stats(self):
1471 1489 tags = [
1472 1490 ('testrun', self.request),
1473 1491 ('testrun.start', self.testrun['start']),
1474 1492 ('testrun.timestamp', self.testrun['timestamp']),
1475 1493 ('test', self.namespace),
1476 1494 ]
1477 1495 for key, value in self.tags_before.items():
1478 1496 tags.append((key + '.before', value))
1479 1497 try:
1480 1498 delta = self.tags_after[key] - value
1481 1499 tags.append((key + '.delta', delta))
1482 1500 except Exception:
1483 1501 pass
1484 1502 for key, value in self.tags_after.items():
1485 1503 tags.append((key + '.after', value))
1486 1504 self.collect({
1487 1505 'message': "Collected tags",
1488 1506 'tags': tags,
1489 1507 })
1490 1508
1491 1509 response = requests.post(
1492 1510 self.url,
1493 1511 headers={
1494 1512 'X-appenlight-api-key': self.api_key},
1495 1513 json=self.stats,
1496 1514 )
1497 1515
1498 1516 if not response.status_code == 200:
1499 1517 pprint.pprint(self.stats)
1500 1518 print response.headers
1501 1519 print response.text
1502 1520 raise Exception('Sending to appenlight failed')
1503 1521
1504 1522
1505 1523 @pytest.fixture
1506 1524 def gist_util(request, pylonsapp):
1507 1525 """
1508 1526 Provides a wired instance of `GistUtility` with integrated cleanup.
1509 1527 """
1510 1528 utility = GistUtility()
1511 1529 request.addfinalizer(utility.cleanup)
1512 1530 return utility
1513 1531
1514 1532
1515 1533 class GistUtility(object):
1516 1534 def __init__(self):
1517 1535 self.fixture = Fixture()
1518 1536 self.gist_ids = []
1519 1537
1520 1538 def create_gist(self, **kwargs):
1521 1539 gist = self.fixture.create_gist(**kwargs)
1522 1540 self.gist_ids.append(gist.gist_id)
1523 1541 return gist
1524 1542
1525 1543 def cleanup(self):
1526 1544 for id_ in self.gist_ids:
1527 1545 self.fixture.destroy_gists(str(id_))
1528 1546
1529 1547
1530 1548 @pytest.fixture
1531 1549 def enabled_backends(request):
1532 1550 backends = request.config.option.backends
1533 1551 return backends[:]
1534 1552
1535 1553
1536 1554 @pytest.fixture
1537 1555 def settings_util(request):
1538 1556 """
1539 1557 Provides a wired instance of `SettingsUtility` with integrated cleanup.
1540 1558 """
1541 1559 utility = SettingsUtility()
1542 1560 request.addfinalizer(utility.cleanup)
1543 1561 return utility
1544 1562
1545 1563
1546 1564 class SettingsUtility(object):
1547 1565 def __init__(self):
1548 1566 self.rhodecode_ui_ids = []
1549 1567 self.rhodecode_setting_ids = []
1550 1568 self.repo_rhodecode_ui_ids = []
1551 1569 self.repo_rhodecode_setting_ids = []
1552 1570
1553 1571 def create_repo_rhodecode_ui(
1554 1572 self, repo, section, value, key=None, active=True, cleanup=True):
1555 1573 key = key or hashlib.sha1(
1556 1574 '{}{}{}'.format(section, value, repo.repo_id)).hexdigest()
1557 1575
1558 1576 setting = RepoRhodeCodeUi()
1559 1577 setting.repository_id = repo.repo_id
1560 1578 setting.ui_section = section
1561 1579 setting.ui_value = value
1562 1580 setting.ui_key = key
1563 1581 setting.ui_active = active
1564 1582 Session().add(setting)
1565 1583 Session().commit()
1566 1584
1567 1585 if cleanup:
1568 1586 self.repo_rhodecode_ui_ids.append(setting.ui_id)
1569 1587 return setting
1570 1588
1571 1589 def create_rhodecode_ui(
1572 1590 self, section, value, key=None, active=True, cleanup=True):
1573 1591 key = key or hashlib.sha1('{}{}'.format(section, value)).hexdigest()
1574 1592
1575 1593 setting = RhodeCodeUi()
1576 1594 setting.ui_section = section
1577 1595 setting.ui_value = value
1578 1596 setting.ui_key = key
1579 1597 setting.ui_active = active
1580 1598 Session().add(setting)
1581 1599 Session().commit()
1582 1600
1583 1601 if cleanup:
1584 1602 self.rhodecode_ui_ids.append(setting.ui_id)
1585 1603 return setting
1586 1604
1587 1605 def create_repo_rhodecode_setting(
1588 1606 self, repo, name, value, type_, cleanup=True):
1589 1607 setting = RepoRhodeCodeSetting(
1590 1608 repo.repo_id, key=name, val=value, type=type_)
1591 1609 Session().add(setting)
1592 1610 Session().commit()
1593 1611
1594 1612 if cleanup:
1595 1613 self.repo_rhodecode_setting_ids.append(setting.app_settings_id)
1596 1614 return setting
1597 1615
1598 1616 def create_rhodecode_setting(self, name, value, type_, cleanup=True):
1599 1617 setting = RhodeCodeSetting(key=name, val=value, type=type_)
1600 1618 Session().add(setting)
1601 1619 Session().commit()
1602 1620
1603 1621 if cleanup:
1604 1622 self.rhodecode_setting_ids.append(setting.app_settings_id)
1605 1623
1606 1624 return setting
1607 1625
1608 1626 def cleanup(self):
1609 1627 for id_ in self.rhodecode_ui_ids:
1610 1628 setting = RhodeCodeUi.get(id_)
1611 1629 Session().delete(setting)
1612 1630
1613 1631 for id_ in self.rhodecode_setting_ids:
1614 1632 setting = RhodeCodeSetting.get(id_)
1615 1633 Session().delete(setting)
1616 1634
1617 1635 for id_ in self.repo_rhodecode_ui_ids:
1618 1636 setting = RepoRhodeCodeUi.get(id_)
1619 1637 Session().delete(setting)
1620 1638
1621 1639 for id_ in self.repo_rhodecode_setting_ids:
1622 1640 setting = RepoRhodeCodeSetting.get(id_)
1623 1641 Session().delete(setting)
1624 1642
1625 1643 Session().commit()
1626 1644
1627 1645
1628 1646 @pytest.fixture
1629 1647 def no_notifications(request):
1630 1648 notification_patcher = mock.patch(
1631 1649 'rhodecode.model.notification.NotificationModel.create')
1632 1650 notification_patcher.start()
1633 1651 request.addfinalizer(notification_patcher.stop)
1634 1652
1635 1653
1636 1654 @pytest.fixture
1637 1655 def silence_action_logger(request):
1638 1656 notification_patcher = mock.patch(
1639 1657 'rhodecode.lib.utils.action_logger')
1640 1658 notification_patcher.start()
1641 1659 request.addfinalizer(notification_patcher.stop)
1642 1660
1643 1661
1644 1662 @pytest.fixture(scope='session')
1645 1663 def repeat(request):
1646 1664 """
1647 1665 The number of repetitions is based on this fixture.
1648 1666
1649 1667 Slower calls may divide it by 10 or 100. It is chosen in a way so that the
1650 1668 tests are not too slow in our default test suite.
1651 1669 """
1652 1670 return request.config.getoption('--repeat')
1653 1671
1654 1672
1655 1673 @pytest.fixture
1656 1674 def rhodecode_fixtures():
1657 1675 return Fixture()
1658 1676
1659 1677
1660 1678 @pytest.fixture
1661 1679 def request_stub():
1662 1680 """
1663 1681 Stub request object.
1664 1682 """
1665 1683 request = pyramid.testing.DummyRequest()
1666 1684 request.scheme = 'https'
1667 1685 return request
1668 1686
1669 1687
1670 1688 @pytest.fixture
1671 1689 def config_stub(request, request_stub):
1672 1690 """
1673 1691 Set up pyramid.testing and return the Configurator.
1674 1692 """
1675 1693 config = pyramid.testing.setUp(request=request_stub)
1676 1694
1677 1695 @request.addfinalizer
1678 1696 def cleanup():
1679 1697 pyramid.testing.tearDown()
1680 1698
1681 1699 return config
1682 1700
1683 1701
1684 1702 @pytest.fixture
1685 1703 def StubIntegrationType():
1686 1704 class _StubIntegrationType(IntegrationTypeBase):
1687 1705 """ Test integration type class """
1688 1706
1689 1707 key = 'test'
1690 1708 display_name = 'Test integration type'
1691 1709 description = 'A test integration type for testing'
1692 1710 icon = 'test_icon_html_image'
1693 1711
1694 1712 def __init__(self, settings):
1695 1713 super(_StubIntegrationType, self).__init__(settings)
1696 1714 self.sent_events = [] # for testing
1697 1715
1698 1716 def send_event(self, event):
1699 1717 self.sent_events.append(event)
1700 1718
1701 1719 def settings_schema(self):
1702 1720 class SettingsSchema(colander.Schema):
1703 1721 test_string_field = colander.SchemaNode(
1704 1722 colander.String(),
1705 1723 missing=colander.required,
1706 1724 title='test string field',
1707 1725 )
1708 1726 test_int_field = colander.SchemaNode(
1709 1727 colander.Int(),
1710 1728 title='some integer setting',
1711 1729 )
1712 1730 return SettingsSchema()
1713 1731
1714 1732
1715 1733 integration_type_registry.register_integration_type(_StubIntegrationType)
1716 1734 return _StubIntegrationType
1717 1735
1718 1736 @pytest.fixture
1719 1737 def stub_integration_settings():
1720 1738 return {
1721 1739 'test_string_field': 'some data',
1722 1740 'test_int_field': 100,
1723 1741 }
1724 1742
1725 1743
1726 1744 @pytest.fixture
1727 1745 def repo_integration_stub(request, repo_stub, StubIntegrationType,
1728 1746 stub_integration_settings):
1729 1747 integration = IntegrationModel().create(
1730 1748 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1731 1749 name='test repo integration',
1732 1750 repo=repo_stub, repo_group=None, child_repos_only=None)
1733 1751
1734 1752 @request.addfinalizer
1735 1753 def cleanup():
1736 1754 IntegrationModel().delete(integration)
1737 1755
1738 1756 return integration
1739 1757
1740 1758
1741 1759 @pytest.fixture
1742 1760 def repogroup_integration_stub(request, test_repo_group, StubIntegrationType,
1743 1761 stub_integration_settings):
1744 1762 integration = IntegrationModel().create(
1745 1763 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1746 1764 name='test repogroup integration',
1747 1765 repo=None, repo_group=test_repo_group, child_repos_only=True)
1748 1766
1749 1767 @request.addfinalizer
1750 1768 def cleanup():
1751 1769 IntegrationModel().delete(integration)
1752 1770
1753 1771 return integration
1754 1772
1755 1773
1756 1774 @pytest.fixture
1757 1775 def repogroup_recursive_integration_stub(request, test_repo_group,
1758 1776 StubIntegrationType, stub_integration_settings):
1759 1777 integration = IntegrationModel().create(
1760 1778 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1761 1779 name='test recursive repogroup integration',
1762 1780 repo=None, repo_group=test_repo_group, child_repos_only=False)
1763 1781
1764 1782 @request.addfinalizer
1765 1783 def cleanup():
1766 1784 IntegrationModel().delete(integration)
1767 1785
1768 1786 return integration
1769 1787
1770 1788
1771 1789 @pytest.fixture
1772 1790 def global_integration_stub(request, StubIntegrationType,
1773 1791 stub_integration_settings):
1774 1792 integration = IntegrationModel().create(
1775 1793 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1776 1794 name='test global integration',
1777 1795 repo=None, repo_group=None, child_repos_only=None)
1778 1796
1779 1797 @request.addfinalizer
1780 1798 def cleanup():
1781 1799 IntegrationModel().delete(integration)
1782 1800
1783 1801 return integration
1784 1802
1785 1803
1786 1804 @pytest.fixture
1787 1805 def root_repos_integration_stub(request, StubIntegrationType,
1788 1806 stub_integration_settings):
1789 1807 integration = IntegrationModel().create(
1790 1808 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1791 1809 name='test global integration',
1792 1810 repo=None, repo_group=None, child_repos_only=True)
1793 1811
1794 1812 @request.addfinalizer
1795 1813 def cleanup():
1796 1814 IntegrationModel().delete(integration)
1797 1815
1798 1816 return integration
General Comments 0
You need to be logged in to leave comments. Login now