##// END OF EJS Templates
tests: Add vcsbackend_stub as a new fixture...
johbo -
r771:b63d1121 default
parent child Browse files
Show More
@@ -1,1750 +1,1760 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import collections
22 22 import datetime
23 23 import hashlib
24 24 import os
25 25 import re
26 26 import pprint
27 27 import shutil
28 28 import socket
29 29 import subprocess
30 30 import time
31 31 import uuid
32 32
33 33 import mock
34 34 import pyramid.testing
35 35 import pytest
36 36 import colander
37 37 import requests
38 38 from webtest.app import TestApp
39 39
40 40 import rhodecode
41 41 from rhodecode.model.changeset_status import ChangesetStatusModel
42 42 from rhodecode.model.comment import ChangesetCommentsModel
43 43 from rhodecode.model.db import (
44 44 PullRequest, Repository, RhodeCodeSetting, ChangesetStatus, RepoGroup,
45 45 UserGroup, RepoRhodeCodeUi, RepoRhodeCodeSetting, RhodeCodeUi, Integration)
46 46 from rhodecode.model.meta import Session
47 47 from rhodecode.model.pull_request import PullRequestModel
48 48 from rhodecode.model.repo import RepoModel
49 49 from rhodecode.model.repo_group import RepoGroupModel
50 50 from rhodecode.model.user import UserModel
51 51 from rhodecode.model.settings import VcsSettingsModel
52 52 from rhodecode.model.user_group import UserGroupModel
53 53 from rhodecode.model.integration import IntegrationModel
54 54 from rhodecode.integrations import integration_type_registry
55 55 from rhodecode.integrations.types.base import IntegrationTypeBase
56 56 from rhodecode.lib.utils import repo2db_mapper
57 57 from rhodecode.lib.vcs import create_vcsserver_proxy
58 58 from rhodecode.lib.vcs.backends import get_backend
59 59 from rhodecode.lib.vcs.nodes import FileNode
60 60 from rhodecode.tests import (
61 61 login_user_session, get_new_dir, utils, TESTS_TMP_PATH,
62 62 TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR2_LOGIN,
63 63 TEST_USER_REGULAR_PASS)
64 64 from rhodecode.tests.fixture import Fixture
65 65
66 66
67 67 def _split_comma(value):
68 68 return value.split(',')
69 69
70 70
71 71 def pytest_addoption(parser):
72 72 parser.addoption(
73 73 '--keep-tmp-path', action='store_true',
74 74 help="Keep the test temporary directories")
75 75 parser.addoption(
76 76 '--backends', action='store', type=_split_comma,
77 77 default=['git', 'hg', 'svn'],
78 78 help="Select which backends to test for backend specific tests.")
79 79 parser.addoption(
80 80 '--dbs', action='store', type=_split_comma,
81 81 default=['sqlite'],
82 82 help="Select which database to test for database specific tests. "
83 83 "Possible options are sqlite,postgres,mysql")
84 84 parser.addoption(
85 85 '--appenlight', '--ae', action='store_true',
86 86 help="Track statistics in appenlight.")
87 87 parser.addoption(
88 88 '--appenlight-api-key', '--ae-key',
89 89 help="API key for Appenlight.")
90 90 parser.addoption(
91 91 '--appenlight-url', '--ae-url',
92 92 default="https://ae.rhodecode.com",
93 93 help="Appenlight service URL, defaults to https://ae.rhodecode.com")
94 94 parser.addoption(
95 95 '--sqlite-connection-string', action='store',
96 96 default='', help="Connection string for the dbs tests with SQLite")
97 97 parser.addoption(
98 98 '--postgres-connection-string', action='store',
99 99 default='', help="Connection string for the dbs tests with Postgres")
100 100 parser.addoption(
101 101 '--mysql-connection-string', action='store',
102 102 default='', help="Connection string for the dbs tests with MySQL")
103 103 parser.addoption(
104 104 '--repeat', type=int, default=100,
105 105 help="Number of repetitions in performance tests.")
106 106
107 107
108 108 def pytest_configure(config):
109 109 # Appy the kombu patch early on, needed for test discovery on Python 2.7.11
110 110 from rhodecode.config import patches
111 111 patches.kombu_1_5_1_python_2_7_11()
112 112
113 113
114 114 def pytest_collection_modifyitems(session, config, items):
115 115 # nottest marked, compare nose, used for transition from nose to pytest
116 116 remaining = [
117 117 i for i in items if getattr(i.obj, '__test__', True)]
118 118 items[:] = remaining
119 119
120 120
121 121 def pytest_generate_tests(metafunc):
122 122 # Support test generation based on --backend parameter
123 123 if 'backend_alias' in metafunc.fixturenames:
124 124 backends = get_backends_from_metafunc(metafunc)
125 125 scope = None
126 126 if not backends:
127 127 pytest.skip("Not enabled for any of selected backends")
128 128 metafunc.parametrize('backend_alias', backends, scope=scope)
129 129 elif hasattr(metafunc.function, 'backends'):
130 130 backends = get_backends_from_metafunc(metafunc)
131 131 if not backends:
132 132 pytest.skip("Not enabled for any of selected backends")
133 133
134 134
135 135 def get_backends_from_metafunc(metafunc):
136 136 requested_backends = set(metafunc.config.getoption('--backends'))
137 137 if hasattr(metafunc.function, 'backends'):
138 138 # Supported backends by this test function, created from
139 139 # pytest.mark.backends
140 140 backends = metafunc.function.backends.args
141 141 elif hasattr(metafunc.cls, 'backend_alias'):
142 142 # Support class attribute "backend_alias", this is mainly
143 143 # for legacy reasons for tests not yet using pytest.mark.backends
144 144 backends = [metafunc.cls.backend_alias]
145 145 else:
146 146 backends = metafunc.config.getoption('--backends')
147 147 return requested_backends.intersection(backends)
148 148
149 149
150 150 @pytest.fixture(scope='session', autouse=True)
151 151 def activate_example_rcextensions(request):
152 152 """
153 153 Patch in an example rcextensions module which verifies passed in kwargs.
154 154 """
155 155 from rhodecode.tests.other import example_rcextensions
156 156
157 157 old_extensions = rhodecode.EXTENSIONS
158 158 rhodecode.EXTENSIONS = example_rcextensions
159 159
160 160 @request.addfinalizer
161 161 def cleanup():
162 162 rhodecode.EXTENSIONS = old_extensions
163 163
164 164
165 165 @pytest.fixture
166 166 def capture_rcextensions():
167 167 """
168 168 Returns the recorded calls to entry points in rcextensions.
169 169 """
170 170 calls = rhodecode.EXTENSIONS.calls
171 171 calls.clear()
172 172 # Note: At this moment, it is still the empty dict, but that will
173 173 # be filled during the test run and since it is a reference this
174 174 # is enough to make it work.
175 175 return calls
176 176
177 177
178 178 @pytest.fixture(scope='session')
179 179 def http_environ_session():
180 180 """
181 181 Allow to use "http_environ" in session scope.
182 182 """
183 183 return http_environ(
184 184 http_host_stub=http_host_stub())
185 185
186 186
187 187 @pytest.fixture
188 188 def http_host_stub():
189 189 """
190 190 Value of HTTP_HOST in the test run.
191 191 """
192 192 return 'test.example.com:80'
193 193
194 194
195 195 @pytest.fixture
196 196 def http_environ(http_host_stub):
197 197 """
198 198 HTTP extra environ keys.
199 199
200 200 User by the test application and as well for setting up the pylons
201 201 environment. In the case of the fixture "app" it should be possible
202 202 to override this for a specific test case.
203 203 """
204 204 return {
205 205 'SERVER_NAME': http_host_stub.split(':')[0],
206 206 'SERVER_PORT': http_host_stub.split(':')[1],
207 207 'HTTP_HOST': http_host_stub,
208 208 }
209 209
210 210
211 211 @pytest.fixture(scope='function')
212 212 def app(request, pylonsapp, http_environ):
213 213 app = TestApp(
214 214 pylonsapp,
215 215 extra_environ=http_environ)
216 216 if request.cls:
217 217 request.cls.app = app
218 218 return app
219 219
220 220
221 221 @pytest.fixture()
222 222 def app_settings(pylonsapp, pylons_config):
223 223 """
224 224 Settings dictionary used to create the app.
225 225
226 226 Parses the ini file and passes the result through the sanitize and apply
227 227 defaults mechanism in `rhodecode.config.middleware`.
228 228 """
229 229 from paste.deploy.loadwsgi import loadcontext, APP
230 230 from rhodecode.config.middleware import (
231 231 sanitize_settings_and_apply_defaults)
232 232 context = loadcontext(APP, 'config:' + pylons_config)
233 233 settings = sanitize_settings_and_apply_defaults(context.config())
234 234 return settings
235 235
236 236
237 237 LoginData = collections.namedtuple('LoginData', ('csrf_token', 'user'))
238 238
239 239
240 240 def _autologin_user(app, *args):
241 241 session = login_user_session(app, *args)
242 242 csrf_token = rhodecode.lib.auth.get_csrf_token(session)
243 243 return LoginData(csrf_token, session['rhodecode_user'])
244 244
245 245
246 246 @pytest.fixture
247 247 def autologin_user(app):
248 248 """
249 249 Utility fixture which makes sure that the admin user is logged in
250 250 """
251 251 return _autologin_user(app)
252 252
253 253
254 254 @pytest.fixture
255 255 def autologin_regular_user(app):
256 256 """
257 257 Utility fixture which makes sure that the regular user is logged in
258 258 """
259 259 return _autologin_user(
260 260 app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
261 261
262 262
263 263 @pytest.fixture(scope='function')
264 264 def csrf_token(request, autologin_user):
265 265 return autologin_user.csrf_token
266 266
267 267
268 268 @pytest.fixture(scope='function')
269 269 def xhr_header(request):
270 270 return {'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'}
271 271
272 272
273 273 @pytest.fixture
274 274 def real_crypto_backend(monkeypatch):
275 275 """
276 276 Switch the production crypto backend on for this test.
277 277
278 278 During the test run the crypto backend is replaced with a faster
279 279 implementation based on the MD5 algorithm.
280 280 """
281 281 monkeypatch.setattr(rhodecode, 'is_test', False)
282 282
283 283
284 284 @pytest.fixture(scope='class')
285 285 def index_location(request, pylonsapp):
286 286 index_location = pylonsapp.config['app_conf']['search.location']
287 287 if request.cls:
288 288 request.cls.index_location = index_location
289 289 return index_location
290 290
291 291
292 292 @pytest.fixture(scope='session', autouse=True)
293 293 def tests_tmp_path(request):
294 294 """
295 295 Create temporary directory to be used during the test session.
296 296 """
297 297 if not os.path.exists(TESTS_TMP_PATH):
298 298 os.makedirs(TESTS_TMP_PATH)
299 299
300 300 if not request.config.getoption('--keep-tmp-path'):
301 301 @request.addfinalizer
302 302 def remove_tmp_path():
303 303 shutil.rmtree(TESTS_TMP_PATH)
304 304
305 305 return TESTS_TMP_PATH
306 306
307 307
308 308 @pytest.fixture(scope='session', autouse=True)
309 309 def patch_pyro_request_scope_proxy_factory(request):
310 310 """
311 311 Patch the pyro proxy factory to always use the same dummy request object
312 312 when under test. This will return the same pyro proxy on every call.
313 313 """
314 314 dummy_request = pyramid.testing.DummyRequest()
315 315
316 316 def mocked_call(self, request=None):
317 317 return self.getProxy(request=dummy_request)
318 318
319 319 patcher = mock.patch(
320 320 'rhodecode.lib.vcs.client.RequestScopeProxyFactory.__call__',
321 321 new=mocked_call)
322 322 patcher.start()
323 323
324 324 @request.addfinalizer
325 325 def undo_patching():
326 326 patcher.stop()
327 327
328 328
329 329 @pytest.fixture
330 330 def test_repo_group(request):
331 331 """
332 332 Create a temporary repository group, and destroy it after
333 333 usage automatically
334 334 """
335 335 fixture = Fixture()
336 336 repogroupid = 'test_repo_group_%s' % int(time.time())
337 337 repo_group = fixture.create_repo_group(repogroupid)
338 338
339 339 def _cleanup():
340 340 fixture.destroy_repo_group(repogroupid)
341 341
342 342 request.addfinalizer(_cleanup)
343 343 return repo_group
344 344
345 345
346 346 @pytest.fixture
347 347 def test_user_group(request):
348 348 """
349 349 Create a temporary user group, and destroy it after
350 350 usage automatically
351 351 """
352 352 fixture = Fixture()
353 353 usergroupid = 'test_user_group_%s' % int(time.time())
354 354 user_group = fixture.create_user_group(usergroupid)
355 355
356 356 def _cleanup():
357 357 fixture.destroy_user_group(user_group)
358 358
359 359 request.addfinalizer(_cleanup)
360 360 return user_group
361 361
362 362
363 363 @pytest.fixture(scope='session')
364 364 def test_repo(request):
365 365 container = TestRepoContainer()
366 366 request.addfinalizer(container._cleanup)
367 367 return container
368 368
369 369
370 370 class TestRepoContainer(object):
371 371 """
372 372 Container for test repositories which are used read only.
373 373
374 374 Repositories will be created on demand and re-used during the lifetime
375 375 of this object.
376 376
377 377 Usage to get the svn test repository "minimal"::
378 378
379 379 test_repo = TestContainer()
380 380 repo = test_repo('minimal', 'svn')
381 381
382 382 """
383 383
384 384 dump_extractors = {
385 385 'git': utils.extract_git_repo_from_dump,
386 386 'hg': utils.extract_hg_repo_from_dump,
387 387 'svn': utils.extract_svn_repo_from_dump,
388 388 }
389 389
390 390 def __init__(self):
391 391 self._cleanup_repos = []
392 392 self._fixture = Fixture()
393 393 self._repos = {}
394 394
395 395 def __call__(self, dump_name, backend_alias):
396 396 key = (dump_name, backend_alias)
397 397 if key not in self._repos:
398 398 repo = self._create_repo(dump_name, backend_alias)
399 399 self._repos[key] = repo.repo_id
400 400 return Repository.get(self._repos[key])
401 401
402 402 def _create_repo(self, dump_name, backend_alias):
403 403 repo_name = '%s-%s' % (backend_alias, dump_name)
404 404 backend_class = get_backend(backend_alias)
405 405 dump_extractor = self.dump_extractors[backend_alias]
406 406 repo_path = dump_extractor(dump_name, repo_name)
407 407 vcs_repo = backend_class(repo_path)
408 408 repo2db_mapper({repo_name: vcs_repo})
409 409 repo = RepoModel().get_by_repo_name(repo_name)
410 410 self._cleanup_repos.append(repo_name)
411 411 return repo
412 412
413 413 def _cleanup(self):
414 414 for repo_name in reversed(self._cleanup_repos):
415 415 self._fixture.destroy_repo(repo_name)
416 416
417 417
418 418 @pytest.fixture
419 419 def backend(request, backend_alias, pylonsapp, test_repo):
420 420 """
421 421 Parametrized fixture which represents a single backend implementation.
422 422
423 423 It respects the option `--backends` to focus the test run on specific
424 424 backend implementations.
425 425
426 426 It also supports `pytest.mark.xfail_backends` to mark tests as failing
427 427 for specific backends. This is intended as a utility for incremental
428 428 development of a new backend implementation.
429 429 """
430 430 if backend_alias not in request.config.getoption('--backends'):
431 431 pytest.skip("Backend %s not selected." % (backend_alias, ))
432 432
433 433 utils.check_xfail_backends(request.node, backend_alias)
434 434 utils.check_skip_backends(request.node, backend_alias)
435 435
436 436 repo_name = 'vcs_test_%s' % (backend_alias, )
437 437 backend = Backend(
438 438 alias=backend_alias,
439 439 repo_name=repo_name,
440 440 test_name=request.node.name,
441 441 test_repo_container=test_repo)
442 442 request.addfinalizer(backend.cleanup)
443 443 return backend
444 444
445 445
446 446 @pytest.fixture
447 447 def backend_git(request, pylonsapp, test_repo):
448 448 return backend(request, 'git', pylonsapp, test_repo)
449 449
450 450
451 451 @pytest.fixture
452 452 def backend_hg(request, pylonsapp, test_repo):
453 453 return backend(request, 'hg', pylonsapp, test_repo)
454 454
455 455
456 456 @pytest.fixture
457 457 def backend_svn(request, pylonsapp, test_repo):
458 458 return backend(request, 'svn', pylonsapp, test_repo)
459 459
460 460
461 461 @pytest.fixture
462 462 def backend_random(backend_git):
463 463 """
464 464 Use this to express that your tests need "a backend.
465 465
466 466 A few of our tests need a backend, so that we can run the code. This
467 467 fixture is intended to be used for such cases. It will pick one of the
468 468 backends and run the tests.
469 469
470 470 The fixture `backend` would run the test multiple times for each
471 471 available backend which is a pure waste of time if the test is
472 472 independent of the backend type.
473 473 """
474 474 # TODO: johbo: Change this to pick a random backend
475 475 return backend_git
476 476
477 477
478 478 @pytest.fixture
479 479 def backend_stub(backend_git):
480 480 """
481 481 Use this to express that your tests need a backend stub
482 482
483 483 TODO: mikhail: Implement a real stub logic instead of returning
484 484 a git backend
485 485 """
486 486 return backend_git
487 487
488 488
489 489 @pytest.fixture
490 490 def repo_stub(backend_stub):
491 491 """
492 492 Use this to express that your tests need a repository stub
493 493 """
494 494 return backend_stub.create_repo()
495 495
496 496
497 497 class Backend(object):
498 498 """
499 499 Represents the test configuration for one supported backend
500 500
501 501 Provides easy access to different test repositories based on
502 502 `__getitem__`. Such repositories will only be created once per test
503 503 session.
504 504 """
505 505
506 506 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
507 507 _master_repo = None
508 508 _commit_ids = {}
509 509
510 510 def __init__(self, alias, repo_name, test_name, test_repo_container):
511 511 self.alias = alias
512 512 self.repo_name = repo_name
513 513 self._cleanup_repos = []
514 514 self._test_name = test_name
515 515 self._test_repo_container = test_repo_container
516 516 # TODO: johbo: Used as a delegate interim. Not yet sure if Backend or
517 517 # Fixture will survive in the end.
518 518 self._fixture = Fixture()
519 519
520 520 def __getitem__(self, key):
521 521 return self._test_repo_container(key, self.alias)
522 522
523 523 @property
524 524 def repo(self):
525 525 """
526 526 Returns the "current" repository. This is the vcs_test repo or the
527 527 last repo which has been created with `create_repo`.
528 528 """
529 529 from rhodecode.model.db import Repository
530 530 return Repository.get_by_repo_name(self.repo_name)
531 531
532 532 @property
533 533 def default_branch_name(self):
534 534 VcsRepository = get_backend(self.alias)
535 535 return VcsRepository.DEFAULT_BRANCH_NAME
536 536
537 537 @property
538 538 def default_head_id(self):
539 539 """
540 540 Returns the default head id of the underlying backend.
541 541
542 542 This will be the default branch name in case the backend does have a
543 543 default branch. In the other cases it will point to a valid head
544 544 which can serve as the base to create a new commit on top of it.
545 545 """
546 546 vcsrepo = self.repo.scm_instance()
547 547 head_id = (
548 548 vcsrepo.DEFAULT_BRANCH_NAME or
549 549 vcsrepo.commit_ids[-1])
550 550 return head_id
551 551
552 552 @property
553 553 def commit_ids(self):
554 554 """
555 555 Returns the list of commits for the last created repository
556 556 """
557 557 return self._commit_ids
558 558
559 559 def create_master_repo(self, commits):
560 560 """
561 561 Create a repository and remember it as a template.
562 562
563 563 This allows to easily create derived repositories to construct
564 564 more complex scenarios for diff, compare and pull requests.
565 565
566 566 Returns a commit map which maps from commit message to raw_id.
567 567 """
568 568 self._master_repo = self.create_repo(commits=commits)
569 569 return self._commit_ids
570 570
571 571 def create_repo(
572 572 self, commits=None, number_of_commits=0, heads=None,
573 573 name_suffix=u'', **kwargs):
574 574 """
575 575 Create a repository and record it for later cleanup.
576 576
577 577 :param commits: Optional. A sequence of dict instances.
578 578 Will add a commit per entry to the new repository.
579 579 :param number_of_commits: Optional. If set to a number, this number of
580 580 commits will be added to the new repository.
581 581 :param heads: Optional. Can be set to a sequence of of commit
582 582 names which shall be pulled in from the master repository.
583 583
584 584 """
585 585 self.repo_name = self._next_repo_name() + name_suffix
586 586 repo = self._fixture.create_repo(
587 587 self.repo_name, repo_type=self.alias, **kwargs)
588 588 self._cleanup_repos.append(repo.repo_name)
589 589
590 590 commits = commits or [
591 591 {'message': 'Commit %s of %s' % (x, self.repo_name)}
592 592 for x in xrange(number_of_commits)]
593 593 self._add_commits_to_repo(repo.scm_instance(), commits)
594 594 if heads:
595 595 self.pull_heads(repo, heads)
596 596
597 597 return repo
598 598
599 599 def pull_heads(self, repo, heads):
600 600 """
601 601 Make sure that repo contains all commits mentioned in `heads`
602 602 """
603 603 vcsmaster = self._master_repo.scm_instance()
604 604 vcsrepo = repo.scm_instance()
605 605 vcsrepo.config.clear_section('hooks')
606 606 commit_ids = [self._commit_ids[h] for h in heads]
607 607 vcsrepo.pull(vcsmaster.path, commit_ids=commit_ids)
608 608
609 609 def create_fork(self):
610 610 repo_to_fork = self.repo_name
611 611 self.repo_name = self._next_repo_name()
612 612 repo = self._fixture.create_fork(repo_to_fork, self.repo_name)
613 613 self._cleanup_repos.append(self.repo_name)
614 614 return repo
615 615
616 616 def new_repo_name(self, suffix=u''):
617 617 self.repo_name = self._next_repo_name() + suffix
618 618 self._cleanup_repos.append(self.repo_name)
619 619 return self.repo_name
620 620
621 621 def _next_repo_name(self):
622 622 return u"%s_%s" % (
623 623 self.invalid_repo_name.sub(u'_', self._test_name),
624 624 len(self._cleanup_repos))
625 625
626 626 def ensure_file(self, filename, content='Test content\n'):
627 627 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
628 628 commits = [
629 629 {'added': [
630 630 FileNode(filename, content=content),
631 631 ]},
632 632 ]
633 633 self._add_commits_to_repo(self.repo.scm_instance(), commits)
634 634
635 635 def enable_downloads(self):
636 636 repo = self.repo
637 637 repo.enable_downloads = True
638 638 Session().add(repo)
639 639 Session().commit()
640 640
641 641 def cleanup(self):
642 642 for repo_name in reversed(self._cleanup_repos):
643 643 self._fixture.destroy_repo(repo_name)
644 644
645 645 def _add_commits_to_repo(self, repo, commits):
646 646 commit_ids = _add_commits_to_repo(repo, commits)
647 647 if not commit_ids:
648 648 return
649 649 self._commit_ids = commit_ids
650 650
651 651 # Creating refs for Git to allow fetching them from remote repository
652 652 if self.alias == 'git':
653 653 refs = {}
654 654 for message in self._commit_ids:
655 655 # TODO: mikhail: do more special chars replacements
656 656 ref_name = 'refs/test-refs/{}'.format(
657 657 message.replace(' ', ''))
658 658 refs[ref_name] = self._commit_ids[message]
659 659 self._create_refs(repo, refs)
660 660
661 661 def _create_refs(self, repo, refs):
662 662 for ref_name in refs:
663 663 repo.set_refs(ref_name, refs[ref_name])
664 664
665 665
666 666 @pytest.fixture
667 667 def vcsbackend(request, backend_alias, tests_tmp_path, pylonsapp, test_repo):
668 668 """
669 669 Parametrized fixture which represents a single vcs backend implementation.
670 670
671 671 See the fixture `backend` for more details. This one implements the same
672 672 concept, but on vcs level. So it does not provide model instances etc.
673 673
674 674 Parameters are generated dynamically, see :func:`pytest_generate_tests`
675 675 for how this works.
676 676 """
677 677 if backend_alias not in request.config.getoption('--backends'):
678 678 pytest.skip("Backend %s not selected." % (backend_alias, ))
679 679
680 680 utils.check_xfail_backends(request.node, backend_alias)
681 681 utils.check_skip_backends(request.node, backend_alias)
682 682
683 683 repo_name = 'vcs_test_%s' % (backend_alias, )
684 684 repo_path = os.path.join(tests_tmp_path, repo_name)
685 685 backend = VcsBackend(
686 686 alias=backend_alias,
687 687 repo_path=repo_path,
688 688 test_name=request.node.name,
689 689 test_repo_container=test_repo)
690 690 request.addfinalizer(backend.cleanup)
691 691 return backend
692 692
693 693
694 694 @pytest.fixture
695 695 def vcsbackend_git(request, tests_tmp_path, pylonsapp, test_repo):
696 696 return vcsbackend(request, 'git', tests_tmp_path, pylonsapp, test_repo)
697 697
698 698
699 699 @pytest.fixture
700 700 def vcsbackend_hg(request, tests_tmp_path, pylonsapp, test_repo):
701 701 return vcsbackend(request, 'hg', tests_tmp_path, pylonsapp, test_repo)
702 702
703 703
704 704 @pytest.fixture
705 705 def vcsbackend_svn(request, tests_tmp_path, pylonsapp, test_repo):
706 706 return vcsbackend(request, 'svn', tests_tmp_path, pylonsapp, test_repo)
707 707
708 708
709 709 @pytest.fixture
710 710 def vcsbackend_random(vcsbackend_git):
711 711 """
712 712 Use this to express that your tests need "a vcsbackend".
713 713
714 714 The fixture `vcsbackend` would run the test multiple times for each
715 715 available vcs backend which is a pure waste of time if the test is
716 716 independent of the vcs backend type.
717 717 """
718 718 # TODO: johbo: Change this to pick a random backend
719 719 return vcsbackend_git
720 720
721 721
722 @pytest.fixture
723 def vcsbackend_stub(vcsbackend_git):
724 """
725 Use this to express that your test just needs a stub of a vcsbackend.
726
727 Plan is to eventually implement an in-memory stub to speed tests up.
728 """
729 return vcsbackend_git
730
731
722 732 class VcsBackend(object):
723 733 """
724 734 Represents the test configuration for one supported vcs backend.
725 735 """
726 736
727 737 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
728 738
729 739 def __init__(self, alias, repo_path, test_name, test_repo_container):
730 740 self.alias = alias
731 741 self._repo_path = repo_path
732 742 self._cleanup_repos = []
733 743 self._test_name = test_name
734 744 self._test_repo_container = test_repo_container
735 745
736 746 def __getitem__(self, key):
737 747 return self._test_repo_container(key, self.alias).scm_instance()
738 748
739 749 @property
740 750 def repo(self):
741 751 """
742 752 Returns the "current" repository. This is the vcs_test repo of the last
743 753 repo which has been created.
744 754 """
745 755 Repository = get_backend(self.alias)
746 756 return Repository(self._repo_path)
747 757
748 758 @property
749 759 def backend(self):
750 760 """
751 761 Returns the backend implementation class.
752 762 """
753 763 return get_backend(self.alias)
754 764
755 765 def create_repo(self, commits=None, number_of_commits=0, _clone_repo=None):
756 766 repo_name = self._next_repo_name()
757 767 self._repo_path = get_new_dir(repo_name)
758 768 repo_class = get_backend(self.alias)
759 769 src_url = None
760 770 if _clone_repo:
761 771 src_url = _clone_repo.path
762 772 repo = repo_class(self._repo_path, create=True, src_url=src_url)
763 773 self._cleanup_repos.append(repo)
764 774
765 775 commits = commits or [
766 776 {'message': 'Commit %s of %s' % (x, repo_name)}
767 777 for x in xrange(number_of_commits)]
768 778 _add_commits_to_repo(repo, commits)
769 779 return repo
770 780
771 781 def clone_repo(self, repo):
772 782 return self.create_repo(_clone_repo=repo)
773 783
774 784 def cleanup(self):
775 785 for repo in self._cleanup_repos:
776 786 shutil.rmtree(repo.path)
777 787
778 788 def new_repo_path(self):
779 789 repo_name = self._next_repo_name()
780 790 self._repo_path = get_new_dir(repo_name)
781 791 return self._repo_path
782 792
783 793 def _next_repo_name(self):
784 794 return "%s_%s" % (
785 795 self.invalid_repo_name.sub('_', self._test_name),
786 796 len(self._cleanup_repos))
787 797
788 798 def add_file(self, repo, filename, content='Test content\n'):
789 799 imc = repo.in_memory_commit
790 800 imc.add(FileNode(filename, content=content))
791 801 imc.commit(
792 802 message=u'Automatic commit from vcsbackend fixture',
793 803 author=u'Automatic')
794 804
795 805 def ensure_file(self, filename, content='Test content\n'):
796 806 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
797 807 self.add_file(self.repo, filename, content)
798 808
799 809
800 810 def _add_commits_to_repo(vcs_repo, commits):
801 811 commit_ids = {}
802 812 if not commits:
803 813 return commit_ids
804 814
805 815 imc = vcs_repo.in_memory_commit
806 816 commit = None
807 817
808 818 for idx, commit in enumerate(commits):
809 819 message = unicode(commit.get('message', 'Commit %s' % idx))
810 820
811 821 for node in commit.get('added', []):
812 822 imc.add(FileNode(node.path, content=node.content))
813 823 for node in commit.get('changed', []):
814 824 imc.change(FileNode(node.path, content=node.content))
815 825 for node in commit.get('removed', []):
816 826 imc.remove(FileNode(node.path))
817 827
818 828 parents = [
819 829 vcs_repo.get_commit(commit_id=commit_ids[p])
820 830 for p in commit.get('parents', [])]
821 831
822 832 operations = ('added', 'changed', 'removed')
823 833 if not any((commit.get(o) for o in operations)):
824 834 imc.add(FileNode('file_%s' % idx, content=message))
825 835
826 836 commit = imc.commit(
827 837 message=message,
828 838 author=unicode(commit.get('author', 'Automatic')),
829 839 date=commit.get('date'),
830 840 branch=commit.get('branch'),
831 841 parents=parents)
832 842
833 843 commit_ids[commit.message] = commit.raw_id
834 844
835 845 return commit_ids
836 846
837 847
838 848 @pytest.fixture
839 849 def reposerver(request):
840 850 """
841 851 Allows to serve a backend repository
842 852 """
843 853
844 854 repo_server = RepoServer()
845 855 request.addfinalizer(repo_server.cleanup)
846 856 return repo_server
847 857
848 858
849 859 class RepoServer(object):
850 860 """
851 861 Utility to serve a local repository for the duration of a test case.
852 862
853 863 Supports only Subversion so far.
854 864 """
855 865
856 866 url = None
857 867
858 868 def __init__(self):
859 869 self._cleanup_servers = []
860 870
861 871 def serve(self, vcsrepo):
862 872 if vcsrepo.alias != 'svn':
863 873 raise TypeError("Backend %s not supported" % vcsrepo.alias)
864 874
865 875 proc = subprocess.Popen(
866 876 ['svnserve', '-d', '--foreground', '--listen-host', 'localhost',
867 877 '--root', vcsrepo.path])
868 878 self._cleanup_servers.append(proc)
869 879 self.url = 'svn://localhost'
870 880
871 881 def cleanup(self):
872 882 for proc in self._cleanup_servers:
873 883 proc.terminate()
874 884
875 885
876 886 @pytest.fixture
877 887 def pr_util(backend, request):
878 888 """
879 889 Utility for tests of models and for functional tests around pull requests.
880 890
881 891 It gives an instance of :class:`PRTestUtility` which provides various
882 892 utility methods around one pull request.
883 893
884 894 This fixture uses `backend` and inherits its parameterization.
885 895 """
886 896
887 897 util = PRTestUtility(backend)
888 898
889 899 @request.addfinalizer
890 900 def cleanup():
891 901 util.cleanup()
892 902
893 903 return util
894 904
895 905
896 906 class PRTestUtility(object):
897 907
898 908 pull_request = None
899 909 pull_request_id = None
900 910 mergeable_patcher = None
901 911 mergeable_mock = None
902 912 notification_patcher = None
903 913
904 914 def __init__(self, backend):
905 915 self.backend = backend
906 916
907 917 def create_pull_request(
908 918 self, commits=None, target_head=None, source_head=None,
909 919 revisions=None, approved=False, author=None, mergeable=False,
910 920 enable_notifications=True, name_suffix=u'', reviewers=None,
911 921 title=u"Test", description=u"Description"):
912 922 self.set_mergeable(mergeable)
913 923 if not enable_notifications:
914 924 # mock notification side effect
915 925 self.notification_patcher = mock.patch(
916 926 'rhodecode.model.notification.NotificationModel.create')
917 927 self.notification_patcher.start()
918 928
919 929 if not self.pull_request:
920 930 if not commits:
921 931 commits = [
922 932 {'message': 'c1'},
923 933 {'message': 'c2'},
924 934 {'message': 'c3'},
925 935 ]
926 936 target_head = 'c1'
927 937 source_head = 'c2'
928 938 revisions = ['c2']
929 939
930 940 self.commit_ids = self.backend.create_master_repo(commits)
931 941 self.target_repository = self.backend.create_repo(
932 942 heads=[target_head], name_suffix=name_suffix)
933 943 self.source_repository = self.backend.create_repo(
934 944 heads=[source_head], name_suffix=name_suffix)
935 945 self.author = author or UserModel().get_by_username(
936 946 TEST_USER_ADMIN_LOGIN)
937 947
938 948 model = PullRequestModel()
939 949 self.create_parameters = {
940 950 'created_by': self.author,
941 951 'source_repo': self.source_repository.repo_name,
942 952 'source_ref': self._default_branch_reference(source_head),
943 953 'target_repo': self.target_repository.repo_name,
944 954 'target_ref': self._default_branch_reference(target_head),
945 955 'revisions': [self.commit_ids[r] for r in revisions],
946 956 'reviewers': reviewers or self._get_reviewers(),
947 957 'title': title,
948 958 'description': description,
949 959 }
950 960 self.pull_request = model.create(**self.create_parameters)
951 961 assert model.get_versions(self.pull_request) == []
952 962
953 963 self.pull_request_id = self.pull_request.pull_request_id
954 964
955 965 if approved:
956 966 self.approve()
957 967
958 968 Session().add(self.pull_request)
959 969 Session().commit()
960 970
961 971 return self.pull_request
962 972
963 973 def approve(self):
964 974 self.create_status_votes(
965 975 ChangesetStatus.STATUS_APPROVED,
966 976 *self.pull_request.reviewers)
967 977
968 978 def close(self):
969 979 PullRequestModel().close_pull_request(self.pull_request, self.author)
970 980
971 981 def _default_branch_reference(self, commit_message):
972 982 reference = '%s:%s:%s' % (
973 983 'branch',
974 984 self.backend.default_branch_name,
975 985 self.commit_ids[commit_message])
976 986 return reference
977 987
978 988 def _get_reviewers(self):
979 989 model = UserModel()
980 990 return [
981 991 model.get_by_username(TEST_USER_REGULAR_LOGIN),
982 992 model.get_by_username(TEST_USER_REGULAR2_LOGIN),
983 993 ]
984 994
985 995 def update_source_repository(self, head=None):
986 996 heads = [head or 'c3']
987 997 self.backend.pull_heads(self.source_repository, heads=heads)
988 998
989 999 def add_one_commit(self, head=None):
990 1000 self.update_source_repository(head=head)
991 1001 old_commit_ids = set(self.pull_request.revisions)
992 1002 PullRequestModel().update_commits(self.pull_request)
993 1003 commit_ids = set(self.pull_request.revisions)
994 1004 new_commit_ids = commit_ids - old_commit_ids
995 1005 assert len(new_commit_ids) == 1
996 1006 return new_commit_ids.pop()
997 1007
998 1008 def remove_one_commit(self):
999 1009 assert len(self.pull_request.revisions) == 2
1000 1010 source_vcs = self.source_repository.scm_instance()
1001 1011 removed_commit_id = source_vcs.commit_ids[-1]
1002 1012
1003 1013 # TODO: johbo: Git and Mercurial have an inconsistent vcs api here,
1004 1014 # remove the if once that's sorted out.
1005 1015 if self.backend.alias == "git":
1006 1016 kwargs = {'branch_name': self.backend.default_branch_name}
1007 1017 else:
1008 1018 kwargs = {}
1009 1019 source_vcs.strip(removed_commit_id, **kwargs)
1010 1020
1011 1021 PullRequestModel().update_commits(self.pull_request)
1012 1022 assert len(self.pull_request.revisions) == 1
1013 1023 return removed_commit_id
1014 1024
1015 1025 def create_comment(self, linked_to=None):
1016 1026 comment = ChangesetCommentsModel().create(
1017 1027 text=u"Test comment",
1018 1028 repo=self.target_repository.repo_name,
1019 1029 user=self.author,
1020 1030 pull_request=self.pull_request)
1021 1031 assert comment.pull_request_version_id is None
1022 1032
1023 1033 if linked_to:
1024 1034 PullRequestModel()._link_comments_to_version(linked_to)
1025 1035
1026 1036 return comment
1027 1037
1028 1038 def create_inline_comment(
1029 1039 self, linked_to=None, line_no=u'n1', file_path='file_1'):
1030 1040 comment = ChangesetCommentsModel().create(
1031 1041 text=u"Test comment",
1032 1042 repo=self.target_repository.repo_name,
1033 1043 user=self.author,
1034 1044 line_no=line_no,
1035 1045 f_path=file_path,
1036 1046 pull_request=self.pull_request)
1037 1047 assert comment.pull_request_version_id is None
1038 1048
1039 1049 if linked_to:
1040 1050 PullRequestModel()._link_comments_to_version(linked_to)
1041 1051
1042 1052 return comment
1043 1053
1044 1054 def create_version_of_pull_request(self):
1045 1055 pull_request = self.create_pull_request()
1046 1056 version = PullRequestModel()._create_version_from_snapshot(
1047 1057 pull_request)
1048 1058 return version
1049 1059
1050 1060 def create_status_votes(self, status, *reviewers):
1051 1061 for reviewer in reviewers:
1052 1062 ChangesetStatusModel().set_status(
1053 1063 repo=self.pull_request.target_repo,
1054 1064 status=status,
1055 1065 user=reviewer.user_id,
1056 1066 pull_request=self.pull_request)
1057 1067
1058 1068 def set_mergeable(self, value):
1059 1069 if not self.mergeable_patcher:
1060 1070 self.mergeable_patcher = mock.patch.object(
1061 1071 VcsSettingsModel, 'get_general_settings')
1062 1072 self.mergeable_mock = self.mergeable_patcher.start()
1063 1073 self.mergeable_mock.return_value = {
1064 1074 'rhodecode_pr_merge_enabled': value}
1065 1075
1066 1076 def cleanup(self):
1067 1077 # In case the source repository is already cleaned up, the pull
1068 1078 # request will already be deleted.
1069 1079 pull_request = PullRequest().get(self.pull_request_id)
1070 1080 if pull_request:
1071 1081 PullRequestModel().delete(pull_request)
1072 1082 Session().commit()
1073 1083
1074 1084 if self.notification_patcher:
1075 1085 self.notification_patcher.stop()
1076 1086
1077 1087 if self.mergeable_patcher:
1078 1088 self.mergeable_patcher.stop()
1079 1089
1080 1090
1081 1091 @pytest.fixture
1082 1092 def user_admin(pylonsapp):
1083 1093 """
1084 1094 Provides the default admin test user as an instance of `db.User`.
1085 1095 """
1086 1096 user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
1087 1097 return user
1088 1098
1089 1099
1090 1100 @pytest.fixture
1091 1101 def user_regular(pylonsapp):
1092 1102 """
1093 1103 Provides the default regular test user as an instance of `db.User`.
1094 1104 """
1095 1105 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1096 1106 return user
1097 1107
1098 1108
1099 1109 @pytest.fixture
1100 1110 def user_util(request, pylonsapp):
1101 1111 """
1102 1112 Provides a wired instance of `UserUtility` with integrated cleanup.
1103 1113 """
1104 1114 utility = UserUtility(test_name=request.node.name)
1105 1115 request.addfinalizer(utility.cleanup)
1106 1116 return utility
1107 1117
1108 1118
1109 1119 # TODO: johbo: Split this up into utilities per domain or something similar
1110 1120 class UserUtility(object):
1111 1121
1112 1122 def __init__(self, test_name="test"):
1113 1123 self._test_name = test_name
1114 1124 self.fixture = Fixture()
1115 1125 self.repo_group_ids = []
1116 1126 self.user_ids = []
1117 1127 self.user_group_ids = []
1118 1128 self.user_repo_permission_ids = []
1119 1129 self.user_group_repo_permission_ids = []
1120 1130 self.user_repo_group_permission_ids = []
1121 1131 self.user_group_repo_group_permission_ids = []
1122 1132 self.user_user_group_permission_ids = []
1123 1133 self.user_group_user_group_permission_ids = []
1124 1134 self.user_permissions = []
1125 1135
1126 1136 def create_repo_group(
1127 1137 self, owner=TEST_USER_ADMIN_LOGIN, auto_cleanup=True):
1128 1138 group_name = "{prefix}_repogroup_{count}".format(
1129 1139 prefix=self._test_name,
1130 1140 count=len(self.repo_group_ids))
1131 1141 repo_group = self.fixture.create_repo_group(
1132 1142 group_name, cur_user=owner)
1133 1143 if auto_cleanup:
1134 1144 self.repo_group_ids.append(repo_group.group_id)
1135 1145 return repo_group
1136 1146
1137 1147 def create_user(self, auto_cleanup=True, **kwargs):
1138 1148 user_name = "{prefix}_user_{count}".format(
1139 1149 prefix=self._test_name,
1140 1150 count=len(self.user_ids))
1141 1151 user = self.fixture.create_user(user_name, **kwargs)
1142 1152 if auto_cleanup:
1143 1153 self.user_ids.append(user.user_id)
1144 1154 return user
1145 1155
1146 1156 def create_user_with_group(self):
1147 1157 user = self.create_user()
1148 1158 user_group = self.create_user_group(members=[user])
1149 1159 return user, user_group
1150 1160
1151 1161 def create_user_group(self, members=None, auto_cleanup=True, **kwargs):
1152 1162 group_name = "{prefix}_usergroup_{count}".format(
1153 1163 prefix=self._test_name,
1154 1164 count=len(self.user_group_ids))
1155 1165 user_group = self.fixture.create_user_group(group_name, **kwargs)
1156 1166 if auto_cleanup:
1157 1167 self.user_group_ids.append(user_group.users_group_id)
1158 1168 if members:
1159 1169 for user in members:
1160 1170 UserGroupModel().add_user_to_group(user_group, user)
1161 1171 return user_group
1162 1172
1163 1173 def grant_user_permission(self, user_name, permission_name):
1164 1174 self._inherit_default_user_permissions(user_name, False)
1165 1175 self.user_permissions.append((user_name, permission_name))
1166 1176
1167 1177 def grant_user_permission_to_repo_group(
1168 1178 self, repo_group, user, permission_name):
1169 1179 permission = RepoGroupModel().grant_user_permission(
1170 1180 repo_group, user, permission_name)
1171 1181 self.user_repo_group_permission_ids.append(
1172 1182 (repo_group.group_id, user.user_id))
1173 1183 return permission
1174 1184
1175 1185 def grant_user_group_permission_to_repo_group(
1176 1186 self, repo_group, user_group, permission_name):
1177 1187 permission = RepoGroupModel().grant_user_group_permission(
1178 1188 repo_group, user_group, permission_name)
1179 1189 self.user_group_repo_group_permission_ids.append(
1180 1190 (repo_group.group_id, user_group.users_group_id))
1181 1191 return permission
1182 1192
1183 1193 def grant_user_permission_to_repo(
1184 1194 self, repo, user, permission_name):
1185 1195 permission = RepoModel().grant_user_permission(
1186 1196 repo, user, permission_name)
1187 1197 self.user_repo_permission_ids.append(
1188 1198 (repo.repo_id, user.user_id))
1189 1199 return permission
1190 1200
1191 1201 def grant_user_group_permission_to_repo(
1192 1202 self, repo, user_group, permission_name):
1193 1203 permission = RepoModel().grant_user_group_permission(
1194 1204 repo, user_group, permission_name)
1195 1205 self.user_group_repo_permission_ids.append(
1196 1206 (repo.repo_id, user_group.users_group_id))
1197 1207 return permission
1198 1208
1199 1209 def grant_user_permission_to_user_group(
1200 1210 self, target_user_group, user, permission_name):
1201 1211 permission = UserGroupModel().grant_user_permission(
1202 1212 target_user_group, user, permission_name)
1203 1213 self.user_user_group_permission_ids.append(
1204 1214 (target_user_group.users_group_id, user.user_id))
1205 1215 return permission
1206 1216
1207 1217 def grant_user_group_permission_to_user_group(
1208 1218 self, target_user_group, user_group, permission_name):
1209 1219 permission = UserGroupModel().grant_user_group_permission(
1210 1220 target_user_group, user_group, permission_name)
1211 1221 self.user_group_user_group_permission_ids.append(
1212 1222 (target_user_group.users_group_id, user_group.users_group_id))
1213 1223 return permission
1214 1224
1215 1225 def revoke_user_permission(self, user_name, permission_name):
1216 1226 self._inherit_default_user_permissions(user_name, True)
1217 1227 UserModel().revoke_perm(user_name, permission_name)
1218 1228
1219 1229 def _inherit_default_user_permissions(self, user_name, value):
1220 1230 user = UserModel().get_by_username(user_name)
1221 1231 user.inherit_default_permissions = value
1222 1232 Session().add(user)
1223 1233 Session().commit()
1224 1234
1225 1235 def cleanup(self):
1226 1236 self._cleanup_permissions()
1227 1237 self._cleanup_repo_groups()
1228 1238 self._cleanup_user_groups()
1229 1239 self._cleanup_users()
1230 1240
1231 1241 def _cleanup_permissions(self):
1232 1242 if self.user_permissions:
1233 1243 for user_name, permission_name in self.user_permissions:
1234 1244 self.revoke_user_permission(user_name, permission_name)
1235 1245
1236 1246 for permission in self.user_repo_permission_ids:
1237 1247 RepoModel().revoke_user_permission(*permission)
1238 1248
1239 1249 for permission in self.user_group_repo_permission_ids:
1240 1250 RepoModel().revoke_user_group_permission(*permission)
1241 1251
1242 1252 for permission in self.user_repo_group_permission_ids:
1243 1253 RepoGroupModel().revoke_user_permission(*permission)
1244 1254
1245 1255 for permission in self.user_group_repo_group_permission_ids:
1246 1256 RepoGroupModel().revoke_user_group_permission(*permission)
1247 1257
1248 1258 for permission in self.user_user_group_permission_ids:
1249 1259 UserGroupModel().revoke_user_permission(*permission)
1250 1260
1251 1261 for permission in self.user_group_user_group_permission_ids:
1252 1262 UserGroupModel().revoke_user_group_permission(*permission)
1253 1263
1254 1264 def _cleanup_repo_groups(self):
1255 1265 def _repo_group_compare(first_group_id, second_group_id):
1256 1266 """
1257 1267 Gives higher priority to the groups with the most complex paths
1258 1268 """
1259 1269 first_group = RepoGroup.get(first_group_id)
1260 1270 second_group = RepoGroup.get(second_group_id)
1261 1271 first_group_parts = (
1262 1272 len(first_group.group_name.split('/')) if first_group else 0)
1263 1273 second_group_parts = (
1264 1274 len(second_group.group_name.split('/')) if second_group else 0)
1265 1275 return cmp(second_group_parts, first_group_parts)
1266 1276
1267 1277 sorted_repo_group_ids = sorted(
1268 1278 self.repo_group_ids, cmp=_repo_group_compare)
1269 1279 for repo_group_id in sorted_repo_group_ids:
1270 1280 self.fixture.destroy_repo_group(repo_group_id)
1271 1281
1272 1282 def _cleanup_user_groups(self):
1273 1283 def _user_group_compare(first_group_id, second_group_id):
1274 1284 """
1275 1285 Gives higher priority to the groups with the most complex paths
1276 1286 """
1277 1287 first_group = UserGroup.get(first_group_id)
1278 1288 second_group = UserGroup.get(second_group_id)
1279 1289 first_group_parts = (
1280 1290 len(first_group.users_group_name.split('/'))
1281 1291 if first_group else 0)
1282 1292 second_group_parts = (
1283 1293 len(second_group.users_group_name.split('/'))
1284 1294 if second_group else 0)
1285 1295 return cmp(second_group_parts, first_group_parts)
1286 1296
1287 1297 sorted_user_group_ids = sorted(
1288 1298 self.user_group_ids, cmp=_user_group_compare)
1289 1299 for user_group_id in sorted_user_group_ids:
1290 1300 self.fixture.destroy_user_group(user_group_id)
1291 1301
1292 1302 def _cleanup_users(self):
1293 1303 for user_id in self.user_ids:
1294 1304 self.fixture.destroy_user(user_id)
1295 1305
1296 1306
1297 1307 # TODO: Think about moving this into a pytest-pyro package and make it a
1298 1308 # pytest plugin
1299 1309 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
1300 1310 def pytest_runtest_makereport(item, call):
1301 1311 """
1302 1312 Adding the remote traceback if the exception has this information.
1303 1313
1304 1314 Pyro4 attaches this information as the attribute `_pyroTraceback`
1305 1315 to the exception instance.
1306 1316 """
1307 1317 outcome = yield
1308 1318 report = outcome.get_result()
1309 1319 if call.excinfo:
1310 1320 _add_pyro_remote_traceback(report, call.excinfo.value)
1311 1321
1312 1322
1313 1323 def _add_pyro_remote_traceback(report, exc):
1314 1324 pyro_traceback = getattr(exc, '_pyroTraceback', None)
1315 1325
1316 1326 if pyro_traceback:
1317 1327 traceback = ''.join(pyro_traceback)
1318 1328 section = 'Pyro4 remote traceback ' + report.when
1319 1329 report.sections.append((section, traceback))
1320 1330
1321 1331
1322 1332 @pytest.fixture(scope='session')
1323 1333 def testrun():
1324 1334 return {
1325 1335 'uuid': uuid.uuid4(),
1326 1336 'start': datetime.datetime.utcnow().isoformat(),
1327 1337 'timestamp': int(time.time()),
1328 1338 }
1329 1339
1330 1340
1331 1341 @pytest.fixture(autouse=True)
1332 1342 def collect_appenlight_stats(request, testrun):
1333 1343 """
1334 1344 This fixture reports memory consumtion of single tests.
1335 1345
1336 1346 It gathers data based on `psutil` and sends them to Appenlight. The option
1337 1347 ``--ae`` has te be used to enable this fixture and the API key for your
1338 1348 application has to be provided in ``--ae-key``.
1339 1349 """
1340 1350 try:
1341 1351 # cygwin cannot have yet psutil support.
1342 1352 import psutil
1343 1353 except ImportError:
1344 1354 return
1345 1355
1346 1356 if not request.config.getoption('--appenlight'):
1347 1357 return
1348 1358 else:
1349 1359 # Only request the pylonsapp fixture if appenlight tracking is
1350 1360 # enabled. This will speed up a test run of unit tests by 2 to 3
1351 1361 # seconds if appenlight is not enabled.
1352 1362 pylonsapp = request.getfuncargvalue("pylonsapp")
1353 1363 url = '{}/api/logs'.format(request.config.getoption('--appenlight-url'))
1354 1364 client = AppenlightClient(
1355 1365 url=url,
1356 1366 api_key=request.config.getoption('--appenlight-api-key'),
1357 1367 namespace=request.node.nodeid,
1358 1368 request=str(testrun['uuid']),
1359 1369 testrun=testrun)
1360 1370
1361 1371 client.collect({
1362 1372 'message': "Starting",
1363 1373 })
1364 1374
1365 1375 server_and_port = pylonsapp.config['vcs.server']
1366 1376 server = create_vcsserver_proxy(server_and_port)
1367 1377 with server:
1368 1378 vcs_pid = server.get_pid()
1369 1379 server.run_gc()
1370 1380 vcs_process = psutil.Process(vcs_pid)
1371 1381 mem = vcs_process.memory_info()
1372 1382 client.tag_before('vcsserver.rss', mem.rss)
1373 1383 client.tag_before('vcsserver.vms', mem.vms)
1374 1384
1375 1385 test_process = psutil.Process()
1376 1386 mem = test_process.memory_info()
1377 1387 client.tag_before('test.rss', mem.rss)
1378 1388 client.tag_before('test.vms', mem.vms)
1379 1389
1380 1390 client.tag_before('time', time.time())
1381 1391
1382 1392 @request.addfinalizer
1383 1393 def send_stats():
1384 1394 client.tag_after('time', time.time())
1385 1395 with server:
1386 1396 gc_stats = server.run_gc()
1387 1397 for tag, value in gc_stats.items():
1388 1398 client.tag_after(tag, value)
1389 1399 mem = vcs_process.memory_info()
1390 1400 client.tag_after('vcsserver.rss', mem.rss)
1391 1401 client.tag_after('vcsserver.vms', mem.vms)
1392 1402
1393 1403 mem = test_process.memory_info()
1394 1404 client.tag_after('test.rss', mem.rss)
1395 1405 client.tag_after('test.vms', mem.vms)
1396 1406
1397 1407 client.collect({
1398 1408 'message': "Finished",
1399 1409 })
1400 1410 client.send_stats()
1401 1411
1402 1412 return client
1403 1413
1404 1414
1405 1415 class AppenlightClient():
1406 1416
1407 1417 url_template = '{url}?protocol_version=0.5'
1408 1418
1409 1419 def __init__(
1410 1420 self, url, api_key, add_server=True, add_timestamp=True,
1411 1421 namespace=None, request=None, testrun=None):
1412 1422 self.url = self.url_template.format(url=url)
1413 1423 self.api_key = api_key
1414 1424 self.add_server = add_server
1415 1425 self.add_timestamp = add_timestamp
1416 1426 self.namespace = namespace
1417 1427 self.request = request
1418 1428 self.server = socket.getfqdn(socket.gethostname())
1419 1429 self.tags_before = {}
1420 1430 self.tags_after = {}
1421 1431 self.stats = []
1422 1432 self.testrun = testrun or {}
1423 1433
1424 1434 def tag_before(self, tag, value):
1425 1435 self.tags_before[tag] = value
1426 1436
1427 1437 def tag_after(self, tag, value):
1428 1438 self.tags_after[tag] = value
1429 1439
1430 1440 def collect(self, data):
1431 1441 if self.add_server:
1432 1442 data.setdefault('server', self.server)
1433 1443 if self.add_timestamp:
1434 1444 data.setdefault('date', datetime.datetime.utcnow().isoformat())
1435 1445 if self.namespace:
1436 1446 data.setdefault('namespace', self.namespace)
1437 1447 if self.request:
1438 1448 data.setdefault('request', self.request)
1439 1449 self.stats.append(data)
1440 1450
1441 1451 def send_stats(self):
1442 1452 tags = [
1443 1453 ('testrun', self.request),
1444 1454 ('testrun.start', self.testrun['start']),
1445 1455 ('testrun.timestamp', self.testrun['timestamp']),
1446 1456 ('test', self.namespace),
1447 1457 ]
1448 1458 for key, value in self.tags_before.items():
1449 1459 tags.append((key + '.before', value))
1450 1460 try:
1451 1461 delta = self.tags_after[key] - value
1452 1462 tags.append((key + '.delta', delta))
1453 1463 except Exception:
1454 1464 pass
1455 1465 for key, value in self.tags_after.items():
1456 1466 tags.append((key + '.after', value))
1457 1467 self.collect({
1458 1468 'message': "Collected tags",
1459 1469 'tags': tags,
1460 1470 })
1461 1471
1462 1472 response = requests.post(
1463 1473 self.url,
1464 1474 headers={
1465 1475 'X-appenlight-api-key': self.api_key},
1466 1476 json=self.stats,
1467 1477 )
1468 1478
1469 1479 if not response.status_code == 200:
1470 1480 pprint.pprint(self.stats)
1471 1481 print response.headers
1472 1482 print response.text
1473 1483 raise Exception('Sending to appenlight failed')
1474 1484
1475 1485
1476 1486 @pytest.fixture
1477 1487 def gist_util(request, pylonsapp):
1478 1488 """
1479 1489 Provides a wired instance of `GistUtility` with integrated cleanup.
1480 1490 """
1481 1491 utility = GistUtility()
1482 1492 request.addfinalizer(utility.cleanup)
1483 1493 return utility
1484 1494
1485 1495
1486 1496 class GistUtility(object):
1487 1497 def __init__(self):
1488 1498 self.fixture = Fixture()
1489 1499 self.gist_ids = []
1490 1500
1491 1501 def create_gist(self, **kwargs):
1492 1502 gist = self.fixture.create_gist(**kwargs)
1493 1503 self.gist_ids.append(gist.gist_id)
1494 1504 return gist
1495 1505
1496 1506 def cleanup(self):
1497 1507 for id_ in self.gist_ids:
1498 1508 self.fixture.destroy_gists(str(id_))
1499 1509
1500 1510
1501 1511 @pytest.fixture
1502 1512 def enabled_backends(request):
1503 1513 backends = request.config.option.backends
1504 1514 return backends[:]
1505 1515
1506 1516
1507 1517 @pytest.fixture
1508 1518 def settings_util(request):
1509 1519 """
1510 1520 Provides a wired instance of `SettingsUtility` with integrated cleanup.
1511 1521 """
1512 1522 utility = SettingsUtility()
1513 1523 request.addfinalizer(utility.cleanup)
1514 1524 return utility
1515 1525
1516 1526
1517 1527 class SettingsUtility(object):
1518 1528 def __init__(self):
1519 1529 self.rhodecode_ui_ids = []
1520 1530 self.rhodecode_setting_ids = []
1521 1531 self.repo_rhodecode_ui_ids = []
1522 1532 self.repo_rhodecode_setting_ids = []
1523 1533
1524 1534 def create_repo_rhodecode_ui(
1525 1535 self, repo, section, value, key=None, active=True, cleanup=True):
1526 1536 key = key or hashlib.sha1(
1527 1537 '{}{}{}'.format(section, value, repo.repo_id)).hexdigest()
1528 1538
1529 1539 setting = RepoRhodeCodeUi()
1530 1540 setting.repository_id = repo.repo_id
1531 1541 setting.ui_section = section
1532 1542 setting.ui_value = value
1533 1543 setting.ui_key = key
1534 1544 setting.ui_active = active
1535 1545 Session().add(setting)
1536 1546 Session().commit()
1537 1547
1538 1548 if cleanup:
1539 1549 self.repo_rhodecode_ui_ids.append(setting.ui_id)
1540 1550 return setting
1541 1551
1542 1552 def create_rhodecode_ui(
1543 1553 self, section, value, key=None, active=True, cleanup=True):
1544 1554 key = key or hashlib.sha1('{}{}'.format(section, value)).hexdigest()
1545 1555
1546 1556 setting = RhodeCodeUi()
1547 1557 setting.ui_section = section
1548 1558 setting.ui_value = value
1549 1559 setting.ui_key = key
1550 1560 setting.ui_active = active
1551 1561 Session().add(setting)
1552 1562 Session().commit()
1553 1563
1554 1564 if cleanup:
1555 1565 self.rhodecode_ui_ids.append(setting.ui_id)
1556 1566 return setting
1557 1567
1558 1568 def create_repo_rhodecode_setting(
1559 1569 self, repo, name, value, type_, cleanup=True):
1560 1570 setting = RepoRhodeCodeSetting(
1561 1571 repo.repo_id, key=name, val=value, type=type_)
1562 1572 Session().add(setting)
1563 1573 Session().commit()
1564 1574
1565 1575 if cleanup:
1566 1576 self.repo_rhodecode_setting_ids.append(setting.app_settings_id)
1567 1577 return setting
1568 1578
1569 1579 def create_rhodecode_setting(self, name, value, type_, cleanup=True):
1570 1580 setting = RhodeCodeSetting(key=name, val=value, type=type_)
1571 1581 Session().add(setting)
1572 1582 Session().commit()
1573 1583
1574 1584 if cleanup:
1575 1585 self.rhodecode_setting_ids.append(setting.app_settings_id)
1576 1586
1577 1587 return setting
1578 1588
1579 1589 def cleanup(self):
1580 1590 for id_ in self.rhodecode_ui_ids:
1581 1591 setting = RhodeCodeUi.get(id_)
1582 1592 Session().delete(setting)
1583 1593
1584 1594 for id_ in self.rhodecode_setting_ids:
1585 1595 setting = RhodeCodeSetting.get(id_)
1586 1596 Session().delete(setting)
1587 1597
1588 1598 for id_ in self.repo_rhodecode_ui_ids:
1589 1599 setting = RepoRhodeCodeUi.get(id_)
1590 1600 Session().delete(setting)
1591 1601
1592 1602 for id_ in self.repo_rhodecode_setting_ids:
1593 1603 setting = RepoRhodeCodeSetting.get(id_)
1594 1604 Session().delete(setting)
1595 1605
1596 1606 Session().commit()
1597 1607
1598 1608
1599 1609 @pytest.fixture
1600 1610 def no_notifications(request):
1601 1611 notification_patcher = mock.patch(
1602 1612 'rhodecode.model.notification.NotificationModel.create')
1603 1613 notification_patcher.start()
1604 1614 request.addfinalizer(notification_patcher.stop)
1605 1615
1606 1616
1607 1617 @pytest.fixture
1608 1618 def silence_action_logger(request):
1609 1619 notification_patcher = mock.patch(
1610 1620 'rhodecode.lib.utils.action_logger')
1611 1621 notification_patcher.start()
1612 1622 request.addfinalizer(notification_patcher.stop)
1613 1623
1614 1624
1615 1625 @pytest.fixture(scope='session')
1616 1626 def repeat(request):
1617 1627 """
1618 1628 The number of repetitions is based on this fixture.
1619 1629
1620 1630 Slower calls may divide it by 10 or 100. It is chosen in a way so that the
1621 1631 tests are not too slow in our default test suite.
1622 1632 """
1623 1633 return request.config.getoption('--repeat')
1624 1634
1625 1635
1626 1636 @pytest.fixture
1627 1637 def rhodecode_fixtures():
1628 1638 return Fixture()
1629 1639
1630 1640
1631 1641 @pytest.fixture
1632 1642 def request_stub():
1633 1643 """
1634 1644 Stub request object.
1635 1645 """
1636 1646 request = pyramid.testing.DummyRequest()
1637 1647 request.scheme = 'https'
1638 1648 return request
1639 1649
1640 1650
1641 1651 @pytest.fixture
1642 1652 def config_stub(request, request_stub):
1643 1653 """
1644 1654 Set up pyramid.testing and return the Configurator.
1645 1655 """
1646 1656 config = pyramid.testing.setUp(request=request_stub)
1647 1657
1648 1658 @request.addfinalizer
1649 1659 def cleanup():
1650 1660 pyramid.testing.tearDown()
1651 1661
1652 1662 return config
1653 1663
1654 1664
1655 1665 @pytest.fixture
1656 1666 def StubIntegrationType():
1657 1667 class _StubIntegrationType(IntegrationTypeBase):
1658 1668 """ Test integration type class """
1659 1669
1660 1670 key = 'test'
1661 1671 display_name = 'Test integration type'
1662 1672 description = 'A test integration type for testing'
1663 1673 icon = 'test_icon_html_image'
1664 1674
1665 1675 def __init__(self, settings):
1666 1676 super(_StubIntegrationType, self).__init__(settings)
1667 1677 self.sent_events = [] # for testing
1668 1678
1669 1679 def send_event(self, event):
1670 1680 self.sent_events.append(event)
1671 1681
1672 1682 def settings_schema(self):
1673 1683 class SettingsSchema(colander.Schema):
1674 1684 test_string_field = colander.SchemaNode(
1675 1685 colander.String(),
1676 1686 missing=colander.required,
1677 1687 title='test string field',
1678 1688 )
1679 1689 test_int_field = colander.SchemaNode(
1680 1690 colander.Int(),
1681 1691 title='some integer setting',
1682 1692 )
1683 1693 return SettingsSchema()
1684 1694
1685 1695
1686 1696 integration_type_registry.register_integration_type(_StubIntegrationType)
1687 1697 return _StubIntegrationType
1688 1698
1689 1699 @pytest.fixture
1690 1700 def stub_integration_settings():
1691 1701 return {
1692 1702 'test_string_field': 'some data',
1693 1703 'test_int_field': 100,
1694 1704 }
1695 1705
1696 1706
1697 1707 @pytest.fixture
1698 1708 def repo_integration_stub(request, repo_stub, StubIntegrationType,
1699 1709 stub_integration_settings):
1700 1710 integration = IntegrationModel().create(
1701 1711 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1702 1712 name='test repo integration', scope=repo_stub)
1703 1713
1704 1714 @request.addfinalizer
1705 1715 def cleanup():
1706 1716 IntegrationModel().delete(integration)
1707 1717
1708 1718 return integration
1709 1719
1710 1720
1711 1721 @pytest.fixture
1712 1722 def repogroup_integration_stub(request, test_repo_group, StubIntegrationType,
1713 1723 stub_integration_settings):
1714 1724 integration = IntegrationModel().create(
1715 1725 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1716 1726 name='test repogroup integration', scope=test_repo_group)
1717 1727
1718 1728 @request.addfinalizer
1719 1729 def cleanup():
1720 1730 IntegrationModel().delete(integration)
1721 1731
1722 1732 return integration
1723 1733
1724 1734
1725 1735 @pytest.fixture
1726 1736 def global_integration_stub(request, StubIntegrationType,
1727 1737 stub_integration_settings):
1728 1738 integration = IntegrationModel().create(
1729 1739 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1730 1740 name='test global integration', scope='global')
1731 1741
1732 1742 @request.addfinalizer
1733 1743 def cleanup():
1734 1744 IntegrationModel().delete(integration)
1735 1745
1736 1746 return integration
1737 1747
1738 1748
1739 1749 @pytest.fixture
1740 1750 def root_repos_integration_stub(request, StubIntegrationType,
1741 1751 stub_integration_settings):
1742 1752 integration = IntegrationModel().create(
1743 1753 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1744 1754 name='test global integration', scope='root_repos')
1745 1755
1746 1756 @request.addfinalizer
1747 1757 def cleanup():
1748 1758 IntegrationModel().delete(integration)
1749 1759
1750 1760 return integration
General Comments 0
You need to be logged in to leave comments. Login now