##// END OF EJS Templates
tests: made few tests handle database transactions better.
marcink -
r2786:35caee67 default
parent child Browse files
Show More
@@ -1,82 +1,82 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import pytest
22 22
23 23 from rhodecode.tests import TestController
24 24 from rhodecode.tests.fixture import Fixture
25 25
26 26 fixture = Fixture()
27 27
28 28
29 29 def route_path(name, params=None, **kwargs):
30 30 import urllib
31 31 from rhodecode.apps._base import ADMIN_PREFIX
32 32
33 33 base_url = {
34 34 'admin_home': ADMIN_PREFIX,
35 35 'pullrequest_show': '/{repo_name}/pull-request/{pull_request_id}',
36 36 'pull_requests_global': ADMIN_PREFIX + '/pull-request/{pull_request_id}',
37 37 'pull_requests_global_0': ADMIN_PREFIX + '/pull_requests/{pull_request_id}',
38 38 'pull_requests_global_1': ADMIN_PREFIX + '/pull-requests/{pull_request_id}',
39 39
40 40 }[name].format(**kwargs)
41 41
42 42 if params:
43 43 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
44 44 return base_url
45 45
46 46
47 47 class TestAdminMainView(TestController):
48 48
49 49 def test_redirect_admin_home(self):
50 50 self.log_user()
51 51 response = self.app.get(route_path('admin_home'), status=302)
52 52 assert response.location.endswith('/audit_logs')
53 53
54 54 def test_redirect_pull_request_view(self, view):
55 55 self.log_user()
56 56 self.app.get(
57 57 route_path(view, pull_request_id='xxxx'),
58 58 status=404)
59 59
60 60 @pytest.mark.backends("git", "hg")
61 61 @pytest.mark.parametrize('view', [
62 62 'pull_requests_global',
63 63 'pull_requests_global_0',
64 64 'pull_requests_global_1',
65 65 ])
66 66 def test_redirect_pull_request_view(self, view, pr_util):
67 67 self.log_user()
68 68 pull_request = pr_util.create_pull_request()
69 69 pull_request_id = pull_request.pull_request_id
70 repo_name = pull_request.target_repo.repo_name
70 71
71 72 response = self.app.get(
72 73 route_path(view, pull_request_id=pull_request_id),
73 74 status=302)
74 75 assert response.location.endswith(
75 76 'pull-request/{}'.format(pull_request_id))
76 77
77 repo_name = pull_request.target_repo.repo_name
78 78 redirect_url = route_path(
79 79 'pullrequest_show', repo_name=repo_name,
80 pull_request_id=pull_request.pull_request_id)
80 pull_request_id=pull_request_id)
81 81
82 82 assert redirect_url in response.location
@@ -1,509 +1,512 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import urllib
22 22
23 23 import mock
24 24 import pytest
25 25
26 26 from rhodecode.apps._base import ADMIN_PREFIX
27 27 from rhodecode.lib import auth
28 28 from rhodecode.lib.utils2 import safe_str
29 29 from rhodecode.lib import helpers as h
30 30 from rhodecode.model.db import (
31 31 Repository, RepoGroup, UserRepoToPerm, User, Permission)
32 32 from rhodecode.model.meta import Session
33 33 from rhodecode.model.repo import RepoModel
34 34 from rhodecode.model.repo_group import RepoGroupModel
35 35 from rhodecode.model.user import UserModel
36 36 from rhodecode.tests import (
37 37 login_user_session, assert_session_flash, TEST_USER_ADMIN_LOGIN,
38 38 TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
39 39 from rhodecode.tests.fixture import Fixture, error_function
40 40 from rhodecode.tests.utils import AssertResponse, repo_on_filesystem
41 41
42 42 fixture = Fixture()
43 43
44 44
45 45 def route_path(name, params=None, **kwargs):
46 46 import urllib
47 47
48 48 base_url = {
49 49 'repos': ADMIN_PREFIX + '/repos',
50 50 'repo_new': ADMIN_PREFIX + '/repos/new',
51 51 'repo_create': ADMIN_PREFIX + '/repos/create',
52 52
53 53 'repo_creating_check': '/{repo_name}/repo_creating_check',
54 54 }[name].format(**kwargs)
55 55
56 56 if params:
57 57 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
58 58 return base_url
59 59
60 60
61 61 def _get_permission_for_user(user, repo):
62 62 perm = UserRepoToPerm.query()\
63 63 .filter(UserRepoToPerm.repository ==
64 64 Repository.get_by_repo_name(repo))\
65 65 .filter(UserRepoToPerm.user == User.get_by_username(user))\
66 66 .all()
67 67 return perm
68 68
69 69
70 70 @pytest.mark.usefixtures("app")
71 71 class TestAdminRepos(object):
72 72
73 73 def test_repo_list(self, autologin_user, user_util):
74 74 repo = user_util.create_repo()
75 repo_name = repo.repo_name
75 76 response = self.app.get(
76 77 route_path('repos'), status=200)
77 78
78 response.mustcontain(repo.repo_name)
79 response.mustcontain(repo_name)
79 80
80 81 def test_create_page_restricted_to_single_backend(self, autologin_user, backend):
81 82 with mock.patch('rhodecode.BACKENDS', {'git': 'git'}):
82 83 response = self.app.get(route_path('repo_new'), status=200)
83 84 assert_response = AssertResponse(response)
84 85 element = assert_response.get_element('#repo_type')
85 86 assert element.text_content() == '\ngit\n'
86 87
87 88 def test_create_page_non_restricted_backends(self, autologin_user, backend):
88 89 response = self.app.get(route_path('repo_new'), status=200)
89 90 assert_response = AssertResponse(response)
90 91 assert_response.element_contains('#repo_type', 'git')
91 92 assert_response.element_contains('#repo_type', 'svn')
92 93 assert_response.element_contains('#repo_type', 'hg')
93 94
94 95 @pytest.mark.parametrize(
95 96 "suffix", [u'', u'xxa'], ids=['', 'non-ascii'])
96 97 def test_create(self, autologin_user, backend, suffix, csrf_token):
97 98 repo_name_unicode = backend.new_repo_name(suffix=suffix)
98 99 repo_name = repo_name_unicode.encode('utf8')
99 100 description_unicode = u'description for newly created repo' + suffix
100 101 description = description_unicode.encode('utf8')
101 102 response = self.app.post(
102 103 route_path('repo_create'),
103 104 fixture._get_repo_create_params(
104 105 repo_private=False,
105 106 repo_name=repo_name,
106 107 repo_type=backend.alias,
107 108 repo_description=description,
108 109 csrf_token=csrf_token),
109 110 status=302)
110 111
111 112 self.assert_repository_is_created_correctly(
112 113 repo_name, description, backend)
113 114
114 115 def test_create_numeric_name(self, autologin_user, backend, csrf_token):
115 116 numeric_repo = '1234'
116 117 repo_name = numeric_repo
117 118 description = 'description for newly created repo' + numeric_repo
118 119 self.app.post(
119 120 route_path('repo_create'),
120 121 fixture._get_repo_create_params(
121 122 repo_private=False,
122 123 repo_name=repo_name,
123 124 repo_type=backend.alias,
124 125 repo_description=description,
125 126 csrf_token=csrf_token))
126 127
127 128 self.assert_repository_is_created_correctly(
128 129 repo_name, description, backend)
129 130
130 131 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ‡Δ™'], ids=['', 'non-ascii'])
131 132 def test_create_in_group(
132 133 self, autologin_user, backend, suffix, csrf_token):
133 134 # create GROUP
134 135 group_name = 'sometest_%s' % backend.alias
135 136 gr = RepoGroupModel().create(group_name=group_name,
136 137 group_description='test',
137 138 owner=TEST_USER_ADMIN_LOGIN)
138 139 Session().commit()
139 140
140 141 repo_name = u'ingroup' + suffix
141 142 repo_name_full = RepoGroup.url_sep().join(
142 143 [group_name, repo_name])
143 144 description = u'description for newly created repo'
144 145 self.app.post(
145 146 route_path('repo_create'),
146 147 fixture._get_repo_create_params(
147 148 repo_private=False,
148 149 repo_name=safe_str(repo_name),
149 150 repo_type=backend.alias,
150 151 repo_description=description,
151 152 repo_group=gr.group_id,
152 153 csrf_token=csrf_token))
153 154
154 155 # TODO: johbo: Cleanup work to fixture
155 156 try:
156 157 self.assert_repository_is_created_correctly(
157 158 repo_name_full, description, backend)
158 159
159 160 new_repo = RepoModel().get_by_repo_name(repo_name_full)
160 161 inherited_perms = UserRepoToPerm.query().filter(
161 162 UserRepoToPerm.repository_id == new_repo.repo_id).all()
162 163 assert len(inherited_perms) == 1
163 164 finally:
164 165 RepoModel().delete(repo_name_full)
165 166 RepoGroupModel().delete(group_name)
166 167 Session().commit()
167 168
168 169 def test_create_in_group_numeric_name(
169 170 self, autologin_user, backend, csrf_token):
170 171 # create GROUP
171 172 group_name = 'sometest_%s' % backend.alias
172 173 gr = RepoGroupModel().create(group_name=group_name,
173 174 group_description='test',
174 175 owner=TEST_USER_ADMIN_LOGIN)
175 176 Session().commit()
176 177
177 178 repo_name = '12345'
178 179 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
179 180 description = 'description for newly created repo'
180 181 self.app.post(
181 182 route_path('repo_create'),
182 183 fixture._get_repo_create_params(
183 184 repo_private=False,
184 185 repo_name=repo_name,
185 186 repo_type=backend.alias,
186 187 repo_description=description,
187 188 repo_group=gr.group_id,
188 189 csrf_token=csrf_token))
189 190
190 191 # TODO: johbo: Cleanup work to fixture
191 192 try:
192 193 self.assert_repository_is_created_correctly(
193 194 repo_name_full, description, backend)
194 195
195 196 new_repo = RepoModel().get_by_repo_name(repo_name_full)
196 197 inherited_perms = UserRepoToPerm.query()\
197 198 .filter(UserRepoToPerm.repository_id == new_repo.repo_id).all()
198 199 assert len(inherited_perms) == 1
199 200 finally:
200 201 RepoModel().delete(repo_name_full)
201 202 RepoGroupModel().delete(group_name)
202 203 Session().commit()
203 204
204 205 def test_create_in_group_without_needed_permissions(self, backend):
205 206 session = login_user_session(
206 207 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
207 208 csrf_token = auth.get_csrf_token(session)
208 209 # revoke
209 210 user_model = UserModel()
210 211 # disable fork and create on default user
211 212 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
212 213 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
213 214 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
214 215 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
215 216
216 217 # disable on regular user
217 218 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
218 219 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
219 220 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
220 221 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
221 222 Session().commit()
222 223
223 224 # create GROUP
224 225 group_name = 'reg_sometest_%s' % backend.alias
225 226 gr = RepoGroupModel().create(group_name=group_name,
226 227 group_description='test',
227 228 owner=TEST_USER_ADMIN_LOGIN)
228 229 Session().commit()
230 repo_group_id = gr.group_id
229 231
230 232 group_name_allowed = 'reg_sometest_allowed_%s' % backend.alias
231 233 gr_allowed = RepoGroupModel().create(
232 234 group_name=group_name_allowed,
233 235 group_description='test',
234 236 owner=TEST_USER_REGULAR_LOGIN)
237 allowed_repo_group_id = gr_allowed.group_id
235 238 Session().commit()
236 239
237 240 repo_name = 'ingroup'
238 241 description = 'description for newly created repo'
239 242 response = self.app.post(
240 243 route_path('repo_create'),
241 244 fixture._get_repo_create_params(
242 245 repo_private=False,
243 246 repo_name=repo_name,
244 247 repo_type=backend.alias,
245 248 repo_description=description,
246 repo_group=gr.group_id,
249 repo_group=repo_group_id,
247 250 csrf_token=csrf_token))
248 251
249 252 response.mustcontain('Invalid value')
250 253
251 254 # user is allowed to create in this group
252 255 repo_name = 'ingroup'
253 256 repo_name_full = RepoGroup.url_sep().join(
254 257 [group_name_allowed, repo_name])
255 258 description = 'description for newly created repo'
256 259 response = self.app.post(
257 260 route_path('repo_create'),
258 261 fixture._get_repo_create_params(
259 262 repo_private=False,
260 263 repo_name=repo_name,
261 264 repo_type=backend.alias,
262 265 repo_description=description,
263 repo_group=gr_allowed.group_id,
266 repo_group=allowed_repo_group_id,
264 267 csrf_token=csrf_token))
265 268
266 269 # TODO: johbo: Cleanup in pytest fixture
267 270 try:
268 271 self.assert_repository_is_created_correctly(
269 272 repo_name_full, description, backend)
270 273
271 274 new_repo = RepoModel().get_by_repo_name(repo_name_full)
272 275 inherited_perms = UserRepoToPerm.query().filter(
273 276 UserRepoToPerm.repository_id == new_repo.repo_id).all()
274 277 assert len(inherited_perms) == 1
275 278
276 279 assert repo_on_filesystem(repo_name_full)
277 280 finally:
278 281 RepoModel().delete(repo_name_full)
279 282 RepoGroupModel().delete(group_name)
280 283 RepoGroupModel().delete(group_name_allowed)
281 284 Session().commit()
282 285
283 286 def test_create_in_group_inherit_permissions(self, autologin_user, backend,
284 287 csrf_token):
285 288 # create GROUP
286 289 group_name = 'sometest_%s' % backend.alias
287 290 gr = RepoGroupModel().create(group_name=group_name,
288 291 group_description='test',
289 292 owner=TEST_USER_ADMIN_LOGIN)
290 293 perm = Permission.get_by_key('repository.write')
291 294 RepoGroupModel().grant_user_permission(
292 295 gr, TEST_USER_REGULAR_LOGIN, perm)
293 296
294 297 # add repo permissions
295 298 Session().commit()
296
299 repo_group_id = gr.group_id
297 300 repo_name = 'ingroup_inherited_%s' % backend.alias
298 301 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
299 302 description = 'description for newly created repo'
300 303 self.app.post(
301 304 route_path('repo_create'),
302 305 fixture._get_repo_create_params(
303 306 repo_private=False,
304 307 repo_name=repo_name,
305 308 repo_type=backend.alias,
306 309 repo_description=description,
307 repo_group=gr.group_id,
310 repo_group=repo_group_id,
308 311 repo_copy_permissions=True,
309 312 csrf_token=csrf_token))
310 313
311 314 # TODO: johbo: Cleanup to pytest fixture
312 315 try:
313 316 self.assert_repository_is_created_correctly(
314 317 repo_name_full, description, backend)
315 318 except Exception:
316 319 RepoGroupModel().delete(group_name)
317 320 Session().commit()
318 321 raise
319 322
320 323 # check if inherited permissions are applied
321 324 new_repo = RepoModel().get_by_repo_name(repo_name_full)
322 325 inherited_perms = UserRepoToPerm.query().filter(
323 326 UserRepoToPerm.repository_id == new_repo.repo_id).all()
324 327 assert len(inherited_perms) == 2
325 328
326 329 assert TEST_USER_REGULAR_LOGIN in [
327 330 x.user.username for x in inherited_perms]
328 331 assert 'repository.write' in [
329 332 x.permission.permission_name for x in inherited_perms]
330 333
331 334 RepoModel().delete(repo_name_full)
332 335 RepoGroupModel().delete(group_name)
333 336 Session().commit()
334 337
335 338 @pytest.mark.xfail_backends(
336 339 "git", "hg", reason="Missing reposerver support")
337 340 def test_create_with_clone_uri(self, autologin_user, backend, reposerver,
338 341 csrf_token):
339 342 source_repo = backend.create_repo(number_of_commits=2)
340 343 source_repo_name = source_repo.repo_name
341 344 reposerver.serve(source_repo.scm_instance())
342 345
343 346 repo_name = backend.new_repo_name()
344 347 response = self.app.post(
345 348 route_path('repo_create'),
346 349 fixture._get_repo_create_params(
347 350 repo_private=False,
348 351 repo_name=repo_name,
349 352 repo_type=backend.alias,
350 353 repo_description='',
351 354 clone_uri=reposerver.url,
352 355 csrf_token=csrf_token),
353 356 status=302)
354 357
355 358 # Should be redirected to the creating page
356 359 response.mustcontain('repo_creating')
357 360
358 361 # Expecting that both repositories have same history
359 362 source_repo = RepoModel().get_by_repo_name(source_repo_name)
360 363 source_vcs = source_repo.scm_instance()
361 364 repo = RepoModel().get_by_repo_name(repo_name)
362 365 repo_vcs = repo.scm_instance()
363 366 assert source_vcs[0].message == repo_vcs[0].message
364 367 assert source_vcs.count() == repo_vcs.count()
365 368 assert source_vcs.commit_ids == repo_vcs.commit_ids
366 369
367 370 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
368 371 def test_create_remote_repo_wrong_clone_uri(self, autologin_user, backend,
369 372 csrf_token):
370 373 repo_name = backend.new_repo_name()
371 374 description = 'description for newly created repo'
372 375 response = self.app.post(
373 376 route_path('repo_create'),
374 377 fixture._get_repo_create_params(
375 378 repo_private=False,
376 379 repo_name=repo_name,
377 380 repo_type=backend.alias,
378 381 repo_description=description,
379 382 clone_uri='http://repo.invalid/repo',
380 383 csrf_token=csrf_token))
381 384 response.mustcontain('invalid clone url')
382 385
383 386 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
384 387 def test_create_remote_repo_wrong_clone_uri_hg_svn(
385 388 self, autologin_user, backend, csrf_token):
386 389 repo_name = backend.new_repo_name()
387 390 description = 'description for newly created repo'
388 391 response = self.app.post(
389 392 route_path('repo_create'),
390 393 fixture._get_repo_create_params(
391 394 repo_private=False,
392 395 repo_name=repo_name,
393 396 repo_type=backend.alias,
394 397 repo_description=description,
395 398 clone_uri='svn+http://svn.invalid/repo',
396 399 csrf_token=csrf_token))
397 400 response.mustcontain('invalid clone url')
398 401
399 402 def test_create_with_git_suffix(
400 403 self, autologin_user, backend, csrf_token):
401 404 repo_name = backend.new_repo_name() + ".git"
402 405 description = 'description for newly created repo'
403 406 response = self.app.post(
404 407 route_path('repo_create'),
405 408 fixture._get_repo_create_params(
406 409 repo_private=False,
407 410 repo_name=repo_name,
408 411 repo_type=backend.alias,
409 412 repo_description=description,
410 413 csrf_token=csrf_token))
411 414 response.mustcontain('Repository name cannot end with .git')
412 415
413 416 def test_default_user_cannot_access_private_repo_in_a_group(
414 417 self, autologin_user, user_util, backend):
415 418
416 419 group = user_util.create_repo_group()
417 420
418 421 repo = backend.create_repo(
419 422 repo_private=True, repo_group=group, repo_copy_permissions=True)
420 423
421 424 permissions = _get_permission_for_user(
422 425 user='default', repo=repo.repo_name)
423 426 assert len(permissions) == 1
424 427 assert permissions[0].permission.permission_name == 'repository.none'
425 428 assert permissions[0].repository.private is True
426 429
427 430 def test_create_on_top_level_without_permissions(self, backend):
428 431 session = login_user_session(
429 432 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
430 433 csrf_token = auth.get_csrf_token(session)
431 434
432 435 # revoke
433 436 user_model = UserModel()
434 437 # disable fork and create on default user
435 438 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
436 439 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
437 440 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
438 441 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
439 442
440 443 # disable on regular user
441 444 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
442 445 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
443 446 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
444 447 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
445 448 Session().commit()
446 449
447 450 repo_name = backend.new_repo_name()
448 451 description = 'description for newly created repo'
449 452 response = self.app.post(
450 453 route_path('repo_create'),
451 454 fixture._get_repo_create_params(
452 455 repo_private=False,
453 456 repo_name=repo_name,
454 457 repo_type=backend.alias,
455 458 repo_description=description,
456 459 csrf_token=csrf_token))
457 460
458 461 response.mustcontain(
459 462 u"You do not have the permission to store repositories in "
460 463 u"the root location.")
461 464
462 465 @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
463 466 def test_create_repo_when_filesystem_op_fails(
464 467 self, autologin_user, backend, csrf_token):
465 468 repo_name = backend.new_repo_name()
466 469 description = 'description for newly created repo'
467 470
468 471 response = self.app.post(
469 472 route_path('repo_create'),
470 473 fixture._get_repo_create_params(
471 474 repo_private=False,
472 475 repo_name=repo_name,
473 476 repo_type=backend.alias,
474 477 repo_description=description,
475 478 csrf_token=csrf_token))
476 479
477 480 assert_session_flash(
478 481 response, 'Error creating repository %s' % repo_name)
479 482 # repo must not be in db
480 483 assert backend.repo is None
481 484 # repo must not be in filesystem !
482 485 assert not repo_on_filesystem(repo_name)
483 486
484 487 def assert_repository_is_created_correctly(
485 488 self, repo_name, description, backend):
486 489 repo_name_utf8 = safe_str(repo_name)
487 490
488 491 # run the check page that triggers the flash message
489 492 response = self.app.get(
490 493 route_path('repo_creating_check', repo_name=safe_str(repo_name)))
491 494 assert response.json == {u'result': True}
492 495
493 496 flash_msg = u'Created repository <a href="/{}">{}</a>'.format(
494 497 urllib.quote(repo_name_utf8), repo_name)
495 498 assert_session_flash(response, flash_msg)
496 499
497 500 # test if the repo was created in the database
498 501 new_repo = RepoModel().get_by_repo_name(repo_name)
499 502
500 503 assert new_repo.repo_name == repo_name
501 504 assert new_repo.description == description
502 505
503 506 # test if the repository is visible in the list ?
504 507 response = self.app.get(
505 508 h.route_path('repo_summary', repo_name=safe_str(repo_name)))
506 509 response.mustcontain(repo_name)
507 510 response.mustcontain(backend.alias)
508 511
509 512 assert repo_on_filesystem(repo_name)
@@ -1,170 +1,170 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import pytest
22 22
23 23 from rhodecode.model.db import UserGroup, User
24 24 from rhodecode.model.meta import Session
25 25
26 26 from rhodecode.tests import (
27 27 TestController, TEST_USER_REGULAR_LOGIN, assert_session_flash)
28 28 from rhodecode.tests.fixture import Fixture
29 29
30 30 fixture = Fixture()
31 31
32 32
33 33 def route_path(name, params=None, **kwargs):
34 34 import urllib
35 35 from rhodecode.apps._base import ADMIN_PREFIX
36 36
37 37 base_url = {
38 38 'user_groups': ADMIN_PREFIX + '/user_groups',
39 39 'user_groups_data': ADMIN_PREFIX + '/user_groups_data',
40 40 'user_group_members_data': ADMIN_PREFIX + '/user_groups/{user_group_id}/members',
41 41 'user_groups_new': ADMIN_PREFIX + '/user_groups/new',
42 42 'user_groups_create': ADMIN_PREFIX + '/user_groups/create',
43 43 'edit_user_group': ADMIN_PREFIX + '/user_groups/{user_group_id}/edit',
44 44 }[name].format(**kwargs)
45 45
46 46 if params:
47 47 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
48 48 return base_url
49 49
50 50
51 51 class TestAdminUserGroupsView(TestController):
52 52
53 53 def test_show_users(self):
54 54 self.log_user()
55 55 self.app.get(route_path('user_groups'))
56 56
57 57 def test_show_user_groups_data(self, xhr_header):
58 58 self.log_user()
59 59 response = self.app.get(route_path(
60 60 'user_groups_data'), extra_environ=xhr_header)
61 61
62 62 all_user_groups = UserGroup.query().count()
63 63 assert response.json['recordsTotal'] == all_user_groups
64 64
65 65 def test_show_user_groups_data_filtered(self, xhr_header):
66 66 self.log_user()
67 67 response = self.app.get(route_path(
68 68 'user_groups_data', params={'search[value]': 'empty_search'}),
69 69 extra_environ=xhr_header)
70 70
71 71 all_user_groups = UserGroup.query().count()
72 72 assert response.json['recordsTotal'] == all_user_groups
73 73 assert response.json['recordsFiltered'] == 0
74 74
75 75 def test_usergroup_escape(self, user_util, xhr_header):
76 76 self.log_user()
77 77
78 78 xss_img = '<img src="/image1" onload="alert(\'Hello, World!\');">'
79 79 user = user_util.create_user()
80 80 user.name = xss_img
81 81 user.lastname = xss_img
82 82 Session().add(user)
83 83 Session().commit()
84 84
85 85 user_group = user_util.create_user_group()
86 86
87 87 user_group.users_group_name = xss_img
88 88 user_group.user_group_description = '<strong onload="alert();">DESC</strong>'
89 89
90 90 response = self.app.get(
91 91 route_path('user_groups_data'), extra_environ=xhr_header)
92 92
93 93 response.mustcontain(
94 94 '&lt;strong onload=&#34;alert();&#34;&gt;DESC&lt;/strong&gt;')
95 95 response.mustcontain(
96 96 '&lt;img src=&#34;/image1&#34; onload=&#34;'
97 97 'alert(&#39;Hello, World!&#39;);&#34;&gt;')
98 98
99 99 def test_edit_user_group_autocomplete_empty_members(self, xhr_header, user_util):
100 100 self.log_user()
101 101 ug = user_util.create_user_group()
102 102 response = self.app.get(
103 103 route_path('user_group_members_data', user_group_id=ug.users_group_id),
104 104 extra_environ=xhr_header)
105 105
106 106 assert response.json == {'members': []}
107 107
108 108 def test_edit_user_group_autocomplete_members(self, xhr_header, user_util):
109 109 self.log_user()
110 110 members = [u.user_id for u in User.get_all()]
111 111 ug = user_util.create_user_group(members=members)
112 112 response = self.app.get(
113 113 route_path('user_group_members_data',
114 114 user_group_id=ug.users_group_id),
115 115 extra_environ=xhr_header)
116 116
117 117 assert len(response.json['members']) == len(members)
118 118
119 119 def test_creation_page(self):
120 120 self.log_user()
121 121 self.app.get(route_path('user_groups_new'), status=200)
122 122
123 123 def test_create(self):
124 124 from rhodecode.lib import helpers as h
125 125
126 126 self.log_user()
127 127 users_group_name = 'test_user_group'
128 128 response = self.app.post(route_path('user_groups_create'), {
129 129 'users_group_name': users_group_name,
130 130 'user_group_description': 'DESC',
131 131 'active': True,
132 132 'csrf_token': self.csrf_token})
133 133
134 134 user_group_id = UserGroup.get_by_group_name(
135 135 users_group_name).users_group_id
136 136
137 137 user_group_link = h.link_to(
138 138 users_group_name,
139 139 route_path('edit_user_group', user_group_id=user_group_id))
140 140
141 141 assert_session_flash(
142 142 response,
143 143 'Created user group %s' % user_group_link)
144 144
145 145 fixture.destroy_user_group(users_group_name)
146 146
147 147 def test_create_with_empty_name(self):
148 148 self.log_user()
149 149
150 150 response = self.app.post(route_path('user_groups_create'), {
151 151 'users_group_name': '',
152 152 'user_group_description': 'DESC',
153 153 'active': True,
154 154 'csrf_token': self.csrf_token}, status=200)
155 155
156 156 response.mustcontain('Please enter a value')
157 157
158 158 def test_create_duplicate(self, user_util):
159 159 self.log_user()
160 160
161 161 user_group = user_util.create_user_group()
162 162 duplicate_name = user_group.users_group_name
163 163 response = self.app.post(route_path('user_groups_create'), {
164 164 'users_group_name': duplicate_name,
165 165 'user_group_description': 'DESC',
166 166 'active': True,
167 167 'csrf_token': self.csrf_token}, status=200)
168 168
169 169 response.mustcontain(
170 'User group `{}` already exists'.format(user_group.users_group_name))
170 'User group `{}` already exists'.format(duplicate_name))
@@ -1,781 +1,783 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import pytest
22 22 from sqlalchemy.orm.exc import NoResultFound
23 23
24 24 from rhodecode.lib import auth
25 25 from rhodecode.lib import helpers as h
26 26 from rhodecode.model.db import User, UserApiKeys, UserEmailMap, Repository
27 27 from rhodecode.model.meta import Session
28 28 from rhodecode.model.user import UserModel
29 29
30 30 from rhodecode.tests import (
31 31 TestController, TEST_USER_REGULAR_LOGIN, assert_session_flash)
32 32 from rhodecode.tests.fixture import Fixture
33 33
34 34 fixture = Fixture()
35 35
36 36
37 37 def route_path(name, params=None, **kwargs):
38 38 import urllib
39 39 from rhodecode.apps._base import ADMIN_PREFIX
40 40
41 41 base_url = {
42 42 'users':
43 43 ADMIN_PREFIX + '/users',
44 44 'users_data':
45 45 ADMIN_PREFIX + '/users_data',
46 46 'users_create':
47 47 ADMIN_PREFIX + '/users/create',
48 48 'users_new':
49 49 ADMIN_PREFIX + '/users/new',
50 50 'user_edit':
51 51 ADMIN_PREFIX + '/users/{user_id}/edit',
52 52 'user_edit_advanced':
53 53 ADMIN_PREFIX + '/users/{user_id}/edit/advanced',
54 54 'user_edit_global_perms':
55 55 ADMIN_PREFIX + '/users/{user_id}/edit/global_permissions',
56 56 'user_edit_global_perms_update':
57 57 ADMIN_PREFIX + '/users/{user_id}/edit/global_permissions/update',
58 58 'user_update':
59 59 ADMIN_PREFIX + '/users/{user_id}/update',
60 60 'user_delete':
61 61 ADMIN_PREFIX + '/users/{user_id}/delete',
62 62 'user_force_password_reset':
63 63 ADMIN_PREFIX + '/users/{user_id}/password_reset',
64 64 'user_create_personal_repo_group':
65 65 ADMIN_PREFIX + '/users/{user_id}/create_repo_group',
66 66
67 67 'edit_user_auth_tokens':
68 68 ADMIN_PREFIX + '/users/{user_id}/edit/auth_tokens',
69 69 'edit_user_auth_tokens_add':
70 70 ADMIN_PREFIX + '/users/{user_id}/edit/auth_tokens/new',
71 71 'edit_user_auth_tokens_delete':
72 72 ADMIN_PREFIX + '/users/{user_id}/edit/auth_tokens/delete',
73 73
74 74 'edit_user_emails':
75 75 ADMIN_PREFIX + '/users/{user_id}/edit/emails',
76 76 'edit_user_emails_add':
77 77 ADMIN_PREFIX + '/users/{user_id}/edit/emails/new',
78 78 'edit_user_emails_delete':
79 79 ADMIN_PREFIX + '/users/{user_id}/edit/emails/delete',
80 80
81 81 'edit_user_ips':
82 82 ADMIN_PREFIX + '/users/{user_id}/edit/ips',
83 83 'edit_user_ips_add':
84 84 ADMIN_PREFIX + '/users/{user_id}/edit/ips/new',
85 85 'edit_user_ips_delete':
86 86 ADMIN_PREFIX + '/users/{user_id}/edit/ips/delete',
87 87
88 88 'edit_user_perms_summary':
89 89 ADMIN_PREFIX + '/users/{user_id}/edit/permissions_summary',
90 90 'edit_user_perms_summary_json':
91 91 ADMIN_PREFIX + '/users/{user_id}/edit/permissions_summary/json',
92 92
93 93 'edit_user_audit_logs':
94 94 ADMIN_PREFIX + '/users/{user_id}/edit/audit',
95 95
96 96 }[name].format(**kwargs)
97 97
98 98 if params:
99 99 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
100 100 return base_url
101 101
102 102
103 103 class TestAdminUsersView(TestController):
104 104
105 105 def test_show_users(self):
106 106 self.log_user()
107 107 self.app.get(route_path('users'))
108 108
109 109 def test_show_users_data(self, xhr_header):
110 110 self.log_user()
111 111 response = self.app.get(route_path(
112 112 'users_data'), extra_environ=xhr_header)
113 113
114 114 all_users = User.query().filter(
115 115 User.username != User.DEFAULT_USER).count()
116 116 assert response.json['recordsTotal'] == all_users
117 117
118 118 def test_show_users_data_filtered(self, xhr_header):
119 119 self.log_user()
120 120 response = self.app.get(route_path(
121 121 'users_data', params={'search[value]': 'empty_search'}),
122 122 extra_environ=xhr_header)
123 123
124 124 all_users = User.query().filter(
125 125 User.username != User.DEFAULT_USER).count()
126 126 assert response.json['recordsTotal'] == all_users
127 127 assert response.json['recordsFiltered'] == 0
128 128
129 129 def test_auth_tokens_default_user(self):
130 130 self.log_user()
131 131 user = User.get_default_user()
132 132 response = self.app.get(
133 133 route_path('edit_user_auth_tokens', user_id=user.user_id),
134 134 status=302)
135 135
136 136 def test_auth_tokens(self):
137 137 self.log_user()
138 138
139 139 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
140 user_id = user.user_id
141 auth_tokens = user.auth_tokens
140 142 response = self.app.get(
141 route_path('edit_user_auth_tokens', user_id=user.user_id))
142 for token in user.auth_tokens:
143 route_path('edit_user_auth_tokens', user_id=user_id))
144 for token in auth_tokens:
143 145 response.mustcontain(token)
144 146 response.mustcontain('never')
145 147
146 148 @pytest.mark.parametrize("desc, lifetime", [
147 149 ('forever', -1),
148 150 ('5mins', 60*5),
149 151 ('30days', 60*60*24*30),
150 152 ])
151 153 def test_add_auth_token(self, desc, lifetime, user_util):
152 154 self.log_user()
153 155 user = user_util.create_user()
154 156 user_id = user.user_id
155 157
156 158 response = self.app.post(
157 159 route_path('edit_user_auth_tokens_add', user_id=user_id),
158 160 {'description': desc, 'lifetime': lifetime,
159 161 'csrf_token': self.csrf_token})
160 162 assert_session_flash(response, 'Auth token successfully created')
161 163
162 164 response = response.follow()
163 165 user = User.get(user_id)
164 166 for auth_token in user.auth_tokens:
165 167 response.mustcontain(auth_token)
166 168
167 169 def test_delete_auth_token(self, user_util):
168 170 self.log_user()
169 171 user = user_util.create_user()
170 172 user_id = user.user_id
171 173 keys = user.auth_tokens
172 174 assert 2 == len(keys)
173 175
174 176 response = self.app.post(
175 177 route_path('edit_user_auth_tokens_add', user_id=user_id),
176 178 {'description': 'desc', 'lifetime': -1,
177 179 'csrf_token': self.csrf_token})
178 180 assert_session_flash(response, 'Auth token successfully created')
179 181 response.follow()
180 182
181 183 # now delete our key
182 184 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
183 185 assert 3 == len(keys)
184 186
185 187 response = self.app.post(
186 188 route_path('edit_user_auth_tokens_delete', user_id=user_id),
187 189 {'del_auth_token': keys[0].user_api_key_id,
188 190 'csrf_token': self.csrf_token})
189 191
190 192 assert_session_flash(response, 'Auth token successfully deleted')
191 193 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
192 194 assert 2 == len(keys)
193 195
194 196 def test_ips(self):
195 197 self.log_user()
196 198 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
197 199 response = self.app.get(route_path('edit_user_ips', user_id=user.user_id))
198 200 response.mustcontain('All IP addresses are allowed')
199 201
200 202 @pytest.mark.parametrize("test_name, ip, ip_range, failure", [
201 203 ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False),
202 204 ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False),
203 205 ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False),
204 206 ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False),
205 207 ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True),
206 208 ('127_bad_ip', 'foobar', 'foobar', True),
207 209 ])
208 210 def test_ips_add(self, user_util, test_name, ip, ip_range, failure):
209 211 self.log_user()
210 212 user = user_util.create_user(username=test_name)
211 213 user_id = user.user_id
212 214
213 215 response = self.app.post(
214 216 route_path('edit_user_ips_add', user_id=user_id),
215 217 params={'new_ip': ip, 'csrf_token': self.csrf_token})
216 218
217 219 if failure:
218 220 assert_session_flash(
219 221 response, 'Please enter a valid IPv4 or IpV6 address')
220 222 response = self.app.get(route_path('edit_user_ips', user_id=user_id))
221 223
222 224 response.mustcontain(no=[ip])
223 225 response.mustcontain(no=[ip_range])
224 226
225 227 else:
226 228 response = self.app.get(route_path('edit_user_ips', user_id=user_id))
227 229 response.mustcontain(ip)
228 230 response.mustcontain(ip_range)
229 231
230 232 def test_ips_delete(self, user_util):
231 233 self.log_user()
232 234 user = user_util.create_user()
233 235 user_id = user.user_id
234 236 ip = '127.0.0.1/32'
235 237 ip_range = '127.0.0.1 - 127.0.0.1'
236 238 new_ip = UserModel().add_extra_ip(user_id, ip)
237 239 Session().commit()
238 240 new_ip_id = new_ip.ip_id
239 241
240 242 response = self.app.get(route_path('edit_user_ips', user_id=user_id))
241 243 response.mustcontain(ip)
242 244 response.mustcontain(ip_range)
243 245
244 246 self.app.post(
245 247 route_path('edit_user_ips_delete', user_id=user_id),
246 248 params={'del_ip_id': new_ip_id, 'csrf_token': self.csrf_token})
247 249
248 250 response = self.app.get(route_path('edit_user_ips', user_id=user_id))
249 251 response.mustcontain('All IP addresses are allowed')
250 252 response.mustcontain(no=[ip])
251 253 response.mustcontain(no=[ip_range])
252 254
253 255 def test_emails(self):
254 256 self.log_user()
255 257 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
256 258 response = self.app.get(
257 259 route_path('edit_user_emails', user_id=user.user_id))
258 260 response.mustcontain('No additional emails specified')
259 261
260 262 def test_emails_add(self, user_util):
261 263 self.log_user()
262 264 user = user_util.create_user()
263 265 user_id = user.user_id
264 266
265 267 self.app.post(
266 268 route_path('edit_user_emails_add', user_id=user_id),
267 269 params={'new_email': 'example@rhodecode.com',
268 270 'csrf_token': self.csrf_token})
269 271
270 272 response = self.app.get(
271 273 route_path('edit_user_emails', user_id=user_id))
272 274 response.mustcontain('example@rhodecode.com')
273 275
274 276 def test_emails_add_existing_email(self, user_util, user_regular):
275 277 existing_email = user_regular.email
276 278
277 279 self.log_user()
278 280 user = user_util.create_user()
279 281 user_id = user.user_id
280 282
281 283 response = self.app.post(
282 284 route_path('edit_user_emails_add', user_id=user_id),
283 285 params={'new_email': existing_email,
284 286 'csrf_token': self.csrf_token})
285 287 assert_session_flash(
286 288 response, 'This e-mail address is already taken')
287 289
288 290 response = self.app.get(
289 291 route_path('edit_user_emails', user_id=user_id))
290 292 response.mustcontain(no=[existing_email])
291 293
292 294 def test_emails_delete(self, user_util):
293 295 self.log_user()
294 296 user = user_util.create_user()
295 297 user_id = user.user_id
296 298
297 299 self.app.post(
298 300 route_path('edit_user_emails_add', user_id=user_id),
299 301 params={'new_email': 'example@rhodecode.com',
300 302 'csrf_token': self.csrf_token})
301 303
302 304 response = self.app.get(
303 305 route_path('edit_user_emails', user_id=user_id))
304 306 response.mustcontain('example@rhodecode.com')
305 307
306 308 user_email = UserEmailMap.query()\
307 309 .filter(UserEmailMap.email == 'example@rhodecode.com') \
308 310 .filter(UserEmailMap.user_id == user_id)\
309 311 .one()
310 312
311 313 del_email_id = user_email.email_id
312 314 self.app.post(
313 315 route_path('edit_user_emails_delete', user_id=user_id),
314 316 params={'del_email_id': del_email_id,
315 317 'csrf_token': self.csrf_token})
316 318
317 319 response = self.app.get(
318 320 route_path('edit_user_emails', user_id=user_id))
319 321 response.mustcontain(no=['example@rhodecode.com'])
320 322
321 323
322 324 def test_create(self, request, xhr_header):
323 325 self.log_user()
324 326 username = 'newtestuser'
325 327 password = 'test12'
326 328 password_confirmation = password
327 329 name = 'name'
328 330 lastname = 'lastname'
329 331 email = 'mail@mail.com'
330 332
331 333 self.app.get(route_path('users_new'))
332 334
333 335 response = self.app.post(route_path('users_create'), params={
334 336 'username': username,
335 337 'password': password,
336 338 'password_confirmation': password_confirmation,
337 339 'firstname': name,
338 340 'active': True,
339 341 'lastname': lastname,
340 342 'extern_name': 'rhodecode',
341 343 'extern_type': 'rhodecode',
342 344 'email': email,
343 345 'csrf_token': self.csrf_token,
344 346 })
345 347 user_link = h.link_to(
346 348 username,
347 349 route_path(
348 350 'user_edit', user_id=User.get_by_username(username).user_id))
349 351 assert_session_flash(response, 'Created user %s' % (user_link,))
350 352
351 353 @request.addfinalizer
352 354 def cleanup():
353 355 fixture.destroy_user(username)
354 356 Session().commit()
355 357
356 358 new_user = User.query().filter(User.username == username).one()
357 359
358 360 assert new_user.username == username
359 361 assert auth.check_password(password, new_user.password)
360 362 assert new_user.name == name
361 363 assert new_user.lastname == lastname
362 364 assert new_user.email == email
363 365
364 366 response = self.app.get(route_path('users_data'),
365 367 extra_environ=xhr_header)
366 368 response.mustcontain(username)
367 369
368 370 def test_create_err(self):
369 371 self.log_user()
370 372 username = 'new_user'
371 373 password = ''
372 374 name = 'name'
373 375 lastname = 'lastname'
374 376 email = 'errmail.com'
375 377
376 378 self.app.get(route_path('users_new'))
377 379
378 380 response = self.app.post(route_path('users_create'), params={
379 381 'username': username,
380 382 'password': password,
381 383 'name': name,
382 384 'active': False,
383 385 'lastname': lastname,
384 386 'email': email,
385 387 'csrf_token': self.csrf_token,
386 388 })
387 389
388 390 msg = u'Username "%(username)s" is forbidden'
389 391 msg = h.html_escape(msg % {'username': 'new_user'})
390 392 response.mustcontain('<span class="error-message">%s</span>' % msg)
391 393 response.mustcontain(
392 394 '<span class="error-message">Please enter a value</span>')
393 395 response.mustcontain(
394 396 '<span class="error-message">An email address must contain a'
395 397 ' single @</span>')
396 398
397 399 def get_user():
398 400 Session().query(User).filter(User.username == username).one()
399 401
400 402 with pytest.raises(NoResultFound):
401 403 get_user()
402 404
403 405 def test_new(self):
404 406 self.log_user()
405 407 self.app.get(route_path('users_new'))
406 408
407 409 @pytest.mark.parametrize("name, attrs", [
408 410 ('firstname', {'firstname': 'new_username'}),
409 411 ('lastname', {'lastname': 'new_username'}),
410 412 ('admin', {'admin': True}),
411 413 ('admin', {'admin': False}),
412 414 ('extern_type', {'extern_type': 'ldap'}),
413 415 ('extern_type', {'extern_type': None}),
414 416 ('extern_name', {'extern_name': 'test'}),
415 417 ('extern_name', {'extern_name': None}),
416 418 ('active', {'active': False}),
417 419 ('active', {'active': True}),
418 420 ('email', {'email': 'some@email.com'}),
419 421 ('language', {'language': 'de'}),
420 422 ('language', {'language': 'en'}),
421 423 # ('new_password', {'new_password': 'foobar123',
422 424 # 'password_confirmation': 'foobar123'})
423 425 ])
424 426 def test_update(self, name, attrs, user_util):
425 427 self.log_user()
426 428 usr = user_util.create_user(
427 429 password='qweqwe',
428 430 email='testme@rhodecode.org',
429 431 extern_type='rhodecode',
430 432 extern_name='xxx',
431 433 )
432 434 user_id = usr.user_id
433 435 Session().commit()
434 436
435 437 params = usr.get_api_data()
436 438 cur_lang = params['language'] or 'en'
437 439 params.update({
438 440 'password_confirmation': '',
439 441 'new_password': '',
440 442 'language': cur_lang,
441 443 'csrf_token': self.csrf_token,
442 444 })
443 445 params.update({'new_password': ''})
444 446 params.update(attrs)
445 447 if name == 'email':
446 448 params['emails'] = [attrs['email']]
447 449 elif name == 'extern_type':
448 450 # cannot update this via form, expected value is original one
449 451 params['extern_type'] = "rhodecode"
450 452 elif name == 'extern_name':
451 453 # cannot update this via form, expected value is original one
452 454 params['extern_name'] = 'xxx'
453 455 # special case since this user is not
454 456 # logged in yet his data is not filled
455 457 # so we use creation data
456 458
457 459 response = self.app.post(
458 460 route_path('user_update', user_id=usr.user_id), params)
459 461 assert response.status_int == 302
460 462 assert_session_flash(response, 'User updated successfully')
461 463
462 464 updated_user = User.get(user_id)
463 465 updated_params = updated_user.get_api_data()
464 466 updated_params.update({'password_confirmation': ''})
465 467 updated_params.update({'new_password': ''})
466 468
467 469 del params['csrf_token']
468 470 assert params == updated_params
469 471
470 472 def test_update_and_migrate_password(
471 473 self, autologin_user, real_crypto_backend, user_util):
472 474
473 475 user = user_util.create_user()
474 476 temp_user = user.username
475 477 user.password = auth._RhodeCodeCryptoSha256().hash_create(
476 478 b'test123')
477 479 Session().add(user)
478 480 Session().commit()
479 481
480 482 params = user.get_api_data()
481 483
482 484 params.update({
483 485 'password_confirmation': 'qweqwe123',
484 486 'new_password': 'qweqwe123',
485 487 'language': 'en',
486 488 'csrf_token': autologin_user.csrf_token,
487 489 })
488 490
489 491 response = self.app.post(
490 492 route_path('user_update', user_id=user.user_id), params)
491 493 assert response.status_int == 302
492 494 assert_session_flash(response, 'User updated successfully')
493 495
494 496 # new password should be bcrypted, after log-in and transfer
495 497 user = User.get_by_username(temp_user)
496 498 assert user.password.startswith('$')
497 499
498 500 updated_user = User.get_by_username(temp_user)
499 501 updated_params = updated_user.get_api_data()
500 502 updated_params.update({'password_confirmation': 'qweqwe123'})
501 503 updated_params.update({'new_password': 'qweqwe123'})
502 504
503 505 del params['csrf_token']
504 506 assert params == updated_params
505 507
506 508 def test_delete(self):
507 509 self.log_user()
508 510 username = 'newtestuserdeleteme'
509 511
510 512 fixture.create_user(name=username)
511 513
512 514 new_user = Session().query(User)\
513 515 .filter(User.username == username).one()
514 516 response = self.app.post(
515 517 route_path('user_delete', user_id=new_user.user_id),
516 518 params={'csrf_token': self.csrf_token})
517 519
518 520 assert_session_flash(response, 'Successfully deleted user')
519 521
520 522 def test_delete_owner_of_repository(self, request, user_util):
521 523 self.log_user()
522 524 obj_name = 'test_repo'
523 525 usr = user_util.create_user()
524 526 username = usr.username
525 527 fixture.create_repo(obj_name, cur_user=usr.username)
526 528
527 529 new_user = Session().query(User)\
528 530 .filter(User.username == username).one()
529 531 response = self.app.post(
530 532 route_path('user_delete', user_id=new_user.user_id),
531 533 params={'csrf_token': self.csrf_token})
532 534
533 535 msg = 'user "%s" still owns 1 repositories and cannot be removed. ' \
534 536 'Switch owners or remove those repositories:%s' % (username,
535 537 obj_name)
536 538 assert_session_flash(response, msg)
537 539 fixture.destroy_repo(obj_name)
538 540
539 541 def test_delete_owner_of_repository_detaching(self, request, user_util):
540 542 self.log_user()
541 543 obj_name = 'test_repo'
542 544 usr = user_util.create_user(auto_cleanup=False)
543 545 username = usr.username
544 546 fixture.create_repo(obj_name, cur_user=usr.username)
545 547
546 548 new_user = Session().query(User)\
547 549 .filter(User.username == username).one()
548 550 response = self.app.post(
549 551 route_path('user_delete', user_id=new_user.user_id),
550 552 params={'user_repos': 'detach', 'csrf_token': self.csrf_token})
551 553
552 554 msg = 'Detached 1 repositories'
553 555 assert_session_flash(response, msg)
554 556 fixture.destroy_repo(obj_name)
555 557
556 558 def test_delete_owner_of_repository_deleting(self, request, user_util):
557 559 self.log_user()
558 560 obj_name = 'test_repo'
559 561 usr = user_util.create_user(auto_cleanup=False)
560 562 username = usr.username
561 563 fixture.create_repo(obj_name, cur_user=usr.username)
562 564
563 565 new_user = Session().query(User)\
564 566 .filter(User.username == username).one()
565 567 response = self.app.post(
566 568 route_path('user_delete', user_id=new_user.user_id),
567 569 params={'user_repos': 'delete', 'csrf_token': self.csrf_token})
568 570
569 571 msg = 'Deleted 1 repositories'
570 572 assert_session_flash(response, msg)
571 573
572 574 def test_delete_owner_of_repository_group(self, request, user_util):
573 575 self.log_user()
574 576 obj_name = 'test_group'
575 577 usr = user_util.create_user()
576 578 username = usr.username
577 579 fixture.create_repo_group(obj_name, cur_user=usr.username)
578 580
579 581 new_user = Session().query(User)\
580 582 .filter(User.username == username).one()
581 583 response = self.app.post(
582 584 route_path('user_delete', user_id=new_user.user_id),
583 585 params={'csrf_token': self.csrf_token})
584 586
585 587 msg = 'user "%s" still owns 1 repository groups and cannot be removed. ' \
586 588 'Switch owners or remove those repository groups:%s' % (username,
587 589 obj_name)
588 590 assert_session_flash(response, msg)
589 591 fixture.destroy_repo_group(obj_name)
590 592
591 593 def test_delete_owner_of_repository_group_detaching(self, request, user_util):
592 594 self.log_user()
593 595 obj_name = 'test_group'
594 596 usr = user_util.create_user(auto_cleanup=False)
595 597 username = usr.username
596 598 fixture.create_repo_group(obj_name, cur_user=usr.username)
597 599
598 600 new_user = Session().query(User)\
599 601 .filter(User.username == username).one()
600 602 response = self.app.post(
601 603 route_path('user_delete', user_id=new_user.user_id),
602 604 params={'user_repo_groups': 'delete', 'csrf_token': self.csrf_token})
603 605
604 606 msg = 'Deleted 1 repository groups'
605 607 assert_session_flash(response, msg)
606 608
607 609 def test_delete_owner_of_repository_group_deleting(self, request, user_util):
608 610 self.log_user()
609 611 obj_name = 'test_group'
610 612 usr = user_util.create_user(auto_cleanup=False)
611 613 username = usr.username
612 614 fixture.create_repo_group(obj_name, cur_user=usr.username)
613 615
614 616 new_user = Session().query(User)\
615 617 .filter(User.username == username).one()
616 618 response = self.app.post(
617 619 route_path('user_delete', user_id=new_user.user_id),
618 620 params={'user_repo_groups': 'detach', 'csrf_token': self.csrf_token})
619 621
620 622 msg = 'Detached 1 repository groups'
621 623 assert_session_flash(response, msg)
622 624 fixture.destroy_repo_group(obj_name)
623 625
624 626 def test_delete_owner_of_user_group(self, request, user_util):
625 627 self.log_user()
626 628 obj_name = 'test_user_group'
627 629 usr = user_util.create_user()
628 630 username = usr.username
629 631 fixture.create_user_group(obj_name, cur_user=usr.username)
630 632
631 633 new_user = Session().query(User)\
632 634 .filter(User.username == username).one()
633 635 response = self.app.post(
634 636 route_path('user_delete', user_id=new_user.user_id),
635 637 params={'csrf_token': self.csrf_token})
636 638
637 639 msg = 'user "%s" still owns 1 user groups and cannot be removed. ' \
638 640 'Switch owners or remove those user groups:%s' % (username,
639 641 obj_name)
640 642 assert_session_flash(response, msg)
641 643 fixture.destroy_user_group(obj_name)
642 644
643 645 def test_delete_owner_of_user_group_detaching(self, request, user_util):
644 646 self.log_user()
645 647 obj_name = 'test_user_group'
646 648 usr = user_util.create_user(auto_cleanup=False)
647 649 username = usr.username
648 650 fixture.create_user_group(obj_name, cur_user=usr.username)
649 651
650 652 new_user = Session().query(User)\
651 653 .filter(User.username == username).one()
652 654 try:
653 655 response = self.app.post(
654 656 route_path('user_delete', user_id=new_user.user_id),
655 657 params={'user_user_groups': 'detach',
656 658 'csrf_token': self.csrf_token})
657 659
658 660 msg = 'Detached 1 user groups'
659 661 assert_session_flash(response, msg)
660 662 finally:
661 663 fixture.destroy_user_group(obj_name)
662 664
663 665 def test_delete_owner_of_user_group_deleting(self, request, user_util):
664 666 self.log_user()
665 667 obj_name = 'test_user_group'
666 668 usr = user_util.create_user(auto_cleanup=False)
667 669 username = usr.username
668 670 fixture.create_user_group(obj_name, cur_user=usr.username)
669 671
670 672 new_user = Session().query(User)\
671 673 .filter(User.username == username).one()
672 674 response = self.app.post(
673 675 route_path('user_delete', user_id=new_user.user_id),
674 676 params={'user_user_groups': 'delete', 'csrf_token': self.csrf_token})
675 677
676 678 msg = 'Deleted 1 user groups'
677 679 assert_session_flash(response, msg)
678 680
679 681 def test_edit(self, user_util):
680 682 self.log_user()
681 683 user = user_util.create_user()
682 684 self.app.get(route_path('user_edit', user_id=user.user_id))
683 685
684 686 def test_edit_default_user_redirect(self):
685 687 self.log_user()
686 688 user = User.get_default_user()
687 689 self.app.get(route_path('user_edit', user_id=user.user_id), status=302)
688 690
689 691 @pytest.mark.parametrize(
690 692 'repo_create, repo_create_write, user_group_create, repo_group_create,'
691 693 'fork_create, inherit_default_permissions, expect_error,'
692 694 'expect_form_error', [
693 695 ('hg.create.none', 'hg.create.write_on_repogroup.false',
694 696 'hg.usergroup.create.false', 'hg.repogroup.create.false',
695 697 'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
696 698 ('hg.create.repository', 'hg.create.write_on_repogroup.false',
697 699 'hg.usergroup.create.false', 'hg.repogroup.create.false',
698 700 'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
699 701 ('hg.create.repository', 'hg.create.write_on_repogroup.true',
700 702 'hg.usergroup.create.true', 'hg.repogroup.create.true',
701 703 'hg.fork.repository', 'hg.inherit_default_perms.false', False,
702 704 False),
703 705 ('hg.create.XXX', 'hg.create.write_on_repogroup.true',
704 706 'hg.usergroup.create.true', 'hg.repogroup.create.true',
705 707 'hg.fork.repository', 'hg.inherit_default_perms.false', False,
706 708 True),
707 709 ('', '', '', '', '', '', True, False),
708 710 ])
709 711 def test_global_perms_on_user(
710 712 self, repo_create, repo_create_write, user_group_create,
711 713 repo_group_create, fork_create, expect_error, expect_form_error,
712 714 inherit_default_permissions, user_util):
713 715 self.log_user()
714 716 user = user_util.create_user()
715 717 uid = user.user_id
716 718
717 719 # ENABLE REPO CREATE ON A GROUP
718 720 perm_params = {
719 721 'inherit_default_permissions': False,
720 722 'default_repo_create': repo_create,
721 723 'default_repo_create_on_write': repo_create_write,
722 724 'default_user_group_create': user_group_create,
723 725 'default_repo_group_create': repo_group_create,
724 726 'default_fork_create': fork_create,
725 727 'default_inherit_default_permissions': inherit_default_permissions,
726 728 'csrf_token': self.csrf_token,
727 729 }
728 730 response = self.app.post(
729 731 route_path('user_edit_global_perms_update', user_id=uid),
730 732 params=perm_params)
731 733
732 734 if expect_form_error:
733 735 assert response.status_int == 200
734 736 response.mustcontain('Value must be one of')
735 737 else:
736 738 if expect_error:
737 739 msg = 'An error occurred during permissions saving'
738 740 else:
739 741 msg = 'User global permissions updated successfully'
740 742 ug = User.get(uid)
741 743 del perm_params['inherit_default_permissions']
742 744 del perm_params['csrf_token']
743 745 assert perm_params == ug.get_default_perms()
744 746 assert_session_flash(response, msg)
745 747
746 748 def test_global_permissions_initial_values(self, user_util):
747 749 self.log_user()
748 750 user = user_util.create_user()
749 751 uid = user.user_id
750 752 response = self.app.get(
751 753 route_path('user_edit_global_perms', user_id=uid))
752 754 default_user = User.get_default_user()
753 755 default_permissions = default_user.get_default_perms()
754 756 assert_response = response.assert_response()
755 757 expected_permissions = (
756 758 'default_repo_create', 'default_repo_create_on_write',
757 759 'default_fork_create', 'default_repo_group_create',
758 760 'default_user_group_create', 'default_inherit_default_permissions')
759 761 for permission in expected_permissions:
760 762 css_selector = '[name={}][checked=checked]'.format(permission)
761 763 element = assert_response.get_element(css_selector)
762 764 assert element.value == default_permissions[permission]
763 765
764 766 def test_perms_summary_page(self):
765 767 user = self.log_user()
766 768 response = self.app.get(
767 769 route_path('edit_user_perms_summary', user_id=user['user_id']))
768 770 for repo in Repository.query().all():
769 771 response.mustcontain(repo.repo_name)
770 772
771 773 def test_perms_summary_page_json(self):
772 774 user = self.log_user()
773 775 response = self.app.get(
774 776 route_path('edit_user_perms_summary_json', user_id=user['user_id']))
775 777 for repo in Repository.query().all():
776 778 response.mustcontain(repo.repo_name)
777 779
778 780 def test_audit_log_page(self):
779 781 user = self.log_user()
780 782 self.app.get(
781 783 route_path('edit_user_audit_logs', user_id=user['user_id']))
General Comments 0
You need to be logged in to leave comments. Login now