##// END OF EJS Templates
tests: fixed some tests after new audit-logger
marcink -
r1700:818de782 default
parent child Browse files
Show More
@@ -1,172 +1,174 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import csv
23 23 import datetime
24 24
25 25 import pytest
26 26
27 27 from rhodecode.tests import *
28 28 from rhodecode.model.db import UserLog
29 29 from rhodecode.model.meta import Session
30 30 from rhodecode.lib.utils2 import safe_unicode
31 31
32 32 dn = os.path.dirname
33 33 FIXTURES = os.path.join(dn(dn(os.path.abspath(__file__))), 'fixtures')
34 34
35 35
36 36 class TestAdminController(TestController):
37 37
38 38 @pytest.fixture(scope='class', autouse=True)
39 39 def prepare(self, request, pylonsapp):
40 40 UserLog.query().delete()
41 41 Session().commit()
42 42
43 43 def strptime(val):
44 44 fmt = '%Y-%m-%d %H:%M:%S'
45 45 if '.' not in val:
46 46 return datetime.datetime.strptime(val, fmt)
47 47
48 48 nofrag, frag = val.split(".")
49 49 date = datetime.datetime.strptime(nofrag, fmt)
50 50
51 51 frag = frag[:6] # truncate to microseconds
52 52 frag += (6 - len(frag)) * '0' # add 0s
53 53 return date.replace(microsecond=int(frag))
54 54
55 55 with open(os.path.join(FIXTURES, 'journal_dump.csv')) as f:
56 56 for row in csv.DictReader(f):
57 57 ul = UserLog()
58 58 for k, v in row.iteritems():
59 59 v = safe_unicode(v)
60 60 if k == 'action_date':
61 61 v = strptime(v)
62 62 if k in ['user_id', 'repository_id']:
63 63 # nullable due to FK problems
64 64 v = None
65 65 setattr(ul, k, v)
66 66 Session().add(ul)
67 67 Session().commit()
68 68
69 69 @request.addfinalizer
70 70 def cleanup():
71 71 UserLog.query().delete()
72 72 Session().commit()
73 73
74 74 def test_index(self):
75 75 self.log_user()
76 76 response = self.app.get(url(controller='admin/admin', action='index'))
77 77 response.mustcontain('Admin journal')
78 78
79 79 def test_filter_all_entries(self):
80 80 self.log_user()
81 81 response = self.app.get(url(controller='admin/admin', action='index',))
82 response.mustcontain('2034 entries')
82 all_count = UserLog.query().count()
83 response.mustcontain('%s entries' % all_count)
83 84
84 85 def test_filter_journal_filter_exact_match_on_repository(self):
85 86 self.log_user()
86 87 response = self.app.get(url(controller='admin/admin', action='index',
87 88 filter='repository:rhodecode'))
88 89 response.mustcontain('3 entries')
89 90
90 91 def test_filter_journal_filter_exact_match_on_repository_CamelCase(self):
91 92 self.log_user()
92 93 response = self.app.get(url(controller='admin/admin', action='index',
93 94 filter='repository:RhodeCode'))
94 95 response.mustcontain('3 entries')
95 96
96 97 def test_filter_journal_filter_wildcard_on_repository(self):
97 98 self.log_user()
98 99 response = self.app.get(url(controller='admin/admin', action='index',
99 100 filter='repository:*test*'))
100 101 response.mustcontain('862 entries')
101 102
102 103 def test_filter_journal_filter_prefix_on_repository(self):
103 104 self.log_user()
104 105 response = self.app.get(url(controller='admin/admin', action='index',
105 106 filter='repository:test*'))
106 107 response.mustcontain('257 entries')
107 108
108 109 def test_filter_journal_filter_prefix_on_repository_CamelCase(self):
109 110 self.log_user()
110 111 response = self.app.get(url(controller='admin/admin', action='index',
111 112 filter='repository:Test*'))
112 113 response.mustcontain('257 entries')
113 114
114 115 def test_filter_journal_filter_prefix_on_repository_and_user(self):
115 116 self.log_user()
116 117 response = self.app.get(url(controller='admin/admin', action='index',
117 118 filter='repository:test* AND username:demo'))
118 119 response.mustcontain('130 entries')
119 120
120 121 def test_filter_journal_filter_prefix_on_repository_or_target_repo(self):
121 122 self.log_user()
122 123 response = self.app.get(url(controller='admin/admin', action='index',
123 124 filter='repository:test* OR repository:rhodecode'))
124 125 response.mustcontain('260 entries') # 257 + 3
125 126
126 127 def test_filter_journal_filter_exact_match_on_username(self):
127 128 self.log_user()
128 129 response = self.app.get(url(controller='admin/admin', action='index',
129 130 filter='username:demo'))
130 131 response.mustcontain('1087 entries')
131 132
132 133 def test_filter_journal_filter_exact_match_on_username_camelCase(self):
133 134 self.log_user()
134 135 response = self.app.get(url(controller='admin/admin', action='index',
135 136 filter='username:DemO'))
136 137 response.mustcontain('1087 entries')
137 138
138 139 def test_filter_journal_filter_wildcard_on_username(self):
139 140 self.log_user()
140 141 response = self.app.get(url(controller='admin/admin', action='index',
141 142 filter='username:*test*'))
142 response.mustcontain('100 entries')
143 entries_count = UserLog.query().filter(UserLog.username.ilike('%test%')).count()
144 response.mustcontain('{} entries'.format(entries_count))
143 145
144 146 def test_filter_journal_filter_prefix_on_username(self):
145 147 self.log_user()
146 148 response = self.app.get(url(controller='admin/admin', action='index',
147 149 filter='username:demo*'))
148 150 response.mustcontain('1101 entries')
149 151
150 152 def test_filter_journal_filter_prefix_on_user_or_other_user(self):
151 153 self.log_user()
152 154 response = self.app.get(url(controller='admin/admin', action='index',
153 155 filter='username:demo OR username:volcan'))
154 156 response.mustcontain('1095 entries') # 1087 + 8
155 157
156 158 def test_filter_journal_filter_wildcard_on_action(self):
157 159 self.log_user()
158 160 response = self.app.get(url(controller='admin/admin', action='index',
159 161 filter='action:*pull_request*'))
160 162 response.mustcontain('187 entries')
161 163
162 164 def test_filter_journal_filter_on_date(self):
163 165 self.log_user()
164 166 response = self.app.get(url(controller='admin/admin', action='index',
165 167 filter='date:20121010'))
166 168 response.mustcontain('47 entries')
167 169
168 170 def test_filter_journal_filter_on_date_2(self):
169 171 self.log_user()
170 172 response = self.app.get(url(controller='admin/admin', action='index',
171 173 filter='date:20121020'))
172 174 response.mustcontain('17 entries')
@@ -1,1264 +1,1268 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import urllib
22 22
23 23 import mock
24 24 import pytest
25 25
26 26 from rhodecode.lib import auth
27 27 from rhodecode.lib.utils2 import safe_str, str2bool, safe_unicode
28 28 from rhodecode.lib.vcs.exceptions import RepositoryRequirementError
29 29 from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\
30 30 Permission
31 31 from rhodecode.model.meta import Session
32 32 from rhodecode.model.repo import RepoModel
33 33 from rhodecode.model.repo_group import RepoGroupModel
34 34 from rhodecode.model.settings import SettingsModel, VcsSettingsModel
35 35 from rhodecode.model.user import UserModel
36 36 from rhodecode.tests import (
37 37 login_user_session, url, assert_session_flash, TEST_USER_ADMIN_LOGIN,
38 38 TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS, HG_REPO, GIT_REPO,
39 39 logout_user_session)
40 40 from rhodecode.tests.fixture import Fixture, error_function
41 41 from rhodecode.tests.utils import AssertResponse, repo_on_filesystem
42 42
43 43 fixture = Fixture()
44 44
45 45
46 46 @pytest.mark.usefixtures("app")
47 47 class TestAdminRepos(object):
48 48
49 49 def test_index(self):
50 50 self.app.get(url('repos'))
51 51
52 52 def test_create_page_restricted(self, autologin_user, backend):
53 53 with mock.patch('rhodecode.BACKENDS', {'git': 'git'}):
54 54 response = self.app.get(url('new_repo'), status=200)
55 55 assert_response = AssertResponse(response)
56 56 element = assert_response.get_element('#repo_type')
57 57 assert element.text_content() == '\ngit\n'
58 58
59 59 def test_create_page_non_restricted(self, autologin_user, backend):
60 60 response = self.app.get(url('new_repo'), status=200)
61 61 assert_response = AssertResponse(response)
62 62 assert_response.element_contains('#repo_type', 'git')
63 63 assert_response.element_contains('#repo_type', 'svn')
64 64 assert_response.element_contains('#repo_type', 'hg')
65 65
66 66 @pytest.mark.parametrize("suffix",
67 67 [u'', u'xxa'], ids=['', 'non-ascii'])
68 68 def test_create(self, autologin_user, backend, suffix, csrf_token):
69 69 repo_name_unicode = backend.new_repo_name(suffix=suffix)
70 70 repo_name = repo_name_unicode.encode('utf8')
71 71 description_unicode = u'description for newly created repo' + suffix
72 72 description = description_unicode.encode('utf8')
73 73 response = self.app.post(
74 74 url('repos'),
75 75 fixture._get_repo_create_params(
76 76 repo_private=False,
77 77 repo_name=repo_name,
78 78 repo_type=backend.alias,
79 79 repo_description=description,
80 80 csrf_token=csrf_token),
81 81 status=302)
82 82
83 83 self.assert_repository_is_created_correctly(
84 84 repo_name, description, backend)
85 85
86 86 def test_create_numeric(self, autologin_user, backend, csrf_token):
87 87 numeric_repo = '1234'
88 88 repo_name = numeric_repo
89 89 description = 'description for newly created repo' + numeric_repo
90 90 self.app.post(
91 91 url('repos'),
92 92 fixture._get_repo_create_params(
93 93 repo_private=False,
94 94 repo_name=repo_name,
95 95 repo_type=backend.alias,
96 96 repo_description=description,
97 97 csrf_token=csrf_token))
98 98
99 99 self.assert_repository_is_created_correctly(
100 100 repo_name, description, backend)
101 101
102 102 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ‡Δ™'], ids=['', 'non-ascii'])
103 103 def test_create_in_group(
104 104 self, autologin_user, backend, suffix, csrf_token):
105 105 # create GROUP
106 106 group_name = 'sometest_%s' % backend.alias
107 107 gr = RepoGroupModel().create(group_name=group_name,
108 108 group_description='test',
109 109 owner=TEST_USER_ADMIN_LOGIN)
110 110 Session().commit()
111 111
112 112 repo_name = u'ingroup' + suffix
113 113 repo_name_full = RepoGroup.url_sep().join(
114 114 [group_name, repo_name])
115 115 description = u'description for newly created repo'
116 116 self.app.post(
117 117 url('repos'),
118 118 fixture._get_repo_create_params(
119 119 repo_private=False,
120 120 repo_name=safe_str(repo_name),
121 121 repo_type=backend.alias,
122 122 repo_description=description,
123 123 repo_group=gr.group_id,
124 124 csrf_token=csrf_token))
125 125
126 126 # TODO: johbo: Cleanup work to fixture
127 127 try:
128 128 self.assert_repository_is_created_correctly(
129 129 repo_name_full, description, backend)
130 130
131 131 new_repo = RepoModel().get_by_repo_name(repo_name_full)
132 132 inherited_perms = UserRepoToPerm.query().filter(
133 133 UserRepoToPerm.repository_id == new_repo.repo_id).all()
134 134 assert len(inherited_perms) == 1
135 135 finally:
136 136 RepoModel().delete(repo_name_full)
137 137 RepoGroupModel().delete(group_name)
138 138 Session().commit()
139 139
140 140 def test_create_in_group_numeric(
141 141 self, autologin_user, backend, csrf_token):
142 142 # create GROUP
143 143 group_name = 'sometest_%s' % backend.alias
144 144 gr = RepoGroupModel().create(group_name=group_name,
145 145 group_description='test',
146 146 owner=TEST_USER_ADMIN_LOGIN)
147 147 Session().commit()
148 148
149 149 repo_name = '12345'
150 150 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
151 151 description = 'description for newly created repo'
152 152 self.app.post(
153 153 url('repos'),
154 154 fixture._get_repo_create_params(
155 155 repo_private=False,
156 156 repo_name=repo_name,
157 157 repo_type=backend.alias,
158 158 repo_description=description,
159 159 repo_group=gr.group_id,
160 160 csrf_token=csrf_token))
161 161
162 162 # TODO: johbo: Cleanup work to fixture
163 163 try:
164 164 self.assert_repository_is_created_correctly(
165 165 repo_name_full, description, backend)
166 166
167 167 new_repo = RepoModel().get_by_repo_name(repo_name_full)
168 168 inherited_perms = UserRepoToPerm.query()\
169 169 .filter(UserRepoToPerm.repository_id == new_repo.repo_id).all()
170 170 assert len(inherited_perms) == 1
171 171 finally:
172 172 RepoModel().delete(repo_name_full)
173 173 RepoGroupModel().delete(group_name)
174 174 Session().commit()
175 175
176 176 def test_create_in_group_without_needed_permissions(self, backend):
177 177 session = login_user_session(
178 178 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
179 179 csrf_token = auth.get_csrf_token(session)
180 180 # revoke
181 181 user_model = UserModel()
182 182 # disable fork and create on default user
183 183 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
184 184 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
185 185 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
186 186 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
187 187
188 188 # disable on regular user
189 189 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
190 190 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
191 191 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
192 192 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
193 193 Session().commit()
194 194
195 195 # create GROUP
196 196 group_name = 'reg_sometest_%s' % backend.alias
197 197 gr = RepoGroupModel().create(group_name=group_name,
198 198 group_description='test',
199 199 owner=TEST_USER_ADMIN_LOGIN)
200 200 Session().commit()
201 201
202 202 group_name_allowed = 'reg_sometest_allowed_%s' % backend.alias
203 203 gr_allowed = RepoGroupModel().create(
204 204 group_name=group_name_allowed,
205 205 group_description='test',
206 206 owner=TEST_USER_REGULAR_LOGIN)
207 207 Session().commit()
208 208
209 209 repo_name = 'ingroup'
210 210 description = 'description for newly created repo'
211 211 response = self.app.post(
212 212 url('repos'),
213 213 fixture._get_repo_create_params(
214 214 repo_private=False,
215 215 repo_name=repo_name,
216 216 repo_type=backend.alias,
217 217 repo_description=description,
218 218 repo_group=gr.group_id,
219 219 csrf_token=csrf_token))
220 220
221 221 response.mustcontain('Invalid value')
222 222
223 223 # user is allowed to create in this group
224 224 repo_name = 'ingroup'
225 225 repo_name_full = RepoGroup.url_sep().join(
226 226 [group_name_allowed, repo_name])
227 227 description = 'description for newly created repo'
228 228 response = self.app.post(
229 229 url('repos'),
230 230 fixture._get_repo_create_params(
231 231 repo_private=False,
232 232 repo_name=repo_name,
233 233 repo_type=backend.alias,
234 234 repo_description=description,
235 235 repo_group=gr_allowed.group_id,
236 236 csrf_token=csrf_token))
237 237
238 238 # TODO: johbo: Cleanup in pytest fixture
239 239 try:
240 240 self.assert_repository_is_created_correctly(
241 241 repo_name_full, description, backend)
242 242
243 243 new_repo = RepoModel().get_by_repo_name(repo_name_full)
244 244 inherited_perms = UserRepoToPerm.query().filter(
245 245 UserRepoToPerm.repository_id == new_repo.repo_id).all()
246 246 assert len(inherited_perms) == 1
247 247
248 248 assert repo_on_filesystem(repo_name_full)
249 249 finally:
250 250 RepoModel().delete(repo_name_full)
251 251 RepoGroupModel().delete(group_name)
252 252 RepoGroupModel().delete(group_name_allowed)
253 253 Session().commit()
254 254
255 255 def test_create_in_group_inherit_permissions(self, autologin_user, backend,
256 256 csrf_token):
257 257 # create GROUP
258 258 group_name = 'sometest_%s' % backend.alias
259 259 gr = RepoGroupModel().create(group_name=group_name,
260 260 group_description='test',
261 261 owner=TEST_USER_ADMIN_LOGIN)
262 262 perm = Permission.get_by_key('repository.write')
263 263 RepoGroupModel().grant_user_permission(
264 264 gr, TEST_USER_REGULAR_LOGIN, perm)
265 265
266 266 # add repo permissions
267 267 Session().commit()
268 268
269 269 repo_name = 'ingroup_inherited_%s' % backend.alias
270 270 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
271 271 description = 'description for newly created repo'
272 272 self.app.post(
273 273 url('repos'),
274 274 fixture._get_repo_create_params(
275 275 repo_private=False,
276 276 repo_name=repo_name,
277 277 repo_type=backend.alias,
278 278 repo_description=description,
279 279 repo_group=gr.group_id,
280 280 repo_copy_permissions=True,
281 281 csrf_token=csrf_token))
282 282
283 283 # TODO: johbo: Cleanup to pytest fixture
284 284 try:
285 285 self.assert_repository_is_created_correctly(
286 286 repo_name_full, description, backend)
287 287 except Exception:
288 288 RepoGroupModel().delete(group_name)
289 289 Session().commit()
290 290 raise
291 291
292 292 # check if inherited permissions are applied
293 293 new_repo = RepoModel().get_by_repo_name(repo_name_full)
294 294 inherited_perms = UserRepoToPerm.query().filter(
295 295 UserRepoToPerm.repository_id == new_repo.repo_id).all()
296 296 assert len(inherited_perms) == 2
297 297
298 298 assert TEST_USER_REGULAR_LOGIN in [
299 299 x.user.username for x in inherited_perms]
300 300 assert 'repository.write' in [
301 301 x.permission.permission_name for x in inherited_perms]
302 302
303 303 RepoModel().delete(repo_name_full)
304 304 RepoGroupModel().delete(group_name)
305 305 Session().commit()
306 306
307 307 @pytest.mark.xfail_backends(
308 308 "git", "hg", reason="Missing reposerver support")
309 309 def test_create_with_clone_uri(self, autologin_user, backend, reposerver,
310 310 csrf_token):
311 311 source_repo = backend.create_repo(number_of_commits=2)
312 312 source_repo_name = source_repo.repo_name
313 313 reposerver.serve(source_repo.scm_instance())
314 314
315 315 repo_name = backend.new_repo_name()
316 316 response = self.app.post(
317 317 url('repos'),
318 318 fixture._get_repo_create_params(
319 319 repo_private=False,
320 320 repo_name=repo_name,
321 321 repo_type=backend.alias,
322 322 repo_description='',
323 323 clone_uri=reposerver.url,
324 324 csrf_token=csrf_token),
325 325 status=302)
326 326
327 327 # Should be redirected to the creating page
328 328 response.mustcontain('repo_creating')
329 329
330 330 # Expecting that both repositories have same history
331 331 source_repo = RepoModel().get_by_repo_name(source_repo_name)
332 332 source_vcs = source_repo.scm_instance()
333 333 repo = RepoModel().get_by_repo_name(repo_name)
334 334 repo_vcs = repo.scm_instance()
335 335 assert source_vcs[0].message == repo_vcs[0].message
336 336 assert source_vcs.count() == repo_vcs.count()
337 337 assert source_vcs.commit_ids == repo_vcs.commit_ids
338 338
339 339 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
340 340 def test_create_remote_repo_wrong_clone_uri(self, autologin_user, backend,
341 341 csrf_token):
342 342 repo_name = backend.new_repo_name()
343 343 description = 'description for newly created repo'
344 344 response = self.app.post(
345 345 url('repos'),
346 346 fixture._get_repo_create_params(
347 347 repo_private=False,
348 348 repo_name=repo_name,
349 349 repo_type=backend.alias,
350 350 repo_description=description,
351 351 clone_uri='http://repo.invalid/repo',
352 352 csrf_token=csrf_token))
353 353 response.mustcontain('invalid clone url')
354 354
355 355 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
356 356 def test_create_remote_repo_wrong_clone_uri_hg_svn(
357 357 self, autologin_user, backend, csrf_token):
358 358 repo_name = backend.new_repo_name()
359 359 description = 'description for newly created repo'
360 360 response = self.app.post(
361 361 url('repos'),
362 362 fixture._get_repo_create_params(
363 363 repo_private=False,
364 364 repo_name=repo_name,
365 365 repo_type=backend.alias,
366 366 repo_description=description,
367 367 clone_uri='svn+http://svn.invalid/repo',
368 368 csrf_token=csrf_token))
369 369 response.mustcontain('invalid clone url')
370 370
371 371 def test_create_with_git_suffix(
372 372 self, autologin_user, backend, csrf_token):
373 373 repo_name = backend.new_repo_name() + ".git"
374 374 description = 'description for newly created repo'
375 375 response = self.app.post(
376 376 url('repos'),
377 377 fixture._get_repo_create_params(
378 378 repo_private=False,
379 379 repo_name=repo_name,
380 380 repo_type=backend.alias,
381 381 repo_description=description,
382 382 csrf_token=csrf_token))
383 383 response.mustcontain('Repository name cannot end with .git')
384 384
385 385 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ™Ε‚'], ids=['', 'non-ascii'])
386 386 def test_delete(self, autologin_user, backend, suffix, csrf_token):
387 387 repo = backend.create_repo(name_suffix=suffix)
388 388 repo_name = repo.repo_name
389 389
390 390 response = self.app.post(url('repo', repo_name=repo_name),
391 391 params={'_method': 'delete',
392 392 'csrf_token': csrf_token})
393 393 assert_session_flash(response, 'Deleted repository %s' % (repo_name))
394 394 response.follow()
395 395
396 396 # check if repo was deleted from db
397 397 assert RepoModel().get_by_repo_name(repo_name) is None
398 398 assert not repo_on_filesystem(repo_name)
399 399
400 400 def test_show(self, autologin_user, backend):
401 401 self.app.get(url('repo', repo_name=backend.repo_name))
402 402
403 403 def test_edit(self, backend, autologin_user):
404 404 self.app.get(url('edit_repo', repo_name=backend.repo_name))
405 405
406 406 def test_edit_accessible_when_missing_requirements(
407 407 self, backend_hg, autologin_user):
408 408 scm_patcher = mock.patch.object(
409 409 Repository, 'scm_instance', side_effect=RepositoryRequirementError)
410 410 with scm_patcher:
411 411 self.app.get(url('edit_repo', repo_name=backend_hg.repo_name))
412 412
413 413 def test_set_private_flag_sets_default_to_none(
414 414 self, autologin_user, backend, csrf_token):
415 415 # initially repository perm should be read
416 416 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
417 417 assert len(perm) == 1
418 418 assert perm[0].permission.permission_name == 'repository.read'
419 419 assert not backend.repo.private
420 420
421 421 response = self.app.post(
422 422 url('repo', repo_name=backend.repo_name),
423 423 fixture._get_repo_create_params(
424 424 repo_private=1,
425 425 repo_name=backend.repo_name,
426 426 repo_type=backend.alias,
427 427 user=TEST_USER_ADMIN_LOGIN,
428 428 _method='put',
429 429 csrf_token=csrf_token))
430 430 assert_session_flash(
431 431 response,
432 432 msg='Repository %s updated successfully' % (backend.repo_name))
433 433 assert backend.repo.private
434 434
435 435 # now the repo default permission should be None
436 436 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
437 437 assert len(perm) == 1
438 438 assert perm[0].permission.permission_name == 'repository.none'
439 439
440 440 response = self.app.post(
441 441 url('repo', repo_name=backend.repo_name),
442 442 fixture._get_repo_create_params(
443 443 repo_private=False,
444 444 repo_name=backend.repo_name,
445 445 repo_type=backend.alias,
446 446 user=TEST_USER_ADMIN_LOGIN,
447 447 _method='put',
448 448 csrf_token=csrf_token))
449 449 assert_session_flash(
450 450 response,
451 451 msg='Repository %s updated successfully' % (backend.repo_name))
452 452 assert not backend.repo.private
453 453
454 454 # we turn off private now the repo default permission should stay None
455 455 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
456 456 assert len(perm) == 1
457 457 assert perm[0].permission.permission_name == 'repository.none'
458 458
459 459 # update this permission back
460 460 perm[0].permission = Permission.get_by_key('repository.read')
461 461 Session().add(perm[0])
462 462 Session().commit()
463 463
464 464 def test_default_user_cannot_access_private_repo_in_a_group(
465 465 self, autologin_user, user_util, backend, csrf_token):
466 466
467 467 group = user_util.create_repo_group()
468 468
469 469 repo = backend.create_repo(
470 470 repo_private=True, repo_group=group, repo_copy_permissions=True)
471 471
472 472 permissions = _get_permission_for_user(
473 473 user='default', repo=repo.repo_name)
474 474 assert len(permissions) == 1
475 475 assert permissions[0].permission.permission_name == 'repository.none'
476 476 assert permissions[0].repository.private is True
477 477
478 478 def test_set_repo_fork_has_no_self_id(self, autologin_user, backend):
479 479 repo = backend.repo
480 480 response = self.app.get(
481 481 url('edit_repo_advanced', repo_name=backend.repo_name))
482 482 opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id
483 483 response.mustcontain(no=[opt])
484 484
485 485 def test_set_fork_of_target_repo(
486 486 self, autologin_user, backend, csrf_token):
487 487 target_repo = 'target_%s' % backend.alias
488 488 fixture.create_repo(target_repo, repo_type=backend.alias)
489 489 repo2 = Repository.get_by_repo_name(target_repo)
490 490 response = self.app.post(
491 491 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
492 492 params={'id_fork_of': repo2.repo_id, '_method': 'put',
493 493 'csrf_token': csrf_token})
494 494 repo = Repository.get_by_repo_name(backend.repo_name)
495 495 repo2 = Repository.get_by_repo_name(target_repo)
496 496 assert_session_flash(
497 497 response,
498 498 'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name))
499 499
500 500 assert repo.fork == repo2
501 501 response = response.follow()
502 502 # check if given repo is selected
503 503
504 504 opt = 'This repository is a fork of <a href="%s">%s</a>' % (
505 505 url('summary_home', repo_name=repo2.repo_name), repo2.repo_name)
506 506
507 507 response.mustcontain(opt)
508 508
509 509 fixture.destroy_repo(target_repo, forks='detach')
510 510
511 511 @pytest.mark.backends("hg", "git")
512 512 def test_set_fork_of_other_type_repo(self, autologin_user, backend,
513 513 csrf_token):
514 514 TARGET_REPO_MAP = {
515 515 'git': {
516 516 'type': 'hg',
517 517 'repo_name': HG_REPO},
518 518 'hg': {
519 519 'type': 'git',
520 520 'repo_name': GIT_REPO},
521 521 }
522 522 target_repo = TARGET_REPO_MAP[backend.alias]
523 523
524 524 repo2 = Repository.get_by_repo_name(target_repo['repo_name'])
525 525 response = self.app.post(
526 526 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
527 527 params={'id_fork_of': repo2.repo_id, '_method': 'put',
528 528 'csrf_token': csrf_token})
529 529 assert_session_flash(
530 530 response,
531 531 'Cannot set repository as fork of repository with other type')
532 532
533 533 def test_set_fork_of_none(self, autologin_user, backend, csrf_token):
534 534 # mark it as None
535 535 response = self.app.post(
536 536 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
537 537 params={'id_fork_of': None, '_method': 'put',
538 538 'csrf_token': csrf_token})
539 539 assert_session_flash(
540 540 response,
541 541 'Marked repo %s as fork of %s'
542 542 % (backend.repo_name, "Nothing"))
543 543 assert backend.repo.fork is None
544 544
545 545 def test_set_fork_of_same_repo(self, autologin_user, backend, csrf_token):
546 546 repo = Repository.get_by_repo_name(backend.repo_name)
547 547 response = self.app.post(
548 548 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
549 549 params={'id_fork_of': repo.repo_id, '_method': 'put',
550 550 'csrf_token': csrf_token})
551 551 assert_session_flash(
552 552 response, 'An error occurred during this operation')
553 553
554 554 def test_create_on_top_level_without_permissions(self, backend):
555 555 session = login_user_session(
556 556 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
557 557 csrf_token = auth.get_csrf_token(session)
558 558
559 559 # revoke
560 560 user_model = UserModel()
561 561 # disable fork and create on default user
562 562 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
563 563 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
564 564 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
565 565 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
566 566
567 567 # disable on regular user
568 568 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
569 569 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
570 570 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
571 571 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
572 572 Session().commit()
573 573
574 574 repo_name = backend.new_repo_name()
575 575 description = 'description for newly created repo'
576 576 response = self.app.post(
577 577 url('repos'),
578 578 fixture._get_repo_create_params(
579 579 repo_private=False,
580 580 repo_name=repo_name,
581 581 repo_type=backend.alias,
582 582 repo_description=description,
583 583 csrf_token=csrf_token))
584 584
585 585 response.mustcontain(
586 586 u"You do not have the permission to store repositories in "
587 587 u"the root location.")
588 588
589 589 @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
590 590 def test_create_repo_when_filesystem_op_fails(
591 591 self, autologin_user, backend, csrf_token):
592 592 repo_name = backend.new_repo_name()
593 593 description = 'description for newly created repo'
594 594
595 595 response = self.app.post(
596 596 url('repos'),
597 597 fixture._get_repo_create_params(
598 598 repo_private=False,
599 599 repo_name=repo_name,
600 600 repo_type=backend.alias,
601 601 repo_description=description,
602 602 csrf_token=csrf_token))
603 603
604 604 assert_session_flash(
605 605 response, 'Error creating repository %s' % repo_name)
606 606 # repo must not be in db
607 607 assert backend.repo is None
608 608 # repo must not be in filesystem !
609 609 assert not repo_on_filesystem(repo_name)
610 610
611 611 def assert_repository_is_created_correctly(
612 612 self, repo_name, description, backend):
613 613 repo_name_utf8 = safe_str(repo_name)
614 614
615 615 # run the check page that triggers the flash message
616 616 response = self.app.get(url('repo_check_home', repo_name=repo_name))
617 617 assert response.json == {u'result': True}
618 618
619 619 flash_msg = u'Created repository <a href="/{}">{}</a>'.format(
620 620 urllib.quote(repo_name_utf8), repo_name)
621 621 assert_session_flash(response, flash_msg)
622 622
623 623 # test if the repo was created in the database
624 624 new_repo = RepoModel().get_by_repo_name(repo_name)
625 625
626 626 assert new_repo.repo_name == repo_name
627 627 assert new_repo.description == description
628 628
629 629 # test if the repository is visible in the list ?
630 630 response = self.app.get(url('summary_home', repo_name=repo_name))
631 631 response.mustcontain(repo_name)
632 632 response.mustcontain(backend.alias)
633 633
634 634 assert repo_on_filesystem(repo_name)
635 635
636 636
637 637 @pytest.mark.usefixtures("app")
638 638 class TestVcsSettings(object):
639 639 FORM_DATA = {
640 640 'inherit_global_settings': False,
641 641 'hooks_changegroup_repo_size': False,
642 642 'hooks_changegroup_push_logger': False,
643 643 'hooks_outgoing_pull_logger': False,
644 644 'extensions_largefiles': False,
645 645 'phases_publish': 'False',
646 646 'rhodecode_pr_merge_enabled': False,
647 647 'rhodecode_use_outdated_comments': False,
648 648 'new_svn_branch': '',
649 649 'new_svn_tag': ''
650 650 }
651 651
652 652 @pytest.mark.skip_backends('svn')
653 653 def test_global_settings_initial_values(self, autologin_user, backend):
654 654 repo_name = backend.repo_name
655 655 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
656 656
657 657 expected_settings = (
658 658 'rhodecode_use_outdated_comments', 'rhodecode_pr_merge_enabled',
659 659 'hooks_changegroup_repo_size', 'hooks_changegroup_push_logger',
660 660 'hooks_outgoing_pull_logger'
661 661 )
662 662 for setting in expected_settings:
663 663 self.assert_repo_value_equals_global_value(response, setting)
664 664
665 665 def test_show_settings_requires_repo_admin_permission(
666 666 self, backend, user_util, settings_util):
667 667 repo = backend.create_repo()
668 668 repo_name = repo.repo_name
669 669 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
670 670 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
671 671 login_user_session(
672 672 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
673 673 self.app.get(url('repo_vcs_settings', repo_name=repo_name), status=200)
674 674
675 675 def test_inherit_global_settings_flag_is_true_by_default(
676 676 self, autologin_user, backend):
677 677 repo_name = backend.repo_name
678 678 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
679 679
680 680 assert_response = AssertResponse(response)
681 681 element = assert_response.get_element('#inherit_global_settings')
682 682 assert element.checked
683 683
684 684 @pytest.mark.parametrize('checked_value', [True, False])
685 685 def test_inherit_global_settings_value(
686 686 self, autologin_user, backend, checked_value, settings_util):
687 687 repo = backend.create_repo()
688 688 repo_name = repo.repo_name
689 689 settings_util.create_repo_rhodecode_setting(
690 690 repo, 'inherit_vcs_settings', checked_value, 'bool')
691 691 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
692 692
693 693 assert_response = AssertResponse(response)
694 694 element = assert_response.get_element('#inherit_global_settings')
695 695 assert element.checked == checked_value
696 696
697 697 @pytest.mark.skip_backends('svn')
698 698 def test_hooks_settings_are_created(
699 699 self, autologin_user, backend, csrf_token):
700 700 repo_name = backend.repo_name
701 701 data = self.FORM_DATA.copy()
702 702 data['csrf_token'] = csrf_token
703 703 self.app.post(
704 704 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
705 705 settings = SettingsModel(repo=repo_name)
706 706 try:
707 707 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
708 708 ui = settings.get_ui_by_section_and_key(section, key)
709 709 assert ui.ui_active is False
710 710 finally:
711 711 self._cleanup_repo_settings(settings)
712 712
713 713 def test_hooks_settings_are_not_created_for_svn(
714 714 self, autologin_user, backend_svn, csrf_token):
715 715 repo_name = backend_svn.repo_name
716 716 data = self.FORM_DATA.copy()
717 717 data['csrf_token'] = csrf_token
718 718 self.app.post(
719 719 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
720 720 settings = SettingsModel(repo=repo_name)
721 721 try:
722 722 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
723 723 ui = settings.get_ui_by_section_and_key(section, key)
724 724 assert ui is None
725 725 finally:
726 726 self._cleanup_repo_settings(settings)
727 727
728 728 @pytest.mark.skip_backends('svn')
729 729 def test_hooks_settings_are_updated(
730 730 self, autologin_user, backend, csrf_token):
731 731 repo_name = backend.repo_name
732 732 settings = SettingsModel(repo=repo_name)
733 733 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
734 734 settings.create_ui_section_value(section, '', key=key, active=True)
735 735
736 736 data = self.FORM_DATA.copy()
737 737 data['csrf_token'] = csrf_token
738 738 self.app.post(
739 739 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
740 740 try:
741 741 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
742 742 ui = settings.get_ui_by_section_and_key(section, key)
743 743 assert ui.ui_active is False
744 744 finally:
745 745 self._cleanup_repo_settings(settings)
746 746
747 747 def test_hooks_settings_are_not_updated_for_svn(
748 748 self, autologin_user, backend_svn, csrf_token):
749 749 repo_name = backend_svn.repo_name
750 750 settings = SettingsModel(repo=repo_name)
751 751 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
752 752 settings.create_ui_section_value(section, '', key=key, active=True)
753 753
754 754 data = self.FORM_DATA.copy()
755 755 data['csrf_token'] = csrf_token
756 756 self.app.post(
757 757 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
758 758 try:
759 759 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
760 760 ui = settings.get_ui_by_section_and_key(section, key)
761 761 assert ui.ui_active is True
762 762 finally:
763 763 self._cleanup_repo_settings(settings)
764 764
765 765 @pytest.mark.skip_backends('svn')
766 766 def test_pr_settings_are_created(
767 767 self, autologin_user, backend, csrf_token):
768 768 repo_name = backend.repo_name
769 769 data = self.FORM_DATA.copy()
770 770 data['csrf_token'] = csrf_token
771 771 self.app.post(
772 772 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
773 773 settings = SettingsModel(repo=repo_name)
774 774 try:
775 775 for name in VcsSettingsModel.GENERAL_SETTINGS:
776 776 setting = settings.get_setting_by_name(name)
777 777 assert setting.app_settings_value is False
778 778 finally:
779 779 self._cleanup_repo_settings(settings)
780 780
781 781 def test_pr_settings_are_not_created_for_svn(
782 782 self, autologin_user, backend_svn, csrf_token):
783 783 repo_name = backend_svn.repo_name
784 784 data = self.FORM_DATA.copy()
785 785 data['csrf_token'] = csrf_token
786 786 self.app.post(
787 787 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
788 788 settings = SettingsModel(repo=repo_name)
789 789 try:
790 790 for name in VcsSettingsModel.GENERAL_SETTINGS:
791 791 setting = settings.get_setting_by_name(name)
792 792 assert setting is None
793 793 finally:
794 794 self._cleanup_repo_settings(settings)
795 795
796 796 def test_pr_settings_creation_requires_repo_admin_permission(
797 797 self, backend, user_util, settings_util, csrf_token):
798 798 repo = backend.create_repo()
799 799 repo_name = repo.repo_name
800 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
801 800
802 801 logout_user_session(self.app, csrf_token)
803 802 session = login_user_session(
804 803 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
805 804 new_csrf_token = auth.get_csrf_token(session)
806 805
806 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
807 repo = Repository.get_by_repo_name(repo_name)
807 808 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
808 809 data = self.FORM_DATA.copy()
809 810 data['csrf_token'] = new_csrf_token
810 811 settings = SettingsModel(repo=repo_name)
811 812
812 813 try:
813 814 self.app.post(
814 815 url('repo_vcs_settings', repo_name=repo_name), data,
815 816 status=302)
816 817 finally:
817 818 self._cleanup_repo_settings(settings)
818 819
819 820 @pytest.mark.skip_backends('svn')
820 821 def test_pr_settings_are_updated(
821 822 self, autologin_user, backend, csrf_token):
822 823 repo_name = backend.repo_name
823 824 settings = SettingsModel(repo=repo_name)
824 825 for name in VcsSettingsModel.GENERAL_SETTINGS:
825 826 settings.create_or_update_setting(name, True, 'bool')
826 827
827 828 data = self.FORM_DATA.copy()
828 829 data['csrf_token'] = csrf_token
829 830 self.app.post(
830 831 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
831 832 try:
832 833 for name in VcsSettingsModel.GENERAL_SETTINGS:
833 834 setting = settings.get_setting_by_name(name)
834 835 assert setting.app_settings_value is False
835 836 finally:
836 837 self._cleanup_repo_settings(settings)
837 838
838 839 def test_pr_settings_are_not_updated_for_svn(
839 840 self, autologin_user, backend_svn, csrf_token):
840 841 repo_name = backend_svn.repo_name
841 842 settings = SettingsModel(repo=repo_name)
842 843 for name in VcsSettingsModel.GENERAL_SETTINGS:
843 844 settings.create_or_update_setting(name, True, 'bool')
844 845
845 846 data = self.FORM_DATA.copy()
846 847 data['csrf_token'] = csrf_token
847 848 self.app.post(
848 849 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
849 850 try:
850 851 for name in VcsSettingsModel.GENERAL_SETTINGS:
851 852 setting = settings.get_setting_by_name(name)
852 853 assert setting.app_settings_value is True
853 854 finally:
854 855 self._cleanup_repo_settings(settings)
855 856
856 857 def test_svn_settings_are_created(
857 858 self, autologin_user, backend_svn, csrf_token, settings_util):
858 859 repo_name = backend_svn.repo_name
859 860 data = self.FORM_DATA.copy()
860 861 data['new_svn_tag'] = 'svn-tag'
861 862 data['new_svn_branch'] = 'svn-branch'
862 863 data['csrf_token'] = csrf_token
863 864
864 865 # Create few global settings to make sure that uniqueness validators
865 866 # are not triggered
866 867 settings_util.create_rhodecode_ui(
867 868 VcsSettingsModel.SVN_BRANCH_SECTION, 'svn-branch')
868 869 settings_util.create_rhodecode_ui(
869 870 VcsSettingsModel.SVN_TAG_SECTION, 'svn-tag')
870 871
871 872 self.app.post(
872 873 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
873 874 settings = SettingsModel(repo=repo_name)
874 875 try:
875 876 svn_branches = settings.get_ui_by_section(
876 877 VcsSettingsModel.SVN_BRANCH_SECTION)
877 878 svn_branch_names = [b.ui_value for b in svn_branches]
878 879 svn_tags = settings.get_ui_by_section(
879 880 VcsSettingsModel.SVN_TAG_SECTION)
880 881 svn_tag_names = [b.ui_value for b in svn_tags]
881 882 assert 'svn-branch' in svn_branch_names
882 883 assert 'svn-tag' in svn_tag_names
883 884 finally:
884 885 self._cleanup_repo_settings(settings)
885 886
886 887 def test_svn_settings_are_unique(
887 888 self, autologin_user, backend_svn, csrf_token, settings_util):
888 889 repo = backend_svn.repo
889 890 repo_name = repo.repo_name
890 891 data = self.FORM_DATA.copy()
891 892 data['new_svn_tag'] = 'test_tag'
892 893 data['new_svn_branch'] = 'test_branch'
893 894 data['csrf_token'] = csrf_token
894 895 settings_util.create_repo_rhodecode_ui(
895 896 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch')
896 897 settings_util.create_repo_rhodecode_ui(
897 898 repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag')
898 899
899 900 response = self.app.post(
900 901 url('repo_vcs_settings', repo_name=repo_name), data, status=200)
901 902 response.mustcontain('Pattern already exists')
902 903
903 904 def test_svn_settings_with_empty_values_are_not_created(
904 905 self, autologin_user, backend_svn, csrf_token):
905 906 repo_name = backend_svn.repo_name
906 907 data = self.FORM_DATA.copy()
907 908 data['csrf_token'] = csrf_token
908 909 self.app.post(
909 910 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
910 911 settings = SettingsModel(repo=repo_name)
911 912 try:
912 913 svn_branches = settings.get_ui_by_section(
913 914 VcsSettingsModel.SVN_BRANCH_SECTION)
914 915 svn_tags = settings.get_ui_by_section(
915 916 VcsSettingsModel.SVN_TAG_SECTION)
916 917 assert len(svn_branches) == 0
917 918 assert len(svn_tags) == 0
918 919 finally:
919 920 self._cleanup_repo_settings(settings)
920 921
921 922 def test_svn_settings_are_shown_for_svn_repository(
922 923 self, autologin_user, backend_svn, csrf_token):
923 924 repo_name = backend_svn.repo_name
924 925 response = self.app.get(
925 926 url('repo_vcs_settings', repo_name=repo_name), status=200)
926 927 response.mustcontain('Subversion Settings')
927 928
928 929 @pytest.mark.skip_backends('svn')
929 930 def test_svn_settings_are_not_created_for_not_svn_repository(
930 931 self, autologin_user, backend, csrf_token):
931 932 repo_name = backend.repo_name
932 933 data = self.FORM_DATA.copy()
933 934 data['csrf_token'] = csrf_token
934 935 self.app.post(
935 936 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
936 937 settings = SettingsModel(repo=repo_name)
937 938 try:
938 939 svn_branches = settings.get_ui_by_section(
939 940 VcsSettingsModel.SVN_BRANCH_SECTION)
940 941 svn_tags = settings.get_ui_by_section(
941 942 VcsSettingsModel.SVN_TAG_SECTION)
942 943 assert len(svn_branches) == 0
943 944 assert len(svn_tags) == 0
944 945 finally:
945 946 self._cleanup_repo_settings(settings)
946 947
947 948 @pytest.mark.skip_backends('svn')
948 949 def test_svn_settings_are_shown_only_for_svn_repository(
949 950 self, autologin_user, backend, csrf_token):
950 951 repo_name = backend.repo_name
951 952 response = self.app.get(
952 953 url('repo_vcs_settings', repo_name=repo_name), status=200)
953 954 response.mustcontain(no='Subversion Settings')
954 955
955 956 def test_hg_settings_are_created(
956 957 self, autologin_user, backend_hg, csrf_token):
957 958 repo_name = backend_hg.repo_name
958 959 data = self.FORM_DATA.copy()
959 960 data['new_svn_tag'] = 'svn-tag'
960 961 data['new_svn_branch'] = 'svn-branch'
961 962 data['csrf_token'] = csrf_token
962 963 self.app.post(
963 964 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
964 965 settings = SettingsModel(repo=repo_name)
965 966 try:
966 967 largefiles_ui = settings.get_ui_by_section_and_key(
967 968 'extensions', 'largefiles')
968 969 assert largefiles_ui.ui_active is False
969 970 phases_ui = settings.get_ui_by_section_and_key(
970 971 'phases', 'publish')
971 972 assert str2bool(phases_ui.ui_value) is False
972 973 finally:
973 974 self._cleanup_repo_settings(settings)
974 975
975 976 def test_hg_settings_are_updated(
976 977 self, autologin_user, backend_hg, csrf_token):
977 978 repo_name = backend_hg.repo_name
978 979 settings = SettingsModel(repo=repo_name)
979 980 settings.create_ui_section_value(
980 981 'extensions', '', key='largefiles', active=True)
981 982 settings.create_ui_section_value(
982 983 'phases', '1', key='publish', active=True)
983 984
984 985 data = self.FORM_DATA.copy()
985 986 data['csrf_token'] = csrf_token
986 987 self.app.post(
987 988 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
988 989 try:
989 990 largefiles_ui = settings.get_ui_by_section_and_key(
990 991 'extensions', 'largefiles')
991 992 assert largefiles_ui.ui_active is False
992 993 phases_ui = settings.get_ui_by_section_and_key(
993 994 'phases', 'publish')
994 995 assert str2bool(phases_ui.ui_value) is False
995 996 finally:
996 997 self._cleanup_repo_settings(settings)
997 998
998 999 def test_hg_settings_are_shown_for_hg_repository(
999 1000 self, autologin_user, backend_hg, csrf_token):
1000 1001 repo_name = backend_hg.repo_name
1001 1002 response = self.app.get(
1002 1003 url('repo_vcs_settings', repo_name=repo_name), status=200)
1003 1004 response.mustcontain('Mercurial Settings')
1004 1005
1005 1006 @pytest.mark.skip_backends('hg')
1006 1007 def test_hg_settings_are_created_only_for_hg_repository(
1007 1008 self, autologin_user, backend, csrf_token):
1008 1009 repo_name = backend.repo_name
1009 1010 data = self.FORM_DATA.copy()
1010 1011 data['csrf_token'] = csrf_token
1011 1012 self.app.post(
1012 1013 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1013 1014 settings = SettingsModel(repo=repo_name)
1014 1015 try:
1015 1016 largefiles_ui = settings.get_ui_by_section_and_key(
1016 1017 'extensions', 'largefiles')
1017 1018 assert largefiles_ui is None
1018 1019 phases_ui = settings.get_ui_by_section_and_key(
1019 1020 'phases', 'publish')
1020 1021 assert phases_ui is None
1021 1022 finally:
1022 1023 self._cleanup_repo_settings(settings)
1023 1024
1024 1025 @pytest.mark.skip_backends('hg')
1025 1026 def test_hg_settings_are_shown_only_for_hg_repository(
1026 1027 self, autologin_user, backend, csrf_token):
1027 1028 repo_name = backend.repo_name
1028 1029 response = self.app.get(
1029 1030 url('repo_vcs_settings', repo_name=repo_name), status=200)
1030 1031 response.mustcontain(no='Mercurial Settings')
1031 1032
1032 1033 @pytest.mark.skip_backends('hg')
1033 1034 def test_hg_settings_are_updated_only_for_hg_repository(
1034 1035 self, autologin_user, backend, csrf_token):
1035 1036 repo_name = backend.repo_name
1036 1037 settings = SettingsModel(repo=repo_name)
1037 1038 settings.create_ui_section_value(
1038 1039 'extensions', '', key='largefiles', active=True)
1039 1040 settings.create_ui_section_value(
1040 1041 'phases', '1', key='publish', active=True)
1041 1042
1042 1043 data = self.FORM_DATA.copy()
1043 1044 data['csrf_token'] = csrf_token
1044 1045 self.app.post(
1045 1046 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1046 1047 try:
1047 1048 largefiles_ui = settings.get_ui_by_section_and_key(
1048 1049 'extensions', 'largefiles')
1049 1050 assert largefiles_ui.ui_active is True
1050 1051 phases_ui = settings.get_ui_by_section_and_key(
1051 1052 'phases', 'publish')
1052 1053 assert phases_ui.ui_value == '1'
1053 1054 finally:
1054 1055 self._cleanup_repo_settings(settings)
1055 1056
1056 1057 def test_per_repo_svn_settings_are_displayed(
1057 1058 self, autologin_user, backend_svn, settings_util):
1058 1059 repo = backend_svn.create_repo()
1059 1060 repo_name = repo.repo_name
1060 1061 branches = [
1061 1062 settings_util.create_repo_rhodecode_ui(
1062 1063 repo, VcsSettingsModel.SVN_BRANCH_SECTION,
1063 1064 'branch_{}'.format(i))
1064 1065 for i in range(10)]
1065 1066 tags = [
1066 1067 settings_util.create_repo_rhodecode_ui(
1067 1068 repo, VcsSettingsModel.SVN_TAG_SECTION, 'tag_{}'.format(i))
1068 1069 for i in range(10)]
1069 1070
1070 1071 response = self.app.get(
1071 1072 url('repo_vcs_settings', repo_name=repo_name), status=200)
1072 1073 assert_response = AssertResponse(response)
1073 1074 for branch in branches:
1074 1075 css_selector = '[name=branch_value_{}]'.format(branch.ui_id)
1075 1076 element = assert_response.get_element(css_selector)
1076 1077 assert element.value == branch.ui_value
1077 1078 for tag in tags:
1078 1079 css_selector = '[name=tag_ui_value_new_{}]'.format(tag.ui_id)
1079 1080 element = assert_response.get_element(css_selector)
1080 1081 assert element.value == tag.ui_value
1081 1082
1082 1083 def test_per_repo_hg_and_pr_settings_are_not_displayed_for_svn(
1083 1084 self, autologin_user, backend_svn, settings_util):
1084 1085 repo = backend_svn.create_repo()
1085 1086 repo_name = repo.repo_name
1086 1087 response = self.app.get(
1087 1088 url('repo_vcs_settings', repo_name=repo_name), status=200)
1088 1089 response.mustcontain(no='<label>Hooks:</label>')
1089 1090 response.mustcontain(no='<label>Pull Request Settings:</label>')
1090 1091
1091 1092 def test_inherit_global_settings_value_is_saved(
1092 1093 self, autologin_user, backend, csrf_token):
1093 1094 repo_name = backend.repo_name
1094 1095 data = self.FORM_DATA.copy()
1095 1096 data['csrf_token'] = csrf_token
1096 1097 data['inherit_global_settings'] = True
1097 1098 self.app.post(
1098 1099 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1099 1100
1100 1101 settings = SettingsModel(repo=repo_name)
1101 1102 vcs_settings = VcsSettingsModel(repo=repo_name)
1102 1103 try:
1103 1104 assert vcs_settings.inherit_global_settings is True
1104 1105 finally:
1105 1106 self._cleanup_repo_settings(settings)
1106 1107
1107 1108 def test_repo_cache_is_invalidated_when_settings_are_updated(
1108 1109 self, autologin_user, backend, csrf_token):
1109 1110 repo_name = backend.repo_name
1110 1111 data = self.FORM_DATA.copy()
1111 1112 data['csrf_token'] = csrf_token
1112 1113 data['inherit_global_settings'] = True
1113 1114 settings = SettingsModel(repo=repo_name)
1114 1115
1115 1116 invalidation_patcher = mock.patch(
1116 1117 'rhodecode.controllers.admin.repos.ScmModel.mark_for_invalidation')
1117 1118 with invalidation_patcher as invalidation_mock:
1118 1119 self.app.post(
1119 1120 url('repo_vcs_settings', repo_name=repo_name), data,
1120 1121 status=302)
1121 1122 try:
1122 1123 invalidation_mock.assert_called_once_with(repo_name, delete=True)
1123 1124 finally:
1124 1125 self._cleanup_repo_settings(settings)
1125 1126
1126 1127 def test_other_settings_not_saved_inherit_global_settings_is_true(
1127 1128 self, autologin_user, backend, csrf_token):
1128 1129 repo_name = backend.repo_name
1129 1130 data = self.FORM_DATA.copy()
1130 1131 data['csrf_token'] = csrf_token
1131 1132 data['inherit_global_settings'] = True
1132 1133 self.app.post(
1133 1134 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1134 1135
1135 1136 settings = SettingsModel(repo=repo_name)
1136 1137 ui_settings = (
1137 1138 VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS)
1138 1139
1139 1140 vcs_settings = []
1140 1141 try:
1141 1142 for section, key in ui_settings:
1142 1143 ui = settings.get_ui_by_section_and_key(section, key)
1143 1144 if ui:
1144 1145 vcs_settings.append(ui)
1145 1146 vcs_settings.extend(settings.get_ui_by_section(
1146 1147 VcsSettingsModel.SVN_BRANCH_SECTION))
1147 1148 vcs_settings.extend(settings.get_ui_by_section(
1148 1149 VcsSettingsModel.SVN_TAG_SECTION))
1149 1150 for name in VcsSettingsModel.GENERAL_SETTINGS:
1150 1151 setting = settings.get_setting_by_name(name)
1151 1152 if setting:
1152 1153 vcs_settings.append(setting)
1153 1154 assert vcs_settings == []
1154 1155 finally:
1155 1156 self._cleanup_repo_settings(settings)
1156 1157
1157 1158 def test_delete_svn_branch_and_tag_patterns(
1158 1159 self, autologin_user, backend_svn, settings_util, csrf_token):
1159 1160 repo = backend_svn.create_repo()
1160 1161 repo_name = repo.repo_name
1161 1162 branch = settings_util.create_repo_rhodecode_ui(
1162 1163 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch',
1163 1164 cleanup=False)
1164 1165 tag = settings_util.create_repo_rhodecode_ui(
1165 1166 repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag', cleanup=False)
1166 1167 data = {
1167 1168 '_method': 'delete',
1168 1169 'csrf_token': csrf_token
1169 1170 }
1170 1171 for id_ in (branch.ui_id, tag.ui_id):
1171 1172 data['delete_svn_pattern'] = id_,
1172 1173 self.app.post(
1173 1174 url('repo_vcs_settings', repo_name=repo_name), data,
1174 1175 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
1175 1176 settings = VcsSettingsModel(repo=repo_name)
1176 1177 assert settings.get_repo_svn_branch_patterns() == []
1177 1178
1178 1179 def test_delete_svn_branch_requires_repo_admin_permission(
1179 1180 self, backend_svn, user_util, settings_util, csrf_token):
1180 1181 repo = backend_svn.create_repo()
1181 1182 repo_name = repo.repo_name
1182 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1183
1183 1184 logout_user_session(self.app, csrf_token)
1184 1185 session = login_user_session(
1185 1186 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
1186 1187 csrf_token = auth.get_csrf_token(session)
1188
1189 repo = Repository.get_by_repo_name(repo_name)
1190 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1187 1191 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
1188 1192 branch = settings_util.create_repo_rhodecode_ui(
1189 1193 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch',
1190 1194 cleanup=False)
1191 1195 data = {
1192 1196 '_method': 'delete',
1193 1197 'csrf_token': csrf_token,
1194 1198 'delete_svn_pattern': branch.ui_id
1195 1199 }
1196 1200 self.app.post(
1197 1201 url('repo_vcs_settings', repo_name=repo_name), data,
1198 1202 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
1199 1203
1200 1204 def test_delete_svn_branch_raises_400_when_not_found(
1201 1205 self, autologin_user, backend_svn, settings_util, csrf_token):
1202 1206 repo_name = backend_svn.repo_name
1203 1207 data = {
1204 1208 '_method': 'delete',
1205 1209 'delete_svn_pattern': 123,
1206 1210 'csrf_token': csrf_token
1207 1211 }
1208 1212 self.app.post(
1209 1213 url('repo_vcs_settings', repo_name=repo_name), data,
1210 1214 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400)
1211 1215
1212 1216 def test_delete_svn_branch_raises_400_when_no_id_specified(
1213 1217 self, autologin_user, backend_svn, settings_util, csrf_token):
1214 1218 repo_name = backend_svn.repo_name
1215 1219 data = {
1216 1220 '_method': 'delete',
1217 1221 'csrf_token': csrf_token
1218 1222 }
1219 1223 self.app.post(
1220 1224 url('repo_vcs_settings', repo_name=repo_name), data,
1221 1225 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400)
1222 1226
1223 1227 def _cleanup_repo_settings(self, settings_model):
1224 1228 cleanup = []
1225 1229 ui_settings = (
1226 1230 VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS)
1227 1231
1228 1232 for section, key in ui_settings:
1229 1233 ui = settings_model.get_ui_by_section_and_key(section, key)
1230 1234 if ui:
1231 1235 cleanup.append(ui)
1232 1236
1233 1237 cleanup.extend(settings_model.get_ui_by_section(
1234 1238 VcsSettingsModel.INHERIT_SETTINGS))
1235 1239 cleanup.extend(settings_model.get_ui_by_section(
1236 1240 VcsSettingsModel.SVN_BRANCH_SECTION))
1237 1241 cleanup.extend(settings_model.get_ui_by_section(
1238 1242 VcsSettingsModel.SVN_TAG_SECTION))
1239 1243
1240 1244 for name in VcsSettingsModel.GENERAL_SETTINGS:
1241 1245 setting = settings_model.get_setting_by_name(name)
1242 1246 if setting:
1243 1247 cleanup.append(setting)
1244 1248
1245 1249 for object_ in cleanup:
1246 1250 Session().delete(object_)
1247 1251 Session().commit()
1248 1252
1249 1253 def assert_repo_value_equals_global_value(self, response, setting):
1250 1254 assert_response = AssertResponse(response)
1251 1255 global_css_selector = '[name={}_inherited]'.format(setting)
1252 1256 repo_css_selector = '[name={}]'.format(setting)
1253 1257 repo_element = assert_response.get_element(repo_css_selector)
1254 1258 global_element = assert_response.get_element(global_css_selector)
1255 1259 assert repo_element.value == global_element.value
1256 1260
1257 1261
1258 1262 def _get_permission_for_user(user, repo):
1259 1263 perm = UserRepoToPerm.query()\
1260 1264 .filter(UserRepoToPerm.repository ==
1261 1265 Repository.get_by_repo_name(repo))\
1262 1266 .filter(UserRepoToPerm.user == User.get_by_username(user))\
1263 1267 .all()
1264 1268 return perm
General Comments 0
You need to be logged in to leave comments. Login now