##// END OF EJS Templates
tests: small code cleanup
marcink -
r2315:a5a03d0b default
parent child Browse files
Show More
@@ -1,1858 +1,1855 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import collections
22 22 import datetime
23 23 import hashlib
24 24 import os
25 25 import re
26 26 import pprint
27 27 import shutil
28 28 import socket
29 29 import subprocess32
30 30 import time
31 31 import uuid
32 32 import dateutil.tz
33 33
34 34 import mock
35 35 import pyramid.testing
36 36 import pytest
37 37 import colander
38 38 import requests
39 39
40 40 import rhodecode
41 41 from rhodecode.lib.utils2 import AttributeDict
42 42 from rhodecode.model.changeset_status import ChangesetStatusModel
43 43 from rhodecode.model.comment import CommentsModel
44 44 from rhodecode.model.db import (
45 45 PullRequest, Repository, RhodeCodeSetting, ChangesetStatus, RepoGroup,
46 46 UserGroup, RepoRhodeCodeUi, RepoRhodeCodeSetting, RhodeCodeUi)
47 47 from rhodecode.model.meta import Session
48 48 from rhodecode.model.pull_request import PullRequestModel
49 49 from rhodecode.model.repo import RepoModel
50 50 from rhodecode.model.repo_group import RepoGroupModel
51 51 from rhodecode.model.user import UserModel
52 52 from rhodecode.model.settings import VcsSettingsModel
53 53 from rhodecode.model.user_group import UserGroupModel
54 54 from rhodecode.model.integration import IntegrationModel
55 55 from rhodecode.integrations import integration_type_registry
56 56 from rhodecode.integrations.types.base import IntegrationTypeBase
57 57 from rhodecode.lib.utils import repo2db_mapper
58 58 from rhodecode.lib.vcs import create_vcsserver_proxy
59 59 from rhodecode.lib.vcs.backends import get_backend
60 60 from rhodecode.lib.vcs.nodes import FileNode
61 61 from rhodecode.tests import (
62 62 login_user_session, get_new_dir, utils, TESTS_TMP_PATH,
63 63 TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR2_LOGIN,
64 64 TEST_USER_REGULAR_PASS)
65 65 from rhodecode.tests.utils import CustomTestApp, set_anonymous_access
66 66 from rhodecode.tests.fixture import Fixture
67 67
68 68
69 69 def _split_comma(value):
70 70 return value.split(',')
71 71
72 72
73 73 def pytest_addoption(parser):
74 74 parser.addoption(
75 75 '--keep-tmp-path', action='store_true',
76 76 help="Keep the test temporary directories")
77 77 parser.addoption(
78 78 '--backends', action='store', type=_split_comma,
79 79 default=['git', 'hg', 'svn'],
80 80 help="Select which backends to test for backend specific tests.")
81 81 parser.addoption(
82 82 '--dbs', action='store', type=_split_comma,
83 83 default=['sqlite'],
84 84 help="Select which database to test for database specific tests. "
85 85 "Possible options are sqlite,postgres,mysql")
86 86 parser.addoption(
87 87 '--appenlight', '--ae', action='store_true',
88 88 help="Track statistics in appenlight.")
89 89 parser.addoption(
90 90 '--appenlight-api-key', '--ae-key',
91 91 help="API key for Appenlight.")
92 92 parser.addoption(
93 93 '--appenlight-url', '--ae-url',
94 94 default="https://ae.rhodecode.com",
95 95 help="Appenlight service URL, defaults to https://ae.rhodecode.com")
96 96 parser.addoption(
97 97 '--sqlite-connection-string', action='store',
98 98 default='', help="Connection string for the dbs tests with SQLite")
99 99 parser.addoption(
100 100 '--postgres-connection-string', action='store',
101 101 default='', help="Connection string for the dbs tests with Postgres")
102 102 parser.addoption(
103 103 '--mysql-connection-string', action='store',
104 104 default='', help="Connection string for the dbs tests with MySQL")
105 105 parser.addoption(
106 106 '--repeat', type=int, default=100,
107 107 help="Number of repetitions in performance tests.")
108 108
109 109
110 110 def pytest_configure(config):
111 111 # Appy the kombu patch early on, needed for test discovery on Python 2.7.11
112 112 from rhodecode.config import patches
113 113 patches.kombu_1_5_1_python_2_7_11()
114 114
115 115
116 116 def pytest_collection_modifyitems(session, config, items):
117 117 # nottest marked, compare nose, used for transition from nose to pytest
118 118 remaining = [
119 119 i for i in items if getattr(i.obj, '__test__', True)]
120 120 items[:] = remaining
121 121
122 122
123 123 def pytest_generate_tests(metafunc):
124 124 # Support test generation based on --backend parameter
125 125 if 'backend_alias' in metafunc.fixturenames:
126 126 backends = get_backends_from_metafunc(metafunc)
127 127 scope = None
128 128 if not backends:
129 129 pytest.skip("Not enabled for any of selected backends")
130 130 metafunc.parametrize('backend_alias', backends, scope=scope)
131 131 elif hasattr(metafunc.function, 'backends'):
132 132 backends = get_backends_from_metafunc(metafunc)
133 133 if not backends:
134 134 pytest.skip("Not enabled for any of selected backends")
135 135
136 136
137 137 def get_backends_from_metafunc(metafunc):
138 138 requested_backends = set(metafunc.config.getoption('--backends'))
139 139 if hasattr(metafunc.function, 'backends'):
140 140 # Supported backends by this test function, created from
141 141 # pytest.mark.backends
142 142 backends = metafunc.function.backends.args
143 143 elif hasattr(metafunc.cls, 'backend_alias'):
144 144 # Support class attribute "backend_alias", this is mainly
145 145 # for legacy reasons for tests not yet using pytest.mark.backends
146 146 backends = [metafunc.cls.backend_alias]
147 147 else:
148 148 backends = metafunc.config.getoption('--backends')
149 149 return requested_backends.intersection(backends)
150 150
151 151
152 152 @pytest.fixture(scope='session', autouse=True)
153 153 def activate_example_rcextensions(request):
154 154 """
155 155 Patch in an example rcextensions module which verifies passed in kwargs.
156 156 """
157 157 from rhodecode.tests.other import example_rcextensions
158 158
159 159 old_extensions = rhodecode.EXTENSIONS
160 160 rhodecode.EXTENSIONS = example_rcextensions
161 161
162 162 @request.addfinalizer
163 163 def cleanup():
164 164 rhodecode.EXTENSIONS = old_extensions
165 165
166 166
167 167 @pytest.fixture
168 168 def capture_rcextensions():
169 169 """
170 170 Returns the recorded calls to entry points in rcextensions.
171 171 """
172 172 calls = rhodecode.EXTENSIONS.calls
173 173 calls.clear()
174 174 # Note: At this moment, it is still the empty dict, but that will
175 175 # be filled during the test run and since it is a reference this
176 176 # is enough to make it work.
177 177 return calls
178 178
179 179
180 180 @pytest.fixture(scope='session')
181 181 def http_environ_session():
182 182 """
183 183 Allow to use "http_environ" in session scope.
184 184 """
185 185 return http_environ(
186 186 http_host_stub=http_host_stub())
187 187
188 188
189 189 @pytest.fixture
190 190 def http_host_stub():
191 191 """
192 192 Value of HTTP_HOST in the test run.
193 193 """
194 194 return 'example.com:80'
195 195
196 196
197 197 @pytest.fixture
198 198 def http_host_only_stub():
199 199 """
200 200 Value of HTTP_HOST in the test run.
201 201 """
202 202 return http_host_stub().split(':')[0]
203 203
204 204
205 205 @pytest.fixture
206 206 def http_environ(http_host_stub):
207 207 """
208 208 HTTP extra environ keys.
209 209
210 210 User by the test application and as well for setting up the pylons
211 211 environment. In the case of the fixture "app" it should be possible
212 212 to override this for a specific test case.
213 213 """
214 214 return {
215 215 'SERVER_NAME': http_host_only_stub(),
216 216 'SERVER_PORT': http_host_stub.split(':')[1],
217 217 'HTTP_HOST': http_host_stub,
218 218 'HTTP_USER_AGENT': 'rc-test-agent',
219 219 'REQUEST_METHOD': 'GET'
220 220 }
221 221
222 222
223 223 @pytest.fixture(scope='function')
224 224 def app(request, config_stub, pylonsapp, http_environ):
225 225 app = CustomTestApp(
226 226 pylonsapp,
227 227 extra_environ=http_environ)
228 228 if request.cls:
229 229 request.cls.app = app
230 230 return app
231 231
232 232
233 233 @pytest.fixture(scope='session')
234 234 def app_settings(pylonsapp, pylons_config):
235 235 """
236 236 Settings dictionary used to create the app.
237 237
238 238 Parses the ini file and passes the result through the sanitize and apply
239 239 defaults mechanism in `rhodecode.config.middleware`.
240 240 """
241 241 from paste.deploy.loadwsgi import loadcontext, APP
242 242 from rhodecode.config.middleware import (
243 243 sanitize_settings_and_apply_defaults)
244 244 context = loadcontext(APP, 'config:' + pylons_config)
245 245 settings = sanitize_settings_and_apply_defaults(context.config())
246 246 return settings
247 247
248 248
249 249 @pytest.fixture(scope='session')
250 250 def db(app_settings):
251 251 """
252 252 Initializes the database connection.
253 253
254 254 It uses the same settings which are used to create the ``pylonsapp`` or
255 255 ``app`` fixtures.
256 256 """
257 257 from rhodecode.config.utils import initialize_database
258 258 initialize_database(app_settings)
259 259
260 260
261 261 LoginData = collections.namedtuple('LoginData', ('csrf_token', 'user'))
262 262
263 263
264 264 def _autologin_user(app, *args):
265 265 session = login_user_session(app, *args)
266 266 csrf_token = rhodecode.lib.auth.get_csrf_token(session)
267 267 return LoginData(csrf_token, session['rhodecode_user'])
268 268
269 269
270 270 @pytest.fixture
271 271 def autologin_user(app):
272 272 """
273 273 Utility fixture which makes sure that the admin user is logged in
274 274 """
275 275 return _autologin_user(app)
276 276
277 277
278 278 @pytest.fixture
279 279 def autologin_regular_user(app):
280 280 """
281 281 Utility fixture which makes sure that the regular user is logged in
282 282 """
283 283 return _autologin_user(
284 284 app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
285 285
286 286
287 287 @pytest.fixture(scope='function')
288 288 def csrf_token(request, autologin_user):
289 289 return autologin_user.csrf_token
290 290
291 291
292 292 @pytest.fixture(scope='function')
293 293 def xhr_header(request):
294 294 return {'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'}
295 295
296 296
297 297 @pytest.fixture
298 298 def real_crypto_backend(monkeypatch):
299 299 """
300 300 Switch the production crypto backend on for this test.
301 301
302 302 During the test run the crypto backend is replaced with a faster
303 303 implementation based on the MD5 algorithm.
304 304 """
305 305 monkeypatch.setattr(rhodecode, 'is_test', False)
306 306
307 307
308 308 @pytest.fixture(scope='class')
309 309 def index_location(request, pylonsapp):
310 310 index_location = pylonsapp.config['app_conf']['search.location']
311 311 if request.cls:
312 312 request.cls.index_location = index_location
313 313 return index_location
314 314
315 315
316 316 @pytest.fixture(scope='session', autouse=True)
317 317 def tests_tmp_path(request):
318 318 """
319 319 Create temporary directory to be used during the test session.
320 320 """
321 321 if not os.path.exists(TESTS_TMP_PATH):
322 322 os.makedirs(TESTS_TMP_PATH)
323 323
324 324 if not request.config.getoption('--keep-tmp-path'):
325 325 @request.addfinalizer
326 326 def remove_tmp_path():
327 327 shutil.rmtree(TESTS_TMP_PATH)
328 328
329 329 return TESTS_TMP_PATH
330 330
331 331
332 332 @pytest.fixture
333 333 def test_repo_group(request):
334 334 """
335 335 Create a temporary repository group, and destroy it after
336 336 usage automatically
337 337 """
338 338 fixture = Fixture()
339 339 repogroupid = 'test_repo_group_%s' % str(time.time()).replace('.', '')
340 340 repo_group = fixture.create_repo_group(repogroupid)
341 341
342 342 def _cleanup():
343 343 fixture.destroy_repo_group(repogroupid)
344 344
345 345 request.addfinalizer(_cleanup)
346 346 return repo_group
347 347
348 348
349 349 @pytest.fixture
350 350 def test_user_group(request):
351 351 """
352 352 Create a temporary user group, and destroy it after
353 353 usage automatically
354 354 """
355 355 fixture = Fixture()
356 356 usergroupid = 'test_user_group_%s' % str(time.time()).replace('.', '')
357 357 user_group = fixture.create_user_group(usergroupid)
358 358
359 359 def _cleanup():
360 360 fixture.destroy_user_group(user_group)
361 361
362 362 request.addfinalizer(_cleanup)
363 363 return user_group
364 364
365 365
366 366 @pytest.fixture(scope='session')
367 367 def test_repo(request):
368 368 container = TestRepoContainer()
369 369 request.addfinalizer(container._cleanup)
370 370 return container
371 371
372 372
373 373 class TestRepoContainer(object):
374 374 """
375 375 Container for test repositories which are used read only.
376 376
377 377 Repositories will be created on demand and re-used during the lifetime
378 378 of this object.
379 379
380 380 Usage to get the svn test repository "minimal"::
381 381
382 382 test_repo = TestContainer()
383 383 repo = test_repo('minimal', 'svn')
384 384
385 385 """
386 386
387 387 dump_extractors = {
388 388 'git': utils.extract_git_repo_from_dump,
389 389 'hg': utils.extract_hg_repo_from_dump,
390 390 'svn': utils.extract_svn_repo_from_dump,
391 391 }
392 392
393 393 def __init__(self):
394 394 self._cleanup_repos = []
395 395 self._fixture = Fixture()
396 396 self._repos = {}
397 397
398 398 def __call__(self, dump_name, backend_alias, config=None):
399 399 key = (dump_name, backend_alias)
400 400 if key not in self._repos:
401 401 repo = self._create_repo(dump_name, backend_alias, config)
402 402 self._repos[key] = repo.repo_id
403 403 return Repository.get(self._repos[key])
404 404
405 405 def _create_repo(self, dump_name, backend_alias, config):
406 406 repo_name = '%s-%s' % (backend_alias, dump_name)
407 407 backend_class = get_backend(backend_alias)
408 408 dump_extractor = self.dump_extractors[backend_alias]
409 409 repo_path = dump_extractor(dump_name, repo_name)
410 410
411 411 vcs_repo = backend_class(repo_path, config=config)
412 412 repo2db_mapper({repo_name: vcs_repo})
413 413
414 414 repo = RepoModel().get_by_repo_name(repo_name)
415 415 self._cleanup_repos.append(repo_name)
416 416 return repo
417 417
418 418 def _cleanup(self):
419 419 for repo_name in reversed(self._cleanup_repos):
420 420 self._fixture.destroy_repo(repo_name)
421 421
422 422
423 423 @pytest.fixture
424 424 def backend(request, backend_alias, pylonsapp, test_repo):
425 425 """
426 426 Parametrized fixture which represents a single backend implementation.
427 427
428 428 It respects the option `--backends` to focus the test run on specific
429 429 backend implementations.
430 430
431 431 It also supports `pytest.mark.xfail_backends` to mark tests as failing
432 432 for specific backends. This is intended as a utility for incremental
433 433 development of a new backend implementation.
434 434 """
435 435 if backend_alias not in request.config.getoption('--backends'):
436 436 pytest.skip("Backend %s not selected." % (backend_alias, ))
437 437
438 438 utils.check_xfail_backends(request.node, backend_alias)
439 439 utils.check_skip_backends(request.node, backend_alias)
440 440
441 441 repo_name = 'vcs_test_%s' % (backend_alias, )
442 442 backend = Backend(
443 443 alias=backend_alias,
444 444 repo_name=repo_name,
445 445 test_name=request.node.name,
446 446 test_repo_container=test_repo)
447 447 request.addfinalizer(backend.cleanup)
448 448 return backend
449 449
450 450
451 451 @pytest.fixture
452 452 def backend_git(request, pylonsapp, test_repo):
453 453 return backend(request, 'git', pylonsapp, test_repo)
454 454
455 455
456 456 @pytest.fixture
457 457 def backend_hg(request, pylonsapp, test_repo):
458 458 return backend(request, 'hg', pylonsapp, test_repo)
459 459
460 460
461 461 @pytest.fixture
462 462 def backend_svn(request, pylonsapp, test_repo):
463 463 return backend(request, 'svn', pylonsapp, test_repo)
464 464
465 465
466 466 @pytest.fixture
467 467 def backend_random(backend_git):
468 468 """
469 469 Use this to express that your tests need "a backend.
470 470
471 471 A few of our tests need a backend, so that we can run the code. This
472 472 fixture is intended to be used for such cases. It will pick one of the
473 473 backends and run the tests.
474 474
475 475 The fixture `backend` would run the test multiple times for each
476 476 available backend which is a pure waste of time if the test is
477 477 independent of the backend type.
478 478 """
479 479 # TODO: johbo: Change this to pick a random backend
480 480 return backend_git
481 481
482 482
483 483 @pytest.fixture
484 484 def backend_stub(backend_git):
485 485 """
486 486 Use this to express that your tests need a backend stub
487 487
488 488 TODO: mikhail: Implement a real stub logic instead of returning
489 489 a git backend
490 490 """
491 491 return backend_git
492 492
493 493
494 494 @pytest.fixture
495 495 def repo_stub(backend_stub):
496 496 """
497 497 Use this to express that your tests need a repository stub
498 498 """
499 499 return backend_stub.create_repo()
500 500
501 501
502 502 class Backend(object):
503 503 """
504 504 Represents the test configuration for one supported backend
505 505
506 506 Provides easy access to different test repositories based on
507 507 `__getitem__`. Such repositories will only be created once per test
508 508 session.
509 509 """
510 510
511 511 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
512 512 _master_repo = None
513 513 _commit_ids = {}
514 514
515 515 def __init__(self, alias, repo_name, test_name, test_repo_container):
516 516 self.alias = alias
517 517 self.repo_name = repo_name
518 518 self._cleanup_repos = []
519 519 self._test_name = test_name
520 520 self._test_repo_container = test_repo_container
521 521 # TODO: johbo: Used as a delegate interim. Not yet sure if Backend or
522 522 # Fixture will survive in the end.
523 523 self._fixture = Fixture()
524 524
525 525 def __getitem__(self, key):
526 526 return self._test_repo_container(key, self.alias)
527 527
528 528 def create_test_repo(self, key, config=None):
529 529 return self._test_repo_container(key, self.alias, config)
530 530
531 531 @property
532 532 def repo(self):
533 533 """
534 534 Returns the "current" repository. This is the vcs_test repo or the
535 535 last repo which has been created with `create_repo`.
536 536 """
537 537 from rhodecode.model.db import Repository
538 538 return Repository.get_by_repo_name(self.repo_name)
539 539
540 540 @property
541 541 def default_branch_name(self):
542 542 VcsRepository = get_backend(self.alias)
543 543 return VcsRepository.DEFAULT_BRANCH_NAME
544 544
545 545 @property
546 546 def default_head_id(self):
547 547 """
548 548 Returns the default head id of the underlying backend.
549 549
550 550 This will be the default branch name in case the backend does have a
551 551 default branch. In the other cases it will point to a valid head
552 552 which can serve as the base to create a new commit on top of it.
553 553 """
554 554 vcsrepo = self.repo.scm_instance()
555 555 head_id = (
556 556 vcsrepo.DEFAULT_BRANCH_NAME or
557 557 vcsrepo.commit_ids[-1])
558 558 return head_id
559 559
560 560 @property
561 561 def commit_ids(self):
562 562 """
563 563 Returns the list of commits for the last created repository
564 564 """
565 565 return self._commit_ids
566 566
567 567 def create_master_repo(self, commits):
568 568 """
569 569 Create a repository and remember it as a template.
570 570
571 571 This allows to easily create derived repositories to construct
572 572 more complex scenarios for diff, compare and pull requests.
573 573
574 574 Returns a commit map which maps from commit message to raw_id.
575 575 """
576 576 self._master_repo = self.create_repo(commits=commits)
577 577 return self._commit_ids
578 578
579 579 def create_repo(
580 580 self, commits=None, number_of_commits=0, heads=None,
581 581 name_suffix=u'', **kwargs):
582 582 """
583 583 Create a repository and record it for later cleanup.
584 584
585 585 :param commits: Optional. A sequence of dict instances.
586 586 Will add a commit per entry to the new repository.
587 587 :param number_of_commits: Optional. If set to a number, this number of
588 588 commits will be added to the new repository.
589 589 :param heads: Optional. Can be set to a sequence of of commit
590 590 names which shall be pulled in from the master repository.
591 591
592 592 """
593 593 self.repo_name = self._next_repo_name() + name_suffix
594 594 repo = self._fixture.create_repo(
595 595 self.repo_name, repo_type=self.alias, **kwargs)
596 596 self._cleanup_repos.append(repo.repo_name)
597 597
598 598 commits = commits or [
599 599 {'message': 'Commit %s of %s' % (x, self.repo_name)}
600 600 for x in xrange(number_of_commits)]
601 601 self._add_commits_to_repo(repo.scm_instance(), commits)
602 602 if heads:
603 603 self.pull_heads(repo, heads)
604 604
605 605 return repo
606 606
607 607 def pull_heads(self, repo, heads):
608 608 """
609 609 Make sure that repo contains all commits mentioned in `heads`
610 610 """
611 611 vcsmaster = self._master_repo.scm_instance()
612 612 vcsrepo = repo.scm_instance()
613 613 vcsrepo.config.clear_section('hooks')
614 614 commit_ids = [self._commit_ids[h] for h in heads]
615 615 vcsrepo.pull(vcsmaster.path, commit_ids=commit_ids)
616 616
617 617 def create_fork(self):
618 618 repo_to_fork = self.repo_name
619 619 self.repo_name = self._next_repo_name()
620 620 repo = self._fixture.create_fork(repo_to_fork, self.repo_name)
621 621 self._cleanup_repos.append(self.repo_name)
622 622 return repo
623 623
624 624 def new_repo_name(self, suffix=u''):
625 625 self.repo_name = self._next_repo_name() + suffix
626 626 self._cleanup_repos.append(self.repo_name)
627 627 return self.repo_name
628 628
629 629 def _next_repo_name(self):
630 630 return u"%s_%s" % (
631 631 self.invalid_repo_name.sub(u'_', self._test_name),
632 632 len(self._cleanup_repos))
633 633
634 634 def ensure_file(self, filename, content='Test content\n'):
635 635 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
636 636 commits = [
637 637 {'added': [
638 638 FileNode(filename, content=content),
639 639 ]},
640 640 ]
641 641 self._add_commits_to_repo(self.repo.scm_instance(), commits)
642 642
643 643 def enable_downloads(self):
644 644 repo = self.repo
645 645 repo.enable_downloads = True
646 646 Session().add(repo)
647 647 Session().commit()
648 648
649 649 def cleanup(self):
650 650 for repo_name in reversed(self._cleanup_repos):
651 651 self._fixture.destroy_repo(repo_name)
652 652
653 653 def _add_commits_to_repo(self, repo, commits):
654 654 commit_ids = _add_commits_to_repo(repo, commits)
655 655 if not commit_ids:
656 656 return
657 657 self._commit_ids = commit_ids
658 658
659 659 # Creating refs for Git to allow fetching them from remote repository
660 660 if self.alias == 'git':
661 661 refs = {}
662 662 for message in self._commit_ids:
663 663 # TODO: mikhail: do more special chars replacements
664 664 ref_name = 'refs/test-refs/{}'.format(
665 665 message.replace(' ', ''))
666 666 refs[ref_name] = self._commit_ids[message]
667 667 self._create_refs(repo, refs)
668 668
669 669 def _create_refs(self, repo, refs):
670 670 for ref_name in refs:
671 671 repo.set_refs(ref_name, refs[ref_name])
672 672
673 673
674 674 @pytest.fixture
675 675 def vcsbackend(request, backend_alias, tests_tmp_path, pylonsapp, test_repo):
676 676 """
677 677 Parametrized fixture which represents a single vcs backend implementation.
678 678
679 679 See the fixture `backend` for more details. This one implements the same
680 680 concept, but on vcs level. So it does not provide model instances etc.
681 681
682 682 Parameters are generated dynamically, see :func:`pytest_generate_tests`
683 683 for how this works.
684 684 """
685 685 if backend_alias not in request.config.getoption('--backends'):
686 686 pytest.skip("Backend %s not selected." % (backend_alias, ))
687 687
688 688 utils.check_xfail_backends(request.node, backend_alias)
689 689 utils.check_skip_backends(request.node, backend_alias)
690 690
691 691 repo_name = 'vcs_test_%s' % (backend_alias, )
692 692 repo_path = os.path.join(tests_tmp_path, repo_name)
693 693 backend = VcsBackend(
694 694 alias=backend_alias,
695 695 repo_path=repo_path,
696 696 test_name=request.node.name,
697 697 test_repo_container=test_repo)
698 698 request.addfinalizer(backend.cleanup)
699 699 return backend
700 700
701 701
702 702 @pytest.fixture
703 703 def vcsbackend_git(request, tests_tmp_path, pylonsapp, test_repo):
704 704 return vcsbackend(request, 'git', tests_tmp_path, pylonsapp, test_repo)
705 705
706 706
707 707 @pytest.fixture
708 708 def vcsbackend_hg(request, tests_tmp_path, pylonsapp, test_repo):
709 709 return vcsbackend(request, 'hg', tests_tmp_path, pylonsapp, test_repo)
710 710
711 711
712 712 @pytest.fixture
713 713 def vcsbackend_svn(request, tests_tmp_path, pylonsapp, test_repo):
714 714 return vcsbackend(request, 'svn', tests_tmp_path, pylonsapp, test_repo)
715 715
716 716
717 717 @pytest.fixture
718 718 def vcsbackend_random(vcsbackend_git):
719 719 """
720 720 Use this to express that your tests need "a vcsbackend".
721 721
722 722 The fixture `vcsbackend` would run the test multiple times for each
723 723 available vcs backend which is a pure waste of time if the test is
724 724 independent of the vcs backend type.
725 725 """
726 726 # TODO: johbo: Change this to pick a random backend
727 727 return vcsbackend_git
728 728
729 729
730 730 @pytest.fixture
731 731 def vcsbackend_stub(vcsbackend_git):
732 732 """
733 733 Use this to express that your test just needs a stub of a vcsbackend.
734 734
735 735 Plan is to eventually implement an in-memory stub to speed tests up.
736 736 """
737 737 return vcsbackend_git
738 738
739 739
740 740 class VcsBackend(object):
741 741 """
742 742 Represents the test configuration for one supported vcs backend.
743 743 """
744 744
745 745 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
746 746
747 747 def __init__(self, alias, repo_path, test_name, test_repo_container):
748 748 self.alias = alias
749 749 self._repo_path = repo_path
750 750 self._cleanup_repos = []
751 751 self._test_name = test_name
752 752 self._test_repo_container = test_repo_container
753 753
754 754 def __getitem__(self, key):
755 755 return self._test_repo_container(key, self.alias).scm_instance()
756 756
757 757 @property
758 758 def repo(self):
759 759 """
760 760 Returns the "current" repository. This is the vcs_test repo of the last
761 761 repo which has been created.
762 762 """
763 763 Repository = get_backend(self.alias)
764 764 return Repository(self._repo_path)
765 765
766 766 @property
767 767 def backend(self):
768 768 """
769 769 Returns the backend implementation class.
770 770 """
771 771 return get_backend(self.alias)
772 772
773 773 def create_repo(self, commits=None, number_of_commits=0, _clone_repo=None):
774 774 repo_name = self._next_repo_name()
775 775 self._repo_path = get_new_dir(repo_name)
776 776 repo_class = get_backend(self.alias)
777 777 src_url = None
778 778 if _clone_repo:
779 779 src_url = _clone_repo.path
780 780 repo = repo_class(self._repo_path, create=True, src_url=src_url)
781 781 self._cleanup_repos.append(repo)
782 782
783 783 commits = commits or [
784 784 {'message': 'Commit %s of %s' % (x, repo_name)}
785 785 for x in xrange(number_of_commits)]
786 786 _add_commits_to_repo(repo, commits)
787 787 return repo
788 788
789 789 def clone_repo(self, repo):
790 790 return self.create_repo(_clone_repo=repo)
791 791
792 792 def cleanup(self):
793 793 for repo in self._cleanup_repos:
794 794 shutil.rmtree(repo.path)
795 795
796 796 def new_repo_path(self):
797 797 repo_name = self._next_repo_name()
798 798 self._repo_path = get_new_dir(repo_name)
799 799 return self._repo_path
800 800
801 801 def _next_repo_name(self):
802 802 return "%s_%s" % (
803 803 self.invalid_repo_name.sub('_', self._test_name),
804 804 len(self._cleanup_repos))
805 805
806 806 def add_file(self, repo, filename, content='Test content\n'):
807 807 imc = repo.in_memory_commit
808 808 imc.add(FileNode(filename, content=content))
809 809 imc.commit(
810 810 message=u'Automatic commit from vcsbackend fixture',
811 811 author=u'Automatic')
812 812
813 813 def ensure_file(self, filename, content='Test content\n'):
814 814 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
815 815 self.add_file(self.repo, filename, content)
816 816
817 817
818 818 def _add_commits_to_repo(vcs_repo, commits):
819 819 commit_ids = {}
820 820 if not commits:
821 821 return commit_ids
822 822
823 823 imc = vcs_repo.in_memory_commit
824 824 commit = None
825 825
826 826 for idx, commit in enumerate(commits):
827 827 message = unicode(commit.get('message', 'Commit %s' % idx))
828 828
829 829 for node in commit.get('added', []):
830 830 imc.add(FileNode(node.path, content=node.content))
831 831 for node in commit.get('changed', []):
832 832 imc.change(FileNode(node.path, content=node.content))
833 833 for node in commit.get('removed', []):
834 834 imc.remove(FileNode(node.path))
835 835
836 836 parents = [
837 837 vcs_repo.get_commit(commit_id=commit_ids[p])
838 838 for p in commit.get('parents', [])]
839 839
840 840 operations = ('added', 'changed', 'removed')
841 841 if not any((commit.get(o) for o in operations)):
842 842 imc.add(FileNode('file_%s' % idx, content=message))
843 843
844 844 commit = imc.commit(
845 845 message=message,
846 846 author=unicode(commit.get('author', 'Automatic')),
847 847 date=commit.get('date'),
848 848 branch=commit.get('branch'),
849 849 parents=parents)
850 850
851 851 commit_ids[commit.message] = commit.raw_id
852 852
853 853 return commit_ids
854 854
855 855
856 856 @pytest.fixture
857 857 def reposerver(request):
858 858 """
859 859 Allows to serve a backend repository
860 860 """
861 861
862 862 repo_server = RepoServer()
863 863 request.addfinalizer(repo_server.cleanup)
864 864 return repo_server
865 865
866 866
867 867 class RepoServer(object):
868 868 """
869 869 Utility to serve a local repository for the duration of a test case.
870 870
871 871 Supports only Subversion so far.
872 872 """
873 873
874 874 url = None
875 875
876 876 def __init__(self):
877 877 self._cleanup_servers = []
878 878
879 879 def serve(self, vcsrepo):
880 880 if vcsrepo.alias != 'svn':
881 881 raise TypeError("Backend %s not supported" % vcsrepo.alias)
882 882
883 883 proc = subprocess32.Popen(
884 884 ['svnserve', '-d', '--foreground', '--listen-host', 'localhost',
885 885 '--root', vcsrepo.path])
886 886 self._cleanup_servers.append(proc)
887 887 self.url = 'svn://localhost'
888 888
889 889 def cleanup(self):
890 890 for proc in self._cleanup_servers:
891 891 proc.terminate()
892 892
893 893
894 894 @pytest.fixture
895 895 def pr_util(backend, request, config_stub):
896 896 """
897 897 Utility for tests of models and for functional tests around pull requests.
898 898
899 899 It gives an instance of :class:`PRTestUtility` which provides various
900 900 utility methods around one pull request.
901 901
902 902 This fixture uses `backend` and inherits its parameterization.
903 903 """
904 904
905 905 util = PRTestUtility(backend)
906
907 @request.addfinalizer
908 def cleanup():
909 util.cleanup()
906 request.addfinalizer(util.cleanup)
910 907
911 908 return util
912 909
913 910
914 911 class PRTestUtility(object):
915 912
916 913 pull_request = None
917 914 pull_request_id = None
918 915 mergeable_patcher = None
919 916 mergeable_mock = None
920 917 notification_patcher = None
921 918
922 919 def __init__(self, backend):
923 920 self.backend = backend
924 921
925 922 def create_pull_request(
926 923 self, commits=None, target_head=None, source_head=None,
927 924 revisions=None, approved=False, author=None, mergeable=False,
928 925 enable_notifications=True, name_suffix=u'', reviewers=None,
929 926 title=u"Test", description=u"Description"):
930 927 self.set_mergeable(mergeable)
931 928 if not enable_notifications:
932 929 # mock notification side effect
933 930 self.notification_patcher = mock.patch(
934 931 'rhodecode.model.notification.NotificationModel.create')
935 932 self.notification_patcher.start()
936 933
937 934 if not self.pull_request:
938 935 if not commits:
939 936 commits = [
940 937 {'message': 'c1'},
941 938 {'message': 'c2'},
942 939 {'message': 'c3'},
943 940 ]
944 941 target_head = 'c1'
945 942 source_head = 'c2'
946 943 revisions = ['c2']
947 944
948 945 self.commit_ids = self.backend.create_master_repo(commits)
949 946 self.target_repository = self.backend.create_repo(
950 947 heads=[target_head], name_suffix=name_suffix)
951 948 self.source_repository = self.backend.create_repo(
952 949 heads=[source_head], name_suffix=name_suffix)
953 950 self.author = author or UserModel().get_by_username(
954 951 TEST_USER_ADMIN_LOGIN)
955 952
956 953 model = PullRequestModel()
957 954 self.create_parameters = {
958 955 'created_by': self.author,
959 956 'source_repo': self.source_repository.repo_name,
960 957 'source_ref': self._default_branch_reference(source_head),
961 958 'target_repo': self.target_repository.repo_name,
962 959 'target_ref': self._default_branch_reference(target_head),
963 960 'revisions': [self.commit_ids[r] for r in revisions],
964 961 'reviewers': reviewers or self._get_reviewers(),
965 962 'title': title,
966 963 'description': description,
967 964 }
968 965 self.pull_request = model.create(**self.create_parameters)
969 966 assert model.get_versions(self.pull_request) == []
970 967
971 968 self.pull_request_id = self.pull_request.pull_request_id
972 969
973 970 if approved:
974 971 self.approve()
975 972
976 973 Session().add(self.pull_request)
977 974 Session().commit()
978 975
979 976 return self.pull_request
980 977
981 978 def approve(self):
982 979 self.create_status_votes(
983 980 ChangesetStatus.STATUS_APPROVED,
984 981 *self.pull_request.reviewers)
985 982
986 983 def close(self):
987 984 PullRequestModel().close_pull_request(self.pull_request, self.author)
988 985
989 986 def _default_branch_reference(self, commit_message):
990 987 reference = '%s:%s:%s' % (
991 988 'branch',
992 989 self.backend.default_branch_name,
993 990 self.commit_ids[commit_message])
994 991 return reference
995 992
996 993 def _get_reviewers(self):
997 994 return [
998 995 (TEST_USER_REGULAR_LOGIN, ['default1'], False),
999 996 (TEST_USER_REGULAR2_LOGIN, ['default2'], False),
1000 997 ]
1001 998
1002 999 def update_source_repository(self, head=None):
1003 1000 heads = [head or 'c3']
1004 1001 self.backend.pull_heads(self.source_repository, heads=heads)
1005 1002
1006 1003 def add_one_commit(self, head=None):
1007 1004 self.update_source_repository(head=head)
1008 1005 old_commit_ids = set(self.pull_request.revisions)
1009 1006 PullRequestModel().update_commits(self.pull_request)
1010 1007 commit_ids = set(self.pull_request.revisions)
1011 1008 new_commit_ids = commit_ids - old_commit_ids
1012 1009 assert len(new_commit_ids) == 1
1013 1010 return new_commit_ids.pop()
1014 1011
1015 1012 def remove_one_commit(self):
1016 1013 assert len(self.pull_request.revisions) == 2
1017 1014 source_vcs = self.source_repository.scm_instance()
1018 1015 removed_commit_id = source_vcs.commit_ids[-1]
1019 1016
1020 1017 # TODO: johbo: Git and Mercurial have an inconsistent vcs api here,
1021 1018 # remove the if once that's sorted out.
1022 1019 if self.backend.alias == "git":
1023 1020 kwargs = {'branch_name': self.backend.default_branch_name}
1024 1021 else:
1025 1022 kwargs = {}
1026 1023 source_vcs.strip(removed_commit_id, **kwargs)
1027 1024
1028 1025 PullRequestModel().update_commits(self.pull_request)
1029 1026 assert len(self.pull_request.revisions) == 1
1030 1027 return removed_commit_id
1031 1028
1032 1029 def create_comment(self, linked_to=None):
1033 1030 comment = CommentsModel().create(
1034 1031 text=u"Test comment",
1035 1032 repo=self.target_repository.repo_name,
1036 1033 user=self.author,
1037 1034 pull_request=self.pull_request)
1038 1035 assert comment.pull_request_version_id is None
1039 1036
1040 1037 if linked_to:
1041 1038 PullRequestModel()._link_comments_to_version(linked_to)
1042 1039
1043 1040 return comment
1044 1041
1045 1042 def create_inline_comment(
1046 1043 self, linked_to=None, line_no=u'n1', file_path='file_1'):
1047 1044 comment = CommentsModel().create(
1048 1045 text=u"Test comment",
1049 1046 repo=self.target_repository.repo_name,
1050 1047 user=self.author,
1051 1048 line_no=line_no,
1052 1049 f_path=file_path,
1053 1050 pull_request=self.pull_request)
1054 1051 assert comment.pull_request_version_id is None
1055 1052
1056 1053 if linked_to:
1057 1054 PullRequestModel()._link_comments_to_version(linked_to)
1058 1055
1059 1056 return comment
1060 1057
1061 1058 def create_version_of_pull_request(self):
1062 1059 pull_request = self.create_pull_request()
1063 1060 version = PullRequestModel()._create_version_from_snapshot(
1064 1061 pull_request)
1065 1062 return version
1066 1063
1067 1064 def create_status_votes(self, status, *reviewers):
1068 1065 for reviewer in reviewers:
1069 1066 ChangesetStatusModel().set_status(
1070 1067 repo=self.pull_request.target_repo,
1071 1068 status=status,
1072 1069 user=reviewer.user_id,
1073 1070 pull_request=self.pull_request)
1074 1071
1075 1072 def set_mergeable(self, value):
1076 1073 if not self.mergeable_patcher:
1077 1074 self.mergeable_patcher = mock.patch.object(
1078 1075 VcsSettingsModel, 'get_general_settings')
1079 1076 self.mergeable_mock = self.mergeable_patcher.start()
1080 1077 self.mergeable_mock.return_value = {
1081 1078 'rhodecode_pr_merge_enabled': value}
1082 1079
1083 1080 def cleanup(self):
1084 1081 # In case the source repository is already cleaned up, the pull
1085 1082 # request will already be deleted.
1086 1083 pull_request = PullRequest().get(self.pull_request_id)
1087 1084 if pull_request:
1088 1085 PullRequestModel().delete(pull_request, pull_request.author)
1089 1086 Session().commit()
1090 1087
1091 1088 if self.notification_patcher:
1092 1089 self.notification_patcher.stop()
1093 1090
1094 1091 if self.mergeable_patcher:
1095 1092 self.mergeable_patcher.stop()
1096 1093
1097 1094
1098 1095 @pytest.fixture
1099 1096 def user_admin(pylonsapp):
1100 1097 """
1101 1098 Provides the default admin test user as an instance of `db.User`.
1102 1099 """
1103 1100 user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
1104 1101 return user
1105 1102
1106 1103
1107 1104 @pytest.fixture
1108 1105 def user_regular(pylonsapp):
1109 1106 """
1110 1107 Provides the default regular test user as an instance of `db.User`.
1111 1108 """
1112 1109 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1113 1110 return user
1114 1111
1115 1112
1116 1113 @pytest.fixture
1117 1114 def user_util(request, pylonsapp):
1118 1115 """
1119 1116 Provides a wired instance of `UserUtility` with integrated cleanup.
1120 1117 """
1121 1118 utility = UserUtility(test_name=request.node.name)
1122 1119 request.addfinalizer(utility.cleanup)
1123 1120 return utility
1124 1121
1125 1122
1126 1123 # TODO: johbo: Split this up into utilities per domain or something similar
1127 1124 class UserUtility(object):
1128 1125
1129 1126 def __init__(self, test_name="test"):
1130 1127 self._test_name = self._sanitize_name(test_name)
1131 1128 self.fixture = Fixture()
1132 1129 self.repo_group_ids = []
1133 1130 self.repos_ids = []
1134 1131 self.user_ids = []
1135 1132 self.user_group_ids = []
1136 1133 self.user_repo_permission_ids = []
1137 1134 self.user_group_repo_permission_ids = []
1138 1135 self.user_repo_group_permission_ids = []
1139 1136 self.user_group_repo_group_permission_ids = []
1140 1137 self.user_user_group_permission_ids = []
1141 1138 self.user_group_user_group_permission_ids = []
1142 1139 self.user_permissions = []
1143 1140
1144 1141 def _sanitize_name(self, name):
1145 1142 for char in ['[', ']']:
1146 1143 name = name.replace(char, '_')
1147 1144 return name
1148 1145
1149 1146 def create_repo_group(
1150 1147 self, owner=TEST_USER_ADMIN_LOGIN, auto_cleanup=True):
1151 1148 group_name = "{prefix}_repogroup_{count}".format(
1152 1149 prefix=self._test_name,
1153 1150 count=len(self.repo_group_ids))
1154 1151 repo_group = self.fixture.create_repo_group(
1155 1152 group_name, cur_user=owner)
1156 1153 if auto_cleanup:
1157 1154 self.repo_group_ids.append(repo_group.group_id)
1158 1155 return repo_group
1159 1156
1160 1157 def create_repo(self, owner=TEST_USER_ADMIN_LOGIN, parent=None,
1161 1158 auto_cleanup=True, repo_type='hg'):
1162 1159 repo_name = "{prefix}_repository_{count}".format(
1163 1160 prefix=self._test_name,
1164 1161 count=len(self.repos_ids))
1165 1162
1166 1163 repository = self.fixture.create_repo(
1167 1164 repo_name, cur_user=owner, repo_group=parent, repo_type=repo_type)
1168 1165 if auto_cleanup:
1169 1166 self.repos_ids.append(repository.repo_id)
1170 1167 return repository
1171 1168
1172 1169 def create_user(self, auto_cleanup=True, **kwargs):
1173 1170 user_name = "{prefix}_user_{count}".format(
1174 1171 prefix=self._test_name,
1175 1172 count=len(self.user_ids))
1176 1173 user = self.fixture.create_user(user_name, **kwargs)
1177 1174 if auto_cleanup:
1178 1175 self.user_ids.append(user.user_id)
1179 1176 return user
1180 1177
1181 1178 def create_user_with_group(self):
1182 1179 user = self.create_user()
1183 1180 user_group = self.create_user_group(members=[user])
1184 1181 return user, user_group
1185 1182
1186 1183 def create_user_group(self, owner=TEST_USER_ADMIN_LOGIN, members=None,
1187 1184 auto_cleanup=True, **kwargs):
1188 1185 group_name = "{prefix}_usergroup_{count}".format(
1189 1186 prefix=self._test_name,
1190 1187 count=len(self.user_group_ids))
1191 1188 user_group = self.fixture.create_user_group(
1192 1189 group_name, cur_user=owner, **kwargs)
1193 1190
1194 1191 if auto_cleanup:
1195 1192 self.user_group_ids.append(user_group.users_group_id)
1196 1193 if members:
1197 1194 for user in members:
1198 1195 UserGroupModel().add_user_to_group(user_group, user)
1199 1196 return user_group
1200 1197
1201 1198 def grant_user_permission(self, user_name, permission_name):
1202 1199 self._inherit_default_user_permissions(user_name, False)
1203 1200 self.user_permissions.append((user_name, permission_name))
1204 1201
1205 1202 def grant_user_permission_to_repo_group(
1206 1203 self, repo_group, user, permission_name):
1207 1204 permission = RepoGroupModel().grant_user_permission(
1208 1205 repo_group, user, permission_name)
1209 1206 self.user_repo_group_permission_ids.append(
1210 1207 (repo_group.group_id, user.user_id))
1211 1208 return permission
1212 1209
1213 1210 def grant_user_group_permission_to_repo_group(
1214 1211 self, repo_group, user_group, permission_name):
1215 1212 permission = RepoGroupModel().grant_user_group_permission(
1216 1213 repo_group, user_group, permission_name)
1217 1214 self.user_group_repo_group_permission_ids.append(
1218 1215 (repo_group.group_id, user_group.users_group_id))
1219 1216 return permission
1220 1217
1221 1218 def grant_user_permission_to_repo(
1222 1219 self, repo, user, permission_name):
1223 1220 permission = RepoModel().grant_user_permission(
1224 1221 repo, user, permission_name)
1225 1222 self.user_repo_permission_ids.append(
1226 1223 (repo.repo_id, user.user_id))
1227 1224 return permission
1228 1225
1229 1226 def grant_user_group_permission_to_repo(
1230 1227 self, repo, user_group, permission_name):
1231 1228 permission = RepoModel().grant_user_group_permission(
1232 1229 repo, user_group, permission_name)
1233 1230 self.user_group_repo_permission_ids.append(
1234 1231 (repo.repo_id, user_group.users_group_id))
1235 1232 return permission
1236 1233
1237 1234 def grant_user_permission_to_user_group(
1238 1235 self, target_user_group, user, permission_name):
1239 1236 permission = UserGroupModel().grant_user_permission(
1240 1237 target_user_group, user, permission_name)
1241 1238 self.user_user_group_permission_ids.append(
1242 1239 (target_user_group.users_group_id, user.user_id))
1243 1240 return permission
1244 1241
1245 1242 def grant_user_group_permission_to_user_group(
1246 1243 self, target_user_group, user_group, permission_name):
1247 1244 permission = UserGroupModel().grant_user_group_permission(
1248 1245 target_user_group, user_group, permission_name)
1249 1246 self.user_group_user_group_permission_ids.append(
1250 1247 (target_user_group.users_group_id, user_group.users_group_id))
1251 1248 return permission
1252 1249
1253 1250 def revoke_user_permission(self, user_name, permission_name):
1254 1251 self._inherit_default_user_permissions(user_name, True)
1255 1252 UserModel().revoke_perm(user_name, permission_name)
1256 1253
1257 1254 def _inherit_default_user_permissions(self, user_name, value):
1258 1255 user = UserModel().get_by_username(user_name)
1259 1256 user.inherit_default_permissions = value
1260 1257 Session().add(user)
1261 1258 Session().commit()
1262 1259
1263 1260 def cleanup(self):
1264 1261 self._cleanup_permissions()
1265 1262 self._cleanup_repos()
1266 1263 self._cleanup_repo_groups()
1267 1264 self._cleanup_user_groups()
1268 1265 self._cleanup_users()
1269 1266
1270 1267 def _cleanup_permissions(self):
1271 1268 if self.user_permissions:
1272 1269 for user_name, permission_name in self.user_permissions:
1273 1270 self.revoke_user_permission(user_name, permission_name)
1274 1271
1275 1272 for permission in self.user_repo_permission_ids:
1276 1273 RepoModel().revoke_user_permission(*permission)
1277 1274
1278 1275 for permission in self.user_group_repo_permission_ids:
1279 1276 RepoModel().revoke_user_group_permission(*permission)
1280 1277
1281 1278 for permission in self.user_repo_group_permission_ids:
1282 1279 RepoGroupModel().revoke_user_permission(*permission)
1283 1280
1284 1281 for permission in self.user_group_repo_group_permission_ids:
1285 1282 RepoGroupModel().revoke_user_group_permission(*permission)
1286 1283
1287 1284 for permission in self.user_user_group_permission_ids:
1288 1285 UserGroupModel().revoke_user_permission(*permission)
1289 1286
1290 1287 for permission in self.user_group_user_group_permission_ids:
1291 1288 UserGroupModel().revoke_user_group_permission(*permission)
1292 1289
1293 1290 def _cleanup_repo_groups(self):
1294 1291 def _repo_group_compare(first_group_id, second_group_id):
1295 1292 """
1296 1293 Gives higher priority to the groups with the most complex paths
1297 1294 """
1298 1295 first_group = RepoGroup.get(first_group_id)
1299 1296 second_group = RepoGroup.get(second_group_id)
1300 1297 first_group_parts = (
1301 1298 len(first_group.group_name.split('/')) if first_group else 0)
1302 1299 second_group_parts = (
1303 1300 len(second_group.group_name.split('/')) if second_group else 0)
1304 1301 return cmp(second_group_parts, first_group_parts)
1305 1302
1306 1303 sorted_repo_group_ids = sorted(
1307 1304 self.repo_group_ids, cmp=_repo_group_compare)
1308 1305 for repo_group_id in sorted_repo_group_ids:
1309 1306 self.fixture.destroy_repo_group(repo_group_id)
1310 1307
1311 1308 def _cleanup_repos(self):
1312 1309 sorted_repos_ids = sorted(self.repos_ids)
1313 1310 for repo_id in sorted_repos_ids:
1314 1311 self.fixture.destroy_repo(repo_id)
1315 1312
1316 1313 def _cleanup_user_groups(self):
1317 1314 def _user_group_compare(first_group_id, second_group_id):
1318 1315 """
1319 1316 Gives higher priority to the groups with the most complex paths
1320 1317 """
1321 1318 first_group = UserGroup.get(first_group_id)
1322 1319 second_group = UserGroup.get(second_group_id)
1323 1320 first_group_parts = (
1324 1321 len(first_group.users_group_name.split('/'))
1325 1322 if first_group else 0)
1326 1323 second_group_parts = (
1327 1324 len(second_group.users_group_name.split('/'))
1328 1325 if second_group else 0)
1329 1326 return cmp(second_group_parts, first_group_parts)
1330 1327
1331 1328 sorted_user_group_ids = sorted(
1332 1329 self.user_group_ids, cmp=_user_group_compare)
1333 1330 for user_group_id in sorted_user_group_ids:
1334 1331 self.fixture.destroy_user_group(user_group_id)
1335 1332
1336 1333 def _cleanup_users(self):
1337 1334 for user_id in self.user_ids:
1338 1335 self.fixture.destroy_user(user_id)
1339 1336
1340 1337
1341 1338 # TODO: Think about moving this into a pytest-pyro package and make it a
1342 1339 # pytest plugin
1343 1340 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
1344 1341 def pytest_runtest_makereport(item, call):
1345 1342 """
1346 1343 Adding the remote traceback if the exception has this information.
1347 1344
1348 1345 VCSServer attaches this information as the attribute `_vcs_server_traceback`
1349 1346 to the exception instance.
1350 1347 """
1351 1348 outcome = yield
1352 1349 report = outcome.get_result()
1353 1350 if call.excinfo:
1354 1351 _add_vcsserver_remote_traceback(report, call.excinfo.value)
1355 1352
1356 1353
1357 1354 def _add_vcsserver_remote_traceback(report, exc):
1358 1355 vcsserver_traceback = getattr(exc, '_vcs_server_traceback', None)
1359 1356
1360 1357 if vcsserver_traceback:
1361 1358 section = 'VCSServer remote traceback ' + report.when
1362 1359 report.sections.append((section, vcsserver_traceback))
1363 1360
1364 1361
1365 1362 @pytest.fixture(scope='session')
1366 1363 def testrun():
1367 1364 return {
1368 1365 'uuid': uuid.uuid4(),
1369 1366 'start': datetime.datetime.utcnow().isoformat(),
1370 1367 'timestamp': int(time.time()),
1371 1368 }
1372 1369
1373 1370
1374 1371 @pytest.fixture(autouse=True)
1375 1372 def collect_appenlight_stats(request, testrun):
1376 1373 """
1377 1374 This fixture reports memory consumtion of single tests.
1378 1375
1379 1376 It gathers data based on `psutil` and sends them to Appenlight. The option
1380 1377 ``--ae`` has te be used to enable this fixture and the API key for your
1381 1378 application has to be provided in ``--ae-key``.
1382 1379 """
1383 1380 try:
1384 1381 # cygwin cannot have yet psutil support.
1385 1382 import psutil
1386 1383 except ImportError:
1387 1384 return
1388 1385
1389 1386 if not request.config.getoption('--appenlight'):
1390 1387 return
1391 1388 else:
1392 1389 # Only request the pylonsapp fixture if appenlight tracking is
1393 1390 # enabled. This will speed up a test run of unit tests by 2 to 3
1394 1391 # seconds if appenlight is not enabled.
1395 1392 pylonsapp = request.getfuncargvalue("pylonsapp")
1396 1393 url = '{}/api/logs'.format(request.config.getoption('--appenlight-url'))
1397 1394 client = AppenlightClient(
1398 1395 url=url,
1399 1396 api_key=request.config.getoption('--appenlight-api-key'),
1400 1397 namespace=request.node.nodeid,
1401 1398 request=str(testrun['uuid']),
1402 1399 testrun=testrun)
1403 1400
1404 1401 client.collect({
1405 1402 'message': "Starting",
1406 1403 })
1407 1404
1408 1405 server_and_port = pylonsapp.config['vcs.server']
1409 1406 protocol = pylonsapp.config['vcs.server.protocol']
1410 1407 server = create_vcsserver_proxy(server_and_port, protocol)
1411 1408 with server:
1412 1409 vcs_pid = server.get_pid()
1413 1410 server.run_gc()
1414 1411 vcs_process = psutil.Process(vcs_pid)
1415 1412 mem = vcs_process.memory_info()
1416 1413 client.tag_before('vcsserver.rss', mem.rss)
1417 1414 client.tag_before('vcsserver.vms', mem.vms)
1418 1415
1419 1416 test_process = psutil.Process()
1420 1417 mem = test_process.memory_info()
1421 1418 client.tag_before('test.rss', mem.rss)
1422 1419 client.tag_before('test.vms', mem.vms)
1423 1420
1424 1421 client.tag_before('time', time.time())
1425 1422
1426 1423 @request.addfinalizer
1427 1424 def send_stats():
1428 1425 client.tag_after('time', time.time())
1429 1426 with server:
1430 1427 gc_stats = server.run_gc()
1431 1428 for tag, value in gc_stats.items():
1432 1429 client.tag_after(tag, value)
1433 1430 mem = vcs_process.memory_info()
1434 1431 client.tag_after('vcsserver.rss', mem.rss)
1435 1432 client.tag_after('vcsserver.vms', mem.vms)
1436 1433
1437 1434 mem = test_process.memory_info()
1438 1435 client.tag_after('test.rss', mem.rss)
1439 1436 client.tag_after('test.vms', mem.vms)
1440 1437
1441 1438 client.collect({
1442 1439 'message': "Finished",
1443 1440 })
1444 1441 client.send_stats()
1445 1442
1446 1443 return client
1447 1444
1448 1445
1449 1446 class AppenlightClient():
1450 1447
1451 1448 url_template = '{url}?protocol_version=0.5'
1452 1449
1453 1450 def __init__(
1454 1451 self, url, api_key, add_server=True, add_timestamp=True,
1455 1452 namespace=None, request=None, testrun=None):
1456 1453 self.url = self.url_template.format(url=url)
1457 1454 self.api_key = api_key
1458 1455 self.add_server = add_server
1459 1456 self.add_timestamp = add_timestamp
1460 1457 self.namespace = namespace
1461 1458 self.request = request
1462 1459 self.server = socket.getfqdn(socket.gethostname())
1463 1460 self.tags_before = {}
1464 1461 self.tags_after = {}
1465 1462 self.stats = []
1466 1463 self.testrun = testrun or {}
1467 1464
1468 1465 def tag_before(self, tag, value):
1469 1466 self.tags_before[tag] = value
1470 1467
1471 1468 def tag_after(self, tag, value):
1472 1469 self.tags_after[tag] = value
1473 1470
1474 1471 def collect(self, data):
1475 1472 if self.add_server:
1476 1473 data.setdefault('server', self.server)
1477 1474 if self.add_timestamp:
1478 1475 data.setdefault('date', datetime.datetime.utcnow().isoformat())
1479 1476 if self.namespace:
1480 1477 data.setdefault('namespace', self.namespace)
1481 1478 if self.request:
1482 1479 data.setdefault('request', self.request)
1483 1480 self.stats.append(data)
1484 1481
1485 1482 def send_stats(self):
1486 1483 tags = [
1487 1484 ('testrun', self.request),
1488 1485 ('testrun.start', self.testrun['start']),
1489 1486 ('testrun.timestamp', self.testrun['timestamp']),
1490 1487 ('test', self.namespace),
1491 1488 ]
1492 1489 for key, value in self.tags_before.items():
1493 1490 tags.append((key + '.before', value))
1494 1491 try:
1495 1492 delta = self.tags_after[key] - value
1496 1493 tags.append((key + '.delta', delta))
1497 1494 except Exception:
1498 1495 pass
1499 1496 for key, value in self.tags_after.items():
1500 1497 tags.append((key + '.after', value))
1501 1498 self.collect({
1502 1499 'message': "Collected tags",
1503 1500 'tags': tags,
1504 1501 })
1505 1502
1506 1503 response = requests.post(
1507 1504 self.url,
1508 1505 headers={
1509 1506 'X-appenlight-api-key': self.api_key},
1510 1507 json=self.stats,
1511 1508 )
1512 1509
1513 1510 if not response.status_code == 200:
1514 1511 pprint.pprint(self.stats)
1515 1512 print response.headers
1516 1513 print response.text
1517 1514 raise Exception('Sending to appenlight failed')
1518 1515
1519 1516
1520 1517 @pytest.fixture
1521 1518 def gist_util(request, pylonsapp):
1522 1519 """
1523 1520 Provides a wired instance of `GistUtility` with integrated cleanup.
1524 1521 """
1525 1522 utility = GistUtility()
1526 1523 request.addfinalizer(utility.cleanup)
1527 1524 return utility
1528 1525
1529 1526
1530 1527 class GistUtility(object):
1531 1528 def __init__(self):
1532 1529 self.fixture = Fixture()
1533 1530 self.gist_ids = []
1534 1531
1535 1532 def create_gist(self, **kwargs):
1536 1533 gist = self.fixture.create_gist(**kwargs)
1537 1534 self.gist_ids.append(gist.gist_id)
1538 1535 return gist
1539 1536
1540 1537 def cleanup(self):
1541 1538 for id_ in self.gist_ids:
1542 1539 self.fixture.destroy_gists(str(id_))
1543 1540
1544 1541
1545 1542 @pytest.fixture
1546 1543 def enabled_backends(request):
1547 1544 backends = request.config.option.backends
1548 1545 return backends[:]
1549 1546
1550 1547
1551 1548 @pytest.fixture
1552 1549 def settings_util(request):
1553 1550 """
1554 1551 Provides a wired instance of `SettingsUtility` with integrated cleanup.
1555 1552 """
1556 1553 utility = SettingsUtility()
1557 1554 request.addfinalizer(utility.cleanup)
1558 1555 return utility
1559 1556
1560 1557
1561 1558 class SettingsUtility(object):
1562 1559 def __init__(self):
1563 1560 self.rhodecode_ui_ids = []
1564 1561 self.rhodecode_setting_ids = []
1565 1562 self.repo_rhodecode_ui_ids = []
1566 1563 self.repo_rhodecode_setting_ids = []
1567 1564
1568 1565 def create_repo_rhodecode_ui(
1569 1566 self, repo, section, value, key=None, active=True, cleanup=True):
1570 1567 key = key or hashlib.sha1(
1571 1568 '{}{}{}'.format(section, value, repo.repo_id)).hexdigest()
1572 1569
1573 1570 setting = RepoRhodeCodeUi()
1574 1571 setting.repository_id = repo.repo_id
1575 1572 setting.ui_section = section
1576 1573 setting.ui_value = value
1577 1574 setting.ui_key = key
1578 1575 setting.ui_active = active
1579 1576 Session().add(setting)
1580 1577 Session().commit()
1581 1578
1582 1579 if cleanup:
1583 1580 self.repo_rhodecode_ui_ids.append(setting.ui_id)
1584 1581 return setting
1585 1582
1586 1583 def create_rhodecode_ui(
1587 1584 self, section, value, key=None, active=True, cleanup=True):
1588 1585 key = key or hashlib.sha1('{}{}'.format(section, value)).hexdigest()
1589 1586
1590 1587 setting = RhodeCodeUi()
1591 1588 setting.ui_section = section
1592 1589 setting.ui_value = value
1593 1590 setting.ui_key = key
1594 1591 setting.ui_active = active
1595 1592 Session().add(setting)
1596 1593 Session().commit()
1597 1594
1598 1595 if cleanup:
1599 1596 self.rhodecode_ui_ids.append(setting.ui_id)
1600 1597 return setting
1601 1598
1602 1599 def create_repo_rhodecode_setting(
1603 1600 self, repo, name, value, type_, cleanup=True):
1604 1601 setting = RepoRhodeCodeSetting(
1605 1602 repo.repo_id, key=name, val=value, type=type_)
1606 1603 Session().add(setting)
1607 1604 Session().commit()
1608 1605
1609 1606 if cleanup:
1610 1607 self.repo_rhodecode_setting_ids.append(setting.app_settings_id)
1611 1608 return setting
1612 1609
1613 1610 def create_rhodecode_setting(self, name, value, type_, cleanup=True):
1614 1611 setting = RhodeCodeSetting(key=name, val=value, type=type_)
1615 1612 Session().add(setting)
1616 1613 Session().commit()
1617 1614
1618 1615 if cleanup:
1619 1616 self.rhodecode_setting_ids.append(setting.app_settings_id)
1620 1617
1621 1618 return setting
1622 1619
1623 1620 def cleanup(self):
1624 1621 for id_ in self.rhodecode_ui_ids:
1625 1622 setting = RhodeCodeUi.get(id_)
1626 1623 Session().delete(setting)
1627 1624
1628 1625 for id_ in self.rhodecode_setting_ids:
1629 1626 setting = RhodeCodeSetting.get(id_)
1630 1627 Session().delete(setting)
1631 1628
1632 1629 for id_ in self.repo_rhodecode_ui_ids:
1633 1630 setting = RepoRhodeCodeUi.get(id_)
1634 1631 Session().delete(setting)
1635 1632
1636 1633 for id_ in self.repo_rhodecode_setting_ids:
1637 1634 setting = RepoRhodeCodeSetting.get(id_)
1638 1635 Session().delete(setting)
1639 1636
1640 1637 Session().commit()
1641 1638
1642 1639
1643 1640 @pytest.fixture
1644 1641 def no_notifications(request):
1645 1642 notification_patcher = mock.patch(
1646 1643 'rhodecode.model.notification.NotificationModel.create')
1647 1644 notification_patcher.start()
1648 1645 request.addfinalizer(notification_patcher.stop)
1649 1646
1650 1647
1651 1648 @pytest.fixture(scope='session')
1652 1649 def repeat(request):
1653 1650 """
1654 1651 The number of repetitions is based on this fixture.
1655 1652
1656 1653 Slower calls may divide it by 10 or 100. It is chosen in a way so that the
1657 1654 tests are not too slow in our default test suite.
1658 1655 """
1659 1656 return request.config.getoption('--repeat')
1660 1657
1661 1658
1662 1659 @pytest.fixture
1663 1660 def rhodecode_fixtures():
1664 1661 return Fixture()
1665 1662
1666 1663
1667 1664 @pytest.fixture
1668 1665 def context_stub():
1669 1666 """
1670 1667 Stub context object.
1671 1668 """
1672 1669 context = pyramid.testing.DummyResource()
1673 1670 return context
1674 1671
1675 1672
1676 1673 @pytest.fixture
1677 1674 def request_stub():
1678 1675 """
1679 1676 Stub request object.
1680 1677 """
1681 1678 from rhodecode.lib.base import bootstrap_request
1682 1679 request = bootstrap_request(scheme='https')
1683 1680 return request
1684 1681
1685 1682
1686 1683 @pytest.fixture
1687 1684 def config_stub(request, request_stub):
1688 1685 """
1689 1686 Set up pyramid.testing and return the Configurator.
1690 1687 """
1691 1688 from rhodecode.lib.base import bootstrap_config
1692 1689 config = bootstrap_config(request=request_stub)
1693 1690
1694 1691 @request.addfinalizer
1695 1692 def cleanup():
1696 1693 pyramid.testing.tearDown()
1697 1694
1698 1695 return config
1699 1696
1700 1697
1701 1698 @pytest.fixture
1702 1699 def StubIntegrationType():
1703 1700 class _StubIntegrationType(IntegrationTypeBase):
1704 1701 """ Test integration type class """
1705 1702
1706 1703 key = 'test'
1707 1704 display_name = 'Test integration type'
1708 1705 description = 'A test integration type for testing'
1709 1706 icon = 'test_icon_html_image'
1710 1707
1711 1708 def __init__(self, settings):
1712 1709 super(_StubIntegrationType, self).__init__(settings)
1713 1710 self.sent_events = [] # for testing
1714 1711
1715 1712 def send_event(self, event):
1716 1713 self.sent_events.append(event)
1717 1714
1718 1715 def settings_schema(self):
1719 1716 class SettingsSchema(colander.Schema):
1720 1717 test_string_field = colander.SchemaNode(
1721 1718 colander.String(),
1722 1719 missing=colander.required,
1723 1720 title='test string field',
1724 1721 )
1725 1722 test_int_field = colander.SchemaNode(
1726 1723 colander.Int(),
1727 1724 title='some integer setting',
1728 1725 )
1729 1726 return SettingsSchema()
1730 1727
1731 1728
1732 1729 integration_type_registry.register_integration_type(_StubIntegrationType)
1733 1730 return _StubIntegrationType
1734 1731
1735 1732 @pytest.fixture
1736 1733 def stub_integration_settings():
1737 1734 return {
1738 1735 'test_string_field': 'some data',
1739 1736 'test_int_field': 100,
1740 1737 }
1741 1738
1742 1739
1743 1740 @pytest.fixture
1744 1741 def repo_integration_stub(request, repo_stub, StubIntegrationType,
1745 1742 stub_integration_settings):
1746 1743 integration = IntegrationModel().create(
1747 1744 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1748 1745 name='test repo integration',
1749 1746 repo=repo_stub, repo_group=None, child_repos_only=None)
1750 1747
1751 1748 @request.addfinalizer
1752 1749 def cleanup():
1753 1750 IntegrationModel().delete(integration)
1754 1751
1755 1752 return integration
1756 1753
1757 1754
1758 1755 @pytest.fixture
1759 1756 def repogroup_integration_stub(request, test_repo_group, StubIntegrationType,
1760 1757 stub_integration_settings):
1761 1758 integration = IntegrationModel().create(
1762 1759 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1763 1760 name='test repogroup integration',
1764 1761 repo=None, repo_group=test_repo_group, child_repos_only=True)
1765 1762
1766 1763 @request.addfinalizer
1767 1764 def cleanup():
1768 1765 IntegrationModel().delete(integration)
1769 1766
1770 1767 return integration
1771 1768
1772 1769
1773 1770 @pytest.fixture
1774 1771 def repogroup_recursive_integration_stub(request, test_repo_group,
1775 1772 StubIntegrationType, stub_integration_settings):
1776 1773 integration = IntegrationModel().create(
1777 1774 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1778 1775 name='test recursive repogroup integration',
1779 1776 repo=None, repo_group=test_repo_group, child_repos_only=False)
1780 1777
1781 1778 @request.addfinalizer
1782 1779 def cleanup():
1783 1780 IntegrationModel().delete(integration)
1784 1781
1785 1782 return integration
1786 1783
1787 1784
1788 1785 @pytest.fixture
1789 1786 def global_integration_stub(request, StubIntegrationType,
1790 1787 stub_integration_settings):
1791 1788 integration = IntegrationModel().create(
1792 1789 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1793 1790 name='test global integration',
1794 1791 repo=None, repo_group=None, child_repos_only=None)
1795 1792
1796 1793 @request.addfinalizer
1797 1794 def cleanup():
1798 1795 IntegrationModel().delete(integration)
1799 1796
1800 1797 return integration
1801 1798
1802 1799
1803 1800 @pytest.fixture
1804 1801 def root_repos_integration_stub(request, StubIntegrationType,
1805 1802 stub_integration_settings):
1806 1803 integration = IntegrationModel().create(
1807 1804 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1808 1805 name='test global integration',
1809 1806 repo=None, repo_group=None, child_repos_only=True)
1810 1807
1811 1808 @request.addfinalizer
1812 1809 def cleanup():
1813 1810 IntegrationModel().delete(integration)
1814 1811
1815 1812 return integration
1816 1813
1817 1814
1818 1815 @pytest.fixture
1819 1816 def local_dt_to_utc():
1820 1817 def _factory(dt):
1821 1818 return dt.replace(tzinfo=dateutil.tz.tzlocal()).astimezone(
1822 1819 dateutil.tz.tzutc()).replace(tzinfo=None)
1823 1820 return _factory
1824 1821
1825 1822
1826 1823 @pytest.fixture
1827 1824 def disable_anonymous_user(request, pylonsapp):
1828 1825 set_anonymous_access(False)
1829 1826
1830 1827 @request.addfinalizer
1831 1828 def cleanup():
1832 1829 set_anonymous_access(True)
1833 1830
1834 1831
1835 1832 @pytest.fixture
1836 1833 def rc_fixture(request):
1837 1834 return Fixture()
1838 1835
1839 1836
1840 1837 @pytest.fixture
1841 1838 def repo_groups(request):
1842 1839 fixture = Fixture()
1843 1840
1844 1841 session = Session()
1845 1842 zombie_group = fixture.create_repo_group('zombie')
1846 1843 parent_group = fixture.create_repo_group('parent')
1847 1844 child_group = fixture.create_repo_group('parent/child')
1848 1845 groups_in_db = session.query(RepoGroup).all()
1849 1846 assert len(groups_in_db) == 3
1850 1847 assert child_group.group_parent_id == parent_group.group_id
1851 1848
1852 1849 @request.addfinalizer
1853 1850 def cleanup():
1854 1851 fixture.destroy_repo_group(zombie_group)
1855 1852 fixture.destroy_repo_group(child_group)
1856 1853 fixture.destroy_repo_group(parent_group)
1857 1854
1858 1855 return zombie_group, parent_group, child_group
General Comments 0
You need to be logged in to leave comments. Login now