##// END OF EJS Templates
pytest: added db_connection fixture.
marcink -
r2372:2173e0ba default
parent child Browse files
Show More
@@ -1,1851 +1,1858 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import collections
22 22 import datetime
23 23 import hashlib
24 24 import os
25 25 import re
26 26 import pprint
27 27 import shutil
28 28 import socket
29 29 import subprocess32
30 30 import time
31 31 import uuid
32 32 import dateutil.tz
33 import functools
33 34
34 35 import mock
35 36 import pyramid.testing
36 37 import pytest
37 38 import colander
38 39 import requests
39 40 import pyramid.paster
40 41
41 42 import rhodecode
42 43 from rhodecode.lib.utils2 import AttributeDict
43 44 from rhodecode.model.changeset_status import ChangesetStatusModel
44 45 from rhodecode.model.comment import CommentsModel
45 46 from rhodecode.model.db import (
46 47 PullRequest, Repository, RhodeCodeSetting, ChangesetStatus, RepoGroup,
47 48 UserGroup, RepoRhodeCodeUi, RepoRhodeCodeSetting, RhodeCodeUi)
48 49 from rhodecode.model.meta import Session
49 50 from rhodecode.model.pull_request import PullRequestModel
50 51 from rhodecode.model.repo import RepoModel
51 52 from rhodecode.model.repo_group import RepoGroupModel
52 53 from rhodecode.model.user import UserModel
53 54 from rhodecode.model.settings import VcsSettingsModel
54 55 from rhodecode.model.user_group import UserGroupModel
55 56 from rhodecode.model.integration import IntegrationModel
56 57 from rhodecode.integrations import integration_type_registry
57 58 from rhodecode.integrations.types.base import IntegrationTypeBase
58 59 from rhodecode.lib.utils import repo2db_mapper
59 60 from rhodecode.lib.vcs import create_vcsserver_proxy
60 61 from rhodecode.lib.vcs.backends import get_backend
61 62 from rhodecode.lib.vcs.nodes import FileNode
62 63 from rhodecode.tests import (
63 64 login_user_session, get_new_dir, utils, TESTS_TMP_PATH,
64 65 TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR2_LOGIN,
65 66 TEST_USER_REGULAR_PASS)
66 67 from rhodecode.tests.utils import CustomTestApp, set_anonymous_access
67 68 from rhodecode.tests.fixture import Fixture
68
69 from rhodecode.config import utils as config_utils
69 70
70 71 def _split_comma(value):
71 72 return value.split(',')
72 73
73 74
74 75 def pytest_addoption(parser):
75 76 parser.addoption(
76 77 '--keep-tmp-path', action='store_true',
77 78 help="Keep the test temporary directories")
78 79 parser.addoption(
79 80 '--backends', action='store', type=_split_comma,
80 81 default=['git', 'hg', 'svn'],
81 82 help="Select which backends to test for backend specific tests.")
82 83 parser.addoption(
83 84 '--dbs', action='store', type=_split_comma,
84 85 default=['sqlite'],
85 86 help="Select which database to test for database specific tests. "
86 87 "Possible options are sqlite,postgres,mysql")
87 88 parser.addoption(
88 89 '--appenlight', '--ae', action='store_true',
89 90 help="Track statistics in appenlight.")
90 91 parser.addoption(
91 92 '--appenlight-api-key', '--ae-key',
92 93 help="API key for Appenlight.")
93 94 parser.addoption(
94 95 '--appenlight-url', '--ae-url',
95 96 default="https://ae.rhodecode.com",
96 97 help="Appenlight service URL, defaults to https://ae.rhodecode.com")
97 98 parser.addoption(
98 99 '--sqlite-connection-string', action='store',
99 100 default='', help="Connection string for the dbs tests with SQLite")
100 101 parser.addoption(
101 102 '--postgres-connection-string', action='store',
102 103 default='', help="Connection string for the dbs tests with Postgres")
103 104 parser.addoption(
104 105 '--mysql-connection-string', action='store',
105 106 default='', help="Connection string for the dbs tests with MySQL")
106 107 parser.addoption(
107 108 '--repeat', type=int, default=100,
108 109 help="Number of repetitions in performance tests.")
109 110
110 111
111 112 def pytest_configure(config):
112 113 from rhodecode.config import patches
113 114
114 115
115 116 def pytest_collection_modifyitems(session, config, items):
116 117 # nottest marked, compare nose, used for transition from nose to pytest
117 118 remaining = [
118 119 i for i in items if getattr(i.obj, '__test__', True)]
119 120 items[:] = remaining
120 121
121 122
122 123 def pytest_generate_tests(metafunc):
123 124 # Support test generation based on --backend parameter
124 125 if 'backend_alias' in metafunc.fixturenames:
125 126 backends = get_backends_from_metafunc(metafunc)
126 127 scope = None
127 128 if not backends:
128 129 pytest.skip("Not enabled for any of selected backends")
129 130 metafunc.parametrize('backend_alias', backends, scope=scope)
130 131 elif hasattr(metafunc.function, 'backends'):
131 132 backends = get_backends_from_metafunc(metafunc)
132 133 if not backends:
133 134 pytest.skip("Not enabled for any of selected backends")
134 135
135 136
136 137 def get_backends_from_metafunc(metafunc):
137 138 requested_backends = set(metafunc.config.getoption('--backends'))
138 139 if hasattr(metafunc.function, 'backends'):
139 140 # Supported backends by this test function, created from
140 141 # pytest.mark.backends
141 142 backends = metafunc.function.backends.args
142 143 elif hasattr(metafunc.cls, 'backend_alias'):
143 144 # Support class attribute "backend_alias", this is mainly
144 145 # for legacy reasons for tests not yet using pytest.mark.backends
145 146 backends = [metafunc.cls.backend_alias]
146 147 else:
147 148 backends = metafunc.config.getoption('--backends')
148 149 return requested_backends.intersection(backends)
149 150
150 151
151 152 @pytest.fixture(scope='session', autouse=True)
152 153 def activate_example_rcextensions(request):
153 154 """
154 155 Patch in an example rcextensions module which verifies passed in kwargs.
155 156 """
156 157 from rhodecode.tests.other import example_rcextensions
157 158
158 159 old_extensions = rhodecode.EXTENSIONS
159 160 rhodecode.EXTENSIONS = example_rcextensions
160 161
161 162 @request.addfinalizer
162 163 def cleanup():
163 164 rhodecode.EXTENSIONS = old_extensions
164 165
165 166
166 167 @pytest.fixture
167 168 def capture_rcextensions():
168 169 """
169 170 Returns the recorded calls to entry points in rcextensions.
170 171 """
171 172 calls = rhodecode.EXTENSIONS.calls
172 173 calls.clear()
173 174 # Note: At this moment, it is still the empty dict, but that will
174 175 # be filled during the test run and since it is a reference this
175 176 # is enough to make it work.
176 177 return calls
177 178
178 179
179 180 @pytest.fixture(scope='session')
180 181 def http_environ_session():
181 182 """
182 183 Allow to use "http_environ" in session scope.
183 184 """
184 185 return http_environ(
185 186 http_host_stub=http_host_stub())
186 187
187 188
188 189 @pytest.fixture
189 190 def http_host_stub():
190 191 """
191 192 Value of HTTP_HOST in the test run.
192 193 """
193 194 return 'example.com:80'
194 195
195 196
196 197 @pytest.fixture
197 198 def http_host_only_stub():
198 199 """
199 200 Value of HTTP_HOST in the test run.
200 201 """
201 202 return http_host_stub().split(':')[0]
202 203
203 204
204 205 @pytest.fixture
205 206 def http_environ(http_host_stub):
206 207 """
207 208 HTTP extra environ keys.
208 209
209 210 User by the test application and as well for setting up the pylons
210 211 environment. In the case of the fixture "app" it should be possible
211 212 to override this for a specific test case.
212 213 """
213 214 return {
214 215 'SERVER_NAME': http_host_only_stub(),
215 216 'SERVER_PORT': http_host_stub.split(':')[1],
216 217 'HTTP_HOST': http_host_stub,
217 218 'HTTP_USER_AGENT': 'rc-test-agent',
218 219 'REQUEST_METHOD': 'GET'
219 220 }
220 221
221 222
222 223 @pytest.fixture(scope='session')
223 224 def baseapp(ini_config, vcsserver, http_environ_session):
224 225 from rhodecode.lib.pyramid_utils import get_app_config
225 226 from rhodecode.config.middleware import make_pyramid_app
226 227
227 228 print("Using the RhodeCode configuration:{}".format(ini_config))
228 229 pyramid.paster.setup_logging(ini_config)
229 230
230 231 settings = get_app_config(ini_config)
231 232 app = make_pyramid_app({'__file__': ini_config}, **settings)
232 233
233 234 return app
234 235
235 236
236 237 @pytest.fixture(scope='function')
237 238 def app(request, config_stub, baseapp, http_environ):
238 239 app = CustomTestApp(
239 240 baseapp,
240 241 extra_environ=http_environ)
241 242 if request.cls:
242 243 request.cls.app = app
243 244 return app
244 245
245 246
246 247 @pytest.fixture(scope='session')
247 248 def app_settings(baseapp, ini_config):
248 249 """
249 250 Settings dictionary used to create the app.
250 251
251 252 Parses the ini file and passes the result through the sanitize and apply
252 253 defaults mechanism in `rhodecode.config.middleware`.
253 254 """
254 255 return baseapp.config.get_settings()
255 256
256 257
258 @pytest.fixture(scope='session')
259 def db_connection(ini_settings):
260 # Initialize the database connection.
261 config_utils.initialize_database(ini_settings)
262
263
257 264 LoginData = collections.namedtuple('LoginData', ('csrf_token', 'user'))
258 265
259 266
260 267 def _autologin_user(app, *args):
261 268 session = login_user_session(app, *args)
262 269 csrf_token = rhodecode.lib.auth.get_csrf_token(session)
263 270 return LoginData(csrf_token, session['rhodecode_user'])
264 271
265 272
266 273 @pytest.fixture
267 274 def autologin_user(app):
268 275 """
269 276 Utility fixture which makes sure that the admin user is logged in
270 277 """
271 278 return _autologin_user(app)
272 279
273 280
274 281 @pytest.fixture
275 282 def autologin_regular_user(app):
276 283 """
277 284 Utility fixture which makes sure that the regular user is logged in
278 285 """
279 286 return _autologin_user(
280 287 app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
281 288
282 289
283 290 @pytest.fixture(scope='function')
284 291 def csrf_token(request, autologin_user):
285 292 return autologin_user.csrf_token
286 293
287 294
288 295 @pytest.fixture(scope='function')
289 296 def xhr_header(request):
290 297 return {'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'}
291 298
292 299
293 300 @pytest.fixture
294 301 def real_crypto_backend(monkeypatch):
295 302 """
296 303 Switch the production crypto backend on for this test.
297 304
298 305 During the test run the crypto backend is replaced with a faster
299 306 implementation based on the MD5 algorithm.
300 307 """
301 308 monkeypatch.setattr(rhodecode, 'is_test', False)
302 309
303 310
304 311 @pytest.fixture(scope='class')
305 312 def index_location(request, baseapp):
306 313 index_location = baseapp.config.get_settings()['search.location']
307 314 if request.cls:
308 315 request.cls.index_location = index_location
309 316 return index_location
310 317
311 318
312 319 @pytest.fixture(scope='session', autouse=True)
313 320 def tests_tmp_path(request):
314 321 """
315 322 Create temporary directory to be used during the test session.
316 323 """
317 324 if not os.path.exists(TESTS_TMP_PATH):
318 325 os.makedirs(TESTS_TMP_PATH)
319 326
320 327 if not request.config.getoption('--keep-tmp-path'):
321 328 @request.addfinalizer
322 329 def remove_tmp_path():
323 330 shutil.rmtree(TESTS_TMP_PATH)
324 331
325 332 return TESTS_TMP_PATH
326 333
327 334
328 335 @pytest.fixture
329 336 def test_repo_group(request):
330 337 """
331 338 Create a temporary repository group, and destroy it after
332 339 usage automatically
333 340 """
334 341 fixture = Fixture()
335 342 repogroupid = 'test_repo_group_%s' % str(time.time()).replace('.', '')
336 343 repo_group = fixture.create_repo_group(repogroupid)
337 344
338 345 def _cleanup():
339 346 fixture.destroy_repo_group(repogroupid)
340 347
341 348 request.addfinalizer(_cleanup)
342 349 return repo_group
343 350
344 351
345 352 @pytest.fixture
346 353 def test_user_group(request):
347 354 """
348 355 Create a temporary user group, and destroy it after
349 356 usage automatically
350 357 """
351 358 fixture = Fixture()
352 359 usergroupid = 'test_user_group_%s' % str(time.time()).replace('.', '')
353 360 user_group = fixture.create_user_group(usergroupid)
354 361
355 362 def _cleanup():
356 363 fixture.destroy_user_group(user_group)
357 364
358 365 request.addfinalizer(_cleanup)
359 366 return user_group
360 367
361 368
362 369 @pytest.fixture(scope='session')
363 370 def test_repo(request):
364 371 container = TestRepoContainer()
365 372 request.addfinalizer(container._cleanup)
366 373 return container
367 374
368 375
369 376 class TestRepoContainer(object):
370 377 """
371 378 Container for test repositories which are used read only.
372 379
373 380 Repositories will be created on demand and re-used during the lifetime
374 381 of this object.
375 382
376 383 Usage to get the svn test repository "minimal"::
377 384
378 385 test_repo = TestContainer()
379 386 repo = test_repo('minimal', 'svn')
380 387
381 388 """
382 389
383 390 dump_extractors = {
384 391 'git': utils.extract_git_repo_from_dump,
385 392 'hg': utils.extract_hg_repo_from_dump,
386 393 'svn': utils.extract_svn_repo_from_dump,
387 394 }
388 395
389 396 def __init__(self):
390 397 self._cleanup_repos = []
391 398 self._fixture = Fixture()
392 399 self._repos = {}
393 400
394 401 def __call__(self, dump_name, backend_alias, config=None):
395 402 key = (dump_name, backend_alias)
396 403 if key not in self._repos:
397 404 repo = self._create_repo(dump_name, backend_alias, config)
398 405 self._repos[key] = repo.repo_id
399 406 return Repository.get(self._repos[key])
400 407
401 408 def _create_repo(self, dump_name, backend_alias, config):
402 409 repo_name = '%s-%s' % (backend_alias, dump_name)
403 410 backend_class = get_backend(backend_alias)
404 411 dump_extractor = self.dump_extractors[backend_alias]
405 412 repo_path = dump_extractor(dump_name, repo_name)
406 413
407 414 vcs_repo = backend_class(repo_path, config=config)
408 415 repo2db_mapper({repo_name: vcs_repo})
409 416
410 417 repo = RepoModel().get_by_repo_name(repo_name)
411 418 self._cleanup_repos.append(repo_name)
412 419 return repo
413 420
414 421 def _cleanup(self):
415 422 for repo_name in reversed(self._cleanup_repos):
416 423 self._fixture.destroy_repo(repo_name)
417 424
418 425
419 426 @pytest.fixture
420 427 def backend(request, backend_alias, baseapp, test_repo):
421 428 """
422 429 Parametrized fixture which represents a single backend implementation.
423 430
424 431 It respects the option `--backends` to focus the test run on specific
425 432 backend implementations.
426 433
427 434 It also supports `pytest.mark.xfail_backends` to mark tests as failing
428 435 for specific backends. This is intended as a utility for incremental
429 436 development of a new backend implementation.
430 437 """
431 438 if backend_alias not in request.config.getoption('--backends'):
432 439 pytest.skip("Backend %s not selected." % (backend_alias, ))
433 440
434 441 utils.check_xfail_backends(request.node, backend_alias)
435 442 utils.check_skip_backends(request.node, backend_alias)
436 443
437 444 repo_name = 'vcs_test_%s' % (backend_alias, )
438 445 backend = Backend(
439 446 alias=backend_alias,
440 447 repo_name=repo_name,
441 448 test_name=request.node.name,
442 449 test_repo_container=test_repo)
443 450 request.addfinalizer(backend.cleanup)
444 451 return backend
445 452
446 453
447 454 @pytest.fixture
448 455 def backend_git(request, baseapp, test_repo):
449 456 return backend(request, 'git', baseapp, test_repo)
450 457
451 458
452 459 @pytest.fixture
453 460 def backend_hg(request, baseapp, test_repo):
454 461 return backend(request, 'hg', baseapp, test_repo)
455 462
456 463
457 464 @pytest.fixture
458 465 def backend_svn(request, baseapp, test_repo):
459 466 return backend(request, 'svn', baseapp, test_repo)
460 467
461 468
462 469 @pytest.fixture
463 470 def backend_random(backend_git):
464 471 """
465 472 Use this to express that your tests need "a backend.
466 473
467 474 A few of our tests need a backend, so that we can run the code. This
468 475 fixture is intended to be used for such cases. It will pick one of the
469 476 backends and run the tests.
470 477
471 478 The fixture `backend` would run the test multiple times for each
472 479 available backend which is a pure waste of time if the test is
473 480 independent of the backend type.
474 481 """
475 482 # TODO: johbo: Change this to pick a random backend
476 483 return backend_git
477 484
478 485
479 486 @pytest.fixture
480 487 def backend_stub(backend_git):
481 488 """
482 489 Use this to express that your tests need a backend stub
483 490
484 491 TODO: mikhail: Implement a real stub logic instead of returning
485 492 a git backend
486 493 """
487 494 return backend_git
488 495
489 496
490 497 @pytest.fixture
491 498 def repo_stub(backend_stub):
492 499 """
493 500 Use this to express that your tests need a repository stub
494 501 """
495 502 return backend_stub.create_repo()
496 503
497 504
498 505 class Backend(object):
499 506 """
500 507 Represents the test configuration for one supported backend
501 508
502 509 Provides easy access to different test repositories based on
503 510 `__getitem__`. Such repositories will only be created once per test
504 511 session.
505 512 """
506 513
507 514 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
508 515 _master_repo = None
509 516 _commit_ids = {}
510 517
511 518 def __init__(self, alias, repo_name, test_name, test_repo_container):
512 519 self.alias = alias
513 520 self.repo_name = repo_name
514 521 self._cleanup_repos = []
515 522 self._test_name = test_name
516 523 self._test_repo_container = test_repo_container
517 524 # TODO: johbo: Used as a delegate interim. Not yet sure if Backend or
518 525 # Fixture will survive in the end.
519 526 self._fixture = Fixture()
520 527
521 528 def __getitem__(self, key):
522 529 return self._test_repo_container(key, self.alias)
523 530
524 531 def create_test_repo(self, key, config=None):
525 532 return self._test_repo_container(key, self.alias, config)
526 533
527 534 @property
528 535 def repo(self):
529 536 """
530 537 Returns the "current" repository. This is the vcs_test repo or the
531 538 last repo which has been created with `create_repo`.
532 539 """
533 540 from rhodecode.model.db import Repository
534 541 return Repository.get_by_repo_name(self.repo_name)
535 542
536 543 @property
537 544 def default_branch_name(self):
538 545 VcsRepository = get_backend(self.alias)
539 546 return VcsRepository.DEFAULT_BRANCH_NAME
540 547
541 548 @property
542 549 def default_head_id(self):
543 550 """
544 551 Returns the default head id of the underlying backend.
545 552
546 553 This will be the default branch name in case the backend does have a
547 554 default branch. In the other cases it will point to a valid head
548 555 which can serve as the base to create a new commit on top of it.
549 556 """
550 557 vcsrepo = self.repo.scm_instance()
551 558 head_id = (
552 559 vcsrepo.DEFAULT_BRANCH_NAME or
553 560 vcsrepo.commit_ids[-1])
554 561 return head_id
555 562
556 563 @property
557 564 def commit_ids(self):
558 565 """
559 566 Returns the list of commits for the last created repository
560 567 """
561 568 return self._commit_ids
562 569
563 570 def create_master_repo(self, commits):
564 571 """
565 572 Create a repository and remember it as a template.
566 573
567 574 This allows to easily create derived repositories to construct
568 575 more complex scenarios for diff, compare and pull requests.
569 576
570 577 Returns a commit map which maps from commit message to raw_id.
571 578 """
572 579 self._master_repo = self.create_repo(commits=commits)
573 580 return self._commit_ids
574 581
575 582 def create_repo(
576 583 self, commits=None, number_of_commits=0, heads=None,
577 584 name_suffix=u'', **kwargs):
578 585 """
579 586 Create a repository and record it for later cleanup.
580 587
581 588 :param commits: Optional. A sequence of dict instances.
582 589 Will add a commit per entry to the new repository.
583 590 :param number_of_commits: Optional. If set to a number, this number of
584 591 commits will be added to the new repository.
585 592 :param heads: Optional. Can be set to a sequence of of commit
586 593 names which shall be pulled in from the master repository.
587 594
588 595 """
589 596 self.repo_name = self._next_repo_name() + name_suffix
590 597 repo = self._fixture.create_repo(
591 598 self.repo_name, repo_type=self.alias, **kwargs)
592 599 self._cleanup_repos.append(repo.repo_name)
593 600
594 601 commits = commits or [
595 602 {'message': 'Commit %s of %s' % (x, self.repo_name)}
596 603 for x in xrange(number_of_commits)]
597 604 self._add_commits_to_repo(repo.scm_instance(), commits)
598 605 if heads:
599 606 self.pull_heads(repo, heads)
600 607
601 608 return repo
602 609
603 610 def pull_heads(self, repo, heads):
604 611 """
605 612 Make sure that repo contains all commits mentioned in `heads`
606 613 """
607 614 vcsmaster = self._master_repo.scm_instance()
608 615 vcsrepo = repo.scm_instance()
609 616 vcsrepo.config.clear_section('hooks')
610 617 commit_ids = [self._commit_ids[h] for h in heads]
611 618 vcsrepo.pull(vcsmaster.path, commit_ids=commit_ids)
612 619
613 620 def create_fork(self):
614 621 repo_to_fork = self.repo_name
615 622 self.repo_name = self._next_repo_name()
616 623 repo = self._fixture.create_fork(repo_to_fork, self.repo_name)
617 624 self._cleanup_repos.append(self.repo_name)
618 625 return repo
619 626
620 627 def new_repo_name(self, suffix=u''):
621 628 self.repo_name = self._next_repo_name() + suffix
622 629 self._cleanup_repos.append(self.repo_name)
623 630 return self.repo_name
624 631
625 632 def _next_repo_name(self):
626 633 return u"%s_%s" % (
627 634 self.invalid_repo_name.sub(u'_', self._test_name),
628 635 len(self._cleanup_repos))
629 636
630 637 def ensure_file(self, filename, content='Test content\n'):
631 638 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
632 639 commits = [
633 640 {'added': [
634 641 FileNode(filename, content=content),
635 642 ]},
636 643 ]
637 644 self._add_commits_to_repo(self.repo.scm_instance(), commits)
638 645
639 646 def enable_downloads(self):
640 647 repo = self.repo
641 648 repo.enable_downloads = True
642 649 Session().add(repo)
643 650 Session().commit()
644 651
645 652 def cleanup(self):
646 653 for repo_name in reversed(self._cleanup_repos):
647 654 self._fixture.destroy_repo(repo_name)
648 655
649 656 def _add_commits_to_repo(self, repo, commits):
650 657 commit_ids = _add_commits_to_repo(repo, commits)
651 658 if not commit_ids:
652 659 return
653 660 self._commit_ids = commit_ids
654 661
655 662 # Creating refs for Git to allow fetching them from remote repository
656 663 if self.alias == 'git':
657 664 refs = {}
658 665 for message in self._commit_ids:
659 666 # TODO: mikhail: do more special chars replacements
660 667 ref_name = 'refs/test-refs/{}'.format(
661 668 message.replace(' ', ''))
662 669 refs[ref_name] = self._commit_ids[message]
663 670 self._create_refs(repo, refs)
664 671
665 672 def _create_refs(self, repo, refs):
666 673 for ref_name in refs:
667 674 repo.set_refs(ref_name, refs[ref_name])
668 675
669 676
670 677 @pytest.fixture
671 678 def vcsbackend(request, backend_alias, tests_tmp_path, baseapp, test_repo):
672 679 """
673 680 Parametrized fixture which represents a single vcs backend implementation.
674 681
675 682 See the fixture `backend` for more details. This one implements the same
676 683 concept, but on vcs level. So it does not provide model instances etc.
677 684
678 685 Parameters are generated dynamically, see :func:`pytest_generate_tests`
679 686 for how this works.
680 687 """
681 688 if backend_alias not in request.config.getoption('--backends'):
682 689 pytest.skip("Backend %s not selected." % (backend_alias, ))
683 690
684 691 utils.check_xfail_backends(request.node, backend_alias)
685 692 utils.check_skip_backends(request.node, backend_alias)
686 693
687 694 repo_name = 'vcs_test_%s' % (backend_alias, )
688 695 repo_path = os.path.join(tests_tmp_path, repo_name)
689 696 backend = VcsBackend(
690 697 alias=backend_alias,
691 698 repo_path=repo_path,
692 699 test_name=request.node.name,
693 700 test_repo_container=test_repo)
694 701 request.addfinalizer(backend.cleanup)
695 702 return backend
696 703
697 704
698 705 @pytest.fixture
699 706 def vcsbackend_git(request, tests_tmp_path, baseapp, test_repo):
700 707 return vcsbackend(request, 'git', tests_tmp_path, baseapp, test_repo)
701 708
702 709
703 710 @pytest.fixture
704 711 def vcsbackend_hg(request, tests_tmp_path, baseapp, test_repo):
705 712 return vcsbackend(request, 'hg', tests_tmp_path, baseapp, test_repo)
706 713
707 714
708 715 @pytest.fixture
709 716 def vcsbackend_svn(request, tests_tmp_path, baseapp, test_repo):
710 717 return vcsbackend(request, 'svn', tests_tmp_path, baseapp, test_repo)
711 718
712 719
713 720 @pytest.fixture
714 721 def vcsbackend_random(vcsbackend_git):
715 722 """
716 723 Use this to express that your tests need "a vcsbackend".
717 724
718 725 The fixture `vcsbackend` would run the test multiple times for each
719 726 available vcs backend which is a pure waste of time if the test is
720 727 independent of the vcs backend type.
721 728 """
722 729 # TODO: johbo: Change this to pick a random backend
723 730 return vcsbackend_git
724 731
725 732
726 733 @pytest.fixture
727 734 def vcsbackend_stub(vcsbackend_git):
728 735 """
729 736 Use this to express that your test just needs a stub of a vcsbackend.
730 737
731 738 Plan is to eventually implement an in-memory stub to speed tests up.
732 739 """
733 740 return vcsbackend_git
734 741
735 742
736 743 class VcsBackend(object):
737 744 """
738 745 Represents the test configuration for one supported vcs backend.
739 746 """
740 747
741 748 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
742 749
743 750 def __init__(self, alias, repo_path, test_name, test_repo_container):
744 751 self.alias = alias
745 752 self._repo_path = repo_path
746 753 self._cleanup_repos = []
747 754 self._test_name = test_name
748 755 self._test_repo_container = test_repo_container
749 756
750 757 def __getitem__(self, key):
751 758 return self._test_repo_container(key, self.alias).scm_instance()
752 759
753 760 @property
754 761 def repo(self):
755 762 """
756 763 Returns the "current" repository. This is the vcs_test repo of the last
757 764 repo which has been created.
758 765 """
759 766 Repository = get_backend(self.alias)
760 767 return Repository(self._repo_path)
761 768
762 769 @property
763 770 def backend(self):
764 771 """
765 772 Returns the backend implementation class.
766 773 """
767 774 return get_backend(self.alias)
768 775
769 776 def create_repo(self, commits=None, number_of_commits=0, _clone_repo=None):
770 777 repo_name = self._next_repo_name()
771 778 self._repo_path = get_new_dir(repo_name)
772 779 repo_class = get_backend(self.alias)
773 780 src_url = None
774 781 if _clone_repo:
775 782 src_url = _clone_repo.path
776 783 repo = repo_class(self._repo_path, create=True, src_url=src_url)
777 784 self._cleanup_repos.append(repo)
778 785
779 786 commits = commits or [
780 787 {'message': 'Commit %s of %s' % (x, repo_name)}
781 788 for x in xrange(number_of_commits)]
782 789 _add_commits_to_repo(repo, commits)
783 790 return repo
784 791
785 792 def clone_repo(self, repo):
786 793 return self.create_repo(_clone_repo=repo)
787 794
788 795 def cleanup(self):
789 796 for repo in self._cleanup_repos:
790 797 shutil.rmtree(repo.path)
791 798
792 799 def new_repo_path(self):
793 800 repo_name = self._next_repo_name()
794 801 self._repo_path = get_new_dir(repo_name)
795 802 return self._repo_path
796 803
797 804 def _next_repo_name(self):
798 805 return "%s_%s" % (
799 806 self.invalid_repo_name.sub('_', self._test_name),
800 807 len(self._cleanup_repos))
801 808
802 809 def add_file(self, repo, filename, content='Test content\n'):
803 810 imc = repo.in_memory_commit
804 811 imc.add(FileNode(filename, content=content))
805 812 imc.commit(
806 813 message=u'Automatic commit from vcsbackend fixture',
807 814 author=u'Automatic')
808 815
809 816 def ensure_file(self, filename, content='Test content\n'):
810 817 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
811 818 self.add_file(self.repo, filename, content)
812 819
813 820
814 821 def _add_commits_to_repo(vcs_repo, commits):
815 822 commit_ids = {}
816 823 if not commits:
817 824 return commit_ids
818 825
819 826 imc = vcs_repo.in_memory_commit
820 827 commit = None
821 828
822 829 for idx, commit in enumerate(commits):
823 830 message = unicode(commit.get('message', 'Commit %s' % idx))
824 831
825 832 for node in commit.get('added', []):
826 833 imc.add(FileNode(node.path, content=node.content))
827 834 for node in commit.get('changed', []):
828 835 imc.change(FileNode(node.path, content=node.content))
829 836 for node in commit.get('removed', []):
830 837 imc.remove(FileNode(node.path))
831 838
832 839 parents = [
833 840 vcs_repo.get_commit(commit_id=commit_ids[p])
834 841 for p in commit.get('parents', [])]
835 842
836 843 operations = ('added', 'changed', 'removed')
837 844 if not any((commit.get(o) for o in operations)):
838 845 imc.add(FileNode('file_%s' % idx, content=message))
839 846
840 847 commit = imc.commit(
841 848 message=message,
842 849 author=unicode(commit.get('author', 'Automatic')),
843 850 date=commit.get('date'),
844 851 branch=commit.get('branch'),
845 852 parents=parents)
846 853
847 854 commit_ids[commit.message] = commit.raw_id
848 855
849 856 return commit_ids
850 857
851 858
852 859 @pytest.fixture
853 860 def reposerver(request):
854 861 """
855 862 Allows to serve a backend repository
856 863 """
857 864
858 865 repo_server = RepoServer()
859 866 request.addfinalizer(repo_server.cleanup)
860 867 return repo_server
861 868
862 869
863 870 class RepoServer(object):
864 871 """
865 872 Utility to serve a local repository for the duration of a test case.
866 873
867 874 Supports only Subversion so far.
868 875 """
869 876
870 877 url = None
871 878
872 879 def __init__(self):
873 880 self._cleanup_servers = []
874 881
875 882 def serve(self, vcsrepo):
876 883 if vcsrepo.alias != 'svn':
877 884 raise TypeError("Backend %s not supported" % vcsrepo.alias)
878 885
879 886 proc = subprocess32.Popen(
880 887 ['svnserve', '-d', '--foreground', '--listen-host', 'localhost',
881 888 '--root', vcsrepo.path])
882 889 self._cleanup_servers.append(proc)
883 890 self.url = 'svn://localhost'
884 891
885 892 def cleanup(self):
886 893 for proc in self._cleanup_servers:
887 894 proc.terminate()
888 895
889 896
890 897 @pytest.fixture
891 898 def pr_util(backend, request, config_stub):
892 899 """
893 900 Utility for tests of models and for functional tests around pull requests.
894 901
895 902 It gives an instance of :class:`PRTestUtility` which provides various
896 903 utility methods around one pull request.
897 904
898 905 This fixture uses `backend` and inherits its parameterization.
899 906 """
900 907
901 908 util = PRTestUtility(backend)
902 909 request.addfinalizer(util.cleanup)
903 910
904 911 return util
905 912
906 913
907 914 class PRTestUtility(object):
908 915
909 916 pull_request = None
910 917 pull_request_id = None
911 918 mergeable_patcher = None
912 919 mergeable_mock = None
913 920 notification_patcher = None
914 921
915 922 def __init__(self, backend):
916 923 self.backend = backend
917 924
918 925 def create_pull_request(
919 926 self, commits=None, target_head=None, source_head=None,
920 927 revisions=None, approved=False, author=None, mergeable=False,
921 928 enable_notifications=True, name_suffix=u'', reviewers=None,
922 929 title=u"Test", description=u"Description"):
923 930 self.set_mergeable(mergeable)
924 931 if not enable_notifications:
925 932 # mock notification side effect
926 933 self.notification_patcher = mock.patch(
927 934 'rhodecode.model.notification.NotificationModel.create')
928 935 self.notification_patcher.start()
929 936
930 937 if not self.pull_request:
931 938 if not commits:
932 939 commits = [
933 940 {'message': 'c1'},
934 941 {'message': 'c2'},
935 942 {'message': 'c3'},
936 943 ]
937 944 target_head = 'c1'
938 945 source_head = 'c2'
939 946 revisions = ['c2']
940 947
941 948 self.commit_ids = self.backend.create_master_repo(commits)
942 949 self.target_repository = self.backend.create_repo(
943 950 heads=[target_head], name_suffix=name_suffix)
944 951 self.source_repository = self.backend.create_repo(
945 952 heads=[source_head], name_suffix=name_suffix)
946 953 self.author = author or UserModel().get_by_username(
947 954 TEST_USER_ADMIN_LOGIN)
948 955
949 956 model = PullRequestModel()
950 957 self.create_parameters = {
951 958 'created_by': self.author,
952 959 'source_repo': self.source_repository.repo_name,
953 960 'source_ref': self._default_branch_reference(source_head),
954 961 'target_repo': self.target_repository.repo_name,
955 962 'target_ref': self._default_branch_reference(target_head),
956 963 'revisions': [self.commit_ids[r] for r in revisions],
957 964 'reviewers': reviewers or self._get_reviewers(),
958 965 'title': title,
959 966 'description': description,
960 967 }
961 968 self.pull_request = model.create(**self.create_parameters)
962 969 assert model.get_versions(self.pull_request) == []
963 970
964 971 self.pull_request_id = self.pull_request.pull_request_id
965 972
966 973 if approved:
967 974 self.approve()
968 975
969 976 Session().add(self.pull_request)
970 977 Session().commit()
971 978
972 979 return self.pull_request
973 980
974 981 def approve(self):
975 982 self.create_status_votes(
976 983 ChangesetStatus.STATUS_APPROVED,
977 984 *self.pull_request.reviewers)
978 985
979 986 def close(self):
980 987 PullRequestModel().close_pull_request(self.pull_request, self.author)
981 988
982 989 def _default_branch_reference(self, commit_message):
983 990 reference = '%s:%s:%s' % (
984 991 'branch',
985 992 self.backend.default_branch_name,
986 993 self.commit_ids[commit_message])
987 994 return reference
988 995
989 996 def _get_reviewers(self):
990 997 return [
991 998 (TEST_USER_REGULAR_LOGIN, ['default1'], False),
992 999 (TEST_USER_REGULAR2_LOGIN, ['default2'], False),
993 1000 ]
994 1001
995 1002 def update_source_repository(self, head=None):
996 1003 heads = [head or 'c3']
997 1004 self.backend.pull_heads(self.source_repository, heads=heads)
998 1005
999 1006 def add_one_commit(self, head=None):
1000 1007 self.update_source_repository(head=head)
1001 1008 old_commit_ids = set(self.pull_request.revisions)
1002 1009 PullRequestModel().update_commits(self.pull_request)
1003 1010 commit_ids = set(self.pull_request.revisions)
1004 1011 new_commit_ids = commit_ids - old_commit_ids
1005 1012 assert len(new_commit_ids) == 1
1006 1013 return new_commit_ids.pop()
1007 1014
1008 1015 def remove_one_commit(self):
1009 1016 assert len(self.pull_request.revisions) == 2
1010 1017 source_vcs = self.source_repository.scm_instance()
1011 1018 removed_commit_id = source_vcs.commit_ids[-1]
1012 1019
1013 1020 # TODO: johbo: Git and Mercurial have an inconsistent vcs api here,
1014 1021 # remove the if once that's sorted out.
1015 1022 if self.backend.alias == "git":
1016 1023 kwargs = {'branch_name': self.backend.default_branch_name}
1017 1024 else:
1018 1025 kwargs = {}
1019 1026 source_vcs.strip(removed_commit_id, **kwargs)
1020 1027
1021 1028 PullRequestModel().update_commits(self.pull_request)
1022 1029 assert len(self.pull_request.revisions) == 1
1023 1030 return removed_commit_id
1024 1031
1025 1032 def create_comment(self, linked_to=None):
1026 1033 comment = CommentsModel().create(
1027 1034 text=u"Test comment",
1028 1035 repo=self.target_repository.repo_name,
1029 1036 user=self.author,
1030 1037 pull_request=self.pull_request)
1031 1038 assert comment.pull_request_version_id is None
1032 1039
1033 1040 if linked_to:
1034 1041 PullRequestModel()._link_comments_to_version(linked_to)
1035 1042
1036 1043 return comment
1037 1044
1038 1045 def create_inline_comment(
1039 1046 self, linked_to=None, line_no=u'n1', file_path='file_1'):
1040 1047 comment = CommentsModel().create(
1041 1048 text=u"Test comment",
1042 1049 repo=self.target_repository.repo_name,
1043 1050 user=self.author,
1044 1051 line_no=line_no,
1045 1052 f_path=file_path,
1046 1053 pull_request=self.pull_request)
1047 1054 assert comment.pull_request_version_id is None
1048 1055
1049 1056 if linked_to:
1050 1057 PullRequestModel()._link_comments_to_version(linked_to)
1051 1058
1052 1059 return comment
1053 1060
1054 1061 def create_version_of_pull_request(self):
1055 1062 pull_request = self.create_pull_request()
1056 1063 version = PullRequestModel()._create_version_from_snapshot(
1057 1064 pull_request)
1058 1065 return version
1059 1066
1060 1067 def create_status_votes(self, status, *reviewers):
1061 1068 for reviewer in reviewers:
1062 1069 ChangesetStatusModel().set_status(
1063 1070 repo=self.pull_request.target_repo,
1064 1071 status=status,
1065 1072 user=reviewer.user_id,
1066 1073 pull_request=self.pull_request)
1067 1074
1068 1075 def set_mergeable(self, value):
1069 1076 if not self.mergeable_patcher:
1070 1077 self.mergeable_patcher = mock.patch.object(
1071 1078 VcsSettingsModel, 'get_general_settings')
1072 1079 self.mergeable_mock = self.mergeable_patcher.start()
1073 1080 self.mergeable_mock.return_value = {
1074 1081 'rhodecode_pr_merge_enabled': value}
1075 1082
1076 1083 def cleanup(self):
1077 1084 # In case the source repository is already cleaned up, the pull
1078 1085 # request will already be deleted.
1079 1086 pull_request = PullRequest().get(self.pull_request_id)
1080 1087 if pull_request:
1081 1088 PullRequestModel().delete(pull_request, pull_request.author)
1082 1089 Session().commit()
1083 1090
1084 1091 if self.notification_patcher:
1085 1092 self.notification_patcher.stop()
1086 1093
1087 1094 if self.mergeable_patcher:
1088 1095 self.mergeable_patcher.stop()
1089 1096
1090 1097
1091 1098 @pytest.fixture
1092 1099 def user_admin(baseapp):
1093 1100 """
1094 1101 Provides the default admin test user as an instance of `db.User`.
1095 1102 """
1096 1103 user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
1097 1104 return user
1098 1105
1099 1106
1100 1107 @pytest.fixture
1101 1108 def user_regular(baseapp):
1102 1109 """
1103 1110 Provides the default regular test user as an instance of `db.User`.
1104 1111 """
1105 1112 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1106 1113 return user
1107 1114
1108 1115
1109 1116 @pytest.fixture
1110 1117 def user_util(request, baseapp):
1111 1118 """
1112 1119 Provides a wired instance of `UserUtility` with integrated cleanup.
1113 1120 """
1114 1121 utility = UserUtility(test_name=request.node.name)
1115 1122 request.addfinalizer(utility.cleanup)
1116 1123 return utility
1117 1124
1118 1125
1119 1126 # TODO: johbo: Split this up into utilities per domain or something similar
1120 1127 class UserUtility(object):
1121 1128
1122 1129 def __init__(self, test_name="test"):
1123 1130 self._test_name = self._sanitize_name(test_name)
1124 1131 self.fixture = Fixture()
1125 1132 self.repo_group_ids = []
1126 1133 self.repos_ids = []
1127 1134 self.user_ids = []
1128 1135 self.user_group_ids = []
1129 1136 self.user_repo_permission_ids = []
1130 1137 self.user_group_repo_permission_ids = []
1131 1138 self.user_repo_group_permission_ids = []
1132 1139 self.user_group_repo_group_permission_ids = []
1133 1140 self.user_user_group_permission_ids = []
1134 1141 self.user_group_user_group_permission_ids = []
1135 1142 self.user_permissions = []
1136 1143
1137 1144 def _sanitize_name(self, name):
1138 1145 for char in ['[', ']']:
1139 1146 name = name.replace(char, '_')
1140 1147 return name
1141 1148
1142 1149 def create_repo_group(
1143 1150 self, owner=TEST_USER_ADMIN_LOGIN, auto_cleanup=True):
1144 1151 group_name = "{prefix}_repogroup_{count}".format(
1145 1152 prefix=self._test_name,
1146 1153 count=len(self.repo_group_ids))
1147 1154 repo_group = self.fixture.create_repo_group(
1148 1155 group_name, cur_user=owner)
1149 1156 if auto_cleanup:
1150 1157 self.repo_group_ids.append(repo_group.group_id)
1151 1158 return repo_group
1152 1159
1153 1160 def create_repo(self, owner=TEST_USER_ADMIN_LOGIN, parent=None,
1154 1161 auto_cleanup=True, repo_type='hg'):
1155 1162 repo_name = "{prefix}_repository_{count}".format(
1156 1163 prefix=self._test_name,
1157 1164 count=len(self.repos_ids))
1158 1165
1159 1166 repository = self.fixture.create_repo(
1160 1167 repo_name, cur_user=owner, repo_group=parent, repo_type=repo_type)
1161 1168 if auto_cleanup:
1162 1169 self.repos_ids.append(repository.repo_id)
1163 1170 return repository
1164 1171
1165 1172 def create_user(self, auto_cleanup=True, **kwargs):
1166 1173 user_name = "{prefix}_user_{count}".format(
1167 1174 prefix=self._test_name,
1168 1175 count=len(self.user_ids))
1169 1176 user = self.fixture.create_user(user_name, **kwargs)
1170 1177 if auto_cleanup:
1171 1178 self.user_ids.append(user.user_id)
1172 1179 return user
1173 1180
1174 1181 def create_user_with_group(self):
1175 1182 user = self.create_user()
1176 1183 user_group = self.create_user_group(members=[user])
1177 1184 return user, user_group
1178 1185
1179 1186 def create_user_group(self, owner=TEST_USER_ADMIN_LOGIN, members=None,
1180 1187 auto_cleanup=True, **kwargs):
1181 1188 group_name = "{prefix}_usergroup_{count}".format(
1182 1189 prefix=self._test_name,
1183 1190 count=len(self.user_group_ids))
1184 1191 user_group = self.fixture.create_user_group(
1185 1192 group_name, cur_user=owner, **kwargs)
1186 1193
1187 1194 if auto_cleanup:
1188 1195 self.user_group_ids.append(user_group.users_group_id)
1189 1196 if members:
1190 1197 for user in members:
1191 1198 UserGroupModel().add_user_to_group(user_group, user)
1192 1199 return user_group
1193 1200
1194 1201 def grant_user_permission(self, user_name, permission_name):
1195 1202 self._inherit_default_user_permissions(user_name, False)
1196 1203 self.user_permissions.append((user_name, permission_name))
1197 1204
1198 1205 def grant_user_permission_to_repo_group(
1199 1206 self, repo_group, user, permission_name):
1200 1207 permission = RepoGroupModel().grant_user_permission(
1201 1208 repo_group, user, permission_name)
1202 1209 self.user_repo_group_permission_ids.append(
1203 1210 (repo_group.group_id, user.user_id))
1204 1211 return permission
1205 1212
1206 1213 def grant_user_group_permission_to_repo_group(
1207 1214 self, repo_group, user_group, permission_name):
1208 1215 permission = RepoGroupModel().grant_user_group_permission(
1209 1216 repo_group, user_group, permission_name)
1210 1217 self.user_group_repo_group_permission_ids.append(
1211 1218 (repo_group.group_id, user_group.users_group_id))
1212 1219 return permission
1213 1220
1214 1221 def grant_user_permission_to_repo(
1215 1222 self, repo, user, permission_name):
1216 1223 permission = RepoModel().grant_user_permission(
1217 1224 repo, user, permission_name)
1218 1225 self.user_repo_permission_ids.append(
1219 1226 (repo.repo_id, user.user_id))
1220 1227 return permission
1221 1228
1222 1229 def grant_user_group_permission_to_repo(
1223 1230 self, repo, user_group, permission_name):
1224 1231 permission = RepoModel().grant_user_group_permission(
1225 1232 repo, user_group, permission_name)
1226 1233 self.user_group_repo_permission_ids.append(
1227 1234 (repo.repo_id, user_group.users_group_id))
1228 1235 return permission
1229 1236
1230 1237 def grant_user_permission_to_user_group(
1231 1238 self, target_user_group, user, permission_name):
1232 1239 permission = UserGroupModel().grant_user_permission(
1233 1240 target_user_group, user, permission_name)
1234 1241 self.user_user_group_permission_ids.append(
1235 1242 (target_user_group.users_group_id, user.user_id))
1236 1243 return permission
1237 1244
1238 1245 def grant_user_group_permission_to_user_group(
1239 1246 self, target_user_group, user_group, permission_name):
1240 1247 permission = UserGroupModel().grant_user_group_permission(
1241 1248 target_user_group, user_group, permission_name)
1242 1249 self.user_group_user_group_permission_ids.append(
1243 1250 (target_user_group.users_group_id, user_group.users_group_id))
1244 1251 return permission
1245 1252
1246 1253 def revoke_user_permission(self, user_name, permission_name):
1247 1254 self._inherit_default_user_permissions(user_name, True)
1248 1255 UserModel().revoke_perm(user_name, permission_name)
1249 1256
1250 1257 def _inherit_default_user_permissions(self, user_name, value):
1251 1258 user = UserModel().get_by_username(user_name)
1252 1259 user.inherit_default_permissions = value
1253 1260 Session().add(user)
1254 1261 Session().commit()
1255 1262
1256 1263 def cleanup(self):
1257 1264 self._cleanup_permissions()
1258 1265 self._cleanup_repos()
1259 1266 self._cleanup_repo_groups()
1260 1267 self._cleanup_user_groups()
1261 1268 self._cleanup_users()
1262 1269
1263 1270 def _cleanup_permissions(self):
1264 1271 if self.user_permissions:
1265 1272 for user_name, permission_name in self.user_permissions:
1266 1273 self.revoke_user_permission(user_name, permission_name)
1267 1274
1268 1275 for permission in self.user_repo_permission_ids:
1269 1276 RepoModel().revoke_user_permission(*permission)
1270 1277
1271 1278 for permission in self.user_group_repo_permission_ids:
1272 1279 RepoModel().revoke_user_group_permission(*permission)
1273 1280
1274 1281 for permission in self.user_repo_group_permission_ids:
1275 1282 RepoGroupModel().revoke_user_permission(*permission)
1276 1283
1277 1284 for permission in self.user_group_repo_group_permission_ids:
1278 1285 RepoGroupModel().revoke_user_group_permission(*permission)
1279 1286
1280 1287 for permission in self.user_user_group_permission_ids:
1281 1288 UserGroupModel().revoke_user_permission(*permission)
1282 1289
1283 1290 for permission in self.user_group_user_group_permission_ids:
1284 1291 UserGroupModel().revoke_user_group_permission(*permission)
1285 1292
1286 1293 def _cleanup_repo_groups(self):
1287 1294 def _repo_group_compare(first_group_id, second_group_id):
1288 1295 """
1289 1296 Gives higher priority to the groups with the most complex paths
1290 1297 """
1291 1298 first_group = RepoGroup.get(first_group_id)
1292 1299 second_group = RepoGroup.get(second_group_id)
1293 1300 first_group_parts = (
1294 1301 len(first_group.group_name.split('/')) if first_group else 0)
1295 1302 second_group_parts = (
1296 1303 len(second_group.group_name.split('/')) if second_group else 0)
1297 1304 return cmp(second_group_parts, first_group_parts)
1298 1305
1299 1306 sorted_repo_group_ids = sorted(
1300 1307 self.repo_group_ids, cmp=_repo_group_compare)
1301 1308 for repo_group_id in sorted_repo_group_ids:
1302 1309 self.fixture.destroy_repo_group(repo_group_id)
1303 1310
1304 1311 def _cleanup_repos(self):
1305 1312 sorted_repos_ids = sorted(self.repos_ids)
1306 1313 for repo_id in sorted_repos_ids:
1307 1314 self.fixture.destroy_repo(repo_id)
1308 1315
1309 1316 def _cleanup_user_groups(self):
1310 1317 def _user_group_compare(first_group_id, second_group_id):
1311 1318 """
1312 1319 Gives higher priority to the groups with the most complex paths
1313 1320 """
1314 1321 first_group = UserGroup.get(first_group_id)
1315 1322 second_group = UserGroup.get(second_group_id)
1316 1323 first_group_parts = (
1317 1324 len(first_group.users_group_name.split('/'))
1318 1325 if first_group else 0)
1319 1326 second_group_parts = (
1320 1327 len(second_group.users_group_name.split('/'))
1321 1328 if second_group else 0)
1322 1329 return cmp(second_group_parts, first_group_parts)
1323 1330
1324 1331 sorted_user_group_ids = sorted(
1325 1332 self.user_group_ids, cmp=_user_group_compare)
1326 1333 for user_group_id in sorted_user_group_ids:
1327 1334 self.fixture.destroy_user_group(user_group_id)
1328 1335
1329 1336 def _cleanup_users(self):
1330 1337 for user_id in self.user_ids:
1331 1338 self.fixture.destroy_user(user_id)
1332 1339
1333 1340
1334 1341 # TODO: Think about moving this into a pytest-pyro package and make it a
1335 1342 # pytest plugin
1336 1343 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
1337 1344 def pytest_runtest_makereport(item, call):
1338 1345 """
1339 1346 Adding the remote traceback if the exception has this information.
1340 1347
1341 1348 VCSServer attaches this information as the attribute `_vcs_server_traceback`
1342 1349 to the exception instance.
1343 1350 """
1344 1351 outcome = yield
1345 1352 report = outcome.get_result()
1346 1353 if call.excinfo:
1347 1354 _add_vcsserver_remote_traceback(report, call.excinfo.value)
1348 1355
1349 1356
1350 1357 def _add_vcsserver_remote_traceback(report, exc):
1351 1358 vcsserver_traceback = getattr(exc, '_vcs_server_traceback', None)
1352 1359
1353 1360 if vcsserver_traceback:
1354 1361 section = 'VCSServer remote traceback ' + report.when
1355 1362 report.sections.append((section, vcsserver_traceback))
1356 1363
1357 1364
1358 1365 @pytest.fixture(scope='session')
1359 1366 def testrun():
1360 1367 return {
1361 1368 'uuid': uuid.uuid4(),
1362 1369 'start': datetime.datetime.utcnow().isoformat(),
1363 1370 'timestamp': int(time.time()),
1364 1371 }
1365 1372
1366 1373
1367 1374 @pytest.fixture(autouse=True)
1368 1375 def collect_appenlight_stats(request, testrun):
1369 1376 """
1370 1377 This fixture reports memory consumtion of single tests.
1371 1378
1372 1379 It gathers data based on `psutil` and sends them to Appenlight. The option
1373 1380 ``--ae`` has te be used to enable this fixture and the API key for your
1374 1381 application has to be provided in ``--ae-key``.
1375 1382 """
1376 1383 try:
1377 1384 # cygwin cannot have yet psutil support.
1378 1385 import psutil
1379 1386 except ImportError:
1380 1387 return
1381 1388
1382 1389 if not request.config.getoption('--appenlight'):
1383 1390 return
1384 1391 else:
1385 1392 # Only request the baseapp fixture if appenlight tracking is
1386 1393 # enabled. This will speed up a test run of unit tests by 2 to 3
1387 1394 # seconds if appenlight is not enabled.
1388 1395 baseapp = request.getfuncargvalue("baseapp")
1389 1396 url = '{}/api/logs'.format(request.config.getoption('--appenlight-url'))
1390 1397 client = AppenlightClient(
1391 1398 url=url,
1392 1399 api_key=request.config.getoption('--appenlight-api-key'),
1393 1400 namespace=request.node.nodeid,
1394 1401 request=str(testrun['uuid']),
1395 1402 testrun=testrun)
1396 1403
1397 1404 client.collect({
1398 1405 'message': "Starting",
1399 1406 })
1400 1407
1401 1408 server_and_port = baseapp.config.get_settings()['vcs.server']
1402 1409 protocol = baseapp.config.get_settings()['vcs.server.protocol']
1403 1410 server = create_vcsserver_proxy(server_and_port, protocol)
1404 1411 with server:
1405 1412 vcs_pid = server.get_pid()
1406 1413 server.run_gc()
1407 1414 vcs_process = psutil.Process(vcs_pid)
1408 1415 mem = vcs_process.memory_info()
1409 1416 client.tag_before('vcsserver.rss', mem.rss)
1410 1417 client.tag_before('vcsserver.vms', mem.vms)
1411 1418
1412 1419 test_process = psutil.Process()
1413 1420 mem = test_process.memory_info()
1414 1421 client.tag_before('test.rss', mem.rss)
1415 1422 client.tag_before('test.vms', mem.vms)
1416 1423
1417 1424 client.tag_before('time', time.time())
1418 1425
1419 1426 @request.addfinalizer
1420 1427 def send_stats():
1421 1428 client.tag_after('time', time.time())
1422 1429 with server:
1423 1430 gc_stats = server.run_gc()
1424 1431 for tag, value in gc_stats.items():
1425 1432 client.tag_after(tag, value)
1426 1433 mem = vcs_process.memory_info()
1427 1434 client.tag_after('vcsserver.rss', mem.rss)
1428 1435 client.tag_after('vcsserver.vms', mem.vms)
1429 1436
1430 1437 mem = test_process.memory_info()
1431 1438 client.tag_after('test.rss', mem.rss)
1432 1439 client.tag_after('test.vms', mem.vms)
1433 1440
1434 1441 client.collect({
1435 1442 'message': "Finished",
1436 1443 })
1437 1444 client.send_stats()
1438 1445
1439 1446 return client
1440 1447
1441 1448
1442 1449 class AppenlightClient():
1443 1450
1444 1451 url_template = '{url}?protocol_version=0.5'
1445 1452
1446 1453 def __init__(
1447 1454 self, url, api_key, add_server=True, add_timestamp=True,
1448 1455 namespace=None, request=None, testrun=None):
1449 1456 self.url = self.url_template.format(url=url)
1450 1457 self.api_key = api_key
1451 1458 self.add_server = add_server
1452 1459 self.add_timestamp = add_timestamp
1453 1460 self.namespace = namespace
1454 1461 self.request = request
1455 1462 self.server = socket.getfqdn(socket.gethostname())
1456 1463 self.tags_before = {}
1457 1464 self.tags_after = {}
1458 1465 self.stats = []
1459 1466 self.testrun = testrun or {}
1460 1467
1461 1468 def tag_before(self, tag, value):
1462 1469 self.tags_before[tag] = value
1463 1470
1464 1471 def tag_after(self, tag, value):
1465 1472 self.tags_after[tag] = value
1466 1473
1467 1474 def collect(self, data):
1468 1475 if self.add_server:
1469 1476 data.setdefault('server', self.server)
1470 1477 if self.add_timestamp:
1471 1478 data.setdefault('date', datetime.datetime.utcnow().isoformat())
1472 1479 if self.namespace:
1473 1480 data.setdefault('namespace', self.namespace)
1474 1481 if self.request:
1475 1482 data.setdefault('request', self.request)
1476 1483 self.stats.append(data)
1477 1484
1478 1485 def send_stats(self):
1479 1486 tags = [
1480 1487 ('testrun', self.request),
1481 1488 ('testrun.start', self.testrun['start']),
1482 1489 ('testrun.timestamp', self.testrun['timestamp']),
1483 1490 ('test', self.namespace),
1484 1491 ]
1485 1492 for key, value in self.tags_before.items():
1486 1493 tags.append((key + '.before', value))
1487 1494 try:
1488 1495 delta = self.tags_after[key] - value
1489 1496 tags.append((key + '.delta', delta))
1490 1497 except Exception:
1491 1498 pass
1492 1499 for key, value in self.tags_after.items():
1493 1500 tags.append((key + '.after', value))
1494 1501 self.collect({
1495 1502 'message': "Collected tags",
1496 1503 'tags': tags,
1497 1504 })
1498 1505
1499 1506 response = requests.post(
1500 1507 self.url,
1501 1508 headers={
1502 1509 'X-appenlight-api-key': self.api_key},
1503 1510 json=self.stats,
1504 1511 )
1505 1512
1506 1513 if not response.status_code == 200:
1507 1514 pprint.pprint(self.stats)
1508 1515 print(response.headers)
1509 1516 print(response.text)
1510 1517 raise Exception('Sending to appenlight failed')
1511 1518
1512 1519
1513 1520 @pytest.fixture
1514 1521 def gist_util(request, baseapp):
1515 1522 """
1516 1523 Provides a wired instance of `GistUtility` with integrated cleanup.
1517 1524 """
1518 1525 utility = GistUtility()
1519 1526 request.addfinalizer(utility.cleanup)
1520 1527 return utility
1521 1528
1522 1529
1523 1530 class GistUtility(object):
1524 1531 def __init__(self):
1525 1532 self.fixture = Fixture()
1526 1533 self.gist_ids = []
1527 1534
1528 1535 def create_gist(self, **kwargs):
1529 1536 gist = self.fixture.create_gist(**kwargs)
1530 1537 self.gist_ids.append(gist.gist_id)
1531 1538 return gist
1532 1539
1533 1540 def cleanup(self):
1534 1541 for id_ in self.gist_ids:
1535 1542 self.fixture.destroy_gists(str(id_))
1536 1543
1537 1544
1538 1545 @pytest.fixture
1539 1546 def enabled_backends(request):
1540 1547 backends = request.config.option.backends
1541 1548 return backends[:]
1542 1549
1543 1550
1544 1551 @pytest.fixture
1545 1552 def settings_util(request):
1546 1553 """
1547 1554 Provides a wired instance of `SettingsUtility` with integrated cleanup.
1548 1555 """
1549 1556 utility = SettingsUtility()
1550 1557 request.addfinalizer(utility.cleanup)
1551 1558 return utility
1552 1559
1553 1560
1554 1561 class SettingsUtility(object):
1555 1562 def __init__(self):
1556 1563 self.rhodecode_ui_ids = []
1557 1564 self.rhodecode_setting_ids = []
1558 1565 self.repo_rhodecode_ui_ids = []
1559 1566 self.repo_rhodecode_setting_ids = []
1560 1567
1561 1568 def create_repo_rhodecode_ui(
1562 1569 self, repo, section, value, key=None, active=True, cleanup=True):
1563 1570 key = key or hashlib.sha1(
1564 1571 '{}{}{}'.format(section, value, repo.repo_id)).hexdigest()
1565 1572
1566 1573 setting = RepoRhodeCodeUi()
1567 1574 setting.repository_id = repo.repo_id
1568 1575 setting.ui_section = section
1569 1576 setting.ui_value = value
1570 1577 setting.ui_key = key
1571 1578 setting.ui_active = active
1572 1579 Session().add(setting)
1573 1580 Session().commit()
1574 1581
1575 1582 if cleanup:
1576 1583 self.repo_rhodecode_ui_ids.append(setting.ui_id)
1577 1584 return setting
1578 1585
1579 1586 def create_rhodecode_ui(
1580 1587 self, section, value, key=None, active=True, cleanup=True):
1581 1588 key = key or hashlib.sha1('{}{}'.format(section, value)).hexdigest()
1582 1589
1583 1590 setting = RhodeCodeUi()
1584 1591 setting.ui_section = section
1585 1592 setting.ui_value = value
1586 1593 setting.ui_key = key
1587 1594 setting.ui_active = active
1588 1595 Session().add(setting)
1589 1596 Session().commit()
1590 1597
1591 1598 if cleanup:
1592 1599 self.rhodecode_ui_ids.append(setting.ui_id)
1593 1600 return setting
1594 1601
1595 1602 def create_repo_rhodecode_setting(
1596 1603 self, repo, name, value, type_, cleanup=True):
1597 1604 setting = RepoRhodeCodeSetting(
1598 1605 repo.repo_id, key=name, val=value, type=type_)
1599 1606 Session().add(setting)
1600 1607 Session().commit()
1601 1608
1602 1609 if cleanup:
1603 1610 self.repo_rhodecode_setting_ids.append(setting.app_settings_id)
1604 1611 return setting
1605 1612
1606 1613 def create_rhodecode_setting(self, name, value, type_, cleanup=True):
1607 1614 setting = RhodeCodeSetting(key=name, val=value, type=type_)
1608 1615 Session().add(setting)
1609 1616 Session().commit()
1610 1617
1611 1618 if cleanup:
1612 1619 self.rhodecode_setting_ids.append(setting.app_settings_id)
1613 1620
1614 1621 return setting
1615 1622
1616 1623 def cleanup(self):
1617 1624 for id_ in self.rhodecode_ui_ids:
1618 1625 setting = RhodeCodeUi.get(id_)
1619 1626 Session().delete(setting)
1620 1627
1621 1628 for id_ in self.rhodecode_setting_ids:
1622 1629 setting = RhodeCodeSetting.get(id_)
1623 1630 Session().delete(setting)
1624 1631
1625 1632 for id_ in self.repo_rhodecode_ui_ids:
1626 1633 setting = RepoRhodeCodeUi.get(id_)
1627 1634 Session().delete(setting)
1628 1635
1629 1636 for id_ in self.repo_rhodecode_setting_ids:
1630 1637 setting = RepoRhodeCodeSetting.get(id_)
1631 1638 Session().delete(setting)
1632 1639
1633 1640 Session().commit()
1634 1641
1635 1642
1636 1643 @pytest.fixture
1637 1644 def no_notifications(request):
1638 1645 notification_patcher = mock.patch(
1639 1646 'rhodecode.model.notification.NotificationModel.create')
1640 1647 notification_patcher.start()
1641 1648 request.addfinalizer(notification_patcher.stop)
1642 1649
1643 1650
1644 1651 @pytest.fixture(scope='session')
1645 1652 def repeat(request):
1646 1653 """
1647 1654 The number of repetitions is based on this fixture.
1648 1655
1649 1656 Slower calls may divide it by 10 or 100. It is chosen in a way so that the
1650 1657 tests are not too slow in our default test suite.
1651 1658 """
1652 1659 return request.config.getoption('--repeat')
1653 1660
1654 1661
1655 1662 @pytest.fixture
1656 1663 def rhodecode_fixtures():
1657 1664 return Fixture()
1658 1665
1659 1666
1660 1667 @pytest.fixture
1661 1668 def context_stub():
1662 1669 """
1663 1670 Stub context object.
1664 1671 """
1665 1672 context = pyramid.testing.DummyResource()
1666 1673 return context
1667 1674
1668 1675
1669 1676 @pytest.fixture
1670 1677 def request_stub():
1671 1678 """
1672 1679 Stub request object.
1673 1680 """
1674 1681 from rhodecode.lib.base import bootstrap_request
1675 1682 request = bootstrap_request(scheme='https')
1676 1683 return request
1677 1684
1678 1685
1679 1686 @pytest.fixture
1680 1687 def config_stub(request, request_stub):
1681 1688 """
1682 1689 Set up pyramid.testing and return the Configurator.
1683 1690 """
1684 1691 from rhodecode.lib.base import bootstrap_config
1685 1692 config = bootstrap_config(request=request_stub)
1686 1693
1687 1694 @request.addfinalizer
1688 1695 def cleanup():
1689 1696 pyramid.testing.tearDown()
1690 1697
1691 1698 return config
1692 1699
1693 1700
1694 1701 @pytest.fixture
1695 1702 def StubIntegrationType():
1696 1703 class _StubIntegrationType(IntegrationTypeBase):
1697 1704 """ Test integration type class """
1698 1705
1699 1706 key = 'test'
1700 1707 display_name = 'Test integration type'
1701 1708 description = 'A test integration type for testing'
1702 1709 icon = 'test_icon_html_image'
1703 1710
1704 1711 def __init__(self, settings):
1705 1712 super(_StubIntegrationType, self).__init__(settings)
1706 1713 self.sent_events = [] # for testing
1707 1714
1708 1715 def send_event(self, event):
1709 1716 self.sent_events.append(event)
1710 1717
1711 1718 def settings_schema(self):
1712 1719 class SettingsSchema(colander.Schema):
1713 1720 test_string_field = colander.SchemaNode(
1714 1721 colander.String(),
1715 1722 missing=colander.required,
1716 1723 title='test string field',
1717 1724 )
1718 1725 test_int_field = colander.SchemaNode(
1719 1726 colander.Int(),
1720 1727 title='some integer setting',
1721 1728 )
1722 1729 return SettingsSchema()
1723 1730
1724 1731
1725 1732 integration_type_registry.register_integration_type(_StubIntegrationType)
1726 1733 return _StubIntegrationType
1727 1734
1728 1735 @pytest.fixture
1729 1736 def stub_integration_settings():
1730 1737 return {
1731 1738 'test_string_field': 'some data',
1732 1739 'test_int_field': 100,
1733 1740 }
1734 1741
1735 1742
1736 1743 @pytest.fixture
1737 1744 def repo_integration_stub(request, repo_stub, StubIntegrationType,
1738 1745 stub_integration_settings):
1739 1746 integration = IntegrationModel().create(
1740 1747 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1741 1748 name='test repo integration',
1742 1749 repo=repo_stub, repo_group=None, child_repos_only=None)
1743 1750
1744 1751 @request.addfinalizer
1745 1752 def cleanup():
1746 1753 IntegrationModel().delete(integration)
1747 1754
1748 1755 return integration
1749 1756
1750 1757
1751 1758 @pytest.fixture
1752 1759 def repogroup_integration_stub(request, test_repo_group, StubIntegrationType,
1753 1760 stub_integration_settings):
1754 1761 integration = IntegrationModel().create(
1755 1762 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1756 1763 name='test repogroup integration',
1757 1764 repo=None, repo_group=test_repo_group, child_repos_only=True)
1758 1765
1759 1766 @request.addfinalizer
1760 1767 def cleanup():
1761 1768 IntegrationModel().delete(integration)
1762 1769
1763 1770 return integration
1764 1771
1765 1772
1766 1773 @pytest.fixture
1767 1774 def repogroup_recursive_integration_stub(request, test_repo_group,
1768 1775 StubIntegrationType, stub_integration_settings):
1769 1776 integration = IntegrationModel().create(
1770 1777 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1771 1778 name='test recursive repogroup integration',
1772 1779 repo=None, repo_group=test_repo_group, child_repos_only=False)
1773 1780
1774 1781 @request.addfinalizer
1775 1782 def cleanup():
1776 1783 IntegrationModel().delete(integration)
1777 1784
1778 1785 return integration
1779 1786
1780 1787
1781 1788 @pytest.fixture
1782 1789 def global_integration_stub(request, StubIntegrationType,
1783 1790 stub_integration_settings):
1784 1791 integration = IntegrationModel().create(
1785 1792 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1786 1793 name='test global integration',
1787 1794 repo=None, repo_group=None, child_repos_only=None)
1788 1795
1789 1796 @request.addfinalizer
1790 1797 def cleanup():
1791 1798 IntegrationModel().delete(integration)
1792 1799
1793 1800 return integration
1794 1801
1795 1802
1796 1803 @pytest.fixture
1797 1804 def root_repos_integration_stub(request, StubIntegrationType,
1798 1805 stub_integration_settings):
1799 1806 integration = IntegrationModel().create(
1800 1807 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1801 1808 name='test global integration',
1802 1809 repo=None, repo_group=None, child_repos_only=True)
1803 1810
1804 1811 @request.addfinalizer
1805 1812 def cleanup():
1806 1813 IntegrationModel().delete(integration)
1807 1814
1808 1815 return integration
1809 1816
1810 1817
1811 1818 @pytest.fixture
1812 1819 def local_dt_to_utc():
1813 1820 def _factory(dt):
1814 1821 return dt.replace(tzinfo=dateutil.tz.tzlocal()).astimezone(
1815 1822 dateutil.tz.tzutc()).replace(tzinfo=None)
1816 1823 return _factory
1817 1824
1818 1825
1819 1826 @pytest.fixture
1820 1827 def disable_anonymous_user(request, baseapp):
1821 1828 set_anonymous_access(False)
1822 1829
1823 1830 @request.addfinalizer
1824 1831 def cleanup():
1825 1832 set_anonymous_access(True)
1826 1833
1827 1834
1828 1835 @pytest.fixture(scope='module')
1829 1836 def rc_fixture(request):
1830 1837 return Fixture()
1831 1838
1832 1839
1833 1840 @pytest.fixture
1834 1841 def repo_groups(request):
1835 1842 fixture = Fixture()
1836 1843
1837 1844 session = Session()
1838 1845 zombie_group = fixture.create_repo_group('zombie')
1839 1846 parent_group = fixture.create_repo_group('parent')
1840 1847 child_group = fixture.create_repo_group('parent/child')
1841 1848 groups_in_db = session.query(RepoGroup).all()
1842 1849 assert len(groups_in_db) == 3
1843 1850 assert child_group.group_parent_id == parent_group.group_id
1844 1851
1845 1852 @request.addfinalizer
1846 1853 def cleanup():
1847 1854 fixture.destroy_repo_group(zombie_group)
1848 1855 fixture.destroy_repo_group(child_group)
1849 1856 fixture.destroy_repo_group(parent_group)
1850 1857
1851 1858 return zombie_group, parent_group, child_group
@@ -1,372 +1,380 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import json
23 23 import platform
24 24 import socket
25 25 import subprocess32
26 26 import time
27 27 from urllib2 import urlopen, URLError
28 28
29 29 import configobj
30 30 import pytest
31 31
32 32 import pyramid.paster
33
34 from rhodecode.lib.pyramid_utils import get_app_config
33 35 from rhodecode.tests.fixture import TestINI
34 36 import rhodecode
35 37
36 38
37 39 def _parse_json(value):
38 40 return json.loads(value) if value else None
39 41
40 42
41 43 def pytest_addoption(parser):
42 44 parser.addoption(
43 45 '--test-loglevel', dest='test_loglevel',
44 46 help="Set default Logging level for tests, warn (default), info, debug")
45 47 group = parser.getgroup('pylons')
46 48 group.addoption(
47 49 '--with-pylons', dest='pyramid_config',
48 50 help="Set up a Pylons environment with the specified config file.")
49 51 group.addoption(
50 52 '--ini-config-override', action='store', type=_parse_json,
51 53 default=None, dest='pyramid_config_override', help=(
52 54 "Overrides the .ini file settings. Should be specified in JSON"
53 55 " format, e.g. '{\"section\": {\"parameter\": \"value\", ...}}'"
54 56 )
55 57 )
56 58 parser.addini(
57 59 'pyramid_config',
58 60 "Set up a Pyramid environment with the specified config file.")
59 61
60 62 vcsgroup = parser.getgroup('vcs')
61 63 vcsgroup.addoption(
62 64 '--without-vcsserver', dest='with_vcsserver', action='store_false',
63 65 help="Do not start the VCSServer in a background process.")
64 66 vcsgroup.addoption(
65 67 '--with-vcsserver-http', dest='vcsserver_config_http',
66 68 help="Start the HTTP VCSServer with the specified config file.")
67 69 vcsgroup.addoption(
68 70 '--vcsserver-protocol', dest='vcsserver_protocol',
69 71 help="Start the VCSServer with HTTP protocol support.")
70 72 vcsgroup.addoption(
71 73 '--vcsserver-config-override', action='store', type=_parse_json,
72 74 default=None, dest='vcsserver_config_override', help=(
73 75 "Overrides the .ini file settings for the VCSServer. "
74 76 "Should be specified in JSON "
75 77 "format, e.g. '{\"section\": {\"parameter\": \"value\", ...}}'"
76 78 )
77 79 )
78 80 vcsgroup.addoption(
79 81 '--vcsserver-port', action='store', type=int,
80 82 default=None, help=(
81 83 "Allows to set the port of the vcsserver. Useful when testing "
82 84 "against an already running server and random ports cause "
83 85 "trouble."))
84 86 parser.addini(
85 87 'vcsserver_config_http',
86 88 "Start the HTTP VCSServer with the specified config file.")
87 89 parser.addini(
88 90 'vcsserver_protocol',
89 91 "Start the VCSServer with HTTP protocol support.")
90 92
91 93
92 94 @pytest.fixture(scope='session')
93 95 def vcsserver(request, vcsserver_port, vcsserver_factory):
94 96 """
95 97 Session scope VCSServer.
96 98
97 99 Tests wich need the VCSServer have to rely on this fixture in order
98 100 to ensure it will be running.
99 101
100 102 For specific needs, the fixture vcsserver_factory can be used. It allows to
101 103 adjust the configuration file for the test run.
102 104
103 105 Command line args:
104 106
105 107 --without-vcsserver: Allows to switch this fixture off. You have to
106 108 manually start the server.
107 109
108 110 --vcsserver-port: Will expect the VCSServer to listen on this port.
109 111 """
110 112
111 113 if not request.config.getoption('with_vcsserver'):
112 114 return None
113 115
114 116 use_http = _use_vcs_http_server(request.config)
115 117 return vcsserver_factory(
116 118 request, use_http=use_http, vcsserver_port=vcsserver_port)
117 119
118 120
119 121 @pytest.fixture(scope='session')
120 122 def vcsserver_factory(tmpdir_factory):
121 123 """
122 124 Use this if you need a running vcsserver with a special configuration.
123 125 """
124 126
125 127 def factory(request, use_http=True, overrides=(), vcsserver_port=None):
126 128
127 129 if vcsserver_port is None:
128 130 vcsserver_port = get_available_port()
129 131
130 132 overrides = list(overrides)
131 133 if use_http:
132 134 overrides.append({'server:main': {'port': vcsserver_port}})
133 135 else:
134 136 overrides.append({'DEFAULT': {'port': vcsserver_port}})
135 137
136 138 if is_cygwin():
137 139 platform_override = {'DEFAULT': {
138 140 'beaker.cache.repo_object.type': 'nocache'}}
139 141 overrides.append(platform_override)
140 142
141 143 option_name = 'vcsserver_config_http' if use_http else ''
142 144 override_option_name = 'vcsserver_config_override'
143 145 config_file = get_config(
144 146 request.config, option_name=option_name,
145 147 override_option_name=override_option_name, overrides=overrides,
146 148 basetemp=tmpdir_factory.getbasetemp().strpath,
147 149 prefix='test_vcs_')
148 150
149 151 print("Using the VCSServer configuration:{}".format(config_file))
150 152 ServerClass = HttpVCSServer if use_http else None
151 153 server = ServerClass(config_file)
152 154 server.start()
153 155
154 156 @request.addfinalizer
155 157 def cleanup():
156 158 server.shutdown()
157 159
158 160 server.wait_until_ready()
159 161 return server
160 162
161 163 return factory
162 164
163 165
164 166 def is_cygwin():
165 167 return 'cygwin' in platform.system().lower()
166 168
167 169
168 170 def _use_vcs_http_server(config):
169 171 protocol_option = 'vcsserver_protocol'
170 172 protocol = (
171 173 config.getoption(protocol_option) or
172 174 config.getini(protocol_option) or
173 175 'http')
174 176 return protocol == 'http'
175 177
176 178
177 179 def _use_log_level(config):
178 180 level = config.getoption('test_loglevel') or 'warn'
179 181 return level.upper()
180 182
181 183
182 184 class VCSServer(object):
183 185 """
184 186 Represents a running VCSServer instance.
185 187 """
186 188
187 189 _args = []
188 190
189 191 def start(self):
190 192 print("Starting the VCSServer: {}".format(self._args))
191 193 self.process = subprocess32.Popen(self._args)
192 194
193 195 def wait_until_ready(self, timeout=30):
194 196 raise NotImplementedError()
195 197
196 198 def shutdown(self):
197 199 self.process.kill()
198 200
199 201
200 202 class HttpVCSServer(VCSServer):
201 203 """
202 204 Represents a running VCSServer instance.
203 205 """
204 206 def __init__(self, config_file):
205 207 config_data = configobj.ConfigObj(config_file)
206 208 self._config = config_data['server:main']
207 209
208 210 args = ['pserve', config_file]
209 211 self._args = args
210 212
211 213 @property
212 214 def http_url(self):
213 215 template = 'http://{host}:{port}/'
214 216 return template.format(**self._config)
215 217
216 218 def start(self):
217 219 self.process = subprocess32.Popen(self._args)
218 220
219 221 def wait_until_ready(self, timeout=30):
220 222 host = self._config['host']
221 223 port = self._config['port']
222 224 status_url = 'http://{host}:{port}/status'.format(host=host, port=port)
223 225 start = time.time()
224 226
225 227 while time.time() - start < timeout:
226 228 try:
227 229 urlopen(status_url)
228 230 break
229 231 except URLError:
230 232 time.sleep(0.2)
231 233 else:
232 234 pytest.exit(
233 235 "Starting the VCSServer failed or took more than {} "
234 236 "seconds. cmd: `{}`".format(timeout, ' '.join(self._args)))
235 237
236 238 def shutdown(self):
237 239 self.process.kill()
238 240
239 241
240 242 @pytest.fixture(scope='session')
241 243 def ini_config(request, tmpdir_factory, rcserver_port, vcsserver_port):
242 244 option_name = 'pyramid_config'
243 245 log_level = _use_log_level(request.config)
244 246
245 247 overrides = [
246 248 {'server:main': {'port': rcserver_port}},
247 249 {'app:main': {
248 250 'vcs.server': 'localhost:%s' % vcsserver_port,
249 251 # johbo: We will always start the VCSServer on our own based on the
250 252 # fixtures of the test cases. For the test run it must always be
251 253 # off in the INI file.
252 254 'vcs.start_server': 'false',
253 255 }},
254 256
255 257 {'handler_console': {
256 258 'class ': 'StreamHandler',
257 259 'args ': '(sys.stderr,)',
258 260 'level': log_level,
259 261 }},
260 262
261 263 ]
262 264 if _use_vcs_http_server(request.config):
263 265 overrides.append({
264 266 'app:main': {
265 267 'vcs.server.protocol': 'http',
266 268 'vcs.scm_app_implementation': 'http',
267 269 'vcs.hooks.protocol': 'http',
268 270 }
269 271 })
270 272
271 273 filename = get_config(
272 274 request.config, option_name=option_name,
273 275 override_option_name='{}_override'.format(option_name),
274 276 overrides=overrides,
275 277 basetemp=tmpdir_factory.getbasetemp().strpath,
276 278 prefix='test_rce_')
277 279 return filename
278 280
279 281
280 282 @pytest.fixture(scope='session')
283 def ini_settings(ini_config):
284 ini_path = ini_config
285 return get_app_config(ini_path)
286
287
288 @pytest.fixture(scope='session')
281 289 def rcserver_port(request):
282 290 port = get_available_port()
283 291 print('Using rcserver port {}'.format(port))
284 292 return port
285 293
286 294
287 295 @pytest.fixture(scope='session')
288 296 def vcsserver_port(request):
289 297 port = request.config.getoption('--vcsserver-port')
290 298 if port is None:
291 299 port = get_available_port()
292 300 print('Using vcsserver port {}'.format(port))
293 301 return port
294 302
295 303
296 304 def get_available_port():
297 305 family = socket.AF_INET
298 306 socktype = socket.SOCK_STREAM
299 307 host = '127.0.0.1'
300 308
301 309 mysocket = socket.socket(family, socktype)
302 310 mysocket.bind((host, 0))
303 311 port = mysocket.getsockname()[1]
304 312 mysocket.close()
305 313 del mysocket
306 314 return port
307 315
308 316
309 317 @pytest.fixture(scope='session')
310 318 def available_port_factory():
311 319 """
312 320 Returns a callable which returns free port numbers.
313 321 """
314 322 return get_available_port
315 323
316 324
317 325 @pytest.fixture
318 326 def available_port(available_port_factory):
319 327 """
320 328 Gives you one free port for the current test.
321 329
322 330 Uses "available_port_factory" to retrieve the port.
323 331 """
324 332 return available_port_factory()
325 333
326 334
327 335 @pytest.fixture(scope='session')
328 336 def testini_factory(tmpdir_factory, ini_config):
329 337 """
330 338 Factory to create an INI file based on TestINI.
331 339
332 340 It will make sure to place the INI file in the correct directory.
333 341 """
334 342 basetemp = tmpdir_factory.getbasetemp().strpath
335 343 return TestIniFactory(basetemp, ini_config)
336 344
337 345
338 346 class TestIniFactory(object):
339 347
340 348 def __init__(self, basetemp, template_ini):
341 349 self._basetemp = basetemp
342 350 self._template_ini = template_ini
343 351
344 352 def __call__(self, ini_params, new_file_prefix='test'):
345 353 ini_file = TestINI(
346 354 self._template_ini, ini_params=ini_params,
347 355 new_file_prefix=new_file_prefix, dir=self._basetemp)
348 356 result = ini_file.create()
349 357 return result
350 358
351 359
352 360 def get_config(
353 361 config, option_name, override_option_name, overrides=None,
354 362 basetemp=None, prefix='test'):
355 363 """
356 364 Find a configuration file and apply overrides for the given `prefix`.
357 365 """
358 366 config_file = (
359 367 config.getoption(option_name) or config.getini(option_name))
360 368 if not config_file:
361 369 pytest.exit(
362 370 "Configuration error, could not extract {}.".format(option_name))
363 371
364 372 overrides = overrides or []
365 373 config_override = config.getoption(override_option_name)
366 374 if config_override:
367 375 overrides.append(config_override)
368 376 temp_ini_file = TestINI(
369 377 config_file, ini_params=overrides, new_file_prefix=prefix,
370 378 dir=basetemp)
371 379
372 380 return temp_ini_file.create()
General Comments 0
You need to be logged in to leave comments. Login now