##// END OF EJS Templates
tests: sanitize temp group name to conform with our schema validators that clear...
marcink -
r1122:6b984a38 default
parent child Browse files
Show More
@@ -1,1792 +1,1797 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import collections
22 22 import datetime
23 23 import hashlib
24 24 import os
25 25 import re
26 26 import pprint
27 27 import shutil
28 28 import socket
29 29 import subprocess32
30 30 import time
31 31 import uuid
32 32
33 33 import mock
34 34 import pyramid.testing
35 35 import pytest
36 36 import colander
37 37 import requests
38 38 from webtest.app import TestApp
39 39
40 40 import rhodecode
41 41 from rhodecode.lib.utils2 import AttributeDict
42 42 from rhodecode.model.changeset_status import ChangesetStatusModel
43 43 from rhodecode.model.comment import ChangesetCommentsModel
44 44 from rhodecode.model.db import (
45 45 PullRequest, Repository, RhodeCodeSetting, ChangesetStatus, RepoGroup,
46 46 UserGroup, RepoRhodeCodeUi, RepoRhodeCodeSetting, RhodeCodeUi)
47 47 from rhodecode.model.meta import Session
48 48 from rhodecode.model.pull_request import PullRequestModel
49 49 from rhodecode.model.repo import RepoModel
50 50 from rhodecode.model.repo_group import RepoGroupModel
51 51 from rhodecode.model.user import UserModel
52 52 from rhodecode.model.settings import VcsSettingsModel
53 53 from rhodecode.model.user_group import UserGroupModel
54 54 from rhodecode.model.integration import IntegrationModel
55 55 from rhodecode.integrations import integration_type_registry
56 56 from rhodecode.integrations.types.base import IntegrationTypeBase
57 57 from rhodecode.lib.utils import repo2db_mapper
58 58 from rhodecode.lib.vcs import create_vcsserver_proxy
59 59 from rhodecode.lib.vcs.backends import get_backend
60 60 from rhodecode.lib.vcs.nodes import FileNode
61 61 from rhodecode.tests import (
62 62 login_user_session, get_new_dir, utils, TESTS_TMP_PATH,
63 63 TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR2_LOGIN,
64 64 TEST_USER_REGULAR_PASS)
65 65 from rhodecode.tests.fixture import Fixture
66 66
67 67
68 68 def _split_comma(value):
69 69 return value.split(',')
70 70
71 71
72 72 def pytest_addoption(parser):
73 73 parser.addoption(
74 74 '--keep-tmp-path', action='store_true',
75 75 help="Keep the test temporary directories")
76 76 parser.addoption(
77 77 '--backends', action='store', type=_split_comma,
78 78 default=['git', 'hg', 'svn'],
79 79 help="Select which backends to test for backend specific tests.")
80 80 parser.addoption(
81 81 '--dbs', action='store', type=_split_comma,
82 82 default=['sqlite'],
83 83 help="Select which database to test for database specific tests. "
84 84 "Possible options are sqlite,postgres,mysql")
85 85 parser.addoption(
86 86 '--appenlight', '--ae', action='store_true',
87 87 help="Track statistics in appenlight.")
88 88 parser.addoption(
89 89 '--appenlight-api-key', '--ae-key',
90 90 help="API key for Appenlight.")
91 91 parser.addoption(
92 92 '--appenlight-url', '--ae-url',
93 93 default="https://ae.rhodecode.com",
94 94 help="Appenlight service URL, defaults to https://ae.rhodecode.com")
95 95 parser.addoption(
96 96 '--sqlite-connection-string', action='store',
97 97 default='', help="Connection string for the dbs tests with SQLite")
98 98 parser.addoption(
99 99 '--postgres-connection-string', action='store',
100 100 default='', help="Connection string for the dbs tests with Postgres")
101 101 parser.addoption(
102 102 '--mysql-connection-string', action='store',
103 103 default='', help="Connection string for the dbs tests with MySQL")
104 104 parser.addoption(
105 105 '--repeat', type=int, default=100,
106 106 help="Number of repetitions in performance tests.")
107 107
108 108
109 109 def pytest_configure(config):
110 110 # Appy the kombu patch early on, needed for test discovery on Python 2.7.11
111 111 from rhodecode.config import patches
112 112 patches.kombu_1_5_1_python_2_7_11()
113 113
114 114
115 115 def pytest_collection_modifyitems(session, config, items):
116 116 # nottest marked, compare nose, used for transition from nose to pytest
117 117 remaining = [
118 118 i for i in items if getattr(i.obj, '__test__', True)]
119 119 items[:] = remaining
120 120
121 121
122 122 def pytest_generate_tests(metafunc):
123 123 # Support test generation based on --backend parameter
124 124 if 'backend_alias' in metafunc.fixturenames:
125 125 backends = get_backends_from_metafunc(metafunc)
126 126 scope = None
127 127 if not backends:
128 128 pytest.skip("Not enabled for any of selected backends")
129 129 metafunc.parametrize('backend_alias', backends, scope=scope)
130 130 elif hasattr(metafunc.function, 'backends'):
131 131 backends = get_backends_from_metafunc(metafunc)
132 132 if not backends:
133 133 pytest.skip("Not enabled for any of selected backends")
134 134
135 135
136 136 def get_backends_from_metafunc(metafunc):
137 137 requested_backends = set(metafunc.config.getoption('--backends'))
138 138 if hasattr(metafunc.function, 'backends'):
139 139 # Supported backends by this test function, created from
140 140 # pytest.mark.backends
141 141 backends = metafunc.function.backends.args
142 142 elif hasattr(metafunc.cls, 'backend_alias'):
143 143 # Support class attribute "backend_alias", this is mainly
144 144 # for legacy reasons for tests not yet using pytest.mark.backends
145 145 backends = [metafunc.cls.backend_alias]
146 146 else:
147 147 backends = metafunc.config.getoption('--backends')
148 148 return requested_backends.intersection(backends)
149 149
150 150
151 151 @pytest.fixture(scope='session', autouse=True)
152 152 def activate_example_rcextensions(request):
153 153 """
154 154 Patch in an example rcextensions module which verifies passed in kwargs.
155 155 """
156 156 from rhodecode.tests.other import example_rcextensions
157 157
158 158 old_extensions = rhodecode.EXTENSIONS
159 159 rhodecode.EXTENSIONS = example_rcextensions
160 160
161 161 @request.addfinalizer
162 162 def cleanup():
163 163 rhodecode.EXTENSIONS = old_extensions
164 164
165 165
166 166 @pytest.fixture
167 167 def capture_rcextensions():
168 168 """
169 169 Returns the recorded calls to entry points in rcextensions.
170 170 """
171 171 calls = rhodecode.EXTENSIONS.calls
172 172 calls.clear()
173 173 # Note: At this moment, it is still the empty dict, but that will
174 174 # be filled during the test run and since it is a reference this
175 175 # is enough to make it work.
176 176 return calls
177 177
178 178
179 179 @pytest.fixture(scope='session')
180 180 def http_environ_session():
181 181 """
182 182 Allow to use "http_environ" in session scope.
183 183 """
184 184 return http_environ(
185 185 http_host_stub=http_host_stub())
186 186
187 187
188 188 @pytest.fixture
189 189 def http_host_stub():
190 190 """
191 191 Value of HTTP_HOST in the test run.
192 192 """
193 193 return 'test.example.com:80'
194 194
195 195
196 196 @pytest.fixture
197 197 def http_environ(http_host_stub):
198 198 """
199 199 HTTP extra environ keys.
200 200
201 201 User by the test application and as well for setting up the pylons
202 202 environment. In the case of the fixture "app" it should be possible
203 203 to override this for a specific test case.
204 204 """
205 205 return {
206 206 'SERVER_NAME': http_host_stub.split(':')[0],
207 207 'SERVER_PORT': http_host_stub.split(':')[1],
208 208 'HTTP_HOST': http_host_stub,
209 209 }
210 210
211 211
212 212 @pytest.fixture(scope='function')
213 213 def app(request, pylonsapp, http_environ):
214 214 app = TestApp(
215 215 pylonsapp,
216 216 extra_environ=http_environ)
217 217 if request.cls:
218 218 request.cls.app = app
219 219 return app
220 220
221 221
222 222 @pytest.fixture(scope='session')
223 223 def app_settings(pylonsapp, pylons_config):
224 224 """
225 225 Settings dictionary used to create the app.
226 226
227 227 Parses the ini file and passes the result through the sanitize and apply
228 228 defaults mechanism in `rhodecode.config.middleware`.
229 229 """
230 230 from paste.deploy.loadwsgi import loadcontext, APP
231 231 from rhodecode.config.middleware import (
232 232 sanitize_settings_and_apply_defaults)
233 233 context = loadcontext(APP, 'config:' + pylons_config)
234 234 settings = sanitize_settings_and_apply_defaults(context.config())
235 235 return settings
236 236
237 237
238 238 @pytest.fixture(scope='session')
239 239 def db(app_settings):
240 240 """
241 241 Initializes the database connection.
242 242
243 243 It uses the same settings which are used to create the ``pylonsapp`` or
244 244 ``app`` fixtures.
245 245 """
246 246 from rhodecode.config.utils import initialize_database
247 247 initialize_database(app_settings)
248 248
249 249
250 250 LoginData = collections.namedtuple('LoginData', ('csrf_token', 'user'))
251 251
252 252
253 253 def _autologin_user(app, *args):
254 254 session = login_user_session(app, *args)
255 255 csrf_token = rhodecode.lib.auth.get_csrf_token(session)
256 256 return LoginData(csrf_token, session['rhodecode_user'])
257 257
258 258
259 259 @pytest.fixture
260 260 def autologin_user(app):
261 261 """
262 262 Utility fixture which makes sure that the admin user is logged in
263 263 """
264 264 return _autologin_user(app)
265 265
266 266
267 267 @pytest.fixture
268 268 def autologin_regular_user(app):
269 269 """
270 270 Utility fixture which makes sure that the regular user is logged in
271 271 """
272 272 return _autologin_user(
273 273 app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
274 274
275 275
276 276 @pytest.fixture(scope='function')
277 277 def csrf_token(request, autologin_user):
278 278 return autologin_user.csrf_token
279 279
280 280
281 281 @pytest.fixture(scope='function')
282 282 def xhr_header(request):
283 283 return {'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'}
284 284
285 285
286 286 @pytest.fixture
287 287 def real_crypto_backend(monkeypatch):
288 288 """
289 289 Switch the production crypto backend on for this test.
290 290
291 291 During the test run the crypto backend is replaced with a faster
292 292 implementation based on the MD5 algorithm.
293 293 """
294 294 monkeypatch.setattr(rhodecode, 'is_test', False)
295 295
296 296
297 297 @pytest.fixture(scope='class')
298 298 def index_location(request, pylonsapp):
299 299 index_location = pylonsapp.config['app_conf']['search.location']
300 300 if request.cls:
301 301 request.cls.index_location = index_location
302 302 return index_location
303 303
304 304
305 305 @pytest.fixture(scope='session', autouse=True)
306 306 def tests_tmp_path(request):
307 307 """
308 308 Create temporary directory to be used during the test session.
309 309 """
310 310 if not os.path.exists(TESTS_TMP_PATH):
311 311 os.makedirs(TESTS_TMP_PATH)
312 312
313 313 if not request.config.getoption('--keep-tmp-path'):
314 314 @request.addfinalizer
315 315 def remove_tmp_path():
316 316 shutil.rmtree(TESTS_TMP_PATH)
317 317
318 318 return TESTS_TMP_PATH
319 319
320 320
321 321 @pytest.fixture(scope='session', autouse=True)
322 322 def patch_pyro_request_scope_proxy_factory(request):
323 323 """
324 324 Patch the pyro proxy factory to always use the same dummy request object
325 325 when under test. This will return the same pyro proxy on every call.
326 326 """
327 327 dummy_request = pyramid.testing.DummyRequest()
328 328
329 329 def mocked_call(self, request=None):
330 330 return self.getProxy(request=dummy_request)
331 331
332 332 patcher = mock.patch(
333 333 'rhodecode.lib.vcs.client.RequestScopeProxyFactory.__call__',
334 334 new=mocked_call)
335 335 patcher.start()
336 336
337 337 @request.addfinalizer
338 338 def undo_patching():
339 339 patcher.stop()
340 340
341 341
342 342 @pytest.fixture
343 343 def test_repo_group(request):
344 344 """
345 345 Create a temporary repository group, and destroy it after
346 346 usage automatically
347 347 """
348 348 fixture = Fixture()
349 349 repogroupid = 'test_repo_group_%s' % int(time.time())
350 350 repo_group = fixture.create_repo_group(repogroupid)
351 351
352 352 def _cleanup():
353 353 fixture.destroy_repo_group(repogroupid)
354 354
355 355 request.addfinalizer(_cleanup)
356 356 return repo_group
357 357
358 358
359 359 @pytest.fixture
360 360 def test_user_group(request):
361 361 """
362 362 Create a temporary user group, and destroy it after
363 363 usage automatically
364 364 """
365 365 fixture = Fixture()
366 366 usergroupid = 'test_user_group_%s' % int(time.time())
367 367 user_group = fixture.create_user_group(usergroupid)
368 368
369 369 def _cleanup():
370 370 fixture.destroy_user_group(user_group)
371 371
372 372 request.addfinalizer(_cleanup)
373 373 return user_group
374 374
375 375
376 376 @pytest.fixture(scope='session')
377 377 def test_repo(request):
378 378 container = TestRepoContainer()
379 379 request.addfinalizer(container._cleanup)
380 380 return container
381 381
382 382
383 383 class TestRepoContainer(object):
384 384 """
385 385 Container for test repositories which are used read only.
386 386
387 387 Repositories will be created on demand and re-used during the lifetime
388 388 of this object.
389 389
390 390 Usage to get the svn test repository "minimal"::
391 391
392 392 test_repo = TestContainer()
393 393 repo = test_repo('minimal', 'svn')
394 394
395 395 """
396 396
397 397 dump_extractors = {
398 398 'git': utils.extract_git_repo_from_dump,
399 399 'hg': utils.extract_hg_repo_from_dump,
400 400 'svn': utils.extract_svn_repo_from_dump,
401 401 }
402 402
403 403 def __init__(self):
404 404 self._cleanup_repos = []
405 405 self._fixture = Fixture()
406 406 self._repos = {}
407 407
408 408 def __call__(self, dump_name, backend_alias):
409 409 key = (dump_name, backend_alias)
410 410 if key not in self._repos:
411 411 repo = self._create_repo(dump_name, backend_alias)
412 412 self._repos[key] = repo.repo_id
413 413 return Repository.get(self._repos[key])
414 414
415 415 def _create_repo(self, dump_name, backend_alias):
416 416 repo_name = '%s-%s' % (backend_alias, dump_name)
417 417 backend_class = get_backend(backend_alias)
418 418 dump_extractor = self.dump_extractors[backend_alias]
419 419 repo_path = dump_extractor(dump_name, repo_name)
420 420 vcs_repo = backend_class(repo_path)
421 421 repo2db_mapper({repo_name: vcs_repo})
422 422 repo = RepoModel().get_by_repo_name(repo_name)
423 423 self._cleanup_repos.append(repo_name)
424 424 return repo
425 425
426 426 def _cleanup(self):
427 427 for repo_name in reversed(self._cleanup_repos):
428 428 self._fixture.destroy_repo(repo_name)
429 429
430 430
431 431 @pytest.fixture
432 432 def backend(request, backend_alias, pylonsapp, test_repo):
433 433 """
434 434 Parametrized fixture which represents a single backend implementation.
435 435
436 436 It respects the option `--backends` to focus the test run on specific
437 437 backend implementations.
438 438
439 439 It also supports `pytest.mark.xfail_backends` to mark tests as failing
440 440 for specific backends. This is intended as a utility for incremental
441 441 development of a new backend implementation.
442 442 """
443 443 if backend_alias not in request.config.getoption('--backends'):
444 444 pytest.skip("Backend %s not selected." % (backend_alias, ))
445 445
446 446 utils.check_xfail_backends(request.node, backend_alias)
447 447 utils.check_skip_backends(request.node, backend_alias)
448 448
449 449 repo_name = 'vcs_test_%s' % (backend_alias, )
450 450 backend = Backend(
451 451 alias=backend_alias,
452 452 repo_name=repo_name,
453 453 test_name=request.node.name,
454 454 test_repo_container=test_repo)
455 455 request.addfinalizer(backend.cleanup)
456 456 return backend
457 457
458 458
459 459 @pytest.fixture
460 460 def backend_git(request, pylonsapp, test_repo):
461 461 return backend(request, 'git', pylonsapp, test_repo)
462 462
463 463
464 464 @pytest.fixture
465 465 def backend_hg(request, pylonsapp, test_repo):
466 466 return backend(request, 'hg', pylonsapp, test_repo)
467 467
468 468
469 469 @pytest.fixture
470 470 def backend_svn(request, pylonsapp, test_repo):
471 471 return backend(request, 'svn', pylonsapp, test_repo)
472 472
473 473
474 474 @pytest.fixture
475 475 def backend_random(backend_git):
476 476 """
477 477 Use this to express that your tests need "a backend.
478 478
479 479 A few of our tests need a backend, so that we can run the code. This
480 480 fixture is intended to be used for such cases. It will pick one of the
481 481 backends and run the tests.
482 482
483 483 The fixture `backend` would run the test multiple times for each
484 484 available backend which is a pure waste of time if the test is
485 485 independent of the backend type.
486 486 """
487 487 # TODO: johbo: Change this to pick a random backend
488 488 return backend_git
489 489
490 490
491 491 @pytest.fixture
492 492 def backend_stub(backend_git):
493 493 """
494 494 Use this to express that your tests need a backend stub
495 495
496 496 TODO: mikhail: Implement a real stub logic instead of returning
497 497 a git backend
498 498 """
499 499 return backend_git
500 500
501 501
502 502 @pytest.fixture
503 503 def repo_stub(backend_stub):
504 504 """
505 505 Use this to express that your tests need a repository stub
506 506 """
507 507 return backend_stub.create_repo()
508 508
509 509
510 510 class Backend(object):
511 511 """
512 512 Represents the test configuration for one supported backend
513 513
514 514 Provides easy access to different test repositories based on
515 515 `__getitem__`. Such repositories will only be created once per test
516 516 session.
517 517 """
518 518
519 519 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
520 520 _master_repo = None
521 521 _commit_ids = {}
522 522
523 523 def __init__(self, alias, repo_name, test_name, test_repo_container):
524 524 self.alias = alias
525 525 self.repo_name = repo_name
526 526 self._cleanup_repos = []
527 527 self._test_name = test_name
528 528 self._test_repo_container = test_repo_container
529 529 # TODO: johbo: Used as a delegate interim. Not yet sure if Backend or
530 530 # Fixture will survive in the end.
531 531 self._fixture = Fixture()
532 532
533 533 def __getitem__(self, key):
534 534 return self._test_repo_container(key, self.alias)
535 535
536 536 @property
537 537 def repo(self):
538 538 """
539 539 Returns the "current" repository. This is the vcs_test repo or the
540 540 last repo which has been created with `create_repo`.
541 541 """
542 542 from rhodecode.model.db import Repository
543 543 return Repository.get_by_repo_name(self.repo_name)
544 544
545 545 @property
546 546 def default_branch_name(self):
547 547 VcsRepository = get_backend(self.alias)
548 548 return VcsRepository.DEFAULT_BRANCH_NAME
549 549
550 550 @property
551 551 def default_head_id(self):
552 552 """
553 553 Returns the default head id of the underlying backend.
554 554
555 555 This will be the default branch name in case the backend does have a
556 556 default branch. In the other cases it will point to a valid head
557 557 which can serve as the base to create a new commit on top of it.
558 558 """
559 559 vcsrepo = self.repo.scm_instance()
560 560 head_id = (
561 561 vcsrepo.DEFAULT_BRANCH_NAME or
562 562 vcsrepo.commit_ids[-1])
563 563 return head_id
564 564
565 565 @property
566 566 def commit_ids(self):
567 567 """
568 568 Returns the list of commits for the last created repository
569 569 """
570 570 return self._commit_ids
571 571
572 572 def create_master_repo(self, commits):
573 573 """
574 574 Create a repository and remember it as a template.
575 575
576 576 This allows to easily create derived repositories to construct
577 577 more complex scenarios for diff, compare and pull requests.
578 578
579 579 Returns a commit map which maps from commit message to raw_id.
580 580 """
581 581 self._master_repo = self.create_repo(commits=commits)
582 582 return self._commit_ids
583 583
584 584 def create_repo(
585 585 self, commits=None, number_of_commits=0, heads=None,
586 586 name_suffix=u'', **kwargs):
587 587 """
588 588 Create a repository and record it for later cleanup.
589 589
590 590 :param commits: Optional. A sequence of dict instances.
591 591 Will add a commit per entry to the new repository.
592 592 :param number_of_commits: Optional. If set to a number, this number of
593 593 commits will be added to the new repository.
594 594 :param heads: Optional. Can be set to a sequence of of commit
595 595 names which shall be pulled in from the master repository.
596 596
597 597 """
598 598 self.repo_name = self._next_repo_name() + name_suffix
599 599 repo = self._fixture.create_repo(
600 600 self.repo_name, repo_type=self.alias, **kwargs)
601 601 self._cleanup_repos.append(repo.repo_name)
602 602
603 603 commits = commits or [
604 604 {'message': 'Commit %s of %s' % (x, self.repo_name)}
605 605 for x in xrange(number_of_commits)]
606 606 self._add_commits_to_repo(repo.scm_instance(), commits)
607 607 if heads:
608 608 self.pull_heads(repo, heads)
609 609
610 610 return repo
611 611
612 612 def pull_heads(self, repo, heads):
613 613 """
614 614 Make sure that repo contains all commits mentioned in `heads`
615 615 """
616 616 vcsmaster = self._master_repo.scm_instance()
617 617 vcsrepo = repo.scm_instance()
618 618 vcsrepo.config.clear_section('hooks')
619 619 commit_ids = [self._commit_ids[h] for h in heads]
620 620 vcsrepo.pull(vcsmaster.path, commit_ids=commit_ids)
621 621
622 622 def create_fork(self):
623 623 repo_to_fork = self.repo_name
624 624 self.repo_name = self._next_repo_name()
625 625 repo = self._fixture.create_fork(repo_to_fork, self.repo_name)
626 626 self._cleanup_repos.append(self.repo_name)
627 627 return repo
628 628
629 629 def new_repo_name(self, suffix=u''):
630 630 self.repo_name = self._next_repo_name() + suffix
631 631 self._cleanup_repos.append(self.repo_name)
632 632 return self.repo_name
633 633
634 634 def _next_repo_name(self):
635 635 return u"%s_%s" % (
636 636 self.invalid_repo_name.sub(u'_', self._test_name),
637 637 len(self._cleanup_repos))
638 638
639 639 def ensure_file(self, filename, content='Test content\n'):
640 640 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
641 641 commits = [
642 642 {'added': [
643 643 FileNode(filename, content=content),
644 644 ]},
645 645 ]
646 646 self._add_commits_to_repo(self.repo.scm_instance(), commits)
647 647
648 648 def enable_downloads(self):
649 649 repo = self.repo
650 650 repo.enable_downloads = True
651 651 Session().add(repo)
652 652 Session().commit()
653 653
654 654 def cleanup(self):
655 655 for repo_name in reversed(self._cleanup_repos):
656 656 self._fixture.destroy_repo(repo_name)
657 657
658 658 def _add_commits_to_repo(self, repo, commits):
659 659 commit_ids = _add_commits_to_repo(repo, commits)
660 660 if not commit_ids:
661 661 return
662 662 self._commit_ids = commit_ids
663 663
664 664 # Creating refs for Git to allow fetching them from remote repository
665 665 if self.alias == 'git':
666 666 refs = {}
667 667 for message in self._commit_ids:
668 668 # TODO: mikhail: do more special chars replacements
669 669 ref_name = 'refs/test-refs/{}'.format(
670 670 message.replace(' ', ''))
671 671 refs[ref_name] = self._commit_ids[message]
672 672 self._create_refs(repo, refs)
673 673
674 674 def _create_refs(self, repo, refs):
675 675 for ref_name in refs:
676 676 repo.set_refs(ref_name, refs[ref_name])
677 677
678 678
679 679 @pytest.fixture
680 680 def vcsbackend(request, backend_alias, tests_tmp_path, pylonsapp, test_repo):
681 681 """
682 682 Parametrized fixture which represents a single vcs backend implementation.
683 683
684 684 See the fixture `backend` for more details. This one implements the same
685 685 concept, but on vcs level. So it does not provide model instances etc.
686 686
687 687 Parameters are generated dynamically, see :func:`pytest_generate_tests`
688 688 for how this works.
689 689 """
690 690 if backend_alias not in request.config.getoption('--backends'):
691 691 pytest.skip("Backend %s not selected." % (backend_alias, ))
692 692
693 693 utils.check_xfail_backends(request.node, backend_alias)
694 694 utils.check_skip_backends(request.node, backend_alias)
695 695
696 696 repo_name = 'vcs_test_%s' % (backend_alias, )
697 697 repo_path = os.path.join(tests_tmp_path, repo_name)
698 698 backend = VcsBackend(
699 699 alias=backend_alias,
700 700 repo_path=repo_path,
701 701 test_name=request.node.name,
702 702 test_repo_container=test_repo)
703 703 request.addfinalizer(backend.cleanup)
704 704 return backend
705 705
706 706
707 707 @pytest.fixture
708 708 def vcsbackend_git(request, tests_tmp_path, pylonsapp, test_repo):
709 709 return vcsbackend(request, 'git', tests_tmp_path, pylonsapp, test_repo)
710 710
711 711
712 712 @pytest.fixture
713 713 def vcsbackend_hg(request, tests_tmp_path, pylonsapp, test_repo):
714 714 return vcsbackend(request, 'hg', tests_tmp_path, pylonsapp, test_repo)
715 715
716 716
717 717 @pytest.fixture
718 718 def vcsbackend_svn(request, tests_tmp_path, pylonsapp, test_repo):
719 719 return vcsbackend(request, 'svn', tests_tmp_path, pylonsapp, test_repo)
720 720
721 721
722 722 @pytest.fixture
723 723 def vcsbackend_random(vcsbackend_git):
724 724 """
725 725 Use this to express that your tests need "a vcsbackend".
726 726
727 727 The fixture `vcsbackend` would run the test multiple times for each
728 728 available vcs backend which is a pure waste of time if the test is
729 729 independent of the vcs backend type.
730 730 """
731 731 # TODO: johbo: Change this to pick a random backend
732 732 return vcsbackend_git
733 733
734 734
735 735 @pytest.fixture
736 736 def vcsbackend_stub(vcsbackend_git):
737 737 """
738 738 Use this to express that your test just needs a stub of a vcsbackend.
739 739
740 740 Plan is to eventually implement an in-memory stub to speed tests up.
741 741 """
742 742 return vcsbackend_git
743 743
744 744
745 745 class VcsBackend(object):
746 746 """
747 747 Represents the test configuration for one supported vcs backend.
748 748 """
749 749
750 750 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
751 751
752 752 def __init__(self, alias, repo_path, test_name, test_repo_container):
753 753 self.alias = alias
754 754 self._repo_path = repo_path
755 755 self._cleanup_repos = []
756 756 self._test_name = test_name
757 757 self._test_repo_container = test_repo_container
758 758
759 759 def __getitem__(self, key):
760 760 return self._test_repo_container(key, self.alias).scm_instance()
761 761
762 762 @property
763 763 def repo(self):
764 764 """
765 765 Returns the "current" repository. This is the vcs_test repo of the last
766 766 repo which has been created.
767 767 """
768 768 Repository = get_backend(self.alias)
769 769 return Repository(self._repo_path)
770 770
771 771 @property
772 772 def backend(self):
773 773 """
774 774 Returns the backend implementation class.
775 775 """
776 776 return get_backend(self.alias)
777 777
778 778 def create_repo(self, commits=None, number_of_commits=0, _clone_repo=None):
779 779 repo_name = self._next_repo_name()
780 780 self._repo_path = get_new_dir(repo_name)
781 781 repo_class = get_backend(self.alias)
782 782 src_url = None
783 783 if _clone_repo:
784 784 src_url = _clone_repo.path
785 785 repo = repo_class(self._repo_path, create=True, src_url=src_url)
786 786 self._cleanup_repos.append(repo)
787 787
788 788 commits = commits or [
789 789 {'message': 'Commit %s of %s' % (x, repo_name)}
790 790 for x in xrange(number_of_commits)]
791 791 _add_commits_to_repo(repo, commits)
792 792 return repo
793 793
794 794 def clone_repo(self, repo):
795 795 return self.create_repo(_clone_repo=repo)
796 796
797 797 def cleanup(self):
798 798 for repo in self._cleanup_repos:
799 799 shutil.rmtree(repo.path)
800 800
801 801 def new_repo_path(self):
802 802 repo_name = self._next_repo_name()
803 803 self._repo_path = get_new_dir(repo_name)
804 804 return self._repo_path
805 805
806 806 def _next_repo_name(self):
807 807 return "%s_%s" % (
808 808 self.invalid_repo_name.sub('_', self._test_name),
809 809 len(self._cleanup_repos))
810 810
811 811 def add_file(self, repo, filename, content='Test content\n'):
812 812 imc = repo.in_memory_commit
813 813 imc.add(FileNode(filename, content=content))
814 814 imc.commit(
815 815 message=u'Automatic commit from vcsbackend fixture',
816 816 author=u'Automatic')
817 817
818 818 def ensure_file(self, filename, content='Test content\n'):
819 819 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
820 820 self.add_file(self.repo, filename, content)
821 821
822 822
823 823 def _add_commits_to_repo(vcs_repo, commits):
824 824 commit_ids = {}
825 825 if not commits:
826 826 return commit_ids
827 827
828 828 imc = vcs_repo.in_memory_commit
829 829 commit = None
830 830
831 831 for idx, commit in enumerate(commits):
832 832 message = unicode(commit.get('message', 'Commit %s' % idx))
833 833
834 834 for node in commit.get('added', []):
835 835 imc.add(FileNode(node.path, content=node.content))
836 836 for node in commit.get('changed', []):
837 837 imc.change(FileNode(node.path, content=node.content))
838 838 for node in commit.get('removed', []):
839 839 imc.remove(FileNode(node.path))
840 840
841 841 parents = [
842 842 vcs_repo.get_commit(commit_id=commit_ids[p])
843 843 for p in commit.get('parents', [])]
844 844
845 845 operations = ('added', 'changed', 'removed')
846 846 if not any((commit.get(o) for o in operations)):
847 847 imc.add(FileNode('file_%s' % idx, content=message))
848 848
849 849 commit = imc.commit(
850 850 message=message,
851 851 author=unicode(commit.get('author', 'Automatic')),
852 852 date=commit.get('date'),
853 853 branch=commit.get('branch'),
854 854 parents=parents)
855 855
856 856 commit_ids[commit.message] = commit.raw_id
857 857
858 858 return commit_ids
859 859
860 860
861 861 @pytest.fixture
862 862 def reposerver(request):
863 863 """
864 864 Allows to serve a backend repository
865 865 """
866 866
867 867 repo_server = RepoServer()
868 868 request.addfinalizer(repo_server.cleanup)
869 869 return repo_server
870 870
871 871
872 872 class RepoServer(object):
873 873 """
874 874 Utility to serve a local repository for the duration of a test case.
875 875
876 876 Supports only Subversion so far.
877 877 """
878 878
879 879 url = None
880 880
881 881 def __init__(self):
882 882 self._cleanup_servers = []
883 883
884 884 def serve(self, vcsrepo):
885 885 if vcsrepo.alias != 'svn':
886 886 raise TypeError("Backend %s not supported" % vcsrepo.alias)
887 887
888 888 proc = subprocess32.Popen(
889 889 ['svnserve', '-d', '--foreground', '--listen-host', 'localhost',
890 890 '--root', vcsrepo.path])
891 891 self._cleanup_servers.append(proc)
892 892 self.url = 'svn://localhost'
893 893
894 894 def cleanup(self):
895 895 for proc in self._cleanup_servers:
896 896 proc.terminate()
897 897
898 898
899 899 @pytest.fixture
900 900 def pr_util(backend, request):
901 901 """
902 902 Utility for tests of models and for functional tests around pull requests.
903 903
904 904 It gives an instance of :class:`PRTestUtility` which provides various
905 905 utility methods around one pull request.
906 906
907 907 This fixture uses `backend` and inherits its parameterization.
908 908 """
909 909
910 910 util = PRTestUtility(backend)
911 911
912 912 @request.addfinalizer
913 913 def cleanup():
914 914 util.cleanup()
915 915
916 916 return util
917 917
918 918
919 919 class PRTestUtility(object):
920 920
921 921 pull_request = None
922 922 pull_request_id = None
923 923 mergeable_patcher = None
924 924 mergeable_mock = None
925 925 notification_patcher = None
926 926
927 927 def __init__(self, backend):
928 928 self.backend = backend
929 929
930 930 def create_pull_request(
931 931 self, commits=None, target_head=None, source_head=None,
932 932 revisions=None, approved=False, author=None, mergeable=False,
933 933 enable_notifications=True, name_suffix=u'', reviewers=None,
934 934 title=u"Test", description=u"Description"):
935 935 self.set_mergeable(mergeable)
936 936 if not enable_notifications:
937 937 # mock notification side effect
938 938 self.notification_patcher = mock.patch(
939 939 'rhodecode.model.notification.NotificationModel.create')
940 940 self.notification_patcher.start()
941 941
942 942 if not self.pull_request:
943 943 if not commits:
944 944 commits = [
945 945 {'message': 'c1'},
946 946 {'message': 'c2'},
947 947 {'message': 'c3'},
948 948 ]
949 949 target_head = 'c1'
950 950 source_head = 'c2'
951 951 revisions = ['c2']
952 952
953 953 self.commit_ids = self.backend.create_master_repo(commits)
954 954 self.target_repository = self.backend.create_repo(
955 955 heads=[target_head], name_suffix=name_suffix)
956 956 self.source_repository = self.backend.create_repo(
957 957 heads=[source_head], name_suffix=name_suffix)
958 958 self.author = author or UserModel().get_by_username(
959 959 TEST_USER_ADMIN_LOGIN)
960 960
961 961 model = PullRequestModel()
962 962 self.create_parameters = {
963 963 'created_by': self.author,
964 964 'source_repo': self.source_repository.repo_name,
965 965 'source_ref': self._default_branch_reference(source_head),
966 966 'target_repo': self.target_repository.repo_name,
967 967 'target_ref': self._default_branch_reference(target_head),
968 968 'revisions': [self.commit_ids[r] for r in revisions],
969 969 'reviewers': reviewers or self._get_reviewers(),
970 970 'title': title,
971 971 'description': description,
972 972 }
973 973 self.pull_request = model.create(**self.create_parameters)
974 974 assert model.get_versions(self.pull_request) == []
975 975
976 976 self.pull_request_id = self.pull_request.pull_request_id
977 977
978 978 if approved:
979 979 self.approve()
980 980
981 981 Session().add(self.pull_request)
982 982 Session().commit()
983 983
984 984 return self.pull_request
985 985
986 986 def approve(self):
987 987 self.create_status_votes(
988 988 ChangesetStatus.STATUS_APPROVED,
989 989 *self.pull_request.reviewers)
990 990
991 991 def close(self):
992 992 PullRequestModel().close_pull_request(self.pull_request, self.author)
993 993
994 994 def _default_branch_reference(self, commit_message):
995 995 reference = '%s:%s:%s' % (
996 996 'branch',
997 997 self.backend.default_branch_name,
998 998 self.commit_ids[commit_message])
999 999 return reference
1000 1000
1001 1001 def _get_reviewers(self):
1002 1002 model = UserModel()
1003 1003 return [
1004 1004 model.get_by_username(TEST_USER_REGULAR_LOGIN),
1005 1005 model.get_by_username(TEST_USER_REGULAR2_LOGIN),
1006 1006 ]
1007 1007
1008 1008 def update_source_repository(self, head=None):
1009 1009 heads = [head or 'c3']
1010 1010 self.backend.pull_heads(self.source_repository, heads=heads)
1011 1011
1012 1012 def add_one_commit(self, head=None):
1013 1013 self.update_source_repository(head=head)
1014 1014 old_commit_ids = set(self.pull_request.revisions)
1015 1015 PullRequestModel().update_commits(self.pull_request)
1016 1016 commit_ids = set(self.pull_request.revisions)
1017 1017 new_commit_ids = commit_ids - old_commit_ids
1018 1018 assert len(new_commit_ids) == 1
1019 1019 return new_commit_ids.pop()
1020 1020
1021 1021 def remove_one_commit(self):
1022 1022 assert len(self.pull_request.revisions) == 2
1023 1023 source_vcs = self.source_repository.scm_instance()
1024 1024 removed_commit_id = source_vcs.commit_ids[-1]
1025 1025
1026 1026 # TODO: johbo: Git and Mercurial have an inconsistent vcs api here,
1027 1027 # remove the if once that's sorted out.
1028 1028 if self.backend.alias == "git":
1029 1029 kwargs = {'branch_name': self.backend.default_branch_name}
1030 1030 else:
1031 1031 kwargs = {}
1032 1032 source_vcs.strip(removed_commit_id, **kwargs)
1033 1033
1034 1034 PullRequestModel().update_commits(self.pull_request)
1035 1035 assert len(self.pull_request.revisions) == 1
1036 1036 return removed_commit_id
1037 1037
1038 1038 def create_comment(self, linked_to=None):
1039 1039 comment = ChangesetCommentsModel().create(
1040 1040 text=u"Test comment",
1041 1041 repo=self.target_repository.repo_name,
1042 1042 user=self.author,
1043 1043 pull_request=self.pull_request)
1044 1044 assert comment.pull_request_version_id is None
1045 1045
1046 1046 if linked_to:
1047 1047 PullRequestModel()._link_comments_to_version(linked_to)
1048 1048
1049 1049 return comment
1050 1050
1051 1051 def create_inline_comment(
1052 1052 self, linked_to=None, line_no=u'n1', file_path='file_1'):
1053 1053 comment = ChangesetCommentsModel().create(
1054 1054 text=u"Test comment",
1055 1055 repo=self.target_repository.repo_name,
1056 1056 user=self.author,
1057 1057 line_no=line_no,
1058 1058 f_path=file_path,
1059 1059 pull_request=self.pull_request)
1060 1060 assert comment.pull_request_version_id is None
1061 1061
1062 1062 if linked_to:
1063 1063 PullRequestModel()._link_comments_to_version(linked_to)
1064 1064
1065 1065 return comment
1066 1066
1067 1067 def create_version_of_pull_request(self):
1068 1068 pull_request = self.create_pull_request()
1069 1069 version = PullRequestModel()._create_version_from_snapshot(
1070 1070 pull_request)
1071 1071 return version
1072 1072
1073 1073 def create_status_votes(self, status, *reviewers):
1074 1074 for reviewer in reviewers:
1075 1075 ChangesetStatusModel().set_status(
1076 1076 repo=self.pull_request.target_repo,
1077 1077 status=status,
1078 1078 user=reviewer.user_id,
1079 1079 pull_request=self.pull_request)
1080 1080
1081 1081 def set_mergeable(self, value):
1082 1082 if not self.mergeable_patcher:
1083 1083 self.mergeable_patcher = mock.patch.object(
1084 1084 VcsSettingsModel, 'get_general_settings')
1085 1085 self.mergeable_mock = self.mergeable_patcher.start()
1086 1086 self.mergeable_mock.return_value = {
1087 1087 'rhodecode_pr_merge_enabled': value}
1088 1088
1089 1089 def cleanup(self):
1090 1090 # In case the source repository is already cleaned up, the pull
1091 1091 # request will already be deleted.
1092 1092 pull_request = PullRequest().get(self.pull_request_id)
1093 1093 if pull_request:
1094 1094 PullRequestModel().delete(pull_request)
1095 1095 Session().commit()
1096 1096
1097 1097 if self.notification_patcher:
1098 1098 self.notification_patcher.stop()
1099 1099
1100 1100 if self.mergeable_patcher:
1101 1101 self.mergeable_patcher.stop()
1102 1102
1103 1103
1104 1104 @pytest.fixture
1105 1105 def user_admin(pylonsapp):
1106 1106 """
1107 1107 Provides the default admin test user as an instance of `db.User`.
1108 1108 """
1109 1109 user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
1110 1110 return user
1111 1111
1112 1112
1113 1113 @pytest.fixture
1114 1114 def user_regular(pylonsapp):
1115 1115 """
1116 1116 Provides the default regular test user as an instance of `db.User`.
1117 1117 """
1118 1118 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1119 1119 return user
1120 1120
1121 1121
1122 1122 @pytest.fixture
1123 1123 def user_util(request, pylonsapp):
1124 1124 """
1125 1125 Provides a wired instance of `UserUtility` with integrated cleanup.
1126 1126 """
1127 1127 utility = UserUtility(test_name=request.node.name)
1128 1128 request.addfinalizer(utility.cleanup)
1129 1129 return utility
1130 1130
1131 1131
1132 1132 # TODO: johbo: Split this up into utilities per domain or something similar
1133 1133 class UserUtility(object):
1134 1134
1135 1135 def __init__(self, test_name="test"):
1136 self._test_name = test_name
1136 self._test_name = self._sanitize_name(test_name)
1137 1137 self.fixture = Fixture()
1138 1138 self.repo_group_ids = []
1139 1139 self.user_ids = []
1140 1140 self.user_group_ids = []
1141 1141 self.user_repo_permission_ids = []
1142 1142 self.user_group_repo_permission_ids = []
1143 1143 self.user_repo_group_permission_ids = []
1144 1144 self.user_group_repo_group_permission_ids = []
1145 1145 self.user_user_group_permission_ids = []
1146 1146 self.user_group_user_group_permission_ids = []
1147 1147 self.user_permissions = []
1148 1148
1149 def _sanitize_name(self, name):
1150 for char in ['[', ']']:
1151 name = name.replace(char, '_')
1152 return name
1153
1149 1154 def create_repo_group(
1150 1155 self, owner=TEST_USER_ADMIN_LOGIN, auto_cleanup=True):
1151 1156 group_name = "{prefix}_repogroup_{count}".format(
1152 1157 prefix=self._test_name,
1153 1158 count=len(self.repo_group_ids))
1154 1159 repo_group = self.fixture.create_repo_group(
1155 1160 group_name, cur_user=owner)
1156 1161 if auto_cleanup:
1157 1162 self.repo_group_ids.append(repo_group.group_id)
1158 1163 return repo_group
1159 1164
1160 1165 def create_user(self, auto_cleanup=True, **kwargs):
1161 1166 user_name = "{prefix}_user_{count}".format(
1162 1167 prefix=self._test_name,
1163 1168 count=len(self.user_ids))
1164 1169 user = self.fixture.create_user(user_name, **kwargs)
1165 1170 if auto_cleanup:
1166 1171 self.user_ids.append(user.user_id)
1167 1172 return user
1168 1173
1169 1174 def create_user_with_group(self):
1170 1175 user = self.create_user()
1171 1176 user_group = self.create_user_group(members=[user])
1172 1177 return user, user_group
1173 1178
1174 1179 def create_user_group(self, members=None, auto_cleanup=True, **kwargs):
1175 1180 group_name = "{prefix}_usergroup_{count}".format(
1176 1181 prefix=self._test_name,
1177 1182 count=len(self.user_group_ids))
1178 1183 user_group = self.fixture.create_user_group(group_name, **kwargs)
1179 1184 if auto_cleanup:
1180 1185 self.user_group_ids.append(user_group.users_group_id)
1181 1186 if members:
1182 1187 for user in members:
1183 1188 UserGroupModel().add_user_to_group(user_group, user)
1184 1189 return user_group
1185 1190
1186 1191 def grant_user_permission(self, user_name, permission_name):
1187 1192 self._inherit_default_user_permissions(user_name, False)
1188 1193 self.user_permissions.append((user_name, permission_name))
1189 1194
1190 1195 def grant_user_permission_to_repo_group(
1191 1196 self, repo_group, user, permission_name):
1192 1197 permission = RepoGroupModel().grant_user_permission(
1193 1198 repo_group, user, permission_name)
1194 1199 self.user_repo_group_permission_ids.append(
1195 1200 (repo_group.group_id, user.user_id))
1196 1201 return permission
1197 1202
1198 1203 def grant_user_group_permission_to_repo_group(
1199 1204 self, repo_group, user_group, permission_name):
1200 1205 permission = RepoGroupModel().grant_user_group_permission(
1201 1206 repo_group, user_group, permission_name)
1202 1207 self.user_group_repo_group_permission_ids.append(
1203 1208 (repo_group.group_id, user_group.users_group_id))
1204 1209 return permission
1205 1210
1206 1211 def grant_user_permission_to_repo(
1207 1212 self, repo, user, permission_name):
1208 1213 permission = RepoModel().grant_user_permission(
1209 1214 repo, user, permission_name)
1210 1215 self.user_repo_permission_ids.append(
1211 1216 (repo.repo_id, user.user_id))
1212 1217 return permission
1213 1218
1214 1219 def grant_user_group_permission_to_repo(
1215 1220 self, repo, user_group, permission_name):
1216 1221 permission = RepoModel().grant_user_group_permission(
1217 1222 repo, user_group, permission_name)
1218 1223 self.user_group_repo_permission_ids.append(
1219 1224 (repo.repo_id, user_group.users_group_id))
1220 1225 return permission
1221 1226
1222 1227 def grant_user_permission_to_user_group(
1223 1228 self, target_user_group, user, permission_name):
1224 1229 permission = UserGroupModel().grant_user_permission(
1225 1230 target_user_group, user, permission_name)
1226 1231 self.user_user_group_permission_ids.append(
1227 1232 (target_user_group.users_group_id, user.user_id))
1228 1233 return permission
1229 1234
1230 1235 def grant_user_group_permission_to_user_group(
1231 1236 self, target_user_group, user_group, permission_name):
1232 1237 permission = UserGroupModel().grant_user_group_permission(
1233 1238 target_user_group, user_group, permission_name)
1234 1239 self.user_group_user_group_permission_ids.append(
1235 1240 (target_user_group.users_group_id, user_group.users_group_id))
1236 1241 return permission
1237 1242
1238 1243 def revoke_user_permission(self, user_name, permission_name):
1239 1244 self._inherit_default_user_permissions(user_name, True)
1240 1245 UserModel().revoke_perm(user_name, permission_name)
1241 1246
1242 1247 def _inherit_default_user_permissions(self, user_name, value):
1243 1248 user = UserModel().get_by_username(user_name)
1244 1249 user.inherit_default_permissions = value
1245 1250 Session().add(user)
1246 1251 Session().commit()
1247 1252
1248 1253 def cleanup(self):
1249 1254 self._cleanup_permissions()
1250 1255 self._cleanup_repo_groups()
1251 1256 self._cleanup_user_groups()
1252 1257 self._cleanup_users()
1253 1258
1254 1259 def _cleanup_permissions(self):
1255 1260 if self.user_permissions:
1256 1261 for user_name, permission_name in self.user_permissions:
1257 1262 self.revoke_user_permission(user_name, permission_name)
1258 1263
1259 1264 for permission in self.user_repo_permission_ids:
1260 1265 RepoModel().revoke_user_permission(*permission)
1261 1266
1262 1267 for permission in self.user_group_repo_permission_ids:
1263 1268 RepoModel().revoke_user_group_permission(*permission)
1264 1269
1265 1270 for permission in self.user_repo_group_permission_ids:
1266 1271 RepoGroupModel().revoke_user_permission(*permission)
1267 1272
1268 1273 for permission in self.user_group_repo_group_permission_ids:
1269 1274 RepoGroupModel().revoke_user_group_permission(*permission)
1270 1275
1271 1276 for permission in self.user_user_group_permission_ids:
1272 1277 UserGroupModel().revoke_user_permission(*permission)
1273 1278
1274 1279 for permission in self.user_group_user_group_permission_ids:
1275 1280 UserGroupModel().revoke_user_group_permission(*permission)
1276 1281
1277 1282 def _cleanup_repo_groups(self):
1278 1283 def _repo_group_compare(first_group_id, second_group_id):
1279 1284 """
1280 1285 Gives higher priority to the groups with the most complex paths
1281 1286 """
1282 1287 first_group = RepoGroup.get(first_group_id)
1283 1288 second_group = RepoGroup.get(second_group_id)
1284 1289 first_group_parts = (
1285 1290 len(first_group.group_name.split('/')) if first_group else 0)
1286 1291 second_group_parts = (
1287 1292 len(second_group.group_name.split('/')) if second_group else 0)
1288 1293 return cmp(second_group_parts, first_group_parts)
1289 1294
1290 1295 sorted_repo_group_ids = sorted(
1291 1296 self.repo_group_ids, cmp=_repo_group_compare)
1292 1297 for repo_group_id in sorted_repo_group_ids:
1293 1298 self.fixture.destroy_repo_group(repo_group_id)
1294 1299
1295 1300 def _cleanup_user_groups(self):
1296 1301 def _user_group_compare(first_group_id, second_group_id):
1297 1302 """
1298 1303 Gives higher priority to the groups with the most complex paths
1299 1304 """
1300 1305 first_group = UserGroup.get(first_group_id)
1301 1306 second_group = UserGroup.get(second_group_id)
1302 1307 first_group_parts = (
1303 1308 len(first_group.users_group_name.split('/'))
1304 1309 if first_group else 0)
1305 1310 second_group_parts = (
1306 1311 len(second_group.users_group_name.split('/'))
1307 1312 if second_group else 0)
1308 1313 return cmp(second_group_parts, first_group_parts)
1309 1314
1310 1315 sorted_user_group_ids = sorted(
1311 1316 self.user_group_ids, cmp=_user_group_compare)
1312 1317 for user_group_id in sorted_user_group_ids:
1313 1318 self.fixture.destroy_user_group(user_group_id)
1314 1319
1315 1320 def _cleanup_users(self):
1316 1321 for user_id in self.user_ids:
1317 1322 self.fixture.destroy_user(user_id)
1318 1323
1319 1324
1320 1325 # TODO: Think about moving this into a pytest-pyro package and make it a
1321 1326 # pytest plugin
1322 1327 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
1323 1328 def pytest_runtest_makereport(item, call):
1324 1329 """
1325 1330 Adding the remote traceback if the exception has this information.
1326 1331
1327 1332 Pyro4 attaches this information as the attribute `_pyroTraceback`
1328 1333 to the exception instance.
1329 1334 """
1330 1335 outcome = yield
1331 1336 report = outcome.get_result()
1332 1337 if call.excinfo:
1333 1338 _add_pyro_remote_traceback(report, call.excinfo.value)
1334 1339
1335 1340
1336 1341 def _add_pyro_remote_traceback(report, exc):
1337 1342 pyro_traceback = getattr(exc, '_pyroTraceback', None)
1338 1343
1339 1344 if pyro_traceback:
1340 1345 traceback = ''.join(pyro_traceback)
1341 1346 section = 'Pyro4 remote traceback ' + report.when
1342 1347 report.sections.append((section, traceback))
1343 1348
1344 1349
1345 1350 @pytest.fixture(scope='session')
1346 1351 def testrun():
1347 1352 return {
1348 1353 'uuid': uuid.uuid4(),
1349 1354 'start': datetime.datetime.utcnow().isoformat(),
1350 1355 'timestamp': int(time.time()),
1351 1356 }
1352 1357
1353 1358
1354 1359 @pytest.fixture(autouse=True)
1355 1360 def collect_appenlight_stats(request, testrun):
1356 1361 """
1357 1362 This fixture reports memory consumtion of single tests.
1358 1363
1359 1364 It gathers data based on `psutil` and sends them to Appenlight. The option
1360 1365 ``--ae`` has te be used to enable this fixture and the API key for your
1361 1366 application has to be provided in ``--ae-key``.
1362 1367 """
1363 1368 try:
1364 1369 # cygwin cannot have yet psutil support.
1365 1370 import psutil
1366 1371 except ImportError:
1367 1372 return
1368 1373
1369 1374 if not request.config.getoption('--appenlight'):
1370 1375 return
1371 1376 else:
1372 1377 # Only request the pylonsapp fixture if appenlight tracking is
1373 1378 # enabled. This will speed up a test run of unit tests by 2 to 3
1374 1379 # seconds if appenlight is not enabled.
1375 1380 pylonsapp = request.getfuncargvalue("pylonsapp")
1376 1381 url = '{}/api/logs'.format(request.config.getoption('--appenlight-url'))
1377 1382 client = AppenlightClient(
1378 1383 url=url,
1379 1384 api_key=request.config.getoption('--appenlight-api-key'),
1380 1385 namespace=request.node.nodeid,
1381 1386 request=str(testrun['uuid']),
1382 1387 testrun=testrun)
1383 1388
1384 1389 client.collect({
1385 1390 'message': "Starting",
1386 1391 })
1387 1392
1388 1393 server_and_port = pylonsapp.config['vcs.server']
1389 1394 server = create_vcsserver_proxy(server_and_port)
1390 1395 with server:
1391 1396 vcs_pid = server.get_pid()
1392 1397 server.run_gc()
1393 1398 vcs_process = psutil.Process(vcs_pid)
1394 1399 mem = vcs_process.memory_info()
1395 1400 client.tag_before('vcsserver.rss', mem.rss)
1396 1401 client.tag_before('vcsserver.vms', mem.vms)
1397 1402
1398 1403 test_process = psutil.Process()
1399 1404 mem = test_process.memory_info()
1400 1405 client.tag_before('test.rss', mem.rss)
1401 1406 client.tag_before('test.vms', mem.vms)
1402 1407
1403 1408 client.tag_before('time', time.time())
1404 1409
1405 1410 @request.addfinalizer
1406 1411 def send_stats():
1407 1412 client.tag_after('time', time.time())
1408 1413 with server:
1409 1414 gc_stats = server.run_gc()
1410 1415 for tag, value in gc_stats.items():
1411 1416 client.tag_after(tag, value)
1412 1417 mem = vcs_process.memory_info()
1413 1418 client.tag_after('vcsserver.rss', mem.rss)
1414 1419 client.tag_after('vcsserver.vms', mem.vms)
1415 1420
1416 1421 mem = test_process.memory_info()
1417 1422 client.tag_after('test.rss', mem.rss)
1418 1423 client.tag_after('test.vms', mem.vms)
1419 1424
1420 1425 client.collect({
1421 1426 'message': "Finished",
1422 1427 })
1423 1428 client.send_stats()
1424 1429
1425 1430 return client
1426 1431
1427 1432
1428 1433 class AppenlightClient():
1429 1434
1430 1435 url_template = '{url}?protocol_version=0.5'
1431 1436
1432 1437 def __init__(
1433 1438 self, url, api_key, add_server=True, add_timestamp=True,
1434 1439 namespace=None, request=None, testrun=None):
1435 1440 self.url = self.url_template.format(url=url)
1436 1441 self.api_key = api_key
1437 1442 self.add_server = add_server
1438 1443 self.add_timestamp = add_timestamp
1439 1444 self.namespace = namespace
1440 1445 self.request = request
1441 1446 self.server = socket.getfqdn(socket.gethostname())
1442 1447 self.tags_before = {}
1443 1448 self.tags_after = {}
1444 1449 self.stats = []
1445 1450 self.testrun = testrun or {}
1446 1451
1447 1452 def tag_before(self, tag, value):
1448 1453 self.tags_before[tag] = value
1449 1454
1450 1455 def tag_after(self, tag, value):
1451 1456 self.tags_after[tag] = value
1452 1457
1453 1458 def collect(self, data):
1454 1459 if self.add_server:
1455 1460 data.setdefault('server', self.server)
1456 1461 if self.add_timestamp:
1457 1462 data.setdefault('date', datetime.datetime.utcnow().isoformat())
1458 1463 if self.namespace:
1459 1464 data.setdefault('namespace', self.namespace)
1460 1465 if self.request:
1461 1466 data.setdefault('request', self.request)
1462 1467 self.stats.append(data)
1463 1468
1464 1469 def send_stats(self):
1465 1470 tags = [
1466 1471 ('testrun', self.request),
1467 1472 ('testrun.start', self.testrun['start']),
1468 1473 ('testrun.timestamp', self.testrun['timestamp']),
1469 1474 ('test', self.namespace),
1470 1475 ]
1471 1476 for key, value in self.tags_before.items():
1472 1477 tags.append((key + '.before', value))
1473 1478 try:
1474 1479 delta = self.tags_after[key] - value
1475 1480 tags.append((key + '.delta', delta))
1476 1481 except Exception:
1477 1482 pass
1478 1483 for key, value in self.tags_after.items():
1479 1484 tags.append((key + '.after', value))
1480 1485 self.collect({
1481 1486 'message': "Collected tags",
1482 1487 'tags': tags,
1483 1488 })
1484 1489
1485 1490 response = requests.post(
1486 1491 self.url,
1487 1492 headers={
1488 1493 'X-appenlight-api-key': self.api_key},
1489 1494 json=self.stats,
1490 1495 )
1491 1496
1492 1497 if not response.status_code == 200:
1493 1498 pprint.pprint(self.stats)
1494 1499 print response.headers
1495 1500 print response.text
1496 1501 raise Exception('Sending to appenlight failed')
1497 1502
1498 1503
1499 1504 @pytest.fixture
1500 1505 def gist_util(request, pylonsapp):
1501 1506 """
1502 1507 Provides a wired instance of `GistUtility` with integrated cleanup.
1503 1508 """
1504 1509 utility = GistUtility()
1505 1510 request.addfinalizer(utility.cleanup)
1506 1511 return utility
1507 1512
1508 1513
1509 1514 class GistUtility(object):
1510 1515 def __init__(self):
1511 1516 self.fixture = Fixture()
1512 1517 self.gist_ids = []
1513 1518
1514 1519 def create_gist(self, **kwargs):
1515 1520 gist = self.fixture.create_gist(**kwargs)
1516 1521 self.gist_ids.append(gist.gist_id)
1517 1522 return gist
1518 1523
1519 1524 def cleanup(self):
1520 1525 for id_ in self.gist_ids:
1521 1526 self.fixture.destroy_gists(str(id_))
1522 1527
1523 1528
1524 1529 @pytest.fixture
1525 1530 def enabled_backends(request):
1526 1531 backends = request.config.option.backends
1527 1532 return backends[:]
1528 1533
1529 1534
1530 1535 @pytest.fixture
1531 1536 def settings_util(request):
1532 1537 """
1533 1538 Provides a wired instance of `SettingsUtility` with integrated cleanup.
1534 1539 """
1535 1540 utility = SettingsUtility()
1536 1541 request.addfinalizer(utility.cleanup)
1537 1542 return utility
1538 1543
1539 1544
1540 1545 class SettingsUtility(object):
1541 1546 def __init__(self):
1542 1547 self.rhodecode_ui_ids = []
1543 1548 self.rhodecode_setting_ids = []
1544 1549 self.repo_rhodecode_ui_ids = []
1545 1550 self.repo_rhodecode_setting_ids = []
1546 1551
1547 1552 def create_repo_rhodecode_ui(
1548 1553 self, repo, section, value, key=None, active=True, cleanup=True):
1549 1554 key = key or hashlib.sha1(
1550 1555 '{}{}{}'.format(section, value, repo.repo_id)).hexdigest()
1551 1556
1552 1557 setting = RepoRhodeCodeUi()
1553 1558 setting.repository_id = repo.repo_id
1554 1559 setting.ui_section = section
1555 1560 setting.ui_value = value
1556 1561 setting.ui_key = key
1557 1562 setting.ui_active = active
1558 1563 Session().add(setting)
1559 1564 Session().commit()
1560 1565
1561 1566 if cleanup:
1562 1567 self.repo_rhodecode_ui_ids.append(setting.ui_id)
1563 1568 return setting
1564 1569
1565 1570 def create_rhodecode_ui(
1566 1571 self, section, value, key=None, active=True, cleanup=True):
1567 1572 key = key or hashlib.sha1('{}{}'.format(section, value)).hexdigest()
1568 1573
1569 1574 setting = RhodeCodeUi()
1570 1575 setting.ui_section = section
1571 1576 setting.ui_value = value
1572 1577 setting.ui_key = key
1573 1578 setting.ui_active = active
1574 1579 Session().add(setting)
1575 1580 Session().commit()
1576 1581
1577 1582 if cleanup:
1578 1583 self.rhodecode_ui_ids.append(setting.ui_id)
1579 1584 return setting
1580 1585
1581 1586 def create_repo_rhodecode_setting(
1582 1587 self, repo, name, value, type_, cleanup=True):
1583 1588 setting = RepoRhodeCodeSetting(
1584 1589 repo.repo_id, key=name, val=value, type=type_)
1585 1590 Session().add(setting)
1586 1591 Session().commit()
1587 1592
1588 1593 if cleanup:
1589 1594 self.repo_rhodecode_setting_ids.append(setting.app_settings_id)
1590 1595 return setting
1591 1596
1592 1597 def create_rhodecode_setting(self, name, value, type_, cleanup=True):
1593 1598 setting = RhodeCodeSetting(key=name, val=value, type=type_)
1594 1599 Session().add(setting)
1595 1600 Session().commit()
1596 1601
1597 1602 if cleanup:
1598 1603 self.rhodecode_setting_ids.append(setting.app_settings_id)
1599 1604
1600 1605 return setting
1601 1606
1602 1607 def cleanup(self):
1603 1608 for id_ in self.rhodecode_ui_ids:
1604 1609 setting = RhodeCodeUi.get(id_)
1605 1610 Session().delete(setting)
1606 1611
1607 1612 for id_ in self.rhodecode_setting_ids:
1608 1613 setting = RhodeCodeSetting.get(id_)
1609 1614 Session().delete(setting)
1610 1615
1611 1616 for id_ in self.repo_rhodecode_ui_ids:
1612 1617 setting = RepoRhodeCodeUi.get(id_)
1613 1618 Session().delete(setting)
1614 1619
1615 1620 for id_ in self.repo_rhodecode_setting_ids:
1616 1621 setting = RepoRhodeCodeSetting.get(id_)
1617 1622 Session().delete(setting)
1618 1623
1619 1624 Session().commit()
1620 1625
1621 1626
1622 1627 @pytest.fixture
1623 1628 def no_notifications(request):
1624 1629 notification_patcher = mock.patch(
1625 1630 'rhodecode.model.notification.NotificationModel.create')
1626 1631 notification_patcher.start()
1627 1632 request.addfinalizer(notification_patcher.stop)
1628 1633
1629 1634
1630 1635 @pytest.fixture
1631 1636 def silence_action_logger(request):
1632 1637 notification_patcher = mock.patch(
1633 1638 'rhodecode.lib.utils.action_logger')
1634 1639 notification_patcher.start()
1635 1640 request.addfinalizer(notification_patcher.stop)
1636 1641
1637 1642
1638 1643 @pytest.fixture(scope='session')
1639 1644 def repeat(request):
1640 1645 """
1641 1646 The number of repetitions is based on this fixture.
1642 1647
1643 1648 Slower calls may divide it by 10 or 100. It is chosen in a way so that the
1644 1649 tests are not too slow in our default test suite.
1645 1650 """
1646 1651 return request.config.getoption('--repeat')
1647 1652
1648 1653
1649 1654 @pytest.fixture
1650 1655 def rhodecode_fixtures():
1651 1656 return Fixture()
1652 1657
1653 1658
1654 1659 @pytest.fixture
1655 1660 def request_stub():
1656 1661 """
1657 1662 Stub request object.
1658 1663 """
1659 1664 request = pyramid.testing.DummyRequest()
1660 1665 request.scheme = 'https'
1661 1666 return request
1662 1667
1663 1668
1664 1669 @pytest.fixture
1665 1670 def config_stub(request, request_stub):
1666 1671 """
1667 1672 Set up pyramid.testing and return the Configurator.
1668 1673 """
1669 1674 config = pyramid.testing.setUp(request=request_stub)
1670 1675
1671 1676 @request.addfinalizer
1672 1677 def cleanup():
1673 1678 pyramid.testing.tearDown()
1674 1679
1675 1680 return config
1676 1681
1677 1682
1678 1683 @pytest.fixture
1679 1684 def StubIntegrationType():
1680 1685 class _StubIntegrationType(IntegrationTypeBase):
1681 1686 """ Test integration type class """
1682 1687
1683 1688 key = 'test'
1684 1689 display_name = 'Test integration type'
1685 1690 description = 'A test integration type for testing'
1686 1691 icon = 'test_icon_html_image'
1687 1692
1688 1693 def __init__(self, settings):
1689 1694 super(_StubIntegrationType, self).__init__(settings)
1690 1695 self.sent_events = [] # for testing
1691 1696
1692 1697 def send_event(self, event):
1693 1698 self.sent_events.append(event)
1694 1699
1695 1700 def settings_schema(self):
1696 1701 class SettingsSchema(colander.Schema):
1697 1702 test_string_field = colander.SchemaNode(
1698 1703 colander.String(),
1699 1704 missing=colander.required,
1700 1705 title='test string field',
1701 1706 )
1702 1707 test_int_field = colander.SchemaNode(
1703 1708 colander.Int(),
1704 1709 title='some integer setting',
1705 1710 )
1706 1711 return SettingsSchema()
1707 1712
1708 1713
1709 1714 integration_type_registry.register_integration_type(_StubIntegrationType)
1710 1715 return _StubIntegrationType
1711 1716
1712 1717 @pytest.fixture
1713 1718 def stub_integration_settings():
1714 1719 return {
1715 1720 'test_string_field': 'some data',
1716 1721 'test_int_field': 100,
1717 1722 }
1718 1723
1719 1724
1720 1725 @pytest.fixture
1721 1726 def repo_integration_stub(request, repo_stub, StubIntegrationType,
1722 1727 stub_integration_settings):
1723 1728 integration = IntegrationModel().create(
1724 1729 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1725 1730 name='test repo integration',
1726 1731 repo=repo_stub, repo_group=None, child_repos_only=None)
1727 1732
1728 1733 @request.addfinalizer
1729 1734 def cleanup():
1730 1735 IntegrationModel().delete(integration)
1731 1736
1732 1737 return integration
1733 1738
1734 1739
1735 1740 @pytest.fixture
1736 1741 def repogroup_integration_stub(request, test_repo_group, StubIntegrationType,
1737 1742 stub_integration_settings):
1738 1743 integration = IntegrationModel().create(
1739 1744 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1740 1745 name='test repogroup integration',
1741 1746 repo=None, repo_group=test_repo_group, child_repos_only=True)
1742 1747
1743 1748 @request.addfinalizer
1744 1749 def cleanup():
1745 1750 IntegrationModel().delete(integration)
1746 1751
1747 1752 return integration
1748 1753
1749 1754
1750 1755 @pytest.fixture
1751 1756 def repogroup_recursive_integration_stub(request, test_repo_group,
1752 1757 StubIntegrationType, stub_integration_settings):
1753 1758 integration = IntegrationModel().create(
1754 1759 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1755 1760 name='test recursive repogroup integration',
1756 1761 repo=None, repo_group=test_repo_group, child_repos_only=False)
1757 1762
1758 1763 @request.addfinalizer
1759 1764 def cleanup():
1760 1765 IntegrationModel().delete(integration)
1761 1766
1762 1767 return integration
1763 1768
1764 1769
1765 1770 @pytest.fixture
1766 1771 def global_integration_stub(request, StubIntegrationType,
1767 1772 stub_integration_settings):
1768 1773 integration = IntegrationModel().create(
1769 1774 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1770 1775 name='test global integration',
1771 1776 repo=None, repo_group=None, child_repos_only=None)
1772 1777
1773 1778 @request.addfinalizer
1774 1779 def cleanup():
1775 1780 IntegrationModel().delete(integration)
1776 1781
1777 1782 return integration
1778 1783
1779 1784
1780 1785 @pytest.fixture
1781 1786 def root_repos_integration_stub(request, StubIntegrationType,
1782 1787 stub_integration_settings):
1783 1788 integration = IntegrationModel().create(
1784 1789 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1785 1790 name='test global integration',
1786 1791 repo=None, repo_group=None, child_repos_only=True)
1787 1792
1788 1793 @request.addfinalizer
1789 1794 def cleanup():
1790 1795 IntegrationModel().delete(integration)
1791 1796
1792 1797 return integration
General Comments 0
You need to be logged in to leave comments. Login now