##// END OF EJS Templates
tests: improved test utils....
marcink -
r1442:0bf8c0ea default
parent child Browse files
Show More
@@ -1,1805 +1,1808 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import collections
22 22 import datetime
23 23 import hashlib
24 24 import os
25 25 import re
26 26 import pprint
27 27 import shutil
28 28 import socket
29 29 import subprocess32
30 30 import time
31 31 import uuid
32 32 import dateutil.tz
33 33
34 34 import mock
35 35 import pyramid.testing
36 36 import pytest
37 37 import colander
38 38 import requests
39 39
40 40 import rhodecode
41 41 from rhodecode.lib.utils2 import AttributeDict
42 42 from rhodecode.model.changeset_status import ChangesetStatusModel
43 43 from rhodecode.model.comment import CommentsModel
44 44 from rhodecode.model.db import (
45 45 PullRequest, Repository, RhodeCodeSetting, ChangesetStatus, RepoGroup,
46 46 UserGroup, RepoRhodeCodeUi, RepoRhodeCodeSetting, RhodeCodeUi)
47 47 from rhodecode.model.meta import Session
48 48 from rhodecode.model.pull_request import PullRequestModel
49 49 from rhodecode.model.repo import RepoModel
50 50 from rhodecode.model.repo_group import RepoGroupModel
51 51 from rhodecode.model.user import UserModel
52 52 from rhodecode.model.settings import VcsSettingsModel
53 53 from rhodecode.model.user_group import UserGroupModel
54 54 from rhodecode.model.integration import IntegrationModel
55 55 from rhodecode.integrations import integration_type_registry
56 56 from rhodecode.integrations.types.base import IntegrationTypeBase
57 57 from rhodecode.lib.utils import repo2db_mapper
58 58 from rhodecode.lib.vcs import create_vcsserver_proxy
59 59 from rhodecode.lib.vcs.backends import get_backend
60 60 from rhodecode.lib.vcs.nodes import FileNode
61 61 from rhodecode.tests import (
62 62 login_user_session, get_new_dir, utils, TESTS_TMP_PATH,
63 63 TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR2_LOGIN,
64 64 TEST_USER_REGULAR_PASS)
65 65 from rhodecode.tests.utils import CustomTestApp
66 66 from rhodecode.tests.fixture import Fixture
67 67
68 68
69 69 def _split_comma(value):
70 70 return value.split(',')
71 71
72 72
73 73 def pytest_addoption(parser):
74 74 parser.addoption(
75 75 '--keep-tmp-path', action='store_true',
76 76 help="Keep the test temporary directories")
77 77 parser.addoption(
78 78 '--backends', action='store', type=_split_comma,
79 79 default=['git', 'hg', 'svn'],
80 80 help="Select which backends to test for backend specific tests.")
81 81 parser.addoption(
82 82 '--dbs', action='store', type=_split_comma,
83 83 default=['sqlite'],
84 84 help="Select which database to test for database specific tests. "
85 85 "Possible options are sqlite,postgres,mysql")
86 86 parser.addoption(
87 87 '--appenlight', '--ae', action='store_true',
88 88 help="Track statistics in appenlight.")
89 89 parser.addoption(
90 90 '--appenlight-api-key', '--ae-key',
91 91 help="API key for Appenlight.")
92 92 parser.addoption(
93 93 '--appenlight-url', '--ae-url',
94 94 default="https://ae.rhodecode.com",
95 95 help="Appenlight service URL, defaults to https://ae.rhodecode.com")
96 96 parser.addoption(
97 97 '--sqlite-connection-string', action='store',
98 98 default='', help="Connection string for the dbs tests with SQLite")
99 99 parser.addoption(
100 100 '--postgres-connection-string', action='store',
101 101 default='', help="Connection string for the dbs tests with Postgres")
102 102 parser.addoption(
103 103 '--mysql-connection-string', action='store',
104 104 default='', help="Connection string for the dbs tests with MySQL")
105 105 parser.addoption(
106 106 '--repeat', type=int, default=100,
107 107 help="Number of repetitions in performance tests.")
108 108
109 109
110 110 def pytest_configure(config):
111 111 # Appy the kombu patch early on, needed for test discovery on Python 2.7.11
112 112 from rhodecode.config import patches
113 113 patches.kombu_1_5_1_python_2_7_11()
114 114
115 115
116 116 def pytest_collection_modifyitems(session, config, items):
117 117 # nottest marked, compare nose, used for transition from nose to pytest
118 118 remaining = [
119 119 i for i in items if getattr(i.obj, '__test__', True)]
120 120 items[:] = remaining
121 121
122 122
123 123 def pytest_generate_tests(metafunc):
124 124 # Support test generation based on --backend parameter
125 125 if 'backend_alias' in metafunc.fixturenames:
126 126 backends = get_backends_from_metafunc(metafunc)
127 127 scope = None
128 128 if not backends:
129 129 pytest.skip("Not enabled for any of selected backends")
130 130 metafunc.parametrize('backend_alias', backends, scope=scope)
131 131 elif hasattr(metafunc.function, 'backends'):
132 132 backends = get_backends_from_metafunc(metafunc)
133 133 if not backends:
134 134 pytest.skip("Not enabled for any of selected backends")
135 135
136 136
137 137 def get_backends_from_metafunc(metafunc):
138 138 requested_backends = set(metafunc.config.getoption('--backends'))
139 139 if hasattr(metafunc.function, 'backends'):
140 140 # Supported backends by this test function, created from
141 141 # pytest.mark.backends
142 142 backends = metafunc.function.backends.args
143 143 elif hasattr(metafunc.cls, 'backend_alias'):
144 144 # Support class attribute "backend_alias", this is mainly
145 145 # for legacy reasons for tests not yet using pytest.mark.backends
146 146 backends = [metafunc.cls.backend_alias]
147 147 else:
148 148 backends = metafunc.config.getoption('--backends')
149 149 return requested_backends.intersection(backends)
150 150
151 151
152 152 @pytest.fixture(scope='session', autouse=True)
153 153 def activate_example_rcextensions(request):
154 154 """
155 155 Patch in an example rcextensions module which verifies passed in kwargs.
156 156 """
157 157 from rhodecode.tests.other import example_rcextensions
158 158
159 159 old_extensions = rhodecode.EXTENSIONS
160 160 rhodecode.EXTENSIONS = example_rcextensions
161 161
162 162 @request.addfinalizer
163 163 def cleanup():
164 164 rhodecode.EXTENSIONS = old_extensions
165 165
166 166
167 167 @pytest.fixture
168 168 def capture_rcextensions():
169 169 """
170 170 Returns the recorded calls to entry points in rcextensions.
171 171 """
172 172 calls = rhodecode.EXTENSIONS.calls
173 173 calls.clear()
174 174 # Note: At this moment, it is still the empty dict, but that will
175 175 # be filled during the test run and since it is a reference this
176 176 # is enough to make it work.
177 177 return calls
178 178
179 179
180 180 @pytest.fixture(scope='session')
181 181 def http_environ_session():
182 182 """
183 183 Allow to use "http_environ" in session scope.
184 184 """
185 185 return http_environ(
186 186 http_host_stub=http_host_stub())
187 187
188 188
189 189 @pytest.fixture
190 190 def http_host_stub():
191 191 """
192 192 Value of HTTP_HOST in the test run.
193 193 """
194 194 return 'test.example.com:80'
195 195
196 196
197 197 @pytest.fixture
198 198 def http_environ(http_host_stub):
199 199 """
200 200 HTTP extra environ keys.
201 201
202 202 User by the test application and as well for setting up the pylons
203 203 environment. In the case of the fixture "app" it should be possible
204 204 to override this for a specific test case.
205 205 """
206 206 return {
207 207 'SERVER_NAME': http_host_stub.split(':')[0],
208 208 'SERVER_PORT': http_host_stub.split(':')[1],
209 209 'HTTP_HOST': http_host_stub,
210 210 }
211 211
212 212
213 213 @pytest.fixture(scope='function')
214 214 def app(request, pylonsapp, http_environ):
215 215
216 216
217 217 app = CustomTestApp(
218 218 pylonsapp,
219 219 extra_environ=http_environ)
220 220 if request.cls:
221 221 request.cls.app = app
222 222 return app
223 223
224 224
225 225 @pytest.fixture(scope='session')
226 226 def app_settings(pylonsapp, pylons_config):
227 227 """
228 228 Settings dictionary used to create the app.
229 229
230 230 Parses the ini file and passes the result through the sanitize and apply
231 231 defaults mechanism in `rhodecode.config.middleware`.
232 232 """
233 233 from paste.deploy.loadwsgi import loadcontext, APP
234 234 from rhodecode.config.middleware import (
235 235 sanitize_settings_and_apply_defaults)
236 236 context = loadcontext(APP, 'config:' + pylons_config)
237 237 settings = sanitize_settings_and_apply_defaults(context.config())
238 238 return settings
239 239
240 240
241 241 @pytest.fixture(scope='session')
242 242 def db(app_settings):
243 243 """
244 244 Initializes the database connection.
245 245
246 246 It uses the same settings which are used to create the ``pylonsapp`` or
247 247 ``app`` fixtures.
248 248 """
249 249 from rhodecode.config.utils import initialize_database
250 250 initialize_database(app_settings)
251 251
252 252
253 253 LoginData = collections.namedtuple('LoginData', ('csrf_token', 'user'))
254 254
255 255
256 256 def _autologin_user(app, *args):
257 257 session = login_user_session(app, *args)
258 258 csrf_token = rhodecode.lib.auth.get_csrf_token(session)
259 259 return LoginData(csrf_token, session['rhodecode_user'])
260 260
261 261
262 262 @pytest.fixture
263 263 def autologin_user(app):
264 264 """
265 265 Utility fixture which makes sure that the admin user is logged in
266 266 """
267 267 return _autologin_user(app)
268 268
269 269
270 270 @pytest.fixture
271 271 def autologin_regular_user(app):
272 272 """
273 273 Utility fixture which makes sure that the regular user is logged in
274 274 """
275 275 return _autologin_user(
276 276 app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
277 277
278 278
279 279 @pytest.fixture(scope='function')
280 280 def csrf_token(request, autologin_user):
281 281 return autologin_user.csrf_token
282 282
283 283
284 284 @pytest.fixture(scope='function')
285 285 def xhr_header(request):
286 286 return {'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'}
287 287
288 288
289 289 @pytest.fixture
290 290 def real_crypto_backend(monkeypatch):
291 291 """
292 292 Switch the production crypto backend on for this test.
293 293
294 294 During the test run the crypto backend is replaced with a faster
295 295 implementation based on the MD5 algorithm.
296 296 """
297 297 monkeypatch.setattr(rhodecode, 'is_test', False)
298 298
299 299
300 300 @pytest.fixture(scope='class')
301 301 def index_location(request, pylonsapp):
302 302 index_location = pylonsapp.config['app_conf']['search.location']
303 303 if request.cls:
304 304 request.cls.index_location = index_location
305 305 return index_location
306 306
307 307
308 308 @pytest.fixture(scope='session', autouse=True)
309 309 def tests_tmp_path(request):
310 310 """
311 311 Create temporary directory to be used during the test session.
312 312 """
313 313 if not os.path.exists(TESTS_TMP_PATH):
314 314 os.makedirs(TESTS_TMP_PATH)
315 315
316 316 if not request.config.getoption('--keep-tmp-path'):
317 317 @request.addfinalizer
318 318 def remove_tmp_path():
319 319 shutil.rmtree(TESTS_TMP_PATH)
320 320
321 321 return TESTS_TMP_PATH
322 322
323 323
324 324 @pytest.fixture
325 325 def test_repo_group(request):
326 326 """
327 327 Create a temporary repository group, and destroy it after
328 328 usage automatically
329 329 """
330 330 fixture = Fixture()
331 331 repogroupid = 'test_repo_group_%s' % int(time.time())
332 332 repo_group = fixture.create_repo_group(repogroupid)
333 333
334 334 def _cleanup():
335 335 fixture.destroy_repo_group(repogroupid)
336 336
337 337 request.addfinalizer(_cleanup)
338 338 return repo_group
339 339
340 340
341 341 @pytest.fixture
342 342 def test_user_group(request):
343 343 """
344 344 Create a temporary user group, and destroy it after
345 345 usage automatically
346 346 """
347 347 fixture = Fixture()
348 348 usergroupid = 'test_user_group_%s' % int(time.time())
349 349 user_group = fixture.create_user_group(usergroupid)
350 350
351 351 def _cleanup():
352 352 fixture.destroy_user_group(user_group)
353 353
354 354 request.addfinalizer(_cleanup)
355 355 return user_group
356 356
357 357
358 358 @pytest.fixture(scope='session')
359 359 def test_repo(request):
360 360 container = TestRepoContainer()
361 361 request.addfinalizer(container._cleanup)
362 362 return container
363 363
364 364
365 365 class TestRepoContainer(object):
366 366 """
367 367 Container for test repositories which are used read only.
368 368
369 369 Repositories will be created on demand and re-used during the lifetime
370 370 of this object.
371 371
372 372 Usage to get the svn test repository "minimal"::
373 373
374 374 test_repo = TestContainer()
375 375 repo = test_repo('minimal', 'svn')
376 376
377 377 """
378 378
379 379 dump_extractors = {
380 380 'git': utils.extract_git_repo_from_dump,
381 381 'hg': utils.extract_hg_repo_from_dump,
382 382 'svn': utils.extract_svn_repo_from_dump,
383 383 }
384 384
385 385 def __init__(self):
386 386 self._cleanup_repos = []
387 387 self._fixture = Fixture()
388 388 self._repos = {}
389 389
390 390 def __call__(self, dump_name, backend_alias):
391 391 key = (dump_name, backend_alias)
392 392 if key not in self._repos:
393 393 repo = self._create_repo(dump_name, backend_alias)
394 394 self._repos[key] = repo.repo_id
395 395 return Repository.get(self._repos[key])
396 396
397 397 def _create_repo(self, dump_name, backend_alias):
398 398 repo_name = '%s-%s' % (backend_alias, dump_name)
399 399 backend_class = get_backend(backend_alias)
400 400 dump_extractor = self.dump_extractors[backend_alias]
401 401 repo_path = dump_extractor(dump_name, repo_name)
402 402 vcs_repo = backend_class(repo_path)
403 403 repo2db_mapper({repo_name: vcs_repo})
404 404 repo = RepoModel().get_by_repo_name(repo_name)
405 405 self._cleanup_repos.append(repo_name)
406 406 return repo
407 407
408 408 def _cleanup(self):
409 409 for repo_name in reversed(self._cleanup_repos):
410 410 self._fixture.destroy_repo(repo_name)
411 411
412 412
413 413 @pytest.fixture
414 414 def backend(request, backend_alias, pylonsapp, test_repo):
415 415 """
416 416 Parametrized fixture which represents a single backend implementation.
417 417
418 418 It respects the option `--backends` to focus the test run on specific
419 419 backend implementations.
420 420
421 421 It also supports `pytest.mark.xfail_backends` to mark tests as failing
422 422 for specific backends. This is intended as a utility for incremental
423 423 development of a new backend implementation.
424 424 """
425 425 if backend_alias not in request.config.getoption('--backends'):
426 426 pytest.skip("Backend %s not selected." % (backend_alias, ))
427 427
428 428 utils.check_xfail_backends(request.node, backend_alias)
429 429 utils.check_skip_backends(request.node, backend_alias)
430 430
431 431 repo_name = 'vcs_test_%s' % (backend_alias, )
432 432 backend = Backend(
433 433 alias=backend_alias,
434 434 repo_name=repo_name,
435 435 test_name=request.node.name,
436 436 test_repo_container=test_repo)
437 437 request.addfinalizer(backend.cleanup)
438 438 return backend
439 439
440 440
441 441 @pytest.fixture
442 442 def backend_git(request, pylonsapp, test_repo):
443 443 return backend(request, 'git', pylonsapp, test_repo)
444 444
445 445
446 446 @pytest.fixture
447 447 def backend_hg(request, pylonsapp, test_repo):
448 448 return backend(request, 'hg', pylonsapp, test_repo)
449 449
450 450
451 451 @pytest.fixture
452 452 def backend_svn(request, pylonsapp, test_repo):
453 453 return backend(request, 'svn', pylonsapp, test_repo)
454 454
455 455
456 456 @pytest.fixture
457 457 def backend_random(backend_git):
458 458 """
459 459 Use this to express that your tests need "a backend.
460 460
461 461 A few of our tests need a backend, so that we can run the code. This
462 462 fixture is intended to be used for such cases. It will pick one of the
463 463 backends and run the tests.
464 464
465 465 The fixture `backend` would run the test multiple times for each
466 466 available backend which is a pure waste of time if the test is
467 467 independent of the backend type.
468 468 """
469 469 # TODO: johbo: Change this to pick a random backend
470 470 return backend_git
471 471
472 472
473 473 @pytest.fixture
474 474 def backend_stub(backend_git):
475 475 """
476 476 Use this to express that your tests need a backend stub
477 477
478 478 TODO: mikhail: Implement a real stub logic instead of returning
479 479 a git backend
480 480 """
481 481 return backend_git
482 482
483 483
484 484 @pytest.fixture
485 485 def repo_stub(backend_stub):
486 486 """
487 487 Use this to express that your tests need a repository stub
488 488 """
489 489 return backend_stub.create_repo()
490 490
491 491
492 492 class Backend(object):
493 493 """
494 494 Represents the test configuration for one supported backend
495 495
496 496 Provides easy access to different test repositories based on
497 497 `__getitem__`. Such repositories will only be created once per test
498 498 session.
499 499 """
500 500
501 501 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
502 502 _master_repo = None
503 503 _commit_ids = {}
504 504
505 505 def __init__(self, alias, repo_name, test_name, test_repo_container):
506 506 self.alias = alias
507 507 self.repo_name = repo_name
508 508 self._cleanup_repos = []
509 509 self._test_name = test_name
510 510 self._test_repo_container = test_repo_container
511 511 # TODO: johbo: Used as a delegate interim. Not yet sure if Backend or
512 512 # Fixture will survive in the end.
513 513 self._fixture = Fixture()
514 514
515 515 def __getitem__(self, key):
516 516 return self._test_repo_container(key, self.alias)
517 517
518 518 @property
519 519 def repo(self):
520 520 """
521 521 Returns the "current" repository. This is the vcs_test repo or the
522 522 last repo which has been created with `create_repo`.
523 523 """
524 524 from rhodecode.model.db import Repository
525 525 return Repository.get_by_repo_name(self.repo_name)
526 526
527 527 @property
528 528 def default_branch_name(self):
529 529 VcsRepository = get_backend(self.alias)
530 530 return VcsRepository.DEFAULT_BRANCH_NAME
531 531
532 532 @property
533 533 def default_head_id(self):
534 534 """
535 535 Returns the default head id of the underlying backend.
536 536
537 537 This will be the default branch name in case the backend does have a
538 538 default branch. In the other cases it will point to a valid head
539 539 which can serve as the base to create a new commit on top of it.
540 540 """
541 541 vcsrepo = self.repo.scm_instance()
542 542 head_id = (
543 543 vcsrepo.DEFAULT_BRANCH_NAME or
544 544 vcsrepo.commit_ids[-1])
545 545 return head_id
546 546
547 547 @property
548 548 def commit_ids(self):
549 549 """
550 550 Returns the list of commits for the last created repository
551 551 """
552 552 return self._commit_ids
553 553
554 554 def create_master_repo(self, commits):
555 555 """
556 556 Create a repository and remember it as a template.
557 557
558 558 This allows to easily create derived repositories to construct
559 559 more complex scenarios for diff, compare and pull requests.
560 560
561 561 Returns a commit map which maps from commit message to raw_id.
562 562 """
563 563 self._master_repo = self.create_repo(commits=commits)
564 564 return self._commit_ids
565 565
566 566 def create_repo(
567 567 self, commits=None, number_of_commits=0, heads=None,
568 568 name_suffix=u'', **kwargs):
569 569 """
570 570 Create a repository and record it for later cleanup.
571 571
572 572 :param commits: Optional. A sequence of dict instances.
573 573 Will add a commit per entry to the new repository.
574 574 :param number_of_commits: Optional. If set to a number, this number of
575 575 commits will be added to the new repository.
576 576 :param heads: Optional. Can be set to a sequence of of commit
577 577 names which shall be pulled in from the master repository.
578 578
579 579 """
580 580 self.repo_name = self._next_repo_name() + name_suffix
581 581 repo = self._fixture.create_repo(
582 582 self.repo_name, repo_type=self.alias, **kwargs)
583 583 self._cleanup_repos.append(repo.repo_name)
584 584
585 585 commits = commits or [
586 586 {'message': 'Commit %s of %s' % (x, self.repo_name)}
587 587 for x in xrange(number_of_commits)]
588 588 self._add_commits_to_repo(repo.scm_instance(), commits)
589 589 if heads:
590 590 self.pull_heads(repo, heads)
591 591
592 592 return repo
593 593
594 594 def pull_heads(self, repo, heads):
595 595 """
596 596 Make sure that repo contains all commits mentioned in `heads`
597 597 """
598 598 vcsmaster = self._master_repo.scm_instance()
599 599 vcsrepo = repo.scm_instance()
600 600 vcsrepo.config.clear_section('hooks')
601 601 commit_ids = [self._commit_ids[h] for h in heads]
602 602 vcsrepo.pull(vcsmaster.path, commit_ids=commit_ids)
603 603
604 604 def create_fork(self):
605 605 repo_to_fork = self.repo_name
606 606 self.repo_name = self._next_repo_name()
607 607 repo = self._fixture.create_fork(repo_to_fork, self.repo_name)
608 608 self._cleanup_repos.append(self.repo_name)
609 609 return repo
610 610
611 611 def new_repo_name(self, suffix=u''):
612 612 self.repo_name = self._next_repo_name() + suffix
613 613 self._cleanup_repos.append(self.repo_name)
614 614 return self.repo_name
615 615
616 616 def _next_repo_name(self):
617 617 return u"%s_%s" % (
618 618 self.invalid_repo_name.sub(u'_', self._test_name),
619 619 len(self._cleanup_repos))
620 620
621 621 def ensure_file(self, filename, content='Test content\n'):
622 622 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
623 623 commits = [
624 624 {'added': [
625 625 FileNode(filename, content=content),
626 626 ]},
627 627 ]
628 628 self._add_commits_to_repo(self.repo.scm_instance(), commits)
629 629
630 630 def enable_downloads(self):
631 631 repo = self.repo
632 632 repo.enable_downloads = True
633 633 Session().add(repo)
634 634 Session().commit()
635 635
636 636 def cleanup(self):
637 637 for repo_name in reversed(self._cleanup_repos):
638 638 self._fixture.destroy_repo(repo_name)
639 639
640 640 def _add_commits_to_repo(self, repo, commits):
641 641 commit_ids = _add_commits_to_repo(repo, commits)
642 642 if not commit_ids:
643 643 return
644 644 self._commit_ids = commit_ids
645 645
646 646 # Creating refs for Git to allow fetching them from remote repository
647 647 if self.alias == 'git':
648 648 refs = {}
649 649 for message in self._commit_ids:
650 650 # TODO: mikhail: do more special chars replacements
651 651 ref_name = 'refs/test-refs/{}'.format(
652 652 message.replace(' ', ''))
653 653 refs[ref_name] = self._commit_ids[message]
654 654 self._create_refs(repo, refs)
655 655
656 656 def _create_refs(self, repo, refs):
657 657 for ref_name in refs:
658 658 repo.set_refs(ref_name, refs[ref_name])
659 659
660 660
661 661 @pytest.fixture
662 662 def vcsbackend(request, backend_alias, tests_tmp_path, pylonsapp, test_repo):
663 663 """
664 664 Parametrized fixture which represents a single vcs backend implementation.
665 665
666 666 See the fixture `backend` for more details. This one implements the same
667 667 concept, but on vcs level. So it does not provide model instances etc.
668 668
669 669 Parameters are generated dynamically, see :func:`pytest_generate_tests`
670 670 for how this works.
671 671 """
672 672 if backend_alias not in request.config.getoption('--backends'):
673 673 pytest.skip("Backend %s not selected." % (backend_alias, ))
674 674
675 675 utils.check_xfail_backends(request.node, backend_alias)
676 676 utils.check_skip_backends(request.node, backend_alias)
677 677
678 678 repo_name = 'vcs_test_%s' % (backend_alias, )
679 679 repo_path = os.path.join(tests_tmp_path, repo_name)
680 680 backend = VcsBackend(
681 681 alias=backend_alias,
682 682 repo_path=repo_path,
683 683 test_name=request.node.name,
684 684 test_repo_container=test_repo)
685 685 request.addfinalizer(backend.cleanup)
686 686 return backend
687 687
688 688
689 689 @pytest.fixture
690 690 def vcsbackend_git(request, tests_tmp_path, pylonsapp, test_repo):
691 691 return vcsbackend(request, 'git', tests_tmp_path, pylonsapp, test_repo)
692 692
693 693
694 694 @pytest.fixture
695 695 def vcsbackend_hg(request, tests_tmp_path, pylonsapp, test_repo):
696 696 return vcsbackend(request, 'hg', tests_tmp_path, pylonsapp, test_repo)
697 697
698 698
699 699 @pytest.fixture
700 700 def vcsbackend_svn(request, tests_tmp_path, pylonsapp, test_repo):
701 701 return vcsbackend(request, 'svn', tests_tmp_path, pylonsapp, test_repo)
702 702
703 703
704 704 @pytest.fixture
705 705 def vcsbackend_random(vcsbackend_git):
706 706 """
707 707 Use this to express that your tests need "a vcsbackend".
708 708
709 709 The fixture `vcsbackend` would run the test multiple times for each
710 710 available vcs backend which is a pure waste of time if the test is
711 711 independent of the vcs backend type.
712 712 """
713 713 # TODO: johbo: Change this to pick a random backend
714 714 return vcsbackend_git
715 715
716 716
717 717 @pytest.fixture
718 718 def vcsbackend_stub(vcsbackend_git):
719 719 """
720 720 Use this to express that your test just needs a stub of a vcsbackend.
721 721
722 722 Plan is to eventually implement an in-memory stub to speed tests up.
723 723 """
724 724 return vcsbackend_git
725 725
726 726
727 727 class VcsBackend(object):
728 728 """
729 729 Represents the test configuration for one supported vcs backend.
730 730 """
731 731
732 732 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
733 733
734 734 def __init__(self, alias, repo_path, test_name, test_repo_container):
735 735 self.alias = alias
736 736 self._repo_path = repo_path
737 737 self._cleanup_repos = []
738 738 self._test_name = test_name
739 739 self._test_repo_container = test_repo_container
740 740
741 741 def __getitem__(self, key):
742 742 return self._test_repo_container(key, self.alias).scm_instance()
743 743
744 744 @property
745 745 def repo(self):
746 746 """
747 747 Returns the "current" repository. This is the vcs_test repo of the last
748 748 repo which has been created.
749 749 """
750 750 Repository = get_backend(self.alias)
751 751 return Repository(self._repo_path)
752 752
753 753 @property
754 754 def backend(self):
755 755 """
756 756 Returns the backend implementation class.
757 757 """
758 758 return get_backend(self.alias)
759 759
760 760 def create_repo(self, commits=None, number_of_commits=0, _clone_repo=None):
761 761 repo_name = self._next_repo_name()
762 762 self._repo_path = get_new_dir(repo_name)
763 763 repo_class = get_backend(self.alias)
764 764 src_url = None
765 765 if _clone_repo:
766 766 src_url = _clone_repo.path
767 767 repo = repo_class(self._repo_path, create=True, src_url=src_url)
768 768 self._cleanup_repos.append(repo)
769 769
770 770 commits = commits or [
771 771 {'message': 'Commit %s of %s' % (x, repo_name)}
772 772 for x in xrange(number_of_commits)]
773 773 _add_commits_to_repo(repo, commits)
774 774 return repo
775 775
776 776 def clone_repo(self, repo):
777 777 return self.create_repo(_clone_repo=repo)
778 778
779 779 def cleanup(self):
780 780 for repo in self._cleanup_repos:
781 781 shutil.rmtree(repo.path)
782 782
783 783 def new_repo_path(self):
784 784 repo_name = self._next_repo_name()
785 785 self._repo_path = get_new_dir(repo_name)
786 786 return self._repo_path
787 787
788 788 def _next_repo_name(self):
789 789 return "%s_%s" % (
790 790 self.invalid_repo_name.sub('_', self._test_name),
791 791 len(self._cleanup_repos))
792 792
793 793 def add_file(self, repo, filename, content='Test content\n'):
794 794 imc = repo.in_memory_commit
795 795 imc.add(FileNode(filename, content=content))
796 796 imc.commit(
797 797 message=u'Automatic commit from vcsbackend fixture',
798 798 author=u'Automatic')
799 799
800 800 def ensure_file(self, filename, content='Test content\n'):
801 801 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
802 802 self.add_file(self.repo, filename, content)
803 803
804 804
805 805 def _add_commits_to_repo(vcs_repo, commits):
806 806 commit_ids = {}
807 807 if not commits:
808 808 return commit_ids
809 809
810 810 imc = vcs_repo.in_memory_commit
811 811 commit = None
812 812
813 813 for idx, commit in enumerate(commits):
814 814 message = unicode(commit.get('message', 'Commit %s' % idx))
815 815
816 816 for node in commit.get('added', []):
817 817 imc.add(FileNode(node.path, content=node.content))
818 818 for node in commit.get('changed', []):
819 819 imc.change(FileNode(node.path, content=node.content))
820 820 for node in commit.get('removed', []):
821 821 imc.remove(FileNode(node.path))
822 822
823 823 parents = [
824 824 vcs_repo.get_commit(commit_id=commit_ids[p])
825 825 for p in commit.get('parents', [])]
826 826
827 827 operations = ('added', 'changed', 'removed')
828 828 if not any((commit.get(o) for o in operations)):
829 829 imc.add(FileNode('file_%s' % idx, content=message))
830 830
831 831 commit = imc.commit(
832 832 message=message,
833 833 author=unicode(commit.get('author', 'Automatic')),
834 834 date=commit.get('date'),
835 835 branch=commit.get('branch'),
836 836 parents=parents)
837 837
838 838 commit_ids[commit.message] = commit.raw_id
839 839
840 840 return commit_ids
841 841
842 842
843 843 @pytest.fixture
844 844 def reposerver(request):
845 845 """
846 846 Allows to serve a backend repository
847 847 """
848 848
849 849 repo_server = RepoServer()
850 850 request.addfinalizer(repo_server.cleanup)
851 851 return repo_server
852 852
853 853
854 854 class RepoServer(object):
855 855 """
856 856 Utility to serve a local repository for the duration of a test case.
857 857
858 858 Supports only Subversion so far.
859 859 """
860 860
861 861 url = None
862 862
863 863 def __init__(self):
864 864 self._cleanup_servers = []
865 865
866 866 def serve(self, vcsrepo):
867 867 if vcsrepo.alias != 'svn':
868 868 raise TypeError("Backend %s not supported" % vcsrepo.alias)
869 869
870 870 proc = subprocess32.Popen(
871 871 ['svnserve', '-d', '--foreground', '--listen-host', 'localhost',
872 872 '--root', vcsrepo.path])
873 873 self._cleanup_servers.append(proc)
874 874 self.url = 'svn://localhost'
875 875
876 876 def cleanup(self):
877 877 for proc in self._cleanup_servers:
878 878 proc.terminate()
879 879
880 880
881 881 @pytest.fixture
882 882 def pr_util(backend, request):
883 883 """
884 884 Utility for tests of models and for functional tests around pull requests.
885 885
886 886 It gives an instance of :class:`PRTestUtility` which provides various
887 887 utility methods around one pull request.
888 888
889 889 This fixture uses `backend` and inherits its parameterization.
890 890 """
891 891
892 892 util = PRTestUtility(backend)
893 893
894 894 @request.addfinalizer
895 895 def cleanup():
896 896 util.cleanup()
897 897
898 898 return util
899 899
900 900
901 901 class PRTestUtility(object):
902 902
903 903 pull_request = None
904 904 pull_request_id = None
905 905 mergeable_patcher = None
906 906 mergeable_mock = None
907 907 notification_patcher = None
908 908
909 909 def __init__(self, backend):
910 910 self.backend = backend
911 911
912 912 def create_pull_request(
913 913 self, commits=None, target_head=None, source_head=None,
914 914 revisions=None, approved=False, author=None, mergeable=False,
915 915 enable_notifications=True, name_suffix=u'', reviewers=None,
916 916 title=u"Test", description=u"Description"):
917 917 self.set_mergeable(mergeable)
918 918 if not enable_notifications:
919 919 # mock notification side effect
920 920 self.notification_patcher = mock.patch(
921 921 'rhodecode.model.notification.NotificationModel.create')
922 922 self.notification_patcher.start()
923 923
924 924 if not self.pull_request:
925 925 if not commits:
926 926 commits = [
927 927 {'message': 'c1'},
928 928 {'message': 'c2'},
929 929 {'message': 'c3'},
930 930 ]
931 931 target_head = 'c1'
932 932 source_head = 'c2'
933 933 revisions = ['c2']
934 934
935 935 self.commit_ids = self.backend.create_master_repo(commits)
936 936 self.target_repository = self.backend.create_repo(
937 937 heads=[target_head], name_suffix=name_suffix)
938 938 self.source_repository = self.backend.create_repo(
939 939 heads=[source_head], name_suffix=name_suffix)
940 940 self.author = author or UserModel().get_by_username(
941 941 TEST_USER_ADMIN_LOGIN)
942 942
943 943 model = PullRequestModel()
944 944 self.create_parameters = {
945 945 'created_by': self.author,
946 946 'source_repo': self.source_repository.repo_name,
947 947 'source_ref': self._default_branch_reference(source_head),
948 948 'target_repo': self.target_repository.repo_name,
949 949 'target_ref': self._default_branch_reference(target_head),
950 950 'revisions': [self.commit_ids[r] for r in revisions],
951 951 'reviewers': reviewers or self._get_reviewers(),
952 952 'title': title,
953 953 'description': description,
954 954 }
955 955 self.pull_request = model.create(**self.create_parameters)
956 956 assert model.get_versions(self.pull_request) == []
957 957
958 958 self.pull_request_id = self.pull_request.pull_request_id
959 959
960 960 if approved:
961 961 self.approve()
962 962
963 963 Session().add(self.pull_request)
964 964 Session().commit()
965 965
966 966 return self.pull_request
967 967
968 968 def approve(self):
969 969 self.create_status_votes(
970 970 ChangesetStatus.STATUS_APPROVED,
971 971 *self.pull_request.reviewers)
972 972
973 973 def close(self):
974 974 PullRequestModel().close_pull_request(self.pull_request, self.author)
975 975
976 976 def _default_branch_reference(self, commit_message):
977 977 reference = '%s:%s:%s' % (
978 978 'branch',
979 979 self.backend.default_branch_name,
980 980 self.commit_ids[commit_message])
981 981 return reference
982 982
983 983 def _get_reviewers(self):
984 984 model = UserModel()
985 985 return [
986 986 model.get_by_username(TEST_USER_REGULAR_LOGIN),
987 987 model.get_by_username(TEST_USER_REGULAR2_LOGIN),
988 988 ]
989 989
990 990 def update_source_repository(self, head=None):
991 991 heads = [head or 'c3']
992 992 self.backend.pull_heads(self.source_repository, heads=heads)
993 993
994 994 def add_one_commit(self, head=None):
995 995 self.update_source_repository(head=head)
996 996 old_commit_ids = set(self.pull_request.revisions)
997 997 PullRequestModel().update_commits(self.pull_request)
998 998 commit_ids = set(self.pull_request.revisions)
999 999 new_commit_ids = commit_ids - old_commit_ids
1000 1000 assert len(new_commit_ids) == 1
1001 1001 return new_commit_ids.pop()
1002 1002
1003 1003 def remove_one_commit(self):
1004 1004 assert len(self.pull_request.revisions) == 2
1005 1005 source_vcs = self.source_repository.scm_instance()
1006 1006 removed_commit_id = source_vcs.commit_ids[-1]
1007 1007
1008 1008 # TODO: johbo: Git and Mercurial have an inconsistent vcs api here,
1009 1009 # remove the if once that's sorted out.
1010 1010 if self.backend.alias == "git":
1011 1011 kwargs = {'branch_name': self.backend.default_branch_name}
1012 1012 else:
1013 1013 kwargs = {}
1014 1014 source_vcs.strip(removed_commit_id, **kwargs)
1015 1015
1016 1016 PullRequestModel().update_commits(self.pull_request)
1017 1017 assert len(self.pull_request.revisions) == 1
1018 1018 return removed_commit_id
1019 1019
1020 1020 def create_comment(self, linked_to=None):
1021 1021 comment = CommentsModel().create(
1022 1022 text=u"Test comment",
1023 1023 repo=self.target_repository.repo_name,
1024 1024 user=self.author,
1025 1025 pull_request=self.pull_request)
1026 1026 assert comment.pull_request_version_id is None
1027 1027
1028 1028 if linked_to:
1029 1029 PullRequestModel()._link_comments_to_version(linked_to)
1030 1030
1031 1031 return comment
1032 1032
1033 1033 def create_inline_comment(
1034 1034 self, linked_to=None, line_no=u'n1', file_path='file_1'):
1035 1035 comment = CommentsModel().create(
1036 1036 text=u"Test comment",
1037 1037 repo=self.target_repository.repo_name,
1038 1038 user=self.author,
1039 1039 line_no=line_no,
1040 1040 f_path=file_path,
1041 1041 pull_request=self.pull_request)
1042 1042 assert comment.pull_request_version_id is None
1043 1043
1044 1044 if linked_to:
1045 1045 PullRequestModel()._link_comments_to_version(linked_to)
1046 1046
1047 1047 return comment
1048 1048
1049 1049 def create_version_of_pull_request(self):
1050 1050 pull_request = self.create_pull_request()
1051 1051 version = PullRequestModel()._create_version_from_snapshot(
1052 1052 pull_request)
1053 1053 return version
1054 1054
1055 1055 def create_status_votes(self, status, *reviewers):
1056 1056 for reviewer in reviewers:
1057 1057 ChangesetStatusModel().set_status(
1058 1058 repo=self.pull_request.target_repo,
1059 1059 status=status,
1060 1060 user=reviewer.user_id,
1061 1061 pull_request=self.pull_request)
1062 1062
1063 1063 def set_mergeable(self, value):
1064 1064 if not self.mergeable_patcher:
1065 1065 self.mergeable_patcher = mock.patch.object(
1066 1066 VcsSettingsModel, 'get_general_settings')
1067 1067 self.mergeable_mock = self.mergeable_patcher.start()
1068 1068 self.mergeable_mock.return_value = {
1069 1069 'rhodecode_pr_merge_enabled': value}
1070 1070
1071 1071 def cleanup(self):
1072 1072 # In case the source repository is already cleaned up, the pull
1073 1073 # request will already be deleted.
1074 1074 pull_request = PullRequest().get(self.pull_request_id)
1075 1075 if pull_request:
1076 1076 PullRequestModel().delete(pull_request)
1077 1077 Session().commit()
1078 1078
1079 1079 if self.notification_patcher:
1080 1080 self.notification_patcher.stop()
1081 1081
1082 1082 if self.mergeable_patcher:
1083 1083 self.mergeable_patcher.stop()
1084 1084
1085 1085
1086 1086 @pytest.fixture
1087 1087 def user_admin(pylonsapp):
1088 1088 """
1089 1089 Provides the default admin test user as an instance of `db.User`.
1090 1090 """
1091 1091 user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
1092 1092 return user
1093 1093
1094 1094
1095 1095 @pytest.fixture
1096 1096 def user_regular(pylonsapp):
1097 1097 """
1098 1098 Provides the default regular test user as an instance of `db.User`.
1099 1099 """
1100 1100 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1101 1101 return user
1102 1102
1103 1103
1104 1104 @pytest.fixture
1105 1105 def user_util(request, pylonsapp):
1106 1106 """
1107 1107 Provides a wired instance of `UserUtility` with integrated cleanup.
1108 1108 """
1109 1109 utility = UserUtility(test_name=request.node.name)
1110 1110 request.addfinalizer(utility.cleanup)
1111 1111 return utility
1112 1112
1113 1113
1114 1114 # TODO: johbo: Split this up into utilities per domain or something similar
1115 1115 class UserUtility(object):
1116 1116
1117 1117 def __init__(self, test_name="test"):
1118 1118 self._test_name = self._sanitize_name(test_name)
1119 1119 self.fixture = Fixture()
1120 1120 self.repo_group_ids = []
1121 1121 self.repos_ids = []
1122 1122 self.user_ids = []
1123 1123 self.user_group_ids = []
1124 1124 self.user_repo_permission_ids = []
1125 1125 self.user_group_repo_permission_ids = []
1126 1126 self.user_repo_group_permission_ids = []
1127 1127 self.user_group_repo_group_permission_ids = []
1128 1128 self.user_user_group_permission_ids = []
1129 1129 self.user_group_user_group_permission_ids = []
1130 1130 self.user_permissions = []
1131 1131
1132 1132 def _sanitize_name(self, name):
1133 1133 for char in ['[', ']']:
1134 1134 name = name.replace(char, '_')
1135 1135 return name
1136 1136
1137 1137 def create_repo_group(
1138 1138 self, owner=TEST_USER_ADMIN_LOGIN, auto_cleanup=True):
1139 1139 group_name = "{prefix}_repogroup_{count}".format(
1140 1140 prefix=self._test_name,
1141 1141 count=len(self.repo_group_ids))
1142 1142 repo_group = self.fixture.create_repo_group(
1143 1143 group_name, cur_user=owner)
1144 1144 if auto_cleanup:
1145 1145 self.repo_group_ids.append(repo_group.group_id)
1146 1146 return repo_group
1147 1147
1148 1148 def create_repo(self, owner=TEST_USER_ADMIN_LOGIN, parent=None, auto_cleanup=True):
1149 1149 repo_name = "{prefix}_repository_{count}".format(
1150 1150 prefix=self._test_name,
1151 1151 count=len(self.repos_ids))
1152 1152
1153 1153 repository = self.fixture.create_repo(
1154 1154 repo_name, cur_user=owner, repo_group=parent)
1155 1155 if auto_cleanup:
1156 1156 self.repos_ids.append(repository.repo_id)
1157 1157 return repository
1158 1158
1159 1159 def create_user(self, auto_cleanup=True, **kwargs):
1160 1160 user_name = "{prefix}_user_{count}".format(
1161 1161 prefix=self._test_name,
1162 1162 count=len(self.user_ids))
1163 1163 user = self.fixture.create_user(user_name, **kwargs)
1164 1164 if auto_cleanup:
1165 1165 self.user_ids.append(user.user_id)
1166 1166 return user
1167 1167
1168 1168 def create_user_with_group(self):
1169 1169 user = self.create_user()
1170 1170 user_group = self.create_user_group(members=[user])
1171 1171 return user, user_group
1172 1172
1173 def create_user_group(self, members=None, auto_cleanup=True, **kwargs):
1173 def create_user_group(self, owner=TEST_USER_ADMIN_LOGIN, members=None,
1174 auto_cleanup=True, **kwargs):
1174 1175 group_name = "{prefix}_usergroup_{count}".format(
1175 1176 prefix=self._test_name,
1176 1177 count=len(self.user_group_ids))
1177 user_group = self.fixture.create_user_group(group_name, **kwargs)
1178 user_group = self.fixture.create_user_group(
1179 group_name, cur_user=owner, **kwargs)
1180
1178 1181 if auto_cleanup:
1179 1182 self.user_group_ids.append(user_group.users_group_id)
1180 1183 if members:
1181 1184 for user in members:
1182 1185 UserGroupModel().add_user_to_group(user_group, user)
1183 1186 return user_group
1184 1187
1185 1188 def grant_user_permission(self, user_name, permission_name):
1186 1189 self._inherit_default_user_permissions(user_name, False)
1187 1190 self.user_permissions.append((user_name, permission_name))
1188 1191
1189 1192 def grant_user_permission_to_repo_group(
1190 1193 self, repo_group, user, permission_name):
1191 1194 permission = RepoGroupModel().grant_user_permission(
1192 1195 repo_group, user, permission_name)
1193 1196 self.user_repo_group_permission_ids.append(
1194 1197 (repo_group.group_id, user.user_id))
1195 1198 return permission
1196 1199
1197 1200 def grant_user_group_permission_to_repo_group(
1198 1201 self, repo_group, user_group, permission_name):
1199 1202 permission = RepoGroupModel().grant_user_group_permission(
1200 1203 repo_group, user_group, permission_name)
1201 1204 self.user_group_repo_group_permission_ids.append(
1202 1205 (repo_group.group_id, user_group.users_group_id))
1203 1206 return permission
1204 1207
1205 1208 def grant_user_permission_to_repo(
1206 1209 self, repo, user, permission_name):
1207 1210 permission = RepoModel().grant_user_permission(
1208 1211 repo, user, permission_name)
1209 1212 self.user_repo_permission_ids.append(
1210 1213 (repo.repo_id, user.user_id))
1211 1214 return permission
1212 1215
1213 1216 def grant_user_group_permission_to_repo(
1214 1217 self, repo, user_group, permission_name):
1215 1218 permission = RepoModel().grant_user_group_permission(
1216 1219 repo, user_group, permission_name)
1217 1220 self.user_group_repo_permission_ids.append(
1218 1221 (repo.repo_id, user_group.users_group_id))
1219 1222 return permission
1220 1223
1221 1224 def grant_user_permission_to_user_group(
1222 1225 self, target_user_group, user, permission_name):
1223 1226 permission = UserGroupModel().grant_user_permission(
1224 1227 target_user_group, user, permission_name)
1225 1228 self.user_user_group_permission_ids.append(
1226 1229 (target_user_group.users_group_id, user.user_id))
1227 1230 return permission
1228 1231
1229 1232 def grant_user_group_permission_to_user_group(
1230 1233 self, target_user_group, user_group, permission_name):
1231 1234 permission = UserGroupModel().grant_user_group_permission(
1232 1235 target_user_group, user_group, permission_name)
1233 1236 self.user_group_user_group_permission_ids.append(
1234 1237 (target_user_group.users_group_id, user_group.users_group_id))
1235 1238 return permission
1236 1239
1237 1240 def revoke_user_permission(self, user_name, permission_name):
1238 1241 self._inherit_default_user_permissions(user_name, True)
1239 1242 UserModel().revoke_perm(user_name, permission_name)
1240 1243
1241 1244 def _inherit_default_user_permissions(self, user_name, value):
1242 1245 user = UserModel().get_by_username(user_name)
1243 1246 user.inherit_default_permissions = value
1244 1247 Session().add(user)
1245 1248 Session().commit()
1246 1249
1247 1250 def cleanup(self):
1248 1251 self._cleanup_permissions()
1249 1252 self._cleanup_repos()
1250 1253 self._cleanup_repo_groups()
1251 1254 self._cleanup_user_groups()
1252 1255 self._cleanup_users()
1253 1256
1254 1257 def _cleanup_permissions(self):
1255 1258 if self.user_permissions:
1256 1259 for user_name, permission_name in self.user_permissions:
1257 1260 self.revoke_user_permission(user_name, permission_name)
1258 1261
1259 1262 for permission in self.user_repo_permission_ids:
1260 1263 RepoModel().revoke_user_permission(*permission)
1261 1264
1262 1265 for permission in self.user_group_repo_permission_ids:
1263 1266 RepoModel().revoke_user_group_permission(*permission)
1264 1267
1265 1268 for permission in self.user_repo_group_permission_ids:
1266 1269 RepoGroupModel().revoke_user_permission(*permission)
1267 1270
1268 1271 for permission in self.user_group_repo_group_permission_ids:
1269 1272 RepoGroupModel().revoke_user_group_permission(*permission)
1270 1273
1271 1274 for permission in self.user_user_group_permission_ids:
1272 1275 UserGroupModel().revoke_user_permission(*permission)
1273 1276
1274 1277 for permission in self.user_group_user_group_permission_ids:
1275 1278 UserGroupModel().revoke_user_group_permission(*permission)
1276 1279
1277 1280 def _cleanup_repo_groups(self):
1278 1281 def _repo_group_compare(first_group_id, second_group_id):
1279 1282 """
1280 1283 Gives higher priority to the groups with the most complex paths
1281 1284 """
1282 1285 first_group = RepoGroup.get(first_group_id)
1283 1286 second_group = RepoGroup.get(second_group_id)
1284 1287 first_group_parts = (
1285 1288 len(first_group.group_name.split('/')) if first_group else 0)
1286 1289 second_group_parts = (
1287 1290 len(second_group.group_name.split('/')) if second_group else 0)
1288 1291 return cmp(second_group_parts, first_group_parts)
1289 1292
1290 1293 sorted_repo_group_ids = sorted(
1291 1294 self.repo_group_ids, cmp=_repo_group_compare)
1292 1295 for repo_group_id in sorted_repo_group_ids:
1293 1296 self.fixture.destroy_repo_group(repo_group_id)
1294 1297
1295 1298 def _cleanup_repos(self):
1296 1299 sorted_repos_ids = sorted(self.repos_ids)
1297 1300 for repo_id in sorted_repos_ids:
1298 1301 self.fixture.destroy_repo(repo_id)
1299 1302
1300 1303 def _cleanup_user_groups(self):
1301 1304 def _user_group_compare(first_group_id, second_group_id):
1302 1305 """
1303 1306 Gives higher priority to the groups with the most complex paths
1304 1307 """
1305 1308 first_group = UserGroup.get(first_group_id)
1306 1309 second_group = UserGroup.get(second_group_id)
1307 1310 first_group_parts = (
1308 1311 len(first_group.users_group_name.split('/'))
1309 1312 if first_group else 0)
1310 1313 second_group_parts = (
1311 1314 len(second_group.users_group_name.split('/'))
1312 1315 if second_group else 0)
1313 1316 return cmp(second_group_parts, first_group_parts)
1314 1317
1315 1318 sorted_user_group_ids = sorted(
1316 1319 self.user_group_ids, cmp=_user_group_compare)
1317 1320 for user_group_id in sorted_user_group_ids:
1318 1321 self.fixture.destroy_user_group(user_group_id)
1319 1322
1320 1323 def _cleanup_users(self):
1321 1324 for user_id in self.user_ids:
1322 1325 self.fixture.destroy_user(user_id)
1323 1326
1324 1327
1325 1328 # TODO: Think about moving this into a pytest-pyro package and make it a
1326 1329 # pytest plugin
1327 1330 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
1328 1331 def pytest_runtest_makereport(item, call):
1329 1332 """
1330 1333 Adding the remote traceback if the exception has this information.
1331 1334
1332 1335 VCSServer attaches this information as the attribute `_vcs_server_traceback`
1333 1336 to the exception instance.
1334 1337 """
1335 1338 outcome = yield
1336 1339 report = outcome.get_result()
1337 1340 if call.excinfo:
1338 1341 _add_vcsserver_remote_traceback(report, call.excinfo.value)
1339 1342
1340 1343
1341 1344 def _add_vcsserver_remote_traceback(report, exc):
1342 1345 vcsserver_traceback = getattr(exc, '_vcs_server_traceback', None)
1343 1346
1344 1347 if vcsserver_traceback:
1345 1348 section = 'VCSServer remote traceback ' + report.when
1346 1349 report.sections.append((section, vcsserver_traceback))
1347 1350
1348 1351
1349 1352 @pytest.fixture(scope='session')
1350 1353 def testrun():
1351 1354 return {
1352 1355 'uuid': uuid.uuid4(),
1353 1356 'start': datetime.datetime.utcnow().isoformat(),
1354 1357 'timestamp': int(time.time()),
1355 1358 }
1356 1359
1357 1360
1358 1361 @pytest.fixture(autouse=True)
1359 1362 def collect_appenlight_stats(request, testrun):
1360 1363 """
1361 1364 This fixture reports memory consumtion of single tests.
1362 1365
1363 1366 It gathers data based on `psutil` and sends them to Appenlight. The option
1364 1367 ``--ae`` has te be used to enable this fixture and the API key for your
1365 1368 application has to be provided in ``--ae-key``.
1366 1369 """
1367 1370 try:
1368 1371 # cygwin cannot have yet psutil support.
1369 1372 import psutil
1370 1373 except ImportError:
1371 1374 return
1372 1375
1373 1376 if not request.config.getoption('--appenlight'):
1374 1377 return
1375 1378 else:
1376 1379 # Only request the pylonsapp fixture if appenlight tracking is
1377 1380 # enabled. This will speed up a test run of unit tests by 2 to 3
1378 1381 # seconds if appenlight is not enabled.
1379 1382 pylonsapp = request.getfuncargvalue("pylonsapp")
1380 1383 url = '{}/api/logs'.format(request.config.getoption('--appenlight-url'))
1381 1384 client = AppenlightClient(
1382 1385 url=url,
1383 1386 api_key=request.config.getoption('--appenlight-api-key'),
1384 1387 namespace=request.node.nodeid,
1385 1388 request=str(testrun['uuid']),
1386 1389 testrun=testrun)
1387 1390
1388 1391 client.collect({
1389 1392 'message': "Starting",
1390 1393 })
1391 1394
1392 1395 server_and_port = pylonsapp.config['vcs.server']
1393 1396 protocol = pylonsapp.config['vcs.server.protocol']
1394 1397 server = create_vcsserver_proxy(server_and_port, protocol)
1395 1398 with server:
1396 1399 vcs_pid = server.get_pid()
1397 1400 server.run_gc()
1398 1401 vcs_process = psutil.Process(vcs_pid)
1399 1402 mem = vcs_process.memory_info()
1400 1403 client.tag_before('vcsserver.rss', mem.rss)
1401 1404 client.tag_before('vcsserver.vms', mem.vms)
1402 1405
1403 1406 test_process = psutil.Process()
1404 1407 mem = test_process.memory_info()
1405 1408 client.tag_before('test.rss', mem.rss)
1406 1409 client.tag_before('test.vms', mem.vms)
1407 1410
1408 1411 client.tag_before('time', time.time())
1409 1412
1410 1413 @request.addfinalizer
1411 1414 def send_stats():
1412 1415 client.tag_after('time', time.time())
1413 1416 with server:
1414 1417 gc_stats = server.run_gc()
1415 1418 for tag, value in gc_stats.items():
1416 1419 client.tag_after(tag, value)
1417 1420 mem = vcs_process.memory_info()
1418 1421 client.tag_after('vcsserver.rss', mem.rss)
1419 1422 client.tag_after('vcsserver.vms', mem.vms)
1420 1423
1421 1424 mem = test_process.memory_info()
1422 1425 client.tag_after('test.rss', mem.rss)
1423 1426 client.tag_after('test.vms', mem.vms)
1424 1427
1425 1428 client.collect({
1426 1429 'message': "Finished",
1427 1430 })
1428 1431 client.send_stats()
1429 1432
1430 1433 return client
1431 1434
1432 1435
1433 1436 class AppenlightClient():
1434 1437
1435 1438 url_template = '{url}?protocol_version=0.5'
1436 1439
1437 1440 def __init__(
1438 1441 self, url, api_key, add_server=True, add_timestamp=True,
1439 1442 namespace=None, request=None, testrun=None):
1440 1443 self.url = self.url_template.format(url=url)
1441 1444 self.api_key = api_key
1442 1445 self.add_server = add_server
1443 1446 self.add_timestamp = add_timestamp
1444 1447 self.namespace = namespace
1445 1448 self.request = request
1446 1449 self.server = socket.getfqdn(socket.gethostname())
1447 1450 self.tags_before = {}
1448 1451 self.tags_after = {}
1449 1452 self.stats = []
1450 1453 self.testrun = testrun or {}
1451 1454
1452 1455 def tag_before(self, tag, value):
1453 1456 self.tags_before[tag] = value
1454 1457
1455 1458 def tag_after(self, tag, value):
1456 1459 self.tags_after[tag] = value
1457 1460
1458 1461 def collect(self, data):
1459 1462 if self.add_server:
1460 1463 data.setdefault('server', self.server)
1461 1464 if self.add_timestamp:
1462 1465 data.setdefault('date', datetime.datetime.utcnow().isoformat())
1463 1466 if self.namespace:
1464 1467 data.setdefault('namespace', self.namespace)
1465 1468 if self.request:
1466 1469 data.setdefault('request', self.request)
1467 1470 self.stats.append(data)
1468 1471
1469 1472 def send_stats(self):
1470 1473 tags = [
1471 1474 ('testrun', self.request),
1472 1475 ('testrun.start', self.testrun['start']),
1473 1476 ('testrun.timestamp', self.testrun['timestamp']),
1474 1477 ('test', self.namespace),
1475 1478 ]
1476 1479 for key, value in self.tags_before.items():
1477 1480 tags.append((key + '.before', value))
1478 1481 try:
1479 1482 delta = self.tags_after[key] - value
1480 1483 tags.append((key + '.delta', delta))
1481 1484 except Exception:
1482 1485 pass
1483 1486 for key, value in self.tags_after.items():
1484 1487 tags.append((key + '.after', value))
1485 1488 self.collect({
1486 1489 'message': "Collected tags",
1487 1490 'tags': tags,
1488 1491 })
1489 1492
1490 1493 response = requests.post(
1491 1494 self.url,
1492 1495 headers={
1493 1496 'X-appenlight-api-key': self.api_key},
1494 1497 json=self.stats,
1495 1498 )
1496 1499
1497 1500 if not response.status_code == 200:
1498 1501 pprint.pprint(self.stats)
1499 1502 print response.headers
1500 1503 print response.text
1501 1504 raise Exception('Sending to appenlight failed')
1502 1505
1503 1506
1504 1507 @pytest.fixture
1505 1508 def gist_util(request, pylonsapp):
1506 1509 """
1507 1510 Provides a wired instance of `GistUtility` with integrated cleanup.
1508 1511 """
1509 1512 utility = GistUtility()
1510 1513 request.addfinalizer(utility.cleanup)
1511 1514 return utility
1512 1515
1513 1516
1514 1517 class GistUtility(object):
1515 1518 def __init__(self):
1516 1519 self.fixture = Fixture()
1517 1520 self.gist_ids = []
1518 1521
1519 1522 def create_gist(self, **kwargs):
1520 1523 gist = self.fixture.create_gist(**kwargs)
1521 1524 self.gist_ids.append(gist.gist_id)
1522 1525 return gist
1523 1526
1524 1527 def cleanup(self):
1525 1528 for id_ in self.gist_ids:
1526 1529 self.fixture.destroy_gists(str(id_))
1527 1530
1528 1531
1529 1532 @pytest.fixture
1530 1533 def enabled_backends(request):
1531 1534 backends = request.config.option.backends
1532 1535 return backends[:]
1533 1536
1534 1537
1535 1538 @pytest.fixture
1536 1539 def settings_util(request):
1537 1540 """
1538 1541 Provides a wired instance of `SettingsUtility` with integrated cleanup.
1539 1542 """
1540 1543 utility = SettingsUtility()
1541 1544 request.addfinalizer(utility.cleanup)
1542 1545 return utility
1543 1546
1544 1547
1545 1548 class SettingsUtility(object):
1546 1549 def __init__(self):
1547 1550 self.rhodecode_ui_ids = []
1548 1551 self.rhodecode_setting_ids = []
1549 1552 self.repo_rhodecode_ui_ids = []
1550 1553 self.repo_rhodecode_setting_ids = []
1551 1554
1552 1555 def create_repo_rhodecode_ui(
1553 1556 self, repo, section, value, key=None, active=True, cleanup=True):
1554 1557 key = key or hashlib.sha1(
1555 1558 '{}{}{}'.format(section, value, repo.repo_id)).hexdigest()
1556 1559
1557 1560 setting = RepoRhodeCodeUi()
1558 1561 setting.repository_id = repo.repo_id
1559 1562 setting.ui_section = section
1560 1563 setting.ui_value = value
1561 1564 setting.ui_key = key
1562 1565 setting.ui_active = active
1563 1566 Session().add(setting)
1564 1567 Session().commit()
1565 1568
1566 1569 if cleanup:
1567 1570 self.repo_rhodecode_ui_ids.append(setting.ui_id)
1568 1571 return setting
1569 1572
1570 1573 def create_rhodecode_ui(
1571 1574 self, section, value, key=None, active=True, cleanup=True):
1572 1575 key = key or hashlib.sha1('{}{}'.format(section, value)).hexdigest()
1573 1576
1574 1577 setting = RhodeCodeUi()
1575 1578 setting.ui_section = section
1576 1579 setting.ui_value = value
1577 1580 setting.ui_key = key
1578 1581 setting.ui_active = active
1579 1582 Session().add(setting)
1580 1583 Session().commit()
1581 1584
1582 1585 if cleanup:
1583 1586 self.rhodecode_ui_ids.append(setting.ui_id)
1584 1587 return setting
1585 1588
1586 1589 def create_repo_rhodecode_setting(
1587 1590 self, repo, name, value, type_, cleanup=True):
1588 1591 setting = RepoRhodeCodeSetting(
1589 1592 repo.repo_id, key=name, val=value, type=type_)
1590 1593 Session().add(setting)
1591 1594 Session().commit()
1592 1595
1593 1596 if cleanup:
1594 1597 self.repo_rhodecode_setting_ids.append(setting.app_settings_id)
1595 1598 return setting
1596 1599
1597 1600 def create_rhodecode_setting(self, name, value, type_, cleanup=True):
1598 1601 setting = RhodeCodeSetting(key=name, val=value, type=type_)
1599 1602 Session().add(setting)
1600 1603 Session().commit()
1601 1604
1602 1605 if cleanup:
1603 1606 self.rhodecode_setting_ids.append(setting.app_settings_id)
1604 1607
1605 1608 return setting
1606 1609
1607 1610 def cleanup(self):
1608 1611 for id_ in self.rhodecode_ui_ids:
1609 1612 setting = RhodeCodeUi.get(id_)
1610 1613 Session().delete(setting)
1611 1614
1612 1615 for id_ in self.rhodecode_setting_ids:
1613 1616 setting = RhodeCodeSetting.get(id_)
1614 1617 Session().delete(setting)
1615 1618
1616 1619 for id_ in self.repo_rhodecode_ui_ids:
1617 1620 setting = RepoRhodeCodeUi.get(id_)
1618 1621 Session().delete(setting)
1619 1622
1620 1623 for id_ in self.repo_rhodecode_setting_ids:
1621 1624 setting = RepoRhodeCodeSetting.get(id_)
1622 1625 Session().delete(setting)
1623 1626
1624 1627 Session().commit()
1625 1628
1626 1629
1627 1630 @pytest.fixture
1628 1631 def no_notifications(request):
1629 1632 notification_patcher = mock.patch(
1630 1633 'rhodecode.model.notification.NotificationModel.create')
1631 1634 notification_patcher.start()
1632 1635 request.addfinalizer(notification_patcher.stop)
1633 1636
1634 1637
1635 1638 @pytest.fixture
1636 1639 def silence_action_logger(request):
1637 1640 notification_patcher = mock.patch(
1638 1641 'rhodecode.lib.utils.action_logger')
1639 1642 notification_patcher.start()
1640 1643 request.addfinalizer(notification_patcher.stop)
1641 1644
1642 1645
1643 1646 @pytest.fixture(scope='session')
1644 1647 def repeat(request):
1645 1648 """
1646 1649 The number of repetitions is based on this fixture.
1647 1650
1648 1651 Slower calls may divide it by 10 or 100. It is chosen in a way so that the
1649 1652 tests are not too slow in our default test suite.
1650 1653 """
1651 1654 return request.config.getoption('--repeat')
1652 1655
1653 1656
1654 1657 @pytest.fixture
1655 1658 def rhodecode_fixtures():
1656 1659 return Fixture()
1657 1660
1658 1661
1659 1662 @pytest.fixture
1660 1663 def request_stub():
1661 1664 """
1662 1665 Stub request object.
1663 1666 """
1664 1667 request = pyramid.testing.DummyRequest()
1665 1668 request.scheme = 'https'
1666 1669 return request
1667 1670
1668 1671
1669 1672 @pytest.fixture
1670 1673 def config_stub(request, request_stub):
1671 1674 """
1672 1675 Set up pyramid.testing and return the Configurator.
1673 1676 """
1674 1677 config = pyramid.testing.setUp(request=request_stub)
1675 1678
1676 1679 @request.addfinalizer
1677 1680 def cleanup():
1678 1681 pyramid.testing.tearDown()
1679 1682
1680 1683 return config
1681 1684
1682 1685
1683 1686 @pytest.fixture
1684 1687 def StubIntegrationType():
1685 1688 class _StubIntegrationType(IntegrationTypeBase):
1686 1689 """ Test integration type class """
1687 1690
1688 1691 key = 'test'
1689 1692 display_name = 'Test integration type'
1690 1693 description = 'A test integration type for testing'
1691 1694 icon = 'test_icon_html_image'
1692 1695
1693 1696 def __init__(self, settings):
1694 1697 super(_StubIntegrationType, self).__init__(settings)
1695 1698 self.sent_events = [] # for testing
1696 1699
1697 1700 def send_event(self, event):
1698 1701 self.sent_events.append(event)
1699 1702
1700 1703 def settings_schema(self):
1701 1704 class SettingsSchema(colander.Schema):
1702 1705 test_string_field = colander.SchemaNode(
1703 1706 colander.String(),
1704 1707 missing=colander.required,
1705 1708 title='test string field',
1706 1709 )
1707 1710 test_int_field = colander.SchemaNode(
1708 1711 colander.Int(),
1709 1712 title='some integer setting',
1710 1713 )
1711 1714 return SettingsSchema()
1712 1715
1713 1716
1714 1717 integration_type_registry.register_integration_type(_StubIntegrationType)
1715 1718 return _StubIntegrationType
1716 1719
1717 1720 @pytest.fixture
1718 1721 def stub_integration_settings():
1719 1722 return {
1720 1723 'test_string_field': 'some data',
1721 1724 'test_int_field': 100,
1722 1725 }
1723 1726
1724 1727
1725 1728 @pytest.fixture
1726 1729 def repo_integration_stub(request, repo_stub, StubIntegrationType,
1727 1730 stub_integration_settings):
1728 1731 integration = IntegrationModel().create(
1729 1732 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1730 1733 name='test repo integration',
1731 1734 repo=repo_stub, repo_group=None, child_repos_only=None)
1732 1735
1733 1736 @request.addfinalizer
1734 1737 def cleanup():
1735 1738 IntegrationModel().delete(integration)
1736 1739
1737 1740 return integration
1738 1741
1739 1742
1740 1743 @pytest.fixture
1741 1744 def repogroup_integration_stub(request, test_repo_group, StubIntegrationType,
1742 1745 stub_integration_settings):
1743 1746 integration = IntegrationModel().create(
1744 1747 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1745 1748 name='test repogroup integration',
1746 1749 repo=None, repo_group=test_repo_group, child_repos_only=True)
1747 1750
1748 1751 @request.addfinalizer
1749 1752 def cleanup():
1750 1753 IntegrationModel().delete(integration)
1751 1754
1752 1755 return integration
1753 1756
1754 1757
1755 1758 @pytest.fixture
1756 1759 def repogroup_recursive_integration_stub(request, test_repo_group,
1757 1760 StubIntegrationType, stub_integration_settings):
1758 1761 integration = IntegrationModel().create(
1759 1762 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1760 1763 name='test recursive repogroup integration',
1761 1764 repo=None, repo_group=test_repo_group, child_repos_only=False)
1762 1765
1763 1766 @request.addfinalizer
1764 1767 def cleanup():
1765 1768 IntegrationModel().delete(integration)
1766 1769
1767 1770 return integration
1768 1771
1769 1772
1770 1773 @pytest.fixture
1771 1774 def global_integration_stub(request, StubIntegrationType,
1772 1775 stub_integration_settings):
1773 1776 integration = IntegrationModel().create(
1774 1777 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1775 1778 name='test global integration',
1776 1779 repo=None, repo_group=None, child_repos_only=None)
1777 1780
1778 1781 @request.addfinalizer
1779 1782 def cleanup():
1780 1783 IntegrationModel().delete(integration)
1781 1784
1782 1785 return integration
1783 1786
1784 1787
1785 1788 @pytest.fixture
1786 1789 def root_repos_integration_stub(request, StubIntegrationType,
1787 1790 stub_integration_settings):
1788 1791 integration = IntegrationModel().create(
1789 1792 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1790 1793 name='test global integration',
1791 1794 repo=None, repo_group=None, child_repos_only=True)
1792 1795
1793 1796 @request.addfinalizer
1794 1797 def cleanup():
1795 1798 IntegrationModel().delete(integration)
1796 1799
1797 1800 return integration
1798 1801
1799 1802
1800 1803 @pytest.fixture
1801 1804 def local_dt_to_utc():
1802 1805 def _factory(dt):
1803 1806 return dt.replace(tzinfo=dateutil.tz.tzlocal()).astimezone(
1804 1807 dateutil.tz.tzutc()).replace(tzinfo=None)
1805 1808 return _factory
@@ -1,409 +1,409 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import threading
22 22 import time
23 23 import logging
24 24 import os.path
25 25 import subprocess32
26 26 import tempfile
27 27 import urllib2
28 28 from urlparse import urlparse, parse_qsl
29 29 from urllib import unquote_plus
30 30
31 31 from webtest.app import (
32 32 Request, TestResponse, TestApp, print_stderr, string_types)
33 33
34 34 import pytest
35 35 import rc_testdata
36 36
37 37 from rhodecode.model.db import User, Repository
38 38 from rhodecode.model.meta import Session
39 39 from rhodecode.model.scm import ScmModel
40 40 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
41 41 from rhodecode.lib.vcs.backends.base import EmptyCommit
42 42
43 43
44 44 log = logging.getLogger(__name__)
45 45
46 46
47 47 class CustomTestResponse(TestResponse):
48 48 def _save_output(self, out):
49 49 f = tempfile.NamedTemporaryFile(
50 50 delete=False, prefix='rc-test-', suffix='.html')
51 51 f.write(out)
52 52 return f.name
53 53
54 54 def mustcontain(self, *strings, **kw):
55 55 """
56 56 Assert that the response contains all of the strings passed
57 57 in as arguments.
58 58
59 59 Equivalent to::
60 60
61 61 assert string in res
62 62 """
63 63 if 'no' in kw:
64 64 no = kw['no']
65 65 del kw['no']
66 66 if isinstance(no, string_types):
67 67 no = [no]
68 68 else:
69 69 no = []
70 70 if kw:
71 71 raise TypeError(
72 72 "The only keyword argument allowed is 'no'")
73 73
74 74 f = self._save_output(str(self))
75 75
76 76 for s in strings:
77 77 if not s in self:
78 78 print_stderr("Actual response (no %r):" % s)
79 79 print_stderr(str(self))
80 80 raise IndexError(
81 81 "Body does not contain string %r, output saved as %s" % (
82 82 s, f))
83 83
84 84 for no_s in no:
85 85 if no_s in self:
86 86 print_stderr("Actual response (has %r)" % no_s)
87 87 print_stderr(str(self))
88 88 raise IndexError(
89 89 "Body contains bad string %r, output saved as %s" % (
90 90 no_s, f))
91 91
92 92 def assert_response(self):
93 93 return AssertResponse(self)
94 94
95 95
96 96 class TestRequest(Request):
97 97
98 98 # for py.test
99 99 disabled = True
100 100 ResponseClass = CustomTestResponse
101 101
102 102
103 103 class CustomTestApp(TestApp):
104 104 """
105 105 Custom app to make mustcontain more usefull
106 106 """
107 107 RequestClass = TestRequest
108 108
109 109
110 110
111 111
112 112
113 113 def set_anonymous_access(enabled):
114 114 """(Dis)allows anonymous access depending on parameter `enabled`"""
115 115 user = User.get_default_user()
116 116 user.active = enabled
117 117 Session().add(user)
118 118 Session().commit()
119 119 log.info('anonymous access is now: %s', enabled)
120 120 assert enabled == User.get_default_user().active, (
121 121 'Cannot set anonymous access')
122 122
123 123
124 124 def check_xfail_backends(node, backend_alias):
125 125 # Using "xfail_backends" here intentionally, since this marks work
126 126 # which is "to be done" soon.
127 127 skip_marker = node.get_marker('xfail_backends')
128 128 if skip_marker and backend_alias in skip_marker.args:
129 129 msg = "Support for backend %s to be developed." % (backend_alias, )
130 130 msg = skip_marker.kwargs.get('reason', msg)
131 131 pytest.xfail(msg)
132 132
133 133
134 134 def check_skip_backends(node, backend_alias):
135 135 # Using "skip_backends" here intentionally, since this marks work which is
136 136 # not supported.
137 137 skip_marker = node.get_marker('skip_backends')
138 138 if skip_marker and backend_alias in skip_marker.args:
139 139 msg = "Feature not supported for backend %s." % (backend_alias, )
140 140 msg = skip_marker.kwargs.get('reason', msg)
141 141 pytest.skip(msg)
142 142
143 143
144 144 def extract_git_repo_from_dump(dump_name, repo_name):
145 145 """Create git repo `repo_name` from dump `dump_name`."""
146 146 repos_path = ScmModel().repos_path
147 147 target_path = os.path.join(repos_path, repo_name)
148 148 rc_testdata.extract_git_dump(dump_name, target_path)
149 149 return target_path
150 150
151 151
152 152 def extract_hg_repo_from_dump(dump_name, repo_name):
153 153 """Create hg repo `repo_name` from dump `dump_name`."""
154 154 repos_path = ScmModel().repos_path
155 155 target_path = os.path.join(repos_path, repo_name)
156 156 rc_testdata.extract_hg_dump(dump_name, target_path)
157 157 return target_path
158 158
159 159
160 160 def extract_svn_repo_from_dump(dump_name, repo_name):
161 161 """Create a svn repo `repo_name` from dump `dump_name`."""
162 162 repos_path = ScmModel().repos_path
163 163 target_path = os.path.join(repos_path, repo_name)
164 164 SubversionRepository(target_path, create=True)
165 165 _load_svn_dump_into_repo(dump_name, target_path)
166 166 return target_path
167 167
168 168
169 169 def assert_message_in_log(log_records, message, levelno, module):
170 170 messages = [
171 171 r.message for r in log_records
172 172 if r.module == module and r.levelno == levelno
173 173 ]
174 174 assert message in messages
175 175
176 176
177 177 def _load_svn_dump_into_repo(dump_name, repo_path):
178 178 """
179 179 Utility to populate a svn repository with a named dump
180 180
181 181 Currently the dumps are in rc_testdata. They might later on be
182 182 integrated with the main repository once they stabilize more.
183 183 """
184 184 dump = rc_testdata.load_svn_dump(dump_name)
185 185 load_dump = subprocess32.Popen(
186 186 ['svnadmin', 'load', repo_path],
187 187 stdin=subprocess32.PIPE, stdout=subprocess32.PIPE,
188 188 stderr=subprocess32.PIPE)
189 189 out, err = load_dump.communicate(dump)
190 190 if load_dump.returncode != 0:
191 191 log.error("Output of load_dump command: %s", out)
192 192 log.error("Error output of load_dump command: %s", err)
193 193 raise Exception(
194 194 'Failed to load dump "%s" into repository at path "%s".'
195 195 % (dump_name, repo_path))
196 196
197 197
198 198 class AssertResponse(object):
199 199 """
200 200 Utility that helps to assert things about a given HTML response.
201 201 """
202 202
203 203 def __init__(self, response):
204 204 self.response = response
205 205
206 206 def get_imports(self):
207 207 from lxml.html import fromstring, tostring
208 208 from lxml.cssselect import CSSSelector
209 209 return fromstring, tostring, CSSSelector
210 210
211 211 def one_element_exists(self, css_selector):
212 212 self.get_element(css_selector)
213 213
214 214 def no_element_exists(self, css_selector):
215 215 assert not self._get_elements(css_selector)
216 216
217 217 def element_equals_to(self, css_selector, expected_content):
218 218 element = self.get_element(css_selector)
219 219 element_text = self._element_to_string(element)
220 220 assert expected_content in element_text
221 221
222 222 def element_contains(self, css_selector, expected_content):
223 223 element = self.get_element(css_selector)
224 224 assert expected_content in element.text_content()
225 225
226 226 def element_value_contains(self, css_selector, expected_content):
227 227 element = self.get_element(css_selector)
228 228 assert expected_content in element.value
229 229
230 230 def contains_one_link(self, link_text, href):
231 231 fromstring, tostring, CSSSelector = self.get_imports()
232 232 doc = fromstring(self.response.body)
233 233 sel = CSSSelector('a[href]')
234 234 elements = [
235 235 e for e in sel(doc) if e.text_content().strip() == link_text]
236 236 assert len(elements) == 1, "Did not find link or found multiple links"
237 237 self._ensure_url_equal(elements[0].attrib.get('href'), href)
238 238
239 239 def contains_one_anchor(self, anchor_id):
240 240 fromstring, tostring, CSSSelector = self.get_imports()
241 241 doc = fromstring(self.response.body)
242 242 sel = CSSSelector('#' + anchor_id)
243 243 elements = sel(doc)
244 assert len(elements) == 1
244 assert len(elements) == 1, 'cannot find 1 element {}'.format(anchor_id)
245 245
246 246 def _ensure_url_equal(self, found, expected):
247 247 assert _Url(found) == _Url(expected)
248 248
249 249 def get_element(self, css_selector):
250 250 elements = self._get_elements(css_selector)
251 assert len(elements) == 1
251 assert len(elements) == 1, 'cannot find 1 element {}'.format(css_selector)
252 252 return elements[0]
253 253
254 254 def get_elements(self, css_selector):
255 255 return self._get_elements(css_selector)
256 256
257 257 def _get_elements(self, css_selector):
258 258 fromstring, tostring, CSSSelector = self.get_imports()
259 259 doc = fromstring(self.response.body)
260 260 sel = CSSSelector(css_selector)
261 261 elements = sel(doc)
262 262 return elements
263 263
264 264 def _element_to_string(self, element):
265 265 fromstring, tostring, CSSSelector = self.get_imports()
266 266 return tostring(element)
267 267
268 268
269 269 class _Url(object):
270 270 """
271 271 A url object that can be compared with other url orbjects
272 272 without regard to the vagaries of encoding, escaping, and ordering
273 273 of parameters in query strings.
274 274
275 275 Inspired by
276 276 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
277 277 """
278 278
279 279 def __init__(self, url):
280 280 parts = urlparse(url)
281 281 _query = frozenset(parse_qsl(parts.query))
282 282 _path = unquote_plus(parts.path)
283 283 parts = parts._replace(query=_query, path=_path)
284 284 self.parts = parts
285 285
286 286 def __eq__(self, other):
287 287 return self.parts == other.parts
288 288
289 289 def __hash__(self):
290 290 return hash(self.parts)
291 291
292 292
293 293 def run_test_concurrently(times, raise_catched_exc=True):
294 294 """
295 295 Add this decorator to small pieces of code that you want to test
296 296 concurrently
297 297
298 298 ex:
299 299
300 300 @test_concurrently(25)
301 301 def my_test_function():
302 302 ...
303 303 """
304 304 def test_concurrently_decorator(test_func):
305 305 def wrapper(*args, **kwargs):
306 306 exceptions = []
307 307
308 308 def call_test_func():
309 309 try:
310 310 test_func(*args, **kwargs)
311 311 except Exception as e:
312 312 exceptions.append(e)
313 313 if raise_catched_exc:
314 314 raise
315 315 threads = []
316 316 for i in range(times):
317 317 threads.append(threading.Thread(target=call_test_func))
318 318 for t in threads:
319 319 t.start()
320 320 for t in threads:
321 321 t.join()
322 322 if exceptions:
323 323 raise Exception(
324 324 'test_concurrently intercepted %s exceptions: %s' % (
325 325 len(exceptions), exceptions))
326 326 return wrapper
327 327 return test_concurrently_decorator
328 328
329 329
330 330 def wait_for_url(url, timeout=10):
331 331 """
332 332 Wait until URL becomes reachable.
333 333
334 334 It polls the URL until the timeout is reached or it became reachable.
335 335 If will call to `py.test.fail` in case the URL is not reachable.
336 336 """
337 337 timeout = time.time() + timeout
338 338 last = 0
339 339 wait = 0.1
340 340
341 341 while timeout > last:
342 342 last = time.time()
343 343 if is_url_reachable(url):
344 344 break
345 345 elif (last + wait) > time.time():
346 346 # Go to sleep because not enough time has passed since last check.
347 347 time.sleep(wait)
348 348 else:
349 349 pytest.fail("Timeout while waiting for URL {}".format(url))
350 350
351 351
352 352 def is_url_reachable(url):
353 353 try:
354 354 urllib2.urlopen(url)
355 355 except urllib2.URLError:
356 356 return False
357 357 return True
358 358
359 359
360 360 def get_session_from_response(response):
361 361 """
362 362 This returns the session from a response object. Pylons has some magic
363 363 to make the session available as `response.session`. But pyramid
364 364 doesn't expose it.
365 365 """
366 366 # TODO: Try to look up the session key also.
367 367 return response.request.environ['beaker.session']
368 368
369 369
370 370 def repo_on_filesystem(repo_name):
371 371 from rhodecode.lib import vcs
372 372 from rhodecode.tests import TESTS_TMP_PATH
373 373 repo = vcs.get_vcs_instance(
374 374 os.path.join(TESTS_TMP_PATH, repo_name), create=False)
375 375 return repo is not None
376 376
377 377
378 378 def commit_change(
379 379 repo, filename, content, message, vcs_type, parent=None, newfile=False):
380 380 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
381 381
382 382 repo = Repository.get_by_repo_name(repo)
383 383 _commit = parent
384 384 if not parent:
385 385 _commit = EmptyCommit(alias=vcs_type)
386 386
387 387 if newfile:
388 388 nodes = {
389 389 filename: {
390 390 'content': content
391 391 }
392 392 }
393 393 commit = ScmModel().create_nodes(
394 394 user=TEST_USER_ADMIN_LOGIN, repo=repo,
395 395 message=message,
396 396 nodes=nodes,
397 397 parent_commit=_commit,
398 398 author=TEST_USER_ADMIN_LOGIN,
399 399 )
400 400 else:
401 401 commit = ScmModel().commit_change(
402 402 repo=repo.scm_instance(), repo_name=repo.repo_name,
403 403 commit=parent, user=TEST_USER_ADMIN_LOGIN,
404 404 author=TEST_USER_ADMIN_LOGIN,
405 405 message=message,
406 406 content=content,
407 407 f_path=filename
408 408 )
409 409 return commit
General Comments 0
You need to be logged in to leave comments. Login now