##// END OF EJS Templates
tests: fixed broken tests after UI chages
marcink -
r3609:215d0f34 new-ui
parent child Browse files
Show More
@@ -1,82 +1,82 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import pytest
22 22
23 23 from rhodecode.tests import TestController
24 24 from rhodecode.tests.fixture import Fixture
25 25
26 26 fixture = Fixture()
27 27
28 28
29 29 def route_path(name, params=None, **kwargs):
30 30 import urllib
31 31 from rhodecode.apps._base import ADMIN_PREFIX
32 32
33 33 base_url = {
34 34 'admin_home': ADMIN_PREFIX,
35 35 'pullrequest_show': '/{repo_name}/pull-request/{pull_request_id}',
36 36 'pull_requests_global': ADMIN_PREFIX + '/pull-request/{pull_request_id}',
37 37 'pull_requests_global_0': ADMIN_PREFIX + '/pull_requests/{pull_request_id}',
38 38 'pull_requests_global_1': ADMIN_PREFIX + '/pull-requests/{pull_request_id}',
39 39
40 40 }[name].format(**kwargs)
41 41
42 42 if params:
43 43 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
44 44 return base_url
45 45
46 46
47 47 class TestAdminMainView(TestController):
48 48
49 def test_redirect_admin_home(self):
49 def test_access_admin_home(self):
50 50 self.log_user()
51 response = self.app.get(route_path('admin_home'), status=302)
52 assert response.location.endswith('/audit_logs')
51 response = self.app.get(route_path('admin_home'), status=200)
52 response.mustcontain("Administration area")
53 53
54 54 def test_redirect_pull_request_view(self, view):
55 55 self.log_user()
56 56 self.app.get(
57 57 route_path(view, pull_request_id='xxxx'),
58 58 status=404)
59 59
60 60 @pytest.mark.backends("git", "hg")
61 61 @pytest.mark.parametrize('view', [
62 62 'pull_requests_global',
63 63 'pull_requests_global_0',
64 64 'pull_requests_global_1',
65 65 ])
66 66 def test_redirect_pull_request_view(self, view, pr_util):
67 67 self.log_user()
68 68 pull_request = pr_util.create_pull_request()
69 69 pull_request_id = pull_request.pull_request_id
70 70 repo_name = pull_request.target_repo.repo_name
71 71
72 72 response = self.app.get(
73 73 route_path(view, pull_request_id=pull_request_id),
74 74 status=302)
75 75 assert response.location.endswith(
76 76 'pull-request/{}'.format(pull_request_id))
77 77
78 78 redirect_url = route_path(
79 79 'pullrequest_show', repo_name=repo_name,
80 80 pull_request_id=pull_request_id)
81 81
82 82 assert redirect_url in response.location
@@ -1,152 +1,147 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import pytest
22 22
23 23 from rhodecode.tests import TestController
24 24 from rhodecode.tests.fixture import Fixture
25 25
26 26
27 27 def route_path(name, params=None, **kwargs):
28 28 import urllib
29 29 from rhodecode.apps._base import ADMIN_PREFIX
30 30
31 31 base_url = {
32 32 'home': '/',
33 'admin_home': ADMIN_PREFIX,
33 34 'repos':
34 35 ADMIN_PREFIX + '/repos',
35 36 'repo_groups':
36 37 ADMIN_PREFIX + '/repo_groups',
37 38 'user_groups':
38 39 ADMIN_PREFIX + '/user_groups',
39 40 'user_groups_data':
40 41 ADMIN_PREFIX + '/user_groups_data',
41 42 }[name].format(**kwargs)
42 43
43 44 if params:
44 45 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
45 46 return base_url
46 47
47 48
48 49 fixture = Fixture()
49 50
50 51
51 52 class TestAdminDelegatedUser(TestController):
52 53
53 def test_regular_user_cannot_see_admin_interfaces(
54 self, user_util, xhr_header):
54 def test_regular_user_cannot_see_admin_interfaces(self, user_util, xhr_header):
55 55 user = user_util.create_user(password='qweqwe')
56 user_util.inherit_default_user_permissions(user.username, False)
57
56 58 self.log_user(user.username, 'qweqwe')
57 59
58 # check if in home view, such user doesn't see the "admin" menus
59 response = self.app.get(route_path('home'))
60
61 assert_response = response.assert_response()
62
63 assert_response.no_element_exists('li.local-admin-repos')
64 assert_response.no_element_exists('li.local-admin-repo-groups')
65 assert_response.no_element_exists('li.local-admin-user-groups')
60 # user doesn't have any access to resources so main admin page should 404
61 self.app.get(route_path('admin_home'), status=404)
66 62
67 63 response = self.app.get(route_path('repos'), status=200)
68 64 response.mustcontain('data: []')
69 65
70 66 response = self.app.get(route_path('repo_groups'), status=200)
71 67 response.mustcontain('data: []')
72 68
73 69 response = self.app.get(route_path('user_groups_data'),
74 70 status=200, extra_environ=xhr_header)
75 71 assert response.json['data'] == []
76 72
77 def test_regular_user_can_see_admin_interfaces_if_owner(
78 self, user_util, xhr_header):
73 def test_regular_user_can_see_admin_interfaces_if_owner(self, user_util, xhr_header):
79 74 user = user_util.create_user(password='qweqwe')
80 75 username = user.username
81 76
82 77 repo = user_util.create_repo(owner=username)
83 78 repo_name = repo.repo_name
84 79
85 80 repo_group = user_util.create_repo_group(owner=username)
86 81 repo_group_name = repo_group.group_name
87 82
88 83 user_group = user_util.create_user_group(owner=username)
89 84 user_group_name = user_group.users_group_name
90 85
91 86 self.log_user(username, 'qweqwe')
92 # check if in home view, such user doesn't see the "admin" menus
93 response = self.app.get(route_path('home'))
87
88 response = self.app.get(route_path('admin_home'))
94 89
95 90 assert_response = response.assert_response()
96 91
97 assert_response.one_element_exists('li.local-admin-repos')
98 assert_response.one_element_exists('li.local-admin-repo-groups')
99 assert_response.one_element_exists('li.local-admin-user-groups')
92 assert_response.element_contains('td.delegated-admin-repos', '1')
93 assert_response.element_contains('td.delegated-admin-repo-groups', '1')
94 assert_response.element_contains('td.delegated-admin-user-groups', '1')
100 95
101 96 # admin interfaces have visible elements
102 97 response = self.app.get(route_path('repos'), status=200)
103 98 response.mustcontain('"name_raw": "{}"'.format(repo_name))
104 99
105 100 response = self.app.get(route_path('repo_groups'), status=200)
106 101 response.mustcontain('"name_raw": "{}"'.format(repo_group_name))
107 102
108 103 response = self.app.get(route_path('user_groups_data'),
109 104 extra_environ=xhr_header, status=200)
110 105 response.mustcontain('"name_raw": "{}"'.format(user_group_name))
111 106
112 107 def test_regular_user_can_see_admin_interfaces_if_admin_perm(
113 108 self, user_util, xhr_header):
114 109 user = user_util.create_user(password='qweqwe')
115 110 username = user.username
116 111
117 112 repo = user_util.create_repo()
118 113 repo_name = repo.repo_name
119 114
120 115 repo_group = user_util.create_repo_group()
121 116 repo_group_name = repo_group.group_name
122 117
123 118 user_group = user_util.create_user_group()
124 119 user_group_name = user_group.users_group_name
125 120
126 121 user_util.grant_user_permission_to_repo(
127 122 repo, user, 'repository.admin')
128 123 user_util.grant_user_permission_to_repo_group(
129 124 repo_group, user, 'group.admin')
130 125 user_util.grant_user_permission_to_user_group(
131 126 user_group, user, 'usergroup.admin')
132 127
133 128 self.log_user(username, 'qweqwe')
134 129 # check if in home view, such user doesn't see the "admin" menus
135 response = self.app.get(route_path('home'))
130 response = self.app.get(route_path('admin_home'))
136 131
137 132 assert_response = response.assert_response()
138 133
139 assert_response.one_element_exists('li.local-admin-repos')
140 assert_response.one_element_exists('li.local-admin-repo-groups')
141 assert_response.one_element_exists('li.local-admin-user-groups')
134 assert_response.element_contains('td.delegated-admin-repos', '1')
135 assert_response.element_contains('td.delegated-admin-repo-groups', '1')
136 assert_response.element_contains('td.delegated-admin-user-groups', '1')
142 137
143 138 # admin interfaces have visible elements
144 139 response = self.app.get(route_path('repos'), status=200)
145 140 response.mustcontain('"name_raw": "{}"'.format(repo_name))
146 141
147 142 response = self.app.get(route_path('repo_groups'), status=200)
148 143 response.mustcontain('"name_raw": "{}"'.format(repo_group_name))
149 144
150 145 response = self.app.get(route_path('user_groups_data'),
151 146 extra_environ=xhr_header, status=200)
152 147 response.mustcontain('"name_raw": "{}"'.format(user_group_name))
@@ -1,1886 +1,1886 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import collections
22 22 import datetime
23 23 import hashlib
24 24 import os
25 25 import re
26 26 import pprint
27 27 import shutil
28 28 import socket
29 29 import subprocess32
30 30 import time
31 31 import uuid
32 32 import dateutil.tz
33 33 import functools
34 34
35 35 import mock
36 36 import pyramid.testing
37 37 import pytest
38 38 import colander
39 39 import requests
40 40 import pyramid.paster
41 41
42 42 import rhodecode
43 43 from rhodecode.lib.utils2 import AttributeDict
44 44 from rhodecode.model.changeset_status import ChangesetStatusModel
45 45 from rhodecode.model.comment import CommentsModel
46 46 from rhodecode.model.db import (
47 47 PullRequest, Repository, RhodeCodeSetting, ChangesetStatus, RepoGroup,
48 48 UserGroup, RepoRhodeCodeUi, RepoRhodeCodeSetting, RhodeCodeUi)
49 49 from rhodecode.model.meta import Session
50 50 from rhodecode.model.pull_request import PullRequestModel
51 51 from rhodecode.model.repo import RepoModel
52 52 from rhodecode.model.repo_group import RepoGroupModel
53 53 from rhodecode.model.user import UserModel
54 54 from rhodecode.model.settings import VcsSettingsModel
55 55 from rhodecode.model.user_group import UserGroupModel
56 56 from rhodecode.model.integration import IntegrationModel
57 57 from rhodecode.integrations import integration_type_registry
58 58 from rhodecode.integrations.types.base import IntegrationTypeBase
59 59 from rhodecode.lib.utils import repo2db_mapper
60 60 from rhodecode.lib.vcs import create_vcsserver_proxy
61 61 from rhodecode.lib.vcs.backends import get_backend
62 62 from rhodecode.lib.vcs.nodes import FileNode
63 63 from rhodecode.tests import (
64 64 login_user_session, get_new_dir, utils, TESTS_TMP_PATH,
65 65 TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR2_LOGIN,
66 66 TEST_USER_REGULAR_PASS)
67 67 from rhodecode.tests.utils import CustomTestApp, set_anonymous_access
68 68 from rhodecode.tests.fixture import Fixture
69 69 from rhodecode.config import utils as config_utils
70 70
71 71 def _split_comma(value):
72 72 return value.split(',')
73 73
74 74
75 75 def pytest_addoption(parser):
76 76 parser.addoption(
77 77 '--keep-tmp-path', action='store_true',
78 78 help="Keep the test temporary directories")
79 79 parser.addoption(
80 80 '--backends', action='store', type=_split_comma,
81 81 default=['git', 'hg', 'svn'],
82 82 help="Select which backends to test for backend specific tests.")
83 83 parser.addoption(
84 84 '--dbs', action='store', type=_split_comma,
85 85 default=['sqlite'],
86 86 help="Select which database to test for database specific tests. "
87 87 "Possible options are sqlite,postgres,mysql")
88 88 parser.addoption(
89 89 '--appenlight', '--ae', action='store_true',
90 90 help="Track statistics in appenlight.")
91 91 parser.addoption(
92 92 '--appenlight-api-key', '--ae-key',
93 93 help="API key for Appenlight.")
94 94 parser.addoption(
95 95 '--appenlight-url', '--ae-url',
96 96 default="https://ae.rhodecode.com",
97 97 help="Appenlight service URL, defaults to https://ae.rhodecode.com")
98 98 parser.addoption(
99 99 '--sqlite-connection-string', action='store',
100 100 default='', help="Connection string for the dbs tests with SQLite")
101 101 parser.addoption(
102 102 '--postgres-connection-string', action='store',
103 103 default='', help="Connection string for the dbs tests with Postgres")
104 104 parser.addoption(
105 105 '--mysql-connection-string', action='store',
106 106 default='', help="Connection string for the dbs tests with MySQL")
107 107 parser.addoption(
108 108 '--repeat', type=int, default=100,
109 109 help="Number of repetitions in performance tests.")
110 110
111 111
112 112 def pytest_configure(config):
113 113 from rhodecode.config import patches
114 114
115 115
116 116 def pytest_collection_modifyitems(session, config, items):
117 117 # nottest marked, compare nose, used for transition from nose to pytest
118 118 remaining = [
119 119 i for i in items if getattr(i.obj, '__test__', True)]
120 120 items[:] = remaining
121 121
122 122
123 123 def pytest_generate_tests(metafunc):
124 124 # Support test generation based on --backend parameter
125 125 if 'backend_alias' in metafunc.fixturenames:
126 126 backends = get_backends_from_metafunc(metafunc)
127 127 scope = None
128 128 if not backends:
129 129 pytest.skip("Not enabled for any of selected backends")
130 130 metafunc.parametrize('backend_alias', backends, scope=scope)
131 131 elif hasattr(metafunc.function, 'backends'):
132 132 backends = get_backends_from_metafunc(metafunc)
133 133 if not backends:
134 134 pytest.skip("Not enabled for any of selected backends")
135 135
136 136
137 137 def get_backends_from_metafunc(metafunc):
138 138 requested_backends = set(metafunc.config.getoption('--backends'))
139 139 if hasattr(metafunc.function, 'backends'):
140 140 # Supported backends by this test function, created from
141 141 # pytest.mark.backends
142 142 backends = metafunc.definition.get_closest_marker('backends').args
143 143 elif hasattr(metafunc.cls, 'backend_alias'):
144 144 # Support class attribute "backend_alias", this is mainly
145 145 # for legacy reasons for tests not yet using pytest.mark.backends
146 146 backends = [metafunc.cls.backend_alias]
147 147 else:
148 148 backends = metafunc.config.getoption('--backends')
149 149 return requested_backends.intersection(backends)
150 150
151 151
152 152 @pytest.fixture(scope='session', autouse=True)
153 153 def activate_example_rcextensions(request):
154 154 """
155 155 Patch in an example rcextensions module which verifies passed in kwargs.
156 156 """
157 157 from rhodecode.config import rcextensions
158 158
159 159 old_extensions = rhodecode.EXTENSIONS
160 160 rhodecode.EXTENSIONS = rcextensions
161 161 rhodecode.EXTENSIONS.calls = collections.defaultdict(list)
162 162
163 163 @request.addfinalizer
164 164 def cleanup():
165 165 rhodecode.EXTENSIONS = old_extensions
166 166
167 167
168 168 @pytest.fixture
169 169 def capture_rcextensions():
170 170 """
171 171 Returns the recorded calls to entry points in rcextensions.
172 172 """
173 173 calls = rhodecode.EXTENSIONS.calls
174 174 calls.clear()
175 175 # Note: At this moment, it is still the empty dict, but that will
176 176 # be filled during the test run and since it is a reference this
177 177 # is enough to make it work.
178 178 return calls
179 179
180 180
181 181 @pytest.fixture(scope='session')
182 182 def http_environ_session():
183 183 """
184 184 Allow to use "http_environ" in session scope.
185 185 """
186 186 return plain_http_environ()
187 187
188 188
189 189 def plain_http_host_stub():
190 190 """
191 191 Value of HTTP_HOST in the test run.
192 192 """
193 193 return 'example.com:80'
194 194
195 195
196 196 @pytest.fixture
197 197 def http_host_stub():
198 198 """
199 199 Value of HTTP_HOST in the test run.
200 200 """
201 201 return plain_http_host_stub()
202 202
203 203
204 204 def plain_http_host_only_stub():
205 205 """
206 206 Value of HTTP_HOST in the test run.
207 207 """
208 208 return plain_http_host_stub().split(':')[0]
209 209
210 210
211 211 @pytest.fixture
212 212 def http_host_only_stub():
213 213 """
214 214 Value of HTTP_HOST in the test run.
215 215 """
216 216 return plain_http_host_only_stub()
217 217
218 218
219 219 def plain_http_environ():
220 220 """
221 221 HTTP extra environ keys.
222 222
223 223 User by the test application and as well for setting up the pylons
224 224 environment. In the case of the fixture "app" it should be possible
225 225 to override this for a specific test case.
226 226 """
227 227 return {
228 228 'SERVER_NAME': plain_http_host_only_stub(),
229 229 'SERVER_PORT': plain_http_host_stub().split(':')[1],
230 230 'HTTP_HOST': plain_http_host_stub(),
231 231 'HTTP_USER_AGENT': 'rc-test-agent',
232 232 'REQUEST_METHOD': 'GET'
233 233 }
234 234
235 235
236 236 @pytest.fixture
237 237 def http_environ():
238 238 """
239 239 HTTP extra environ keys.
240 240
241 241 User by the test application and as well for setting up the pylons
242 242 environment. In the case of the fixture "app" it should be possible
243 243 to override this for a specific test case.
244 244 """
245 245 return plain_http_environ()
246 246
247 247
248 248 @pytest.fixture(scope='session')
249 249 def baseapp(ini_config, vcsserver, http_environ_session):
250 250 from rhodecode.lib.pyramid_utils import get_app_config
251 251 from rhodecode.config.middleware import make_pyramid_app
252 252
253 253 print("Using the RhodeCode configuration:{}".format(ini_config))
254 254 pyramid.paster.setup_logging(ini_config)
255 255
256 256 settings = get_app_config(ini_config)
257 257 app = make_pyramid_app({'__file__': ini_config}, **settings)
258 258
259 259 return app
260 260
261 261
262 262 @pytest.fixture(scope='function')
263 263 def app(request, config_stub, baseapp, http_environ):
264 264 app = CustomTestApp(
265 265 baseapp,
266 266 extra_environ=http_environ)
267 267 if request.cls:
268 268 request.cls.app = app
269 269 return app
270 270
271 271
272 272 @pytest.fixture(scope='session')
273 273 def app_settings(baseapp, ini_config):
274 274 """
275 275 Settings dictionary used to create the app.
276 276
277 277 Parses the ini file and passes the result through the sanitize and apply
278 278 defaults mechanism in `rhodecode.config.middleware`.
279 279 """
280 280 return baseapp.config.get_settings()
281 281
282 282
283 283 @pytest.fixture(scope='session')
284 284 def db_connection(ini_settings):
285 285 # Initialize the database connection.
286 286 config_utils.initialize_database(ini_settings)
287 287
288 288
289 289 LoginData = collections.namedtuple('LoginData', ('csrf_token', 'user'))
290 290
291 291
292 292 def _autologin_user(app, *args):
293 293 session = login_user_session(app, *args)
294 294 csrf_token = rhodecode.lib.auth.get_csrf_token(session)
295 295 return LoginData(csrf_token, session['rhodecode_user'])
296 296
297 297
298 298 @pytest.fixture
299 299 def autologin_user(app):
300 300 """
301 301 Utility fixture which makes sure that the admin user is logged in
302 302 """
303 303 return _autologin_user(app)
304 304
305 305
306 306 @pytest.fixture
307 307 def autologin_regular_user(app):
308 308 """
309 309 Utility fixture which makes sure that the regular user is logged in
310 310 """
311 311 return _autologin_user(
312 312 app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
313 313
314 314
315 315 @pytest.fixture(scope='function')
316 316 def csrf_token(request, autologin_user):
317 317 return autologin_user.csrf_token
318 318
319 319
320 320 @pytest.fixture(scope='function')
321 321 def xhr_header(request):
322 322 return {'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'}
323 323
324 324
325 325 @pytest.fixture
326 326 def real_crypto_backend(monkeypatch):
327 327 """
328 328 Switch the production crypto backend on for this test.
329 329
330 330 During the test run the crypto backend is replaced with a faster
331 331 implementation based on the MD5 algorithm.
332 332 """
333 333 monkeypatch.setattr(rhodecode, 'is_test', False)
334 334
335 335
336 336 @pytest.fixture(scope='class')
337 337 def index_location(request, baseapp):
338 338 index_location = baseapp.config.get_settings()['search.location']
339 339 if request.cls:
340 340 request.cls.index_location = index_location
341 341 return index_location
342 342
343 343
344 344 @pytest.fixture(scope='session', autouse=True)
345 345 def tests_tmp_path(request):
346 346 """
347 347 Create temporary directory to be used during the test session.
348 348 """
349 349 if not os.path.exists(TESTS_TMP_PATH):
350 350 os.makedirs(TESTS_TMP_PATH)
351 351
352 352 if not request.config.getoption('--keep-tmp-path'):
353 353 @request.addfinalizer
354 354 def remove_tmp_path():
355 355 shutil.rmtree(TESTS_TMP_PATH)
356 356
357 357 return TESTS_TMP_PATH
358 358
359 359
360 360 @pytest.fixture
361 361 def test_repo_group(request):
362 362 """
363 363 Create a temporary repository group, and destroy it after
364 364 usage automatically
365 365 """
366 366 fixture = Fixture()
367 367 repogroupid = 'test_repo_group_%s' % str(time.time()).replace('.', '')
368 368 repo_group = fixture.create_repo_group(repogroupid)
369 369
370 370 def _cleanup():
371 371 fixture.destroy_repo_group(repogroupid)
372 372
373 373 request.addfinalizer(_cleanup)
374 374 return repo_group
375 375
376 376
377 377 @pytest.fixture
378 378 def test_user_group(request):
379 379 """
380 380 Create a temporary user group, and destroy it after
381 381 usage automatically
382 382 """
383 383 fixture = Fixture()
384 384 usergroupid = 'test_user_group_%s' % str(time.time()).replace('.', '')
385 385 user_group = fixture.create_user_group(usergroupid)
386 386
387 387 def _cleanup():
388 388 fixture.destroy_user_group(user_group)
389 389
390 390 request.addfinalizer(_cleanup)
391 391 return user_group
392 392
393 393
394 394 @pytest.fixture(scope='session')
395 395 def test_repo(request):
396 396 container = TestRepoContainer()
397 397 request.addfinalizer(container._cleanup)
398 398 return container
399 399
400 400
401 401 class TestRepoContainer(object):
402 402 """
403 403 Container for test repositories which are used read only.
404 404
405 405 Repositories will be created on demand and re-used during the lifetime
406 406 of this object.
407 407
408 408 Usage to get the svn test repository "minimal"::
409 409
410 410 test_repo = TestContainer()
411 411 repo = test_repo('minimal', 'svn')
412 412
413 413 """
414 414
415 415 dump_extractors = {
416 416 'git': utils.extract_git_repo_from_dump,
417 417 'hg': utils.extract_hg_repo_from_dump,
418 418 'svn': utils.extract_svn_repo_from_dump,
419 419 }
420 420
421 421 def __init__(self):
422 422 self._cleanup_repos = []
423 423 self._fixture = Fixture()
424 424 self._repos = {}
425 425
426 426 def __call__(self, dump_name, backend_alias, config=None):
427 427 key = (dump_name, backend_alias)
428 428 if key not in self._repos:
429 429 repo = self._create_repo(dump_name, backend_alias, config)
430 430 self._repos[key] = repo.repo_id
431 431 return Repository.get(self._repos[key])
432 432
433 433 def _create_repo(self, dump_name, backend_alias, config):
434 434 repo_name = '%s-%s' % (backend_alias, dump_name)
435 435 backend_class = get_backend(backend_alias)
436 436 dump_extractor = self.dump_extractors[backend_alias]
437 437 repo_path = dump_extractor(dump_name, repo_name)
438 438
439 439 vcs_repo = backend_class(repo_path, config=config)
440 440 repo2db_mapper({repo_name: vcs_repo})
441 441
442 442 repo = RepoModel().get_by_repo_name(repo_name)
443 443 self._cleanup_repos.append(repo_name)
444 444 return repo
445 445
446 446 def _cleanup(self):
447 447 for repo_name in reversed(self._cleanup_repos):
448 448 self._fixture.destroy_repo(repo_name)
449 449
450 450
451 451 def backend_base(request, backend_alias, baseapp, test_repo):
452 452 if backend_alias not in request.config.getoption('--backends'):
453 453 pytest.skip("Backend %s not selected." % (backend_alias, ))
454 454
455 455 utils.check_xfail_backends(request.node, backend_alias)
456 456 utils.check_skip_backends(request.node, backend_alias)
457 457
458 458 repo_name = 'vcs_test_%s' % (backend_alias, )
459 459 backend = Backend(
460 460 alias=backend_alias,
461 461 repo_name=repo_name,
462 462 test_name=request.node.name,
463 463 test_repo_container=test_repo)
464 464 request.addfinalizer(backend.cleanup)
465 465 return backend
466 466
467 467
468 468 @pytest.fixture
469 469 def backend(request, backend_alias, baseapp, test_repo):
470 470 """
471 471 Parametrized fixture which represents a single backend implementation.
472 472
473 473 It respects the option `--backends` to focus the test run on specific
474 474 backend implementations.
475 475
476 476 It also supports `pytest.mark.xfail_backends` to mark tests as failing
477 477 for specific backends. This is intended as a utility for incremental
478 478 development of a new backend implementation.
479 479 """
480 480 return backend_base(request, backend_alias, baseapp, test_repo)
481 481
482 482
483 483 @pytest.fixture
484 484 def backend_git(request, baseapp, test_repo):
485 485 return backend_base(request, 'git', baseapp, test_repo)
486 486
487 487
488 488 @pytest.fixture
489 489 def backend_hg(request, baseapp, test_repo):
490 490 return backend_base(request, 'hg', baseapp, test_repo)
491 491
492 492
493 493 @pytest.fixture
494 494 def backend_svn(request, baseapp, test_repo):
495 495 return backend_base(request, 'svn', baseapp, test_repo)
496 496
497 497
498 498 @pytest.fixture
499 499 def backend_random(backend_git):
500 500 """
501 501 Use this to express that your tests need "a backend.
502 502
503 503 A few of our tests need a backend, so that we can run the code. This
504 504 fixture is intended to be used for such cases. It will pick one of the
505 505 backends and run the tests.
506 506
507 507 The fixture `backend` would run the test multiple times for each
508 508 available backend which is a pure waste of time if the test is
509 509 independent of the backend type.
510 510 """
511 511 # TODO: johbo: Change this to pick a random backend
512 512 return backend_git
513 513
514 514
515 515 @pytest.fixture
516 516 def backend_stub(backend_git):
517 517 """
518 518 Use this to express that your tests need a backend stub
519 519
520 520 TODO: mikhail: Implement a real stub logic instead of returning
521 521 a git backend
522 522 """
523 523 return backend_git
524 524
525 525
526 526 @pytest.fixture
527 527 def repo_stub(backend_stub):
528 528 """
529 529 Use this to express that your tests need a repository stub
530 530 """
531 531 return backend_stub.create_repo()
532 532
533 533
534 534 class Backend(object):
535 535 """
536 536 Represents the test configuration for one supported backend
537 537
538 538 Provides easy access to different test repositories based on
539 539 `__getitem__`. Such repositories will only be created once per test
540 540 session.
541 541 """
542 542
543 543 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
544 544 _master_repo = None
545 545 _commit_ids = {}
546 546
547 547 def __init__(self, alias, repo_name, test_name, test_repo_container):
548 548 self.alias = alias
549 549 self.repo_name = repo_name
550 550 self._cleanup_repos = []
551 551 self._test_name = test_name
552 552 self._test_repo_container = test_repo_container
553 553 # TODO: johbo: Used as a delegate interim. Not yet sure if Backend or
554 554 # Fixture will survive in the end.
555 555 self._fixture = Fixture()
556 556
557 557 def __getitem__(self, key):
558 558 return self._test_repo_container(key, self.alias)
559 559
560 560 def create_test_repo(self, key, config=None):
561 561 return self._test_repo_container(key, self.alias, config)
562 562
563 563 @property
564 564 def repo(self):
565 565 """
566 566 Returns the "current" repository. This is the vcs_test repo or the
567 567 last repo which has been created with `create_repo`.
568 568 """
569 569 from rhodecode.model.db import Repository
570 570 return Repository.get_by_repo_name(self.repo_name)
571 571
572 572 @property
573 573 def default_branch_name(self):
574 574 VcsRepository = get_backend(self.alias)
575 575 return VcsRepository.DEFAULT_BRANCH_NAME
576 576
577 577 @property
578 578 def default_head_id(self):
579 579 """
580 580 Returns the default head id of the underlying backend.
581 581
582 582 This will be the default branch name in case the backend does have a
583 583 default branch. In the other cases it will point to a valid head
584 584 which can serve as the base to create a new commit on top of it.
585 585 """
586 586 vcsrepo = self.repo.scm_instance()
587 587 head_id = (
588 588 vcsrepo.DEFAULT_BRANCH_NAME or
589 589 vcsrepo.commit_ids[-1])
590 590 return head_id
591 591
592 592 @property
593 593 def commit_ids(self):
594 594 """
595 595 Returns the list of commits for the last created repository
596 596 """
597 597 return self._commit_ids
598 598
599 599 def create_master_repo(self, commits):
600 600 """
601 601 Create a repository and remember it as a template.
602 602
603 603 This allows to easily create derived repositories to construct
604 604 more complex scenarios for diff, compare and pull requests.
605 605
606 606 Returns a commit map which maps from commit message to raw_id.
607 607 """
608 608 self._master_repo = self.create_repo(commits=commits)
609 609 return self._commit_ids
610 610
611 611 def create_repo(
612 612 self, commits=None, number_of_commits=0, heads=None,
613 613 name_suffix=u'', bare=False, **kwargs):
614 614 """
615 615 Create a repository and record it for later cleanup.
616 616
617 617 :param commits: Optional. A sequence of dict instances.
618 618 Will add a commit per entry to the new repository.
619 619 :param number_of_commits: Optional. If set to a number, this number of
620 620 commits will be added to the new repository.
621 621 :param heads: Optional. Can be set to a sequence of of commit
622 622 names which shall be pulled in from the master repository.
623 623 :param name_suffix: adds special suffix to generated repo name
624 624 :param bare: set a repo as bare (no checkout)
625 625 """
626 626 self.repo_name = self._next_repo_name() + name_suffix
627 627 repo = self._fixture.create_repo(
628 628 self.repo_name, repo_type=self.alias, bare=bare, **kwargs)
629 629 self._cleanup_repos.append(repo.repo_name)
630 630
631 631 commits = commits or [
632 632 {'message': 'Commit %s of %s' % (x, self.repo_name)}
633 633 for x in range(number_of_commits)]
634 634 self._add_commits_to_repo(repo.scm_instance(), commits)
635 635 if heads:
636 636 self.pull_heads(repo, heads)
637 637
638 638 return repo
639 639
640 640 def pull_heads(self, repo, heads):
641 641 """
642 642 Make sure that repo contains all commits mentioned in `heads`
643 643 """
644 644 vcsmaster = self._master_repo.scm_instance()
645 645 vcsrepo = repo.scm_instance()
646 646 vcsrepo.config.clear_section('hooks')
647 647 commit_ids = [self._commit_ids[h] for h in heads]
648 648 vcsrepo.pull(vcsmaster.path, commit_ids=commit_ids)
649 649
650 650 def create_fork(self):
651 651 repo_to_fork = self.repo_name
652 652 self.repo_name = self._next_repo_name()
653 653 repo = self._fixture.create_fork(repo_to_fork, self.repo_name)
654 654 self._cleanup_repos.append(self.repo_name)
655 655 return repo
656 656
657 657 def new_repo_name(self, suffix=u''):
658 658 self.repo_name = self._next_repo_name() + suffix
659 659 self._cleanup_repos.append(self.repo_name)
660 660 return self.repo_name
661 661
662 662 def _next_repo_name(self):
663 663 return u"%s_%s" % (
664 664 self.invalid_repo_name.sub(u'_', self._test_name), len(self._cleanup_repos))
665 665
666 666 def ensure_file(self, filename, content='Test content\n'):
667 667 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
668 668 commits = [
669 669 {'added': [
670 670 FileNode(filename, content=content),
671 671 ]},
672 672 ]
673 673 self._add_commits_to_repo(self.repo.scm_instance(), commits)
674 674
675 675 def enable_downloads(self):
676 676 repo = self.repo
677 677 repo.enable_downloads = True
678 678 Session().add(repo)
679 679 Session().commit()
680 680
681 681 def cleanup(self):
682 682 for repo_name in reversed(self._cleanup_repos):
683 683 self._fixture.destroy_repo(repo_name)
684 684
685 685 def _add_commits_to_repo(self, repo, commits):
686 686 commit_ids = _add_commits_to_repo(repo, commits)
687 687 if not commit_ids:
688 688 return
689 689 self._commit_ids = commit_ids
690 690
691 691 # Creating refs for Git to allow fetching them from remote repository
692 692 if self.alias == 'git':
693 693 refs = {}
694 694 for message in self._commit_ids:
695 695 # TODO: mikhail: do more special chars replacements
696 696 ref_name = 'refs/test-refs/{}'.format(
697 697 message.replace(' ', ''))
698 698 refs[ref_name] = self._commit_ids[message]
699 699 self._create_refs(repo, refs)
700 700
701 701 def _create_refs(self, repo, refs):
702 702 for ref_name in refs:
703 703 repo.set_refs(ref_name, refs[ref_name])
704 704
705 705
706 706 def vcsbackend_base(request, backend_alias, tests_tmp_path, baseapp, test_repo):
707 707 if backend_alias not in request.config.getoption('--backends'):
708 708 pytest.skip("Backend %s not selected." % (backend_alias, ))
709 709
710 710 utils.check_xfail_backends(request.node, backend_alias)
711 711 utils.check_skip_backends(request.node, backend_alias)
712 712
713 713 repo_name = 'vcs_test_%s' % (backend_alias, )
714 714 repo_path = os.path.join(tests_tmp_path, repo_name)
715 715 backend = VcsBackend(
716 716 alias=backend_alias,
717 717 repo_path=repo_path,
718 718 test_name=request.node.name,
719 719 test_repo_container=test_repo)
720 720 request.addfinalizer(backend.cleanup)
721 721 return backend
722 722
723 723
724 724 @pytest.fixture
725 725 def vcsbackend(request, backend_alias, tests_tmp_path, baseapp, test_repo):
726 726 """
727 727 Parametrized fixture which represents a single vcs backend implementation.
728 728
729 729 See the fixture `backend` for more details. This one implements the same
730 730 concept, but on vcs level. So it does not provide model instances etc.
731 731
732 732 Parameters are generated dynamically, see :func:`pytest_generate_tests`
733 733 for how this works.
734 734 """
735 735 return vcsbackend_base(request, backend_alias, tests_tmp_path, baseapp, test_repo)
736 736
737 737
738 738 @pytest.fixture
739 739 def vcsbackend_git(request, tests_tmp_path, baseapp, test_repo):
740 740 return vcsbackend_base(request, 'git', tests_tmp_path, baseapp, test_repo)
741 741
742 742
743 743 @pytest.fixture
744 744 def vcsbackend_hg(request, tests_tmp_path, baseapp, test_repo):
745 745 return vcsbackend_base(request, 'hg', tests_tmp_path, baseapp, test_repo)
746 746
747 747
748 748 @pytest.fixture
749 749 def vcsbackend_svn(request, tests_tmp_path, baseapp, test_repo):
750 750 return vcsbackend_base(request, 'svn', tests_tmp_path, baseapp, test_repo)
751 751
752 752
753 753 @pytest.fixture
754 754 def vcsbackend_stub(vcsbackend_git):
755 755 """
756 756 Use this to express that your test just needs a stub of a vcsbackend.
757 757
758 758 Plan is to eventually implement an in-memory stub to speed tests up.
759 759 """
760 760 return vcsbackend_git
761 761
762 762
763 763 class VcsBackend(object):
764 764 """
765 765 Represents the test configuration for one supported vcs backend.
766 766 """
767 767
768 768 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
769 769
770 770 def __init__(self, alias, repo_path, test_name, test_repo_container):
771 771 self.alias = alias
772 772 self._repo_path = repo_path
773 773 self._cleanup_repos = []
774 774 self._test_name = test_name
775 775 self._test_repo_container = test_repo_container
776 776
777 777 def __getitem__(self, key):
778 778 return self._test_repo_container(key, self.alias).scm_instance()
779 779
780 780 @property
781 781 def repo(self):
782 782 """
783 783 Returns the "current" repository. This is the vcs_test repo of the last
784 784 repo which has been created.
785 785 """
786 786 Repository = get_backend(self.alias)
787 787 return Repository(self._repo_path)
788 788
789 789 @property
790 790 def backend(self):
791 791 """
792 792 Returns the backend implementation class.
793 793 """
794 794 return get_backend(self.alias)
795 795
796 796 def create_repo(self, commits=None, number_of_commits=0, _clone_repo=None,
797 797 bare=False):
798 798 repo_name = self._next_repo_name()
799 799 self._repo_path = get_new_dir(repo_name)
800 800 repo_class = get_backend(self.alias)
801 801 src_url = None
802 802 if _clone_repo:
803 803 src_url = _clone_repo.path
804 804 repo = repo_class(self._repo_path, create=True, src_url=src_url, bare=bare)
805 805 self._cleanup_repos.append(repo)
806 806
807 807 commits = commits or [
808 808 {'message': 'Commit %s of %s' % (x, repo_name)}
809 809 for x in xrange(number_of_commits)]
810 810 _add_commits_to_repo(repo, commits)
811 811 return repo
812 812
813 813 def clone_repo(self, repo):
814 814 return self.create_repo(_clone_repo=repo)
815 815
816 816 def cleanup(self):
817 817 for repo in self._cleanup_repos:
818 818 shutil.rmtree(repo.path)
819 819
820 820 def new_repo_path(self):
821 821 repo_name = self._next_repo_name()
822 822 self._repo_path = get_new_dir(repo_name)
823 823 return self._repo_path
824 824
825 825 def _next_repo_name(self):
826 826 return "%s_%s" % (
827 827 self.invalid_repo_name.sub('_', self._test_name),
828 828 len(self._cleanup_repos))
829 829
830 830 def add_file(self, repo, filename, content='Test content\n'):
831 831 imc = repo.in_memory_commit
832 832 imc.add(FileNode(filename, content=content))
833 833 imc.commit(
834 834 message=u'Automatic commit from vcsbackend fixture',
835 835 author=u'Automatic')
836 836
837 837 def ensure_file(self, filename, content='Test content\n'):
838 838 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
839 839 self.add_file(self.repo, filename, content)
840 840
841 841
842 842 def _add_commits_to_repo(vcs_repo, commits):
843 843 commit_ids = {}
844 844 if not commits:
845 845 return commit_ids
846 846
847 847 imc = vcs_repo.in_memory_commit
848 848 commit = None
849 849
850 850 for idx, commit in enumerate(commits):
851 851 message = unicode(commit.get('message', 'Commit %s' % idx))
852 852
853 853 for node in commit.get('added', []):
854 854 imc.add(FileNode(node.path, content=node.content))
855 855 for node in commit.get('changed', []):
856 856 imc.change(FileNode(node.path, content=node.content))
857 857 for node in commit.get('removed', []):
858 858 imc.remove(FileNode(node.path))
859 859
860 860 parents = [
861 861 vcs_repo.get_commit(commit_id=commit_ids[p])
862 862 for p in commit.get('parents', [])]
863 863
864 864 operations = ('added', 'changed', 'removed')
865 865 if not any((commit.get(o) for o in operations)):
866 866 imc.add(FileNode('file_%s' % idx, content=message))
867 867
868 868 commit = imc.commit(
869 869 message=message,
870 870 author=unicode(commit.get('author', 'Automatic')),
871 871 date=commit.get('date'),
872 872 branch=commit.get('branch'),
873 873 parents=parents)
874 874
875 875 commit_ids[commit.message] = commit.raw_id
876 876
877 877 return commit_ids
878 878
879 879
880 880 @pytest.fixture
881 881 def reposerver(request):
882 882 """
883 883 Allows to serve a backend repository
884 884 """
885 885
886 886 repo_server = RepoServer()
887 887 request.addfinalizer(repo_server.cleanup)
888 888 return repo_server
889 889
890 890
891 891 class RepoServer(object):
892 892 """
893 893 Utility to serve a local repository for the duration of a test case.
894 894
895 895 Supports only Subversion so far.
896 896 """
897 897
898 898 url = None
899 899
900 900 def __init__(self):
901 901 self._cleanup_servers = []
902 902
903 903 def serve(self, vcsrepo):
904 904 if vcsrepo.alias != 'svn':
905 905 raise TypeError("Backend %s not supported" % vcsrepo.alias)
906 906
907 907 proc = subprocess32.Popen(
908 908 ['svnserve', '-d', '--foreground', '--listen-host', 'localhost',
909 909 '--root', vcsrepo.path])
910 910 self._cleanup_servers.append(proc)
911 911 self.url = 'svn://localhost'
912 912
913 913 def cleanup(self):
914 914 for proc in self._cleanup_servers:
915 915 proc.terminate()
916 916
917 917
918 918 @pytest.fixture
919 919 def pr_util(backend, request, config_stub):
920 920 """
921 921 Utility for tests of models and for functional tests around pull requests.
922 922
923 923 It gives an instance of :class:`PRTestUtility` which provides various
924 924 utility methods around one pull request.
925 925
926 926 This fixture uses `backend` and inherits its parameterization.
927 927 """
928 928
929 929 util = PRTestUtility(backend)
930 930 request.addfinalizer(util.cleanup)
931 931
932 932 return util
933 933
934 934
935 935 class PRTestUtility(object):
936 936
937 937 pull_request = None
938 938 pull_request_id = None
939 939 mergeable_patcher = None
940 940 mergeable_mock = None
941 941 notification_patcher = None
942 942
943 943 def __init__(self, backend):
944 944 self.backend = backend
945 945
946 946 def create_pull_request(
947 947 self, commits=None, target_head=None, source_head=None,
948 948 revisions=None, approved=False, author=None, mergeable=False,
949 949 enable_notifications=True, name_suffix=u'', reviewers=None,
950 950 title=u"Test", description=u"Description"):
951 951 self.set_mergeable(mergeable)
952 952 if not enable_notifications:
953 953 # mock notification side effect
954 954 self.notification_patcher = mock.patch(
955 955 'rhodecode.model.notification.NotificationModel.create')
956 956 self.notification_patcher.start()
957 957
958 958 if not self.pull_request:
959 959 if not commits:
960 960 commits = [
961 961 {'message': 'c1'},
962 962 {'message': 'c2'},
963 963 {'message': 'c3'},
964 964 ]
965 965 target_head = 'c1'
966 966 source_head = 'c2'
967 967 revisions = ['c2']
968 968
969 969 self.commit_ids = self.backend.create_master_repo(commits)
970 970 self.target_repository = self.backend.create_repo(
971 971 heads=[target_head], name_suffix=name_suffix)
972 972 self.source_repository = self.backend.create_repo(
973 973 heads=[source_head], name_suffix=name_suffix)
974 974 self.author = author or UserModel().get_by_username(
975 975 TEST_USER_ADMIN_LOGIN)
976 976
977 977 model = PullRequestModel()
978 978 self.create_parameters = {
979 979 'created_by': self.author,
980 980 'source_repo': self.source_repository.repo_name,
981 981 'source_ref': self._default_branch_reference(source_head),
982 982 'target_repo': self.target_repository.repo_name,
983 983 'target_ref': self._default_branch_reference(target_head),
984 984 'revisions': [self.commit_ids[r] for r in revisions],
985 985 'reviewers': reviewers or self._get_reviewers(),
986 986 'title': title,
987 987 'description': description,
988 988 }
989 989 self.pull_request = model.create(**self.create_parameters)
990 990 assert model.get_versions(self.pull_request) == []
991 991
992 992 self.pull_request_id = self.pull_request.pull_request_id
993 993
994 994 if approved:
995 995 self.approve()
996 996
997 997 Session().add(self.pull_request)
998 998 Session().commit()
999 999
1000 1000 return self.pull_request
1001 1001
1002 1002 def approve(self):
1003 1003 self.create_status_votes(
1004 1004 ChangesetStatus.STATUS_APPROVED,
1005 1005 *self.pull_request.reviewers)
1006 1006
1007 1007 def close(self):
1008 1008 PullRequestModel().close_pull_request(self.pull_request, self.author)
1009 1009
1010 1010 def _default_branch_reference(self, commit_message):
1011 1011 reference = '%s:%s:%s' % (
1012 1012 'branch',
1013 1013 self.backend.default_branch_name,
1014 1014 self.commit_ids[commit_message])
1015 1015 return reference
1016 1016
1017 1017 def _get_reviewers(self):
1018 1018 return [
1019 1019 (TEST_USER_REGULAR_LOGIN, ['default1'], False, []),
1020 1020 (TEST_USER_REGULAR2_LOGIN, ['default2'], False, []),
1021 1021 ]
1022 1022
1023 1023 def update_source_repository(self, head=None):
1024 1024 heads = [head or 'c3']
1025 1025 self.backend.pull_heads(self.source_repository, heads=heads)
1026 1026
1027 1027 def add_one_commit(self, head=None):
1028 1028 self.update_source_repository(head=head)
1029 1029 old_commit_ids = set(self.pull_request.revisions)
1030 1030 PullRequestModel().update_commits(self.pull_request)
1031 1031 commit_ids = set(self.pull_request.revisions)
1032 1032 new_commit_ids = commit_ids - old_commit_ids
1033 1033 assert len(new_commit_ids) == 1
1034 1034 return new_commit_ids.pop()
1035 1035
1036 1036 def remove_one_commit(self):
1037 1037 assert len(self.pull_request.revisions) == 2
1038 1038 source_vcs = self.source_repository.scm_instance()
1039 1039 removed_commit_id = source_vcs.commit_ids[-1]
1040 1040
1041 1041 # TODO: johbo: Git and Mercurial have an inconsistent vcs api here,
1042 1042 # remove the if once that's sorted out.
1043 1043 if self.backend.alias == "git":
1044 1044 kwargs = {'branch_name': self.backend.default_branch_name}
1045 1045 else:
1046 1046 kwargs = {}
1047 1047 source_vcs.strip(removed_commit_id, **kwargs)
1048 1048
1049 1049 PullRequestModel().update_commits(self.pull_request)
1050 1050 assert len(self.pull_request.revisions) == 1
1051 1051 return removed_commit_id
1052 1052
1053 1053 def create_comment(self, linked_to=None):
1054 1054 comment = CommentsModel().create(
1055 1055 text=u"Test comment",
1056 1056 repo=self.target_repository.repo_name,
1057 1057 user=self.author,
1058 1058 pull_request=self.pull_request)
1059 1059 assert comment.pull_request_version_id is None
1060 1060
1061 1061 if linked_to:
1062 1062 PullRequestModel()._link_comments_to_version(linked_to)
1063 1063
1064 1064 return comment
1065 1065
1066 1066 def create_inline_comment(
1067 1067 self, linked_to=None, line_no=u'n1', file_path='file_1'):
1068 1068 comment = CommentsModel().create(
1069 1069 text=u"Test comment",
1070 1070 repo=self.target_repository.repo_name,
1071 1071 user=self.author,
1072 1072 line_no=line_no,
1073 1073 f_path=file_path,
1074 1074 pull_request=self.pull_request)
1075 1075 assert comment.pull_request_version_id is None
1076 1076
1077 1077 if linked_to:
1078 1078 PullRequestModel()._link_comments_to_version(linked_to)
1079 1079
1080 1080 return comment
1081 1081
1082 1082 def create_version_of_pull_request(self):
1083 1083 pull_request = self.create_pull_request()
1084 1084 version = PullRequestModel()._create_version_from_snapshot(
1085 1085 pull_request)
1086 1086 return version
1087 1087
1088 1088 def create_status_votes(self, status, *reviewers):
1089 1089 for reviewer in reviewers:
1090 1090 ChangesetStatusModel().set_status(
1091 1091 repo=self.pull_request.target_repo,
1092 1092 status=status,
1093 1093 user=reviewer.user_id,
1094 1094 pull_request=self.pull_request)
1095 1095
1096 1096 def set_mergeable(self, value):
1097 1097 if not self.mergeable_patcher:
1098 1098 self.mergeable_patcher = mock.patch.object(
1099 1099 VcsSettingsModel, 'get_general_settings')
1100 1100 self.mergeable_mock = self.mergeable_patcher.start()
1101 1101 self.mergeable_mock.return_value = {
1102 1102 'rhodecode_pr_merge_enabled': value}
1103 1103
1104 1104 def cleanup(self):
1105 1105 # In case the source repository is already cleaned up, the pull
1106 1106 # request will already be deleted.
1107 1107 pull_request = PullRequest().get(self.pull_request_id)
1108 1108 if pull_request:
1109 1109 PullRequestModel().delete(pull_request, pull_request.author)
1110 1110 Session().commit()
1111 1111
1112 1112 if self.notification_patcher:
1113 1113 self.notification_patcher.stop()
1114 1114
1115 1115 if self.mergeable_patcher:
1116 1116 self.mergeable_patcher.stop()
1117 1117
1118 1118
1119 1119 @pytest.fixture
1120 1120 def user_admin(baseapp):
1121 1121 """
1122 1122 Provides the default admin test user as an instance of `db.User`.
1123 1123 """
1124 1124 user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
1125 1125 return user
1126 1126
1127 1127
1128 1128 @pytest.fixture
1129 1129 def user_regular(baseapp):
1130 1130 """
1131 1131 Provides the default regular test user as an instance of `db.User`.
1132 1132 """
1133 1133 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1134 1134 return user
1135 1135
1136 1136
1137 1137 @pytest.fixture
1138 1138 def user_util(request, db_connection):
1139 1139 """
1140 1140 Provides a wired instance of `UserUtility` with integrated cleanup.
1141 1141 """
1142 1142 utility = UserUtility(test_name=request.node.name)
1143 1143 request.addfinalizer(utility.cleanup)
1144 1144 return utility
1145 1145
1146 1146
1147 1147 # TODO: johbo: Split this up into utilities per domain or something similar
1148 1148 class UserUtility(object):
1149 1149
1150 1150 def __init__(self, test_name="test"):
1151 1151 self._test_name = self._sanitize_name(test_name)
1152 1152 self.fixture = Fixture()
1153 1153 self.repo_group_ids = []
1154 1154 self.repos_ids = []
1155 1155 self.user_ids = []
1156 1156 self.user_group_ids = []
1157 1157 self.user_repo_permission_ids = []
1158 1158 self.user_group_repo_permission_ids = []
1159 1159 self.user_repo_group_permission_ids = []
1160 1160 self.user_group_repo_group_permission_ids = []
1161 1161 self.user_user_group_permission_ids = []
1162 1162 self.user_group_user_group_permission_ids = []
1163 1163 self.user_permissions = []
1164 1164
1165 1165 def _sanitize_name(self, name):
1166 1166 for char in ['[', ']']:
1167 1167 name = name.replace(char, '_')
1168 1168 return name
1169 1169
1170 1170 def create_repo_group(
1171 1171 self, owner=TEST_USER_ADMIN_LOGIN, auto_cleanup=True):
1172 1172 group_name = "{prefix}_repogroup_{count}".format(
1173 1173 prefix=self._test_name,
1174 1174 count=len(self.repo_group_ids))
1175 1175 repo_group = self.fixture.create_repo_group(
1176 1176 group_name, cur_user=owner)
1177 1177 if auto_cleanup:
1178 1178 self.repo_group_ids.append(repo_group.group_id)
1179 1179 return repo_group
1180 1180
1181 1181 def create_repo(self, owner=TEST_USER_ADMIN_LOGIN, parent=None,
1182 1182 auto_cleanup=True, repo_type='hg', bare=False):
1183 1183 repo_name = "{prefix}_repository_{count}".format(
1184 1184 prefix=self._test_name,
1185 1185 count=len(self.repos_ids))
1186 1186
1187 1187 repository = self.fixture.create_repo(
1188 1188 repo_name, cur_user=owner, repo_group=parent, repo_type=repo_type, bare=bare)
1189 1189 if auto_cleanup:
1190 1190 self.repos_ids.append(repository.repo_id)
1191 1191 return repository
1192 1192
1193 1193 def create_user(self, auto_cleanup=True, **kwargs):
1194 1194 user_name = "{prefix}_user_{count}".format(
1195 1195 prefix=self._test_name,
1196 1196 count=len(self.user_ids))
1197 1197 user = self.fixture.create_user(user_name, **kwargs)
1198 1198 if auto_cleanup:
1199 1199 self.user_ids.append(user.user_id)
1200 1200 return user
1201 1201
1202 1202 def create_additional_user_email(self, user, email):
1203 1203 uem = self.fixture.create_additional_user_email(user=user, email=email)
1204 1204 return uem
1205 1205
1206 1206 def create_user_with_group(self):
1207 1207 user = self.create_user()
1208 1208 user_group = self.create_user_group(members=[user])
1209 1209 return user, user_group
1210 1210
1211 1211 def create_user_group(self, owner=TEST_USER_ADMIN_LOGIN, members=None,
1212 1212 auto_cleanup=True, **kwargs):
1213 1213 group_name = "{prefix}_usergroup_{count}".format(
1214 1214 prefix=self._test_name,
1215 1215 count=len(self.user_group_ids))
1216 1216 user_group = self.fixture.create_user_group(
1217 1217 group_name, cur_user=owner, **kwargs)
1218 1218
1219 1219 if auto_cleanup:
1220 1220 self.user_group_ids.append(user_group.users_group_id)
1221 1221 if members:
1222 1222 for user in members:
1223 1223 UserGroupModel().add_user_to_group(user_group, user)
1224 1224 return user_group
1225 1225
1226 1226 def grant_user_permission(self, user_name, permission_name):
1227 self._inherit_default_user_permissions(user_name, False)
1227 self.inherit_default_user_permissions(user_name, False)
1228 1228 self.user_permissions.append((user_name, permission_name))
1229 1229
1230 1230 def grant_user_permission_to_repo_group(
1231 1231 self, repo_group, user, permission_name):
1232 1232 permission = RepoGroupModel().grant_user_permission(
1233 1233 repo_group, user, permission_name)
1234 1234 self.user_repo_group_permission_ids.append(
1235 1235 (repo_group.group_id, user.user_id))
1236 1236 return permission
1237 1237
1238 1238 def grant_user_group_permission_to_repo_group(
1239 1239 self, repo_group, user_group, permission_name):
1240 1240 permission = RepoGroupModel().grant_user_group_permission(
1241 1241 repo_group, user_group, permission_name)
1242 1242 self.user_group_repo_group_permission_ids.append(
1243 1243 (repo_group.group_id, user_group.users_group_id))
1244 1244 return permission
1245 1245
1246 1246 def grant_user_permission_to_repo(
1247 1247 self, repo, user, permission_name):
1248 1248 permission = RepoModel().grant_user_permission(
1249 1249 repo, user, permission_name)
1250 1250 self.user_repo_permission_ids.append(
1251 1251 (repo.repo_id, user.user_id))
1252 1252 return permission
1253 1253
1254 1254 def grant_user_group_permission_to_repo(
1255 1255 self, repo, user_group, permission_name):
1256 1256 permission = RepoModel().grant_user_group_permission(
1257 1257 repo, user_group, permission_name)
1258 1258 self.user_group_repo_permission_ids.append(
1259 1259 (repo.repo_id, user_group.users_group_id))
1260 1260 return permission
1261 1261
1262 1262 def grant_user_permission_to_user_group(
1263 1263 self, target_user_group, user, permission_name):
1264 1264 permission = UserGroupModel().grant_user_permission(
1265 1265 target_user_group, user, permission_name)
1266 1266 self.user_user_group_permission_ids.append(
1267 1267 (target_user_group.users_group_id, user.user_id))
1268 1268 return permission
1269 1269
1270 1270 def grant_user_group_permission_to_user_group(
1271 1271 self, target_user_group, user_group, permission_name):
1272 1272 permission = UserGroupModel().grant_user_group_permission(
1273 1273 target_user_group, user_group, permission_name)
1274 1274 self.user_group_user_group_permission_ids.append(
1275 1275 (target_user_group.users_group_id, user_group.users_group_id))
1276 1276 return permission
1277 1277
1278 1278 def revoke_user_permission(self, user_name, permission_name):
1279 self._inherit_default_user_permissions(user_name, True)
1279 self.inherit_default_user_permissions(user_name, True)
1280 1280 UserModel().revoke_perm(user_name, permission_name)
1281 1281
1282 def _inherit_default_user_permissions(self, user_name, value):
1282 def inherit_default_user_permissions(self, user_name, value):
1283 1283 user = UserModel().get_by_username(user_name)
1284 1284 user.inherit_default_permissions = value
1285 1285 Session().add(user)
1286 1286 Session().commit()
1287 1287
1288 1288 def cleanup(self):
1289 1289 self._cleanup_permissions()
1290 1290 self._cleanup_repos()
1291 1291 self._cleanup_repo_groups()
1292 1292 self._cleanup_user_groups()
1293 1293 self._cleanup_users()
1294 1294
1295 1295 def _cleanup_permissions(self):
1296 1296 if self.user_permissions:
1297 1297 for user_name, permission_name in self.user_permissions:
1298 1298 self.revoke_user_permission(user_name, permission_name)
1299 1299
1300 1300 for permission in self.user_repo_permission_ids:
1301 1301 RepoModel().revoke_user_permission(*permission)
1302 1302
1303 1303 for permission in self.user_group_repo_permission_ids:
1304 1304 RepoModel().revoke_user_group_permission(*permission)
1305 1305
1306 1306 for permission in self.user_repo_group_permission_ids:
1307 1307 RepoGroupModel().revoke_user_permission(*permission)
1308 1308
1309 1309 for permission in self.user_group_repo_group_permission_ids:
1310 1310 RepoGroupModel().revoke_user_group_permission(*permission)
1311 1311
1312 1312 for permission in self.user_user_group_permission_ids:
1313 1313 UserGroupModel().revoke_user_permission(*permission)
1314 1314
1315 1315 for permission in self.user_group_user_group_permission_ids:
1316 1316 UserGroupModel().revoke_user_group_permission(*permission)
1317 1317
1318 1318 def _cleanup_repo_groups(self):
1319 1319 def _repo_group_compare(first_group_id, second_group_id):
1320 1320 """
1321 1321 Gives higher priority to the groups with the most complex paths
1322 1322 """
1323 1323 first_group = RepoGroup.get(first_group_id)
1324 1324 second_group = RepoGroup.get(second_group_id)
1325 1325 first_group_parts = (
1326 1326 len(first_group.group_name.split('/')) if first_group else 0)
1327 1327 second_group_parts = (
1328 1328 len(second_group.group_name.split('/')) if second_group else 0)
1329 1329 return cmp(second_group_parts, first_group_parts)
1330 1330
1331 1331 sorted_repo_group_ids = sorted(
1332 1332 self.repo_group_ids, cmp=_repo_group_compare)
1333 1333 for repo_group_id in sorted_repo_group_ids:
1334 1334 self.fixture.destroy_repo_group(repo_group_id)
1335 1335
1336 1336 def _cleanup_repos(self):
1337 1337 sorted_repos_ids = sorted(self.repos_ids)
1338 1338 for repo_id in sorted_repos_ids:
1339 1339 self.fixture.destroy_repo(repo_id)
1340 1340
1341 1341 def _cleanup_user_groups(self):
1342 1342 def _user_group_compare(first_group_id, second_group_id):
1343 1343 """
1344 1344 Gives higher priority to the groups with the most complex paths
1345 1345 """
1346 1346 first_group = UserGroup.get(first_group_id)
1347 1347 second_group = UserGroup.get(second_group_id)
1348 1348 first_group_parts = (
1349 1349 len(first_group.users_group_name.split('/'))
1350 1350 if first_group else 0)
1351 1351 second_group_parts = (
1352 1352 len(second_group.users_group_name.split('/'))
1353 1353 if second_group else 0)
1354 1354 return cmp(second_group_parts, first_group_parts)
1355 1355
1356 1356 sorted_user_group_ids = sorted(
1357 1357 self.user_group_ids, cmp=_user_group_compare)
1358 1358 for user_group_id in sorted_user_group_ids:
1359 1359 self.fixture.destroy_user_group(user_group_id)
1360 1360
1361 1361 def _cleanup_users(self):
1362 1362 for user_id in self.user_ids:
1363 1363 self.fixture.destroy_user(user_id)
1364 1364
1365 1365
1366 1366 # TODO: Think about moving this into a pytest-pyro package and make it a
1367 1367 # pytest plugin
1368 1368 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
1369 1369 def pytest_runtest_makereport(item, call):
1370 1370 """
1371 1371 Adding the remote traceback if the exception has this information.
1372 1372
1373 1373 VCSServer attaches this information as the attribute `_vcs_server_traceback`
1374 1374 to the exception instance.
1375 1375 """
1376 1376 outcome = yield
1377 1377 report = outcome.get_result()
1378 1378 if call.excinfo:
1379 1379 _add_vcsserver_remote_traceback(report, call.excinfo.value)
1380 1380
1381 1381
1382 1382 def _add_vcsserver_remote_traceback(report, exc):
1383 1383 vcsserver_traceback = getattr(exc, '_vcs_server_traceback', None)
1384 1384
1385 1385 if vcsserver_traceback:
1386 1386 section = 'VCSServer remote traceback ' + report.when
1387 1387 report.sections.append((section, vcsserver_traceback))
1388 1388
1389 1389
1390 1390 @pytest.fixture(scope='session')
1391 1391 def testrun():
1392 1392 return {
1393 1393 'uuid': uuid.uuid4(),
1394 1394 'start': datetime.datetime.utcnow().isoformat(),
1395 1395 'timestamp': int(time.time()),
1396 1396 }
1397 1397
1398 1398
1399 1399 @pytest.fixture(autouse=True)
1400 1400 def collect_appenlight_stats(request, testrun):
1401 1401 """
1402 1402 This fixture reports memory consumtion of single tests.
1403 1403
1404 1404 It gathers data based on `psutil` and sends them to Appenlight. The option
1405 1405 ``--ae`` has te be used to enable this fixture and the API key for your
1406 1406 application has to be provided in ``--ae-key``.
1407 1407 """
1408 1408 try:
1409 1409 # cygwin cannot have yet psutil support.
1410 1410 import psutil
1411 1411 except ImportError:
1412 1412 return
1413 1413
1414 1414 if not request.config.getoption('--appenlight'):
1415 1415 return
1416 1416 else:
1417 1417 # Only request the baseapp fixture if appenlight tracking is
1418 1418 # enabled. This will speed up a test run of unit tests by 2 to 3
1419 1419 # seconds if appenlight is not enabled.
1420 1420 baseapp = request.getfuncargvalue("baseapp")
1421 1421 url = '{}/api/logs'.format(request.config.getoption('--appenlight-url'))
1422 1422 client = AppenlightClient(
1423 1423 url=url,
1424 1424 api_key=request.config.getoption('--appenlight-api-key'),
1425 1425 namespace=request.node.nodeid,
1426 1426 request=str(testrun['uuid']),
1427 1427 testrun=testrun)
1428 1428
1429 1429 client.collect({
1430 1430 'message': "Starting",
1431 1431 })
1432 1432
1433 1433 server_and_port = baseapp.config.get_settings()['vcs.server']
1434 1434 protocol = baseapp.config.get_settings()['vcs.server.protocol']
1435 1435 server = create_vcsserver_proxy(server_and_port, protocol)
1436 1436 with server:
1437 1437 vcs_pid = server.get_pid()
1438 1438 server.run_gc()
1439 1439 vcs_process = psutil.Process(vcs_pid)
1440 1440 mem = vcs_process.memory_info()
1441 1441 client.tag_before('vcsserver.rss', mem.rss)
1442 1442 client.tag_before('vcsserver.vms', mem.vms)
1443 1443
1444 1444 test_process = psutil.Process()
1445 1445 mem = test_process.memory_info()
1446 1446 client.tag_before('test.rss', mem.rss)
1447 1447 client.tag_before('test.vms', mem.vms)
1448 1448
1449 1449 client.tag_before('time', time.time())
1450 1450
1451 1451 @request.addfinalizer
1452 1452 def send_stats():
1453 1453 client.tag_after('time', time.time())
1454 1454 with server:
1455 1455 gc_stats = server.run_gc()
1456 1456 for tag, value in gc_stats.items():
1457 1457 client.tag_after(tag, value)
1458 1458 mem = vcs_process.memory_info()
1459 1459 client.tag_after('vcsserver.rss', mem.rss)
1460 1460 client.tag_after('vcsserver.vms', mem.vms)
1461 1461
1462 1462 mem = test_process.memory_info()
1463 1463 client.tag_after('test.rss', mem.rss)
1464 1464 client.tag_after('test.vms', mem.vms)
1465 1465
1466 1466 client.collect({
1467 1467 'message': "Finished",
1468 1468 })
1469 1469 client.send_stats()
1470 1470
1471 1471 return client
1472 1472
1473 1473
1474 1474 class AppenlightClient():
1475 1475
1476 1476 url_template = '{url}?protocol_version=0.5'
1477 1477
1478 1478 def __init__(
1479 1479 self, url, api_key, add_server=True, add_timestamp=True,
1480 1480 namespace=None, request=None, testrun=None):
1481 1481 self.url = self.url_template.format(url=url)
1482 1482 self.api_key = api_key
1483 1483 self.add_server = add_server
1484 1484 self.add_timestamp = add_timestamp
1485 1485 self.namespace = namespace
1486 1486 self.request = request
1487 1487 self.server = socket.getfqdn(socket.gethostname())
1488 1488 self.tags_before = {}
1489 1489 self.tags_after = {}
1490 1490 self.stats = []
1491 1491 self.testrun = testrun or {}
1492 1492
1493 1493 def tag_before(self, tag, value):
1494 1494 self.tags_before[tag] = value
1495 1495
1496 1496 def tag_after(self, tag, value):
1497 1497 self.tags_after[tag] = value
1498 1498
1499 1499 def collect(self, data):
1500 1500 if self.add_server:
1501 1501 data.setdefault('server', self.server)
1502 1502 if self.add_timestamp:
1503 1503 data.setdefault('date', datetime.datetime.utcnow().isoformat())
1504 1504 if self.namespace:
1505 1505 data.setdefault('namespace', self.namespace)
1506 1506 if self.request:
1507 1507 data.setdefault('request', self.request)
1508 1508 self.stats.append(data)
1509 1509
1510 1510 def send_stats(self):
1511 1511 tags = [
1512 1512 ('testrun', self.request),
1513 1513 ('testrun.start', self.testrun['start']),
1514 1514 ('testrun.timestamp', self.testrun['timestamp']),
1515 1515 ('test', self.namespace),
1516 1516 ]
1517 1517 for key, value in self.tags_before.items():
1518 1518 tags.append((key + '.before', value))
1519 1519 try:
1520 1520 delta = self.tags_after[key] - value
1521 1521 tags.append((key + '.delta', delta))
1522 1522 except Exception:
1523 1523 pass
1524 1524 for key, value in self.tags_after.items():
1525 1525 tags.append((key + '.after', value))
1526 1526 self.collect({
1527 1527 'message': "Collected tags",
1528 1528 'tags': tags,
1529 1529 })
1530 1530
1531 1531 response = requests.post(
1532 1532 self.url,
1533 1533 headers={
1534 1534 'X-appenlight-api-key': self.api_key},
1535 1535 json=self.stats,
1536 1536 )
1537 1537
1538 1538 if not response.status_code == 200:
1539 1539 pprint.pprint(self.stats)
1540 1540 print(response.headers)
1541 1541 print(response.text)
1542 1542 raise Exception('Sending to appenlight failed')
1543 1543
1544 1544
1545 1545 @pytest.fixture
1546 1546 def gist_util(request, db_connection):
1547 1547 """
1548 1548 Provides a wired instance of `GistUtility` with integrated cleanup.
1549 1549 """
1550 1550 utility = GistUtility()
1551 1551 request.addfinalizer(utility.cleanup)
1552 1552 return utility
1553 1553
1554 1554
1555 1555 class GistUtility(object):
1556 1556 def __init__(self):
1557 1557 self.fixture = Fixture()
1558 1558 self.gist_ids = []
1559 1559
1560 1560 def create_gist(self, **kwargs):
1561 1561 gist = self.fixture.create_gist(**kwargs)
1562 1562 self.gist_ids.append(gist.gist_id)
1563 1563 return gist
1564 1564
1565 1565 def cleanup(self):
1566 1566 for id_ in self.gist_ids:
1567 1567 self.fixture.destroy_gists(str(id_))
1568 1568
1569 1569
1570 1570 @pytest.fixture
1571 1571 def enabled_backends(request):
1572 1572 backends = request.config.option.backends
1573 1573 return backends[:]
1574 1574
1575 1575
1576 1576 @pytest.fixture
1577 1577 def settings_util(request, db_connection):
1578 1578 """
1579 1579 Provides a wired instance of `SettingsUtility` with integrated cleanup.
1580 1580 """
1581 1581 utility = SettingsUtility()
1582 1582 request.addfinalizer(utility.cleanup)
1583 1583 return utility
1584 1584
1585 1585
1586 1586 class SettingsUtility(object):
1587 1587 def __init__(self):
1588 1588 self.rhodecode_ui_ids = []
1589 1589 self.rhodecode_setting_ids = []
1590 1590 self.repo_rhodecode_ui_ids = []
1591 1591 self.repo_rhodecode_setting_ids = []
1592 1592
1593 1593 def create_repo_rhodecode_ui(
1594 1594 self, repo, section, value, key=None, active=True, cleanup=True):
1595 1595 key = key or hashlib.sha1(
1596 1596 '{}{}{}'.format(section, value, repo.repo_id)).hexdigest()
1597 1597
1598 1598 setting = RepoRhodeCodeUi()
1599 1599 setting.repository_id = repo.repo_id
1600 1600 setting.ui_section = section
1601 1601 setting.ui_value = value
1602 1602 setting.ui_key = key
1603 1603 setting.ui_active = active
1604 1604 Session().add(setting)
1605 1605 Session().commit()
1606 1606
1607 1607 if cleanup:
1608 1608 self.repo_rhodecode_ui_ids.append(setting.ui_id)
1609 1609 return setting
1610 1610
1611 1611 def create_rhodecode_ui(
1612 1612 self, section, value, key=None, active=True, cleanup=True):
1613 1613 key = key or hashlib.sha1('{}{}'.format(section, value)).hexdigest()
1614 1614
1615 1615 setting = RhodeCodeUi()
1616 1616 setting.ui_section = section
1617 1617 setting.ui_value = value
1618 1618 setting.ui_key = key
1619 1619 setting.ui_active = active
1620 1620 Session().add(setting)
1621 1621 Session().commit()
1622 1622
1623 1623 if cleanup:
1624 1624 self.rhodecode_ui_ids.append(setting.ui_id)
1625 1625 return setting
1626 1626
1627 1627 def create_repo_rhodecode_setting(
1628 1628 self, repo, name, value, type_, cleanup=True):
1629 1629 setting = RepoRhodeCodeSetting(
1630 1630 repo.repo_id, key=name, val=value, type=type_)
1631 1631 Session().add(setting)
1632 1632 Session().commit()
1633 1633
1634 1634 if cleanup:
1635 1635 self.repo_rhodecode_setting_ids.append(setting.app_settings_id)
1636 1636 return setting
1637 1637
1638 1638 def create_rhodecode_setting(self, name, value, type_, cleanup=True):
1639 1639 setting = RhodeCodeSetting(key=name, val=value, type=type_)
1640 1640 Session().add(setting)
1641 1641 Session().commit()
1642 1642
1643 1643 if cleanup:
1644 1644 self.rhodecode_setting_ids.append(setting.app_settings_id)
1645 1645
1646 1646 return setting
1647 1647
1648 1648 def cleanup(self):
1649 1649 for id_ in self.rhodecode_ui_ids:
1650 1650 setting = RhodeCodeUi.get(id_)
1651 1651 Session().delete(setting)
1652 1652
1653 1653 for id_ in self.rhodecode_setting_ids:
1654 1654 setting = RhodeCodeSetting.get(id_)
1655 1655 Session().delete(setting)
1656 1656
1657 1657 for id_ in self.repo_rhodecode_ui_ids:
1658 1658 setting = RepoRhodeCodeUi.get(id_)
1659 1659 Session().delete(setting)
1660 1660
1661 1661 for id_ in self.repo_rhodecode_setting_ids:
1662 1662 setting = RepoRhodeCodeSetting.get(id_)
1663 1663 Session().delete(setting)
1664 1664
1665 1665 Session().commit()
1666 1666
1667 1667
1668 1668 @pytest.fixture
1669 1669 def no_notifications(request):
1670 1670 notification_patcher = mock.patch(
1671 1671 'rhodecode.model.notification.NotificationModel.create')
1672 1672 notification_patcher.start()
1673 1673 request.addfinalizer(notification_patcher.stop)
1674 1674
1675 1675
1676 1676 @pytest.fixture(scope='session')
1677 1677 def repeat(request):
1678 1678 """
1679 1679 The number of repetitions is based on this fixture.
1680 1680
1681 1681 Slower calls may divide it by 10 or 100. It is chosen in a way so that the
1682 1682 tests are not too slow in our default test suite.
1683 1683 """
1684 1684 return request.config.getoption('--repeat')
1685 1685
1686 1686
1687 1687 @pytest.fixture
1688 1688 def rhodecode_fixtures():
1689 1689 return Fixture()
1690 1690
1691 1691
1692 1692 @pytest.fixture
1693 1693 def context_stub():
1694 1694 """
1695 1695 Stub context object.
1696 1696 """
1697 1697 context = pyramid.testing.DummyResource()
1698 1698 return context
1699 1699
1700 1700
1701 1701 @pytest.fixture
1702 1702 def request_stub():
1703 1703 """
1704 1704 Stub request object.
1705 1705 """
1706 1706 from rhodecode.lib.base import bootstrap_request
1707 1707 request = bootstrap_request(scheme='https')
1708 1708 return request
1709 1709
1710 1710
1711 1711 @pytest.fixture
1712 1712 def config_stub(request, request_stub):
1713 1713 """
1714 1714 Set up pyramid.testing and return the Configurator.
1715 1715 """
1716 1716 from rhodecode.lib.base import bootstrap_config
1717 1717 config = bootstrap_config(request=request_stub)
1718 1718
1719 1719 @request.addfinalizer
1720 1720 def cleanup():
1721 1721 pyramid.testing.tearDown()
1722 1722
1723 1723 return config
1724 1724
1725 1725
1726 1726 @pytest.fixture
1727 1727 def StubIntegrationType():
1728 1728 class _StubIntegrationType(IntegrationTypeBase):
1729 1729 """ Test integration type class """
1730 1730
1731 1731 key = 'test'
1732 1732 display_name = 'Test integration type'
1733 1733 description = 'A test integration type for testing'
1734 1734
1735 1735 @classmethod
1736 1736 def icon(cls):
1737 1737 return 'test_icon_html_image'
1738 1738
1739 1739 def __init__(self, settings):
1740 1740 super(_StubIntegrationType, self).__init__(settings)
1741 1741 self.sent_events = [] # for testing
1742 1742
1743 1743 def send_event(self, event):
1744 1744 self.sent_events.append(event)
1745 1745
1746 1746 def settings_schema(self):
1747 1747 class SettingsSchema(colander.Schema):
1748 1748 test_string_field = colander.SchemaNode(
1749 1749 colander.String(),
1750 1750 missing=colander.required,
1751 1751 title='test string field',
1752 1752 )
1753 1753 test_int_field = colander.SchemaNode(
1754 1754 colander.Int(),
1755 1755 title='some integer setting',
1756 1756 )
1757 1757 return SettingsSchema()
1758 1758
1759 1759
1760 1760 integration_type_registry.register_integration_type(_StubIntegrationType)
1761 1761 return _StubIntegrationType
1762 1762
1763 1763 @pytest.fixture
1764 1764 def stub_integration_settings():
1765 1765 return {
1766 1766 'test_string_field': 'some data',
1767 1767 'test_int_field': 100,
1768 1768 }
1769 1769
1770 1770
1771 1771 @pytest.fixture
1772 1772 def repo_integration_stub(request, repo_stub, StubIntegrationType,
1773 1773 stub_integration_settings):
1774 1774 integration = IntegrationModel().create(
1775 1775 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1776 1776 name='test repo integration',
1777 1777 repo=repo_stub, repo_group=None, child_repos_only=None)
1778 1778
1779 1779 @request.addfinalizer
1780 1780 def cleanup():
1781 1781 IntegrationModel().delete(integration)
1782 1782
1783 1783 return integration
1784 1784
1785 1785
1786 1786 @pytest.fixture
1787 1787 def repogroup_integration_stub(request, test_repo_group, StubIntegrationType,
1788 1788 stub_integration_settings):
1789 1789 integration = IntegrationModel().create(
1790 1790 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1791 1791 name='test repogroup integration',
1792 1792 repo=None, repo_group=test_repo_group, child_repos_only=True)
1793 1793
1794 1794 @request.addfinalizer
1795 1795 def cleanup():
1796 1796 IntegrationModel().delete(integration)
1797 1797
1798 1798 return integration
1799 1799
1800 1800
1801 1801 @pytest.fixture
1802 1802 def repogroup_recursive_integration_stub(request, test_repo_group,
1803 1803 StubIntegrationType, stub_integration_settings):
1804 1804 integration = IntegrationModel().create(
1805 1805 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1806 1806 name='test recursive repogroup integration',
1807 1807 repo=None, repo_group=test_repo_group, child_repos_only=False)
1808 1808
1809 1809 @request.addfinalizer
1810 1810 def cleanup():
1811 1811 IntegrationModel().delete(integration)
1812 1812
1813 1813 return integration
1814 1814
1815 1815
1816 1816 @pytest.fixture
1817 1817 def global_integration_stub(request, StubIntegrationType,
1818 1818 stub_integration_settings):
1819 1819 integration = IntegrationModel().create(
1820 1820 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1821 1821 name='test global integration',
1822 1822 repo=None, repo_group=None, child_repos_only=None)
1823 1823
1824 1824 @request.addfinalizer
1825 1825 def cleanup():
1826 1826 IntegrationModel().delete(integration)
1827 1827
1828 1828 return integration
1829 1829
1830 1830
1831 1831 @pytest.fixture
1832 1832 def root_repos_integration_stub(request, StubIntegrationType,
1833 1833 stub_integration_settings):
1834 1834 integration = IntegrationModel().create(
1835 1835 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1836 1836 name='test global integration',
1837 1837 repo=None, repo_group=None, child_repos_only=True)
1838 1838
1839 1839 @request.addfinalizer
1840 1840 def cleanup():
1841 1841 IntegrationModel().delete(integration)
1842 1842
1843 1843 return integration
1844 1844
1845 1845
1846 1846 @pytest.fixture
1847 1847 def local_dt_to_utc():
1848 1848 def _factory(dt):
1849 1849 return dt.replace(tzinfo=dateutil.tz.tzlocal()).astimezone(
1850 1850 dateutil.tz.tzutc()).replace(tzinfo=None)
1851 1851 return _factory
1852 1852
1853 1853
1854 1854 @pytest.fixture
1855 1855 def disable_anonymous_user(request, baseapp):
1856 1856 set_anonymous_access(False)
1857 1857
1858 1858 @request.addfinalizer
1859 1859 def cleanup():
1860 1860 set_anonymous_access(True)
1861 1861
1862 1862
1863 1863 @pytest.fixture(scope='module')
1864 1864 def rc_fixture(request):
1865 1865 return Fixture()
1866 1866
1867 1867
1868 1868 @pytest.fixture
1869 1869 def repo_groups(request):
1870 1870 fixture = Fixture()
1871 1871
1872 1872 session = Session()
1873 1873 zombie_group = fixture.create_repo_group('zombie')
1874 1874 parent_group = fixture.create_repo_group('parent')
1875 1875 child_group = fixture.create_repo_group('parent/child')
1876 1876 groups_in_db = session.query(RepoGroup).all()
1877 1877 assert len(groups_in_db) == 3
1878 1878 assert child_group.group_parent_id == parent_group.group_id
1879 1879
1880 1880 @request.addfinalizer
1881 1881 def cleanup():
1882 1882 fixture.destroy_repo_group(zombie_group)
1883 1883 fixture.destroy_repo_group(child_group)
1884 1884 fixture.destroy_repo_group(parent_group)
1885 1885
1886 1886 return zombie_group, parent_group, child_group
General Comments 0
You need to be logged in to leave comments. Login now