##// END OF EJS Templates
grids: fixed tests and added one for permissions to repository group.
marcink -
r4151:f04de416 default
parent child Browse files
Show More
@@ -1,512 +1,514 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import urllib
22 22
23 23 import mock
24 24 import pytest
25 25
26 26 from rhodecode.apps._base import ADMIN_PREFIX
27 27 from rhodecode.lib import auth
28 28 from rhodecode.lib.utils2 import safe_str
29 29 from rhodecode.lib import helpers as h
30 30 from rhodecode.model.db import (
31 31 Repository, RepoGroup, UserRepoToPerm, User, Permission)
32 32 from rhodecode.model.meta import Session
33 33 from rhodecode.model.repo import RepoModel
34 34 from rhodecode.model.repo_group import RepoGroupModel
35 35 from rhodecode.model.user import UserModel
36 36 from rhodecode.tests import (
37 37 login_user_session, assert_session_flash, TEST_USER_ADMIN_LOGIN,
38 38 TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
39 39 from rhodecode.tests.fixture import Fixture, error_function
40 40 from rhodecode.tests.utils import AssertResponse, repo_on_filesystem
41 41
42 42 fixture = Fixture()
43 43
44 44
45 45 def route_path(name, params=None, **kwargs):
46 46 import urllib
47 47
48 48 base_url = {
49 49 'repos': ADMIN_PREFIX + '/repos',
50 'repos_data': ADMIN_PREFIX + '/repos_data',
50 51 'repo_new': ADMIN_PREFIX + '/repos/new',
51 52 'repo_create': ADMIN_PREFIX + '/repos/create',
52 53
53 54 'repo_creating_check': '/{repo_name}/repo_creating_check',
54 55 }[name].format(**kwargs)
55 56
56 57 if params:
57 58 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
58 59 return base_url
59 60
60 61
61 62 def _get_permission_for_user(user, repo):
62 63 perm = UserRepoToPerm.query()\
63 64 .filter(UserRepoToPerm.repository ==
64 65 Repository.get_by_repo_name(repo))\
65 66 .filter(UserRepoToPerm.user == User.get_by_username(user))\
66 67 .all()
67 68 return perm
68 69
69 70
70 71 @pytest.mark.usefixtures("app")
71 72 class TestAdminRepos(object):
72 73
73 def test_repo_list(self, autologin_user, user_util):
74 def test_repo_list(self, autologin_user, user_util, xhr_header):
74 75 repo = user_util.create_repo()
75 76 repo_name = repo.repo_name
76 77 response = self.app.get(
77 route_path('repos'), status=200)
78 route_path('repos_data'), status=200,
79 extra_environ=xhr_header)
78 80
79 81 response.mustcontain(repo_name)
80 82
81 83 def test_create_page_restricted_to_single_backend(self, autologin_user, backend):
82 84 with mock.patch('rhodecode.BACKENDS', {'git': 'git'}):
83 85 response = self.app.get(route_path('repo_new'), status=200)
84 86 assert_response = response.assert_response()
85 87 element = assert_response.get_element('#repo_type')
86 88 assert element.text_content() == '\ngit\n'
87 89
88 90 def test_create_page_non_restricted_backends(self, autologin_user, backend):
89 91 response = self.app.get(route_path('repo_new'), status=200)
90 92 assert_response = response.assert_response()
91 93 assert_response.element_contains('#repo_type', 'git')
92 94 assert_response.element_contains('#repo_type', 'svn')
93 95 assert_response.element_contains('#repo_type', 'hg')
94 96
95 97 @pytest.mark.parametrize(
96 98 "suffix", [u'', u'xxa'], ids=['', 'non-ascii'])
97 99 def test_create(self, autologin_user, backend, suffix, csrf_token):
98 100 repo_name_unicode = backend.new_repo_name(suffix=suffix)
99 101 repo_name = repo_name_unicode.encode('utf8')
100 102 description_unicode = u'description for newly created repo' + suffix
101 103 description = description_unicode.encode('utf8')
102 104 response = self.app.post(
103 105 route_path('repo_create'),
104 106 fixture._get_repo_create_params(
105 107 repo_private=False,
106 108 repo_name=repo_name,
107 109 repo_type=backend.alias,
108 110 repo_description=description,
109 111 csrf_token=csrf_token),
110 112 status=302)
111 113
112 114 self.assert_repository_is_created_correctly(
113 115 repo_name, description, backend)
114 116
115 117 def test_create_numeric_name(self, autologin_user, backend, csrf_token):
116 118 numeric_repo = '1234'
117 119 repo_name = numeric_repo
118 120 description = 'description for newly created repo' + numeric_repo
119 121 self.app.post(
120 122 route_path('repo_create'),
121 123 fixture._get_repo_create_params(
122 124 repo_private=False,
123 125 repo_name=repo_name,
124 126 repo_type=backend.alias,
125 127 repo_description=description,
126 128 csrf_token=csrf_token))
127 129
128 130 self.assert_repository_is_created_correctly(
129 131 repo_name, description, backend)
130 132
131 133 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ‡Δ™'], ids=['', 'non-ascii'])
132 134 def test_create_in_group(
133 135 self, autologin_user, backend, suffix, csrf_token):
134 136 # create GROUP
135 137 group_name = 'sometest_%s' % backend.alias
136 138 gr = RepoGroupModel().create(group_name=group_name,
137 139 group_description='test',
138 140 owner=TEST_USER_ADMIN_LOGIN)
139 141 Session().commit()
140 142
141 143 repo_name = u'ingroup' + suffix
142 144 repo_name_full = RepoGroup.url_sep().join(
143 145 [group_name, repo_name])
144 146 description = u'description for newly created repo'
145 147 self.app.post(
146 148 route_path('repo_create'),
147 149 fixture._get_repo_create_params(
148 150 repo_private=False,
149 151 repo_name=safe_str(repo_name),
150 152 repo_type=backend.alias,
151 153 repo_description=description,
152 154 repo_group=gr.group_id,
153 155 csrf_token=csrf_token))
154 156
155 157 # TODO: johbo: Cleanup work to fixture
156 158 try:
157 159 self.assert_repository_is_created_correctly(
158 160 repo_name_full, description, backend)
159 161
160 162 new_repo = RepoModel().get_by_repo_name(repo_name_full)
161 163 inherited_perms = UserRepoToPerm.query().filter(
162 164 UserRepoToPerm.repository_id == new_repo.repo_id).all()
163 165 assert len(inherited_perms) == 1
164 166 finally:
165 167 RepoModel().delete(repo_name_full)
166 168 RepoGroupModel().delete(group_name)
167 169 Session().commit()
168 170
169 171 def test_create_in_group_numeric_name(
170 172 self, autologin_user, backend, csrf_token):
171 173 # create GROUP
172 174 group_name = 'sometest_%s' % backend.alias
173 175 gr = RepoGroupModel().create(group_name=group_name,
174 176 group_description='test',
175 177 owner=TEST_USER_ADMIN_LOGIN)
176 178 Session().commit()
177 179
178 180 repo_name = '12345'
179 181 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
180 182 description = 'description for newly created repo'
181 183 self.app.post(
182 184 route_path('repo_create'),
183 185 fixture._get_repo_create_params(
184 186 repo_private=False,
185 187 repo_name=repo_name,
186 188 repo_type=backend.alias,
187 189 repo_description=description,
188 190 repo_group=gr.group_id,
189 191 csrf_token=csrf_token))
190 192
191 193 # TODO: johbo: Cleanup work to fixture
192 194 try:
193 195 self.assert_repository_is_created_correctly(
194 196 repo_name_full, description, backend)
195 197
196 198 new_repo = RepoModel().get_by_repo_name(repo_name_full)
197 199 inherited_perms = UserRepoToPerm.query()\
198 200 .filter(UserRepoToPerm.repository_id == new_repo.repo_id).all()
199 201 assert len(inherited_perms) == 1
200 202 finally:
201 203 RepoModel().delete(repo_name_full)
202 204 RepoGroupModel().delete(group_name)
203 205 Session().commit()
204 206
205 207 def test_create_in_group_without_needed_permissions(self, backend):
206 208 session = login_user_session(
207 209 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
208 210 csrf_token = auth.get_csrf_token(session)
209 211 # revoke
210 212 user_model = UserModel()
211 213 # disable fork and create on default user
212 214 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
213 215 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
214 216 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
215 217 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
216 218
217 219 # disable on regular user
218 220 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
219 221 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
220 222 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
221 223 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
222 224 Session().commit()
223 225
224 226 # create GROUP
225 227 group_name = 'reg_sometest_%s' % backend.alias
226 228 gr = RepoGroupModel().create(group_name=group_name,
227 229 group_description='test',
228 230 owner=TEST_USER_ADMIN_LOGIN)
229 231 Session().commit()
230 232 repo_group_id = gr.group_id
231 233
232 234 group_name_allowed = 'reg_sometest_allowed_%s' % backend.alias
233 235 gr_allowed = RepoGroupModel().create(
234 236 group_name=group_name_allowed,
235 237 group_description='test',
236 238 owner=TEST_USER_REGULAR_LOGIN)
237 239 allowed_repo_group_id = gr_allowed.group_id
238 240 Session().commit()
239 241
240 242 repo_name = 'ingroup'
241 243 description = 'description for newly created repo'
242 244 response = self.app.post(
243 245 route_path('repo_create'),
244 246 fixture._get_repo_create_params(
245 247 repo_private=False,
246 248 repo_name=repo_name,
247 249 repo_type=backend.alias,
248 250 repo_description=description,
249 251 repo_group=repo_group_id,
250 252 csrf_token=csrf_token))
251 253
252 254 response.mustcontain('Invalid value')
253 255
254 256 # user is allowed to create in this group
255 257 repo_name = 'ingroup'
256 258 repo_name_full = RepoGroup.url_sep().join(
257 259 [group_name_allowed, repo_name])
258 260 description = 'description for newly created repo'
259 261 response = self.app.post(
260 262 route_path('repo_create'),
261 263 fixture._get_repo_create_params(
262 264 repo_private=False,
263 265 repo_name=repo_name,
264 266 repo_type=backend.alias,
265 267 repo_description=description,
266 268 repo_group=allowed_repo_group_id,
267 269 csrf_token=csrf_token))
268 270
269 271 # TODO: johbo: Cleanup in pytest fixture
270 272 try:
271 273 self.assert_repository_is_created_correctly(
272 274 repo_name_full, description, backend)
273 275
274 276 new_repo = RepoModel().get_by_repo_name(repo_name_full)
275 277 inherited_perms = UserRepoToPerm.query().filter(
276 278 UserRepoToPerm.repository_id == new_repo.repo_id).all()
277 279 assert len(inherited_perms) == 1
278 280
279 281 assert repo_on_filesystem(repo_name_full)
280 282 finally:
281 283 RepoModel().delete(repo_name_full)
282 284 RepoGroupModel().delete(group_name)
283 285 RepoGroupModel().delete(group_name_allowed)
284 286 Session().commit()
285 287
286 288 def test_create_in_group_inherit_permissions(self, autologin_user, backend,
287 289 csrf_token):
288 290 # create GROUP
289 291 group_name = 'sometest_%s' % backend.alias
290 292 gr = RepoGroupModel().create(group_name=group_name,
291 293 group_description='test',
292 294 owner=TEST_USER_ADMIN_LOGIN)
293 295 perm = Permission.get_by_key('repository.write')
294 296 RepoGroupModel().grant_user_permission(
295 297 gr, TEST_USER_REGULAR_LOGIN, perm)
296 298
297 299 # add repo permissions
298 300 Session().commit()
299 301 repo_group_id = gr.group_id
300 302 repo_name = 'ingroup_inherited_%s' % backend.alias
301 303 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
302 304 description = 'description for newly created repo'
303 305 self.app.post(
304 306 route_path('repo_create'),
305 307 fixture._get_repo_create_params(
306 308 repo_private=False,
307 309 repo_name=repo_name,
308 310 repo_type=backend.alias,
309 311 repo_description=description,
310 312 repo_group=repo_group_id,
311 313 repo_copy_permissions=True,
312 314 csrf_token=csrf_token))
313 315
314 316 # TODO: johbo: Cleanup to pytest fixture
315 317 try:
316 318 self.assert_repository_is_created_correctly(
317 319 repo_name_full, description, backend)
318 320 except Exception:
319 321 RepoGroupModel().delete(group_name)
320 322 Session().commit()
321 323 raise
322 324
323 325 # check if inherited permissions are applied
324 326 new_repo = RepoModel().get_by_repo_name(repo_name_full)
325 327 inherited_perms = UserRepoToPerm.query().filter(
326 328 UserRepoToPerm.repository_id == new_repo.repo_id).all()
327 329 assert len(inherited_perms) == 2
328 330
329 331 assert TEST_USER_REGULAR_LOGIN in [
330 332 x.user.username for x in inherited_perms]
331 333 assert 'repository.write' in [
332 334 x.permission.permission_name for x in inherited_perms]
333 335
334 336 RepoModel().delete(repo_name_full)
335 337 RepoGroupModel().delete(group_name)
336 338 Session().commit()
337 339
338 340 @pytest.mark.xfail_backends(
339 341 "git", "hg", reason="Missing reposerver support")
340 342 def test_create_with_clone_uri(self, autologin_user, backend, reposerver,
341 343 csrf_token):
342 344 source_repo = backend.create_repo(number_of_commits=2)
343 345 source_repo_name = source_repo.repo_name
344 346 reposerver.serve(source_repo.scm_instance())
345 347
346 348 repo_name = backend.new_repo_name()
347 349 response = self.app.post(
348 350 route_path('repo_create'),
349 351 fixture._get_repo_create_params(
350 352 repo_private=False,
351 353 repo_name=repo_name,
352 354 repo_type=backend.alias,
353 355 repo_description='',
354 356 clone_uri=reposerver.url,
355 357 csrf_token=csrf_token),
356 358 status=302)
357 359
358 360 # Should be redirected to the creating page
359 361 response.mustcontain('repo_creating')
360 362
361 363 # Expecting that both repositories have same history
362 364 source_repo = RepoModel().get_by_repo_name(source_repo_name)
363 365 source_vcs = source_repo.scm_instance()
364 366 repo = RepoModel().get_by_repo_name(repo_name)
365 367 repo_vcs = repo.scm_instance()
366 368 assert source_vcs[0].message == repo_vcs[0].message
367 369 assert source_vcs.count() == repo_vcs.count()
368 370 assert source_vcs.commit_ids == repo_vcs.commit_ids
369 371
370 372 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
371 373 def test_create_remote_repo_wrong_clone_uri(self, autologin_user, backend,
372 374 csrf_token):
373 375 repo_name = backend.new_repo_name()
374 376 description = 'description for newly created repo'
375 377 response = self.app.post(
376 378 route_path('repo_create'),
377 379 fixture._get_repo_create_params(
378 380 repo_private=False,
379 381 repo_name=repo_name,
380 382 repo_type=backend.alias,
381 383 repo_description=description,
382 384 clone_uri='http://repo.invalid/repo',
383 385 csrf_token=csrf_token))
384 386 response.mustcontain('invalid clone url')
385 387
386 388 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
387 389 def test_create_remote_repo_wrong_clone_uri_hg_svn(
388 390 self, autologin_user, backend, csrf_token):
389 391 repo_name = backend.new_repo_name()
390 392 description = 'description for newly created repo'
391 393 response = self.app.post(
392 394 route_path('repo_create'),
393 395 fixture._get_repo_create_params(
394 396 repo_private=False,
395 397 repo_name=repo_name,
396 398 repo_type=backend.alias,
397 399 repo_description=description,
398 400 clone_uri='svn+http://svn.invalid/repo',
399 401 csrf_token=csrf_token))
400 402 response.mustcontain('invalid clone url')
401 403
402 404 def test_create_with_git_suffix(
403 405 self, autologin_user, backend, csrf_token):
404 406 repo_name = backend.new_repo_name() + ".git"
405 407 description = 'description for newly created repo'
406 408 response = self.app.post(
407 409 route_path('repo_create'),
408 410 fixture._get_repo_create_params(
409 411 repo_private=False,
410 412 repo_name=repo_name,
411 413 repo_type=backend.alias,
412 414 repo_description=description,
413 415 csrf_token=csrf_token))
414 416 response.mustcontain('Repository name cannot end with .git')
415 417
416 418 def test_default_user_cannot_access_private_repo_in_a_group(
417 419 self, autologin_user, user_util, backend):
418 420
419 421 group = user_util.create_repo_group()
420 422
421 423 repo = backend.create_repo(
422 424 repo_private=True, repo_group=group, repo_copy_permissions=True)
423 425
424 426 permissions = _get_permission_for_user(
425 427 user='default', repo=repo.repo_name)
426 428 assert len(permissions) == 1
427 429 assert permissions[0].permission.permission_name == 'repository.none'
428 430 assert permissions[0].repository.private is True
429 431
430 432 def test_create_on_top_level_without_permissions(self, backend):
431 433 session = login_user_session(
432 434 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
433 435 csrf_token = auth.get_csrf_token(session)
434 436
435 437 # revoke
436 438 user_model = UserModel()
437 439 # disable fork and create on default user
438 440 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
439 441 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
440 442 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
441 443 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
442 444
443 445 # disable on regular user
444 446 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
445 447 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
446 448 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
447 449 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
448 450 Session().commit()
449 451
450 452 repo_name = backend.new_repo_name()
451 453 description = 'description for newly created repo'
452 454 response = self.app.post(
453 455 route_path('repo_create'),
454 456 fixture._get_repo_create_params(
455 457 repo_private=False,
456 458 repo_name=repo_name,
457 459 repo_type=backend.alias,
458 460 repo_description=description,
459 461 csrf_token=csrf_token))
460 462
461 463 response.mustcontain(
462 464 u"You do not have the permission to store repositories in "
463 465 u"the root location.")
464 466
465 467 @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
466 468 def test_create_repo_when_filesystem_op_fails(
467 469 self, autologin_user, backend, csrf_token):
468 470 repo_name = backend.new_repo_name()
469 471 description = 'description for newly created repo'
470 472
471 473 response = self.app.post(
472 474 route_path('repo_create'),
473 475 fixture._get_repo_create_params(
474 476 repo_private=False,
475 477 repo_name=repo_name,
476 478 repo_type=backend.alias,
477 479 repo_description=description,
478 480 csrf_token=csrf_token))
479 481
480 482 assert_session_flash(
481 483 response, 'Error creating repository %s' % repo_name)
482 484 # repo must not be in db
483 485 assert backend.repo is None
484 486 # repo must not be in filesystem !
485 487 assert not repo_on_filesystem(repo_name)
486 488
487 489 def assert_repository_is_created_correctly(
488 490 self, repo_name, description, backend):
489 491 repo_name_utf8 = safe_str(repo_name)
490 492
491 493 # run the check page that triggers the flash message
492 494 response = self.app.get(
493 495 route_path('repo_creating_check', repo_name=safe_str(repo_name)))
494 496 assert response.json == {u'result': True}
495 497
496 498 flash_msg = u'Created repository <a href="/{}">{}</a>'.format(
497 499 urllib.quote(repo_name_utf8), repo_name)
498 500 assert_session_flash(response, flash_msg)
499 501
500 502 # test if the repo was created in the database
501 503 new_repo = RepoModel().get_by_repo_name(repo_name)
502 504
503 505 assert new_repo.repo_name == repo_name
504 506 assert new_repo.description == description
505 507
506 508 # test if the repository is visible in the list ?
507 509 response = self.app.get(
508 510 h.route_path('repo_summary', repo_name=safe_str(repo_name)))
509 511 response.mustcontain(repo_name)
510 512 response.mustcontain(backend.alias)
511 513
512 514 assert repo_on_filesystem(repo_name)
@@ -1,194 +1,194 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import pytest
23 23
24 24 from rhodecode.apps._base import ADMIN_PREFIX
25 25 from rhodecode.lib import helpers as h
26 26 from rhodecode.model.db import Repository, UserRepoToPerm, User, RepoGroup
27 27 from rhodecode.model.meta import Session
28 28 from rhodecode.model.repo_group import RepoGroupModel
29 29 from rhodecode.tests import (
30 30 assert_session_flash, TEST_USER_REGULAR_LOGIN, TESTS_TMP_PATH)
31 31 from rhodecode.tests.fixture import Fixture
32 32
33 33 fixture = Fixture()
34 34
35 35
36 36 def route_path(name, params=None, **kwargs):
37 37 import urllib
38 38
39 39 base_url = {
40 40 'repo_groups': ADMIN_PREFIX + '/repo_groups',
41 41 'repo_groups_data': ADMIN_PREFIX + '/repo_groups_data',
42 42 'repo_group_new': ADMIN_PREFIX + '/repo_group/new',
43 43 'repo_group_create': ADMIN_PREFIX + '/repo_group/create',
44 44
45 45 }[name].format(**kwargs)
46 46
47 47 if params:
48 48 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
49 49 return base_url
50 50
51 51
52 52 def _get_permission_for_user(user, repo):
53 53 perm = UserRepoToPerm.query()\
54 54 .filter(UserRepoToPerm.repository ==
55 55 Repository.get_by_repo_name(repo))\
56 56 .filter(UserRepoToPerm.user == User.get_by_username(user))\
57 57 .all()
58 58 return perm
59 59
60 60
61 61 @pytest.mark.usefixtures("app")
62 62 class TestAdminRepositoryGroups(object):
63 63
64 64 def test_show_repo_groups(self, autologin_user):
65 65 self.app.get(route_path('repo_groups'))
66 66
67 67 def test_show_repo_groups_data(self, autologin_user, xhr_header):
68 68 response = self.app.get(route_path(
69 69 'repo_groups_data'), extra_environ=xhr_header)
70 70
71 71 all_repo_groups = RepoGroup.query().count()
72 72 assert response.json['recordsTotal'] == all_repo_groups
73 73
74 74 def test_show_repo_groups_data_filtered(self, autologin_user, xhr_header):
75 75 response = self.app.get(route_path(
76 76 'repo_groups_data', params={'search[value]': 'empty_search'}),
77 77 extra_environ=xhr_header)
78 78
79 79 all_repo_groups = RepoGroup.query().count()
80 80 assert response.json['recordsTotal'] == all_repo_groups
81 81 assert response.json['recordsFiltered'] == 0
82 82
83 83 def test_show_repo_groups_after_creating_group(self, autologin_user, xhr_header):
84 84 fixture.create_repo_group('test_repo_group')
85 85 response = self.app.get(route_path(
86 86 'repo_groups_data'), extra_environ=xhr_header)
87 response.mustcontain('"name_raw": "test_repo_group"')
87 response.mustcontain('<a href=\\"/{}/_edit\\" title=\\"Edit\\">Edit</a>'.format('test_repo_group'))
88 88 fixture.destroy_repo_group('test_repo_group')
89 89
90 90 def test_new(self, autologin_user):
91 91 self.app.get(route_path('repo_group_new'))
92 92
93 93 def test_new_with_parent_group(self, autologin_user, user_util):
94 94 gr = user_util.create_repo_group()
95 95
96 96 self.app.get(route_path('repo_group_new'),
97 97 params=dict(parent_group=gr.group_name))
98 98
99 99 def test_new_by_regular_user_no_permission(self, autologin_regular_user):
100 100 self.app.get(route_path('repo_group_new'), status=403)
101 101
102 102 @pytest.mark.parametrize('repo_group_name', [
103 103 'git_repo',
104 104 'git_repo_Δ…Δ‡',
105 105 'hg_repo',
106 106 '12345',
107 107 'hg_repo_Δ…Δ‡',
108 108 ])
109 109 def test_create(self, autologin_user, repo_group_name, csrf_token):
110 110 repo_group_name_unicode = repo_group_name.decode('utf8')
111 111 description = 'description for newly created repo group'
112 112
113 113 response = self.app.post(
114 114 route_path('repo_group_create'),
115 115 fixture._get_group_create_params(
116 116 group_name=repo_group_name,
117 117 group_description=description,
118 118 csrf_token=csrf_token))
119 119
120 120 # run the check page that triggers the flash message
121 121 repo_gr_url = h.route_path(
122 122 'repo_group_home', repo_group_name=repo_group_name)
123 123
124 124 assert_session_flash(
125 125 response,
126 126 'Created repository group <a href="%s">%s</a>' % (
127 127 repo_gr_url, repo_group_name_unicode))
128 128
129 129 # # test if the repo group was created in the database
130 130 new_repo_group = RepoGroupModel()._get_repo_group(
131 131 repo_group_name_unicode)
132 132 assert new_repo_group is not None
133 133
134 134 assert new_repo_group.group_name == repo_group_name_unicode
135 135 assert new_repo_group.group_description == description
136 136
137 137 # test if the repository is visible in the list ?
138 138 response = self.app.get(repo_gr_url)
139 139 response.mustcontain(repo_group_name)
140 140
141 141 # test if the repository group was created on filesystem
142 142 is_on_filesystem = os.path.isdir(
143 143 os.path.join(TESTS_TMP_PATH, repo_group_name))
144 144 if not is_on_filesystem:
145 145 self.fail('no repo group %s in filesystem' % repo_group_name)
146 146
147 147 RepoGroupModel().delete(repo_group_name_unicode)
148 148 Session().commit()
149 149
150 150 @pytest.mark.parametrize('repo_group_name', [
151 151 'git_repo',
152 152 'git_repo_Δ…Δ‡',
153 153 'hg_repo',
154 154 '12345',
155 155 'hg_repo_Δ…Δ‡',
156 156 ])
157 157 def test_create_subgroup(self, autologin_user, user_util, repo_group_name, csrf_token):
158 158 parent_group = user_util.create_repo_group()
159 159 parent_group_name = parent_group.group_name
160 160
161 161 expected_group_name = '{}/{}'.format(
162 162 parent_group_name, repo_group_name)
163 163 expected_group_name_unicode = expected_group_name.decode('utf8')
164 164
165 165 try:
166 166 response = self.app.post(
167 167 route_path('repo_group_create'),
168 168 fixture._get_group_create_params(
169 169 group_name=repo_group_name,
170 170 group_parent_id=parent_group.group_id,
171 171 group_description='Test desciption',
172 172 csrf_token=csrf_token))
173 173
174 174 assert_session_flash(
175 175 response,
176 176 u'Created repository group <a href="%s">%s</a>' % (
177 177 h.route_path('repo_group_home',
178 178 repo_group_name=expected_group_name),
179 179 expected_group_name_unicode))
180 180 finally:
181 181 RepoGroupModel().delete(expected_group_name_unicode)
182 182 Session().commit()
183 183
184 184 def test_user_with_creation_permissions_cannot_create_subgroups(
185 185 self, autologin_regular_user, user_util):
186 186
187 187 user_util.grant_user_permission(
188 188 TEST_USER_REGULAR_LOGIN, 'hg.repogroup.create.true')
189 189 parent_group = user_util.create_repo_group()
190 190 parent_group_id = parent_group.group_id
191 191 self.app.get(
192 192 route_path('repo_group_new',
193 193 params=dict(parent_group=parent_group_id), ),
194 194 status=403)
@@ -1,793 +1,794 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import pytest
22 22 from sqlalchemy.orm.exc import NoResultFound
23 23
24 24 from rhodecode.lib import auth
25 25 from rhodecode.lib import helpers as h
26 26 from rhodecode.model.db import User, UserApiKeys, UserEmailMap, Repository
27 27 from rhodecode.model.meta import Session
28 28 from rhodecode.model.user import UserModel
29 29
30 30 from rhodecode.tests import (
31 31 TestController, TEST_USER_REGULAR_LOGIN, assert_session_flash)
32 32 from rhodecode.tests.fixture import Fixture
33 33
34 34 fixture = Fixture()
35 35
36 36
37 37 def route_path(name, params=None, **kwargs):
38 38 import urllib
39 39 from rhodecode.apps._base import ADMIN_PREFIX
40 40
41 41 base_url = {
42 42 'users':
43 43 ADMIN_PREFIX + '/users',
44 44 'users_data':
45 45 ADMIN_PREFIX + '/users_data',
46 46 'users_create':
47 47 ADMIN_PREFIX + '/users/create',
48 48 'users_new':
49 49 ADMIN_PREFIX + '/users/new',
50 50 'user_edit':
51 51 ADMIN_PREFIX + '/users/{user_id}/edit',
52 52 'user_edit_advanced':
53 53 ADMIN_PREFIX + '/users/{user_id}/edit/advanced',
54 54 'user_edit_global_perms':
55 55 ADMIN_PREFIX + '/users/{user_id}/edit/global_permissions',
56 56 'user_edit_global_perms_update':
57 57 ADMIN_PREFIX + '/users/{user_id}/edit/global_permissions/update',
58 58 'user_update':
59 59 ADMIN_PREFIX + '/users/{user_id}/update',
60 60 'user_delete':
61 61 ADMIN_PREFIX + '/users/{user_id}/delete',
62 62 'user_create_personal_repo_group':
63 63 ADMIN_PREFIX + '/users/{user_id}/create_repo_group',
64 64
65 65 'edit_user_auth_tokens':
66 66 ADMIN_PREFIX + '/users/{user_id}/edit/auth_tokens',
67 67 'edit_user_auth_tokens_add':
68 68 ADMIN_PREFIX + '/users/{user_id}/edit/auth_tokens/new',
69 69 'edit_user_auth_tokens_delete':
70 70 ADMIN_PREFIX + '/users/{user_id}/edit/auth_tokens/delete',
71 71
72 72 'edit_user_emails':
73 73 ADMIN_PREFIX + '/users/{user_id}/edit/emails',
74 74 'edit_user_emails_add':
75 75 ADMIN_PREFIX + '/users/{user_id}/edit/emails/new',
76 76 'edit_user_emails_delete':
77 77 ADMIN_PREFIX + '/users/{user_id}/edit/emails/delete',
78 78
79 79 'edit_user_ips':
80 80 ADMIN_PREFIX + '/users/{user_id}/edit/ips',
81 81 'edit_user_ips_add':
82 82 ADMIN_PREFIX + '/users/{user_id}/edit/ips/new',
83 83 'edit_user_ips_delete':
84 84 ADMIN_PREFIX + '/users/{user_id}/edit/ips/delete',
85 85
86 86 'edit_user_perms_summary':
87 87 ADMIN_PREFIX + '/users/{user_id}/edit/permissions_summary',
88 88 'edit_user_perms_summary_json':
89 89 ADMIN_PREFIX + '/users/{user_id}/edit/permissions_summary/json',
90 90
91 91 'edit_user_audit_logs':
92 92 ADMIN_PREFIX + '/users/{user_id}/edit/audit',
93 93
94 94 'edit_user_audit_logs_download':
95 95 ADMIN_PREFIX + '/users/{user_id}/edit/audit/download',
96 96
97 97 }[name].format(**kwargs)
98 98
99 99 if params:
100 100 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
101 101 return base_url
102 102
103 103
104 104 class TestAdminUsersView(TestController):
105 105
106 106 def test_show_users(self):
107 107 self.log_user()
108 108 self.app.get(route_path('users'))
109 109
110 110 def test_show_users_data(self, xhr_header):
111 111 self.log_user()
112 112 response = self.app.get(route_path(
113 113 'users_data'), extra_environ=xhr_header)
114 114
115 115 all_users = User.query().filter(
116 116 User.username != User.DEFAULT_USER).count()
117 117 assert response.json['recordsTotal'] == all_users
118 118
119 119 def test_show_users_data_filtered(self, xhr_header):
120 120 self.log_user()
121 121 response = self.app.get(route_path(
122 122 'users_data', params={'search[value]': 'empty_search'}),
123 123 extra_environ=xhr_header)
124 124
125 125 all_users = User.query().filter(
126 126 User.username != User.DEFAULT_USER).count()
127 127 assert response.json['recordsTotal'] == all_users
128 128 assert response.json['recordsFiltered'] == 0
129 129
130 130 def test_auth_tokens_default_user(self):
131 131 self.log_user()
132 132 user = User.get_default_user()
133 133 response = self.app.get(
134 134 route_path('edit_user_auth_tokens', user_id=user.user_id),
135 135 status=302)
136 136
137 137 def test_auth_tokens(self):
138 138 self.log_user()
139 139
140 140 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
141 141 user_id = user.user_id
142 142 auth_tokens = user.auth_tokens
143 143 response = self.app.get(
144 144 route_path('edit_user_auth_tokens', user_id=user_id))
145 145 for token in auth_tokens:
146 146 response.mustcontain(token)
147 147 response.mustcontain('never')
148 148
149 149 @pytest.mark.parametrize("desc, lifetime", [
150 150 ('forever', -1),
151 151 ('5mins', 60*5),
152 152 ('30days', 60*60*24*30),
153 153 ])
154 154 def test_add_auth_token(self, desc, lifetime, user_util):
155 155 self.log_user()
156 156 user = user_util.create_user()
157 157 user_id = user.user_id
158 158
159 159 response = self.app.post(
160 160 route_path('edit_user_auth_tokens_add', user_id=user_id),
161 161 {'description': desc, 'lifetime': lifetime,
162 162 'csrf_token': self.csrf_token})
163 163 assert_session_flash(response, 'Auth token successfully created')
164 164
165 165 response = response.follow()
166 166 user = User.get(user_id)
167 167 for auth_token in user.auth_tokens:
168 168 response.mustcontain(auth_token)
169 169
170 170 def test_delete_auth_token(self, user_util):
171 171 self.log_user()
172 172 user = user_util.create_user()
173 173 user_id = user.user_id
174 174 keys = user.auth_tokens
175 175 assert 2 == len(keys)
176 176
177 177 response = self.app.post(
178 178 route_path('edit_user_auth_tokens_add', user_id=user_id),
179 179 {'description': 'desc', 'lifetime': -1,
180 180 'csrf_token': self.csrf_token})
181 181 assert_session_flash(response, 'Auth token successfully created')
182 182 response.follow()
183 183
184 184 # now delete our key
185 185 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
186 186 assert 3 == len(keys)
187 187
188 188 response = self.app.post(
189 189 route_path('edit_user_auth_tokens_delete', user_id=user_id),
190 190 {'del_auth_token': keys[0].user_api_key_id,
191 191 'csrf_token': self.csrf_token})
192 192
193 193 assert_session_flash(response, 'Auth token successfully deleted')
194 194 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
195 195 assert 2 == len(keys)
196 196
197 197 def test_ips(self):
198 198 self.log_user()
199 199 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
200 200 response = self.app.get(route_path('edit_user_ips', user_id=user.user_id))
201 201 response.mustcontain('All IP addresses are allowed')
202 202
203 203 @pytest.mark.parametrize("test_name, ip, ip_range, failure", [
204 204 ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False),
205 205 ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False),
206 206 ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False),
207 207 ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False),
208 208 ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True),
209 209 ('127_bad_ip', 'foobar', 'foobar', True),
210 210 ])
211 211 def test_ips_add(self, user_util, test_name, ip, ip_range, failure):
212 212 self.log_user()
213 213 user = user_util.create_user(username=test_name)
214 214 user_id = user.user_id
215 215
216 216 response = self.app.post(
217 217 route_path('edit_user_ips_add', user_id=user_id),
218 218 params={'new_ip': ip, 'csrf_token': self.csrf_token})
219 219
220 220 if failure:
221 221 assert_session_flash(
222 222 response, 'Please enter a valid IPv4 or IpV6 address')
223 223 response = self.app.get(route_path('edit_user_ips', user_id=user_id))
224 224
225 225 response.mustcontain(no=[ip])
226 226 response.mustcontain(no=[ip_range])
227 227
228 228 else:
229 229 response = self.app.get(route_path('edit_user_ips', user_id=user_id))
230 230 response.mustcontain(ip)
231 231 response.mustcontain(ip_range)
232 232
233 233 def test_ips_delete(self, user_util):
234 234 self.log_user()
235 235 user = user_util.create_user()
236 236 user_id = user.user_id
237 237 ip = '127.0.0.1/32'
238 238 ip_range = '127.0.0.1 - 127.0.0.1'
239 239 new_ip = UserModel().add_extra_ip(user_id, ip)
240 240 Session().commit()
241 241 new_ip_id = new_ip.ip_id
242 242
243 243 response = self.app.get(route_path('edit_user_ips', user_id=user_id))
244 244 response.mustcontain(ip)
245 245 response.mustcontain(ip_range)
246 246
247 247 self.app.post(
248 248 route_path('edit_user_ips_delete', user_id=user_id),
249 249 params={'del_ip_id': new_ip_id, 'csrf_token': self.csrf_token})
250 250
251 251 response = self.app.get(route_path('edit_user_ips', user_id=user_id))
252 252 response.mustcontain('All IP addresses are allowed')
253 253 response.mustcontain(no=[ip])
254 254 response.mustcontain(no=[ip_range])
255 255
256 256 def test_emails(self):
257 257 self.log_user()
258 258 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
259 259 response = self.app.get(
260 260 route_path('edit_user_emails', user_id=user.user_id))
261 261 response.mustcontain('No additional emails specified')
262 262
263 263 def test_emails_add(self, user_util):
264 264 self.log_user()
265 265 user = user_util.create_user()
266 266 user_id = user.user_id
267 267
268 268 self.app.post(
269 269 route_path('edit_user_emails_add', user_id=user_id),
270 270 params={'new_email': 'example@rhodecode.com',
271 271 'csrf_token': self.csrf_token})
272 272
273 273 response = self.app.get(
274 274 route_path('edit_user_emails', user_id=user_id))
275 275 response.mustcontain('example@rhodecode.com')
276 276
277 277 def test_emails_add_existing_email(self, user_util, user_regular):
278 278 existing_email = user_regular.email
279 279
280 280 self.log_user()
281 281 user = user_util.create_user()
282 282 user_id = user.user_id
283 283
284 284 response = self.app.post(
285 285 route_path('edit_user_emails_add', user_id=user_id),
286 286 params={'new_email': existing_email,
287 287 'csrf_token': self.csrf_token})
288 288 assert_session_flash(
289 289 response, 'This e-mail address is already taken')
290 290
291 291 response = self.app.get(
292 292 route_path('edit_user_emails', user_id=user_id))
293 293 response.mustcontain(no=[existing_email])
294 294
295 295 def test_emails_delete(self, user_util):
296 296 self.log_user()
297 297 user = user_util.create_user()
298 298 user_id = user.user_id
299 299
300 300 self.app.post(
301 301 route_path('edit_user_emails_add', user_id=user_id),
302 302 params={'new_email': 'example@rhodecode.com',
303 303 'csrf_token': self.csrf_token})
304 304
305 305 response = self.app.get(
306 306 route_path('edit_user_emails', user_id=user_id))
307 307 response.mustcontain('example@rhodecode.com')
308 308
309 309 user_email = UserEmailMap.query()\
310 310 .filter(UserEmailMap.email == 'example@rhodecode.com') \
311 311 .filter(UserEmailMap.user_id == user_id)\
312 312 .one()
313 313
314 314 del_email_id = user_email.email_id
315 315 self.app.post(
316 316 route_path('edit_user_emails_delete', user_id=user_id),
317 317 params={'del_email_id': del_email_id,
318 318 'csrf_token': self.csrf_token})
319 319
320 320 response = self.app.get(
321 321 route_path('edit_user_emails', user_id=user_id))
322 322 response.mustcontain(no=['example@rhodecode.com'])
323 323
324 324 def test_create(self, request, xhr_header):
325 325 self.log_user()
326 326 username = 'newtestuser'
327 327 password = 'test12'
328 328 password_confirmation = password
329 329 name = 'name'
330 330 lastname = 'lastname'
331 331 email = 'mail@mail.com'
332 332
333 333 self.app.get(route_path('users_new'))
334 334
335 335 response = self.app.post(route_path('users_create'), params={
336 336 'username': username,
337 337 'password': password,
338 338 'description': 'mr CTO',
339 339 'password_confirmation': password_confirmation,
340 340 'firstname': name,
341 341 'active': True,
342 342 'lastname': lastname,
343 343 'extern_name': 'rhodecode',
344 344 'extern_type': 'rhodecode',
345 345 'email': email,
346 346 'csrf_token': self.csrf_token,
347 347 })
348 348 user_link = h.link_to(
349 349 username,
350 350 route_path(
351 351 'user_edit', user_id=User.get_by_username(username).user_id))
352 352 assert_session_flash(response, 'Created user %s' % (user_link,))
353 353
354 354 @request.addfinalizer
355 355 def cleanup():
356 356 fixture.destroy_user(username)
357 357 Session().commit()
358 358
359 359 new_user = User.query().filter(User.username == username).one()
360 360
361 361 assert new_user.username == username
362 362 assert auth.check_password(password, new_user.password)
363 363 assert new_user.name == name
364 364 assert new_user.lastname == lastname
365 365 assert new_user.email == email
366 366
367 367 response = self.app.get(route_path('users_data'),
368 368 extra_environ=xhr_header)
369 369 response.mustcontain(username)
370 370
371 371 def test_create_err(self):
372 372 self.log_user()
373 373 username = 'new_user'
374 374 password = ''
375 375 name = 'name'
376 376 lastname = 'lastname'
377 377 email = 'errmail.com'
378 378
379 379 self.app.get(route_path('users_new'))
380 380
381 381 response = self.app.post(route_path('users_create'), params={
382 382 'username': username,
383 383 'password': password,
384 384 'name': name,
385 385 'active': False,
386 386 'lastname': lastname,
387 387 'description': 'mr CTO',
388 388 'email': email,
389 389 'csrf_token': self.csrf_token,
390 390 })
391 391
392 392 msg = u'Username "%(username)s" is forbidden'
393 393 msg = h.html_escape(msg % {'username': 'new_user'})
394 394 response.mustcontain('<span class="error-message">%s</span>' % msg)
395 395 response.mustcontain(
396 396 '<span class="error-message">Please enter a value</span>')
397 397 response.mustcontain(
398 398 '<span class="error-message">An email address must contain a'
399 399 ' single @</span>')
400 400
401 401 def get_user():
402 402 Session().query(User).filter(User.username == username).one()
403 403
404 404 with pytest.raises(NoResultFound):
405 405 get_user()
406 406
407 407 def test_new(self):
408 408 self.log_user()
409 409 self.app.get(route_path('users_new'))
410 410
411 411 @pytest.mark.parametrize("name, attrs", [
412 412 ('firstname', {'firstname': 'new_username'}),
413 413 ('lastname', {'lastname': 'new_username'}),
414 414 ('admin', {'admin': True}),
415 415 ('admin', {'admin': False}),
416 416 ('extern_type', {'extern_type': 'ldap'}),
417 417 ('extern_type', {'extern_type': None}),
418 418 ('extern_name', {'extern_name': 'test'}),
419 419 ('extern_name', {'extern_name': None}),
420 420 ('active', {'active': False}),
421 421 ('active', {'active': True}),
422 422 ('email', {'email': 'some@email.com'}),
423 423 ('language', {'language': 'de'}),
424 424 ('language', {'language': 'en'}),
425 425 ('description', {'description': 'hello CTO'}),
426 426 # ('new_password', {'new_password': 'foobar123',
427 427 # 'password_confirmation': 'foobar123'})
428 428 ])
429 429 def test_update(self, name, attrs, user_util):
430 430 self.log_user()
431 431 usr = user_util.create_user(
432 432 password='qweqwe',
433 433 email='testme@rhodecode.org',
434 434 extern_type='rhodecode',
435 435 extern_name='xxx',
436 436 )
437 437 user_id = usr.user_id
438 438 Session().commit()
439 439
440 440 params = usr.get_api_data()
441 441 cur_lang = params['language'] or 'en'
442 442 params.update({
443 443 'password_confirmation': '',
444 444 'new_password': '',
445 445 'language': cur_lang,
446 446 'csrf_token': self.csrf_token,
447 447 })
448 448 params.update({'new_password': ''})
449 449 params.update(attrs)
450 450 if name == 'email':
451 451 params['emails'] = [attrs['email']]
452 452 elif name == 'extern_type':
453 453 # cannot update this via form, expected value is original one
454 454 params['extern_type'] = "rhodecode"
455 455 elif name == 'extern_name':
456 456 # cannot update this via form, expected value is original one
457 457 params['extern_name'] = 'xxx'
458 458 # special case since this user is not
459 459 # logged in yet his data is not filled
460 460 # so we use creation data
461 461
462 462 response = self.app.post(
463 463 route_path('user_update', user_id=usr.user_id), params)
464 464 assert response.status_int == 302
465 465 assert_session_flash(response, 'User updated successfully')
466 466
467 467 updated_user = User.get(user_id)
468 468 updated_params = updated_user.get_api_data()
469 469 updated_params.update({'password_confirmation': ''})
470 470 updated_params.update({'new_password': ''})
471 471
472 472 del params['csrf_token']
473 473 assert params == updated_params
474 474
475 475 def test_update_and_migrate_password(
476 476 self, autologin_user, real_crypto_backend, user_util):
477 477
478 478 user = user_util.create_user()
479 479 temp_user = user.username
480 480 user.password = auth._RhodeCodeCryptoSha256().hash_create(
481 481 b'test123')
482 482 Session().add(user)
483 483 Session().commit()
484 484
485 485 params = user.get_api_data()
486 486
487 487 params.update({
488 488 'password_confirmation': 'qweqwe123',
489 489 'new_password': 'qweqwe123',
490 490 'language': 'en',
491 491 'csrf_token': autologin_user.csrf_token,
492 492 })
493 493
494 494 response = self.app.post(
495 495 route_path('user_update', user_id=user.user_id), params)
496 496 assert response.status_int == 302
497 497 assert_session_flash(response, 'User updated successfully')
498 498
499 499 # new password should be bcrypted, after log-in and transfer
500 500 user = User.get_by_username(temp_user)
501 501 assert user.password.startswith('$')
502 502
503 503 updated_user = User.get_by_username(temp_user)
504 504 updated_params = updated_user.get_api_data()
505 505 updated_params.update({'password_confirmation': 'qweqwe123'})
506 506 updated_params.update({'new_password': 'qweqwe123'})
507 507
508 508 del params['csrf_token']
509 509 assert params == updated_params
510 510
511 511 def test_delete(self):
512 512 self.log_user()
513 513 username = 'newtestuserdeleteme'
514 514
515 515 fixture.create_user(name=username)
516 516
517 517 new_user = Session().query(User)\
518 518 .filter(User.username == username).one()
519 519 response = self.app.post(
520 520 route_path('user_delete', user_id=new_user.user_id),
521 521 params={'csrf_token': self.csrf_token})
522 522
523 523 assert_session_flash(response, 'Successfully deleted user `{}`'.format(username))
524 524
525 525 def test_delete_owner_of_repository(self, request, user_util):
526 526 self.log_user()
527 527 obj_name = 'test_repo'
528 528 usr = user_util.create_user()
529 529 username = usr.username
530 530 fixture.create_repo(obj_name, cur_user=usr.username)
531 531
532 532 new_user = Session().query(User)\
533 533 .filter(User.username == username).one()
534 534 response = self.app.post(
535 535 route_path('user_delete', user_id=new_user.user_id),
536 536 params={'csrf_token': self.csrf_token})
537 537
538 538 msg = 'user "%s" still owns 1 repositories and cannot be removed. ' \
539 539 'Switch owners or remove those repositories:%s' % (username, obj_name)
540 540 assert_session_flash(response, msg)
541 541 fixture.destroy_repo(obj_name)
542 542
543 543 def test_delete_owner_of_repository_detaching(self, request, user_util):
544 544 self.log_user()
545 545 obj_name = 'test_repo'
546 546 usr = user_util.create_user(auto_cleanup=False)
547 547 username = usr.username
548 548 fixture.create_repo(obj_name, cur_user=usr.username)
549 Session().commit()
549 550
550 551 new_user = Session().query(User)\
551 552 .filter(User.username == username).one()
552 553 response = self.app.post(
553 554 route_path('user_delete', user_id=new_user.user_id),
554 555 params={'user_repos': 'detach', 'csrf_token': self.csrf_token})
555 556
556 557 msg = 'Detached 1 repositories'
557 558 assert_session_flash(response, msg)
558 559 fixture.destroy_repo(obj_name)
559 560
560 561 def test_delete_owner_of_repository_deleting(self, request, user_util):
561 562 self.log_user()
562 563 obj_name = 'test_repo'
563 564 usr = user_util.create_user(auto_cleanup=False)
564 565 username = usr.username
565 566 fixture.create_repo(obj_name, cur_user=usr.username)
566 567
567 568 new_user = Session().query(User)\
568 569 .filter(User.username == username).one()
569 570 response = self.app.post(
570 571 route_path('user_delete', user_id=new_user.user_id),
571 572 params={'user_repos': 'delete', 'csrf_token': self.csrf_token})
572 573
573 574 msg = 'Deleted 1 repositories'
574 575 assert_session_flash(response, msg)
575 576
576 577 def test_delete_owner_of_repository_group(self, request, user_util):
577 578 self.log_user()
578 579 obj_name = 'test_group'
579 580 usr = user_util.create_user()
580 581 username = usr.username
581 582 fixture.create_repo_group(obj_name, cur_user=usr.username)
582 583
583 584 new_user = Session().query(User)\
584 585 .filter(User.username == username).one()
585 586 response = self.app.post(
586 587 route_path('user_delete', user_id=new_user.user_id),
587 588 params={'csrf_token': self.csrf_token})
588 589
589 590 msg = 'user "%s" still owns 1 repository groups and cannot be removed. ' \
590 591 'Switch owners or remove those repository groups:%s' % (username, obj_name)
591 592 assert_session_flash(response, msg)
592 593 fixture.destroy_repo_group(obj_name)
593 594
594 595 def test_delete_owner_of_repository_group_detaching(self, request, user_util):
595 596 self.log_user()
596 597 obj_name = 'test_group'
597 598 usr = user_util.create_user(auto_cleanup=False)
598 599 username = usr.username
599 600 fixture.create_repo_group(obj_name, cur_user=usr.username)
600 601
601 602 new_user = Session().query(User)\
602 603 .filter(User.username == username).one()
603 604 response = self.app.post(
604 605 route_path('user_delete', user_id=new_user.user_id),
605 606 params={'user_repo_groups': 'delete', 'csrf_token': self.csrf_token})
606 607
607 608 msg = 'Deleted 1 repository groups'
608 609 assert_session_flash(response, msg)
609 610
610 611 def test_delete_owner_of_repository_group_deleting(self, request, user_util):
611 612 self.log_user()
612 613 obj_name = 'test_group'
613 614 usr = user_util.create_user(auto_cleanup=False)
614 615 username = usr.username
615 616 fixture.create_repo_group(obj_name, cur_user=usr.username)
616 617
617 618 new_user = Session().query(User)\
618 619 .filter(User.username == username).one()
619 620 response = self.app.post(
620 621 route_path('user_delete', user_id=new_user.user_id),
621 622 params={'user_repo_groups': 'detach', 'csrf_token': self.csrf_token})
622 623
623 624 msg = 'Detached 1 repository groups'
624 625 assert_session_flash(response, msg)
625 626 fixture.destroy_repo_group(obj_name)
626 627
627 628 def test_delete_owner_of_user_group(self, request, user_util):
628 629 self.log_user()
629 630 obj_name = 'test_user_group'
630 631 usr = user_util.create_user()
631 632 username = usr.username
632 633 fixture.create_user_group(obj_name, cur_user=usr.username)
633 634
634 635 new_user = Session().query(User)\
635 636 .filter(User.username == username).one()
636 637 response = self.app.post(
637 638 route_path('user_delete', user_id=new_user.user_id),
638 639 params={'csrf_token': self.csrf_token})
639 640
640 641 msg = 'user "%s" still owns 1 user groups and cannot be removed. ' \
641 642 'Switch owners or remove those user groups:%s' % (username, obj_name)
642 643 assert_session_flash(response, msg)
643 644 fixture.destroy_user_group(obj_name)
644 645
645 646 def test_delete_owner_of_user_group_detaching(self, request, user_util):
646 647 self.log_user()
647 648 obj_name = 'test_user_group'
648 649 usr = user_util.create_user(auto_cleanup=False)
649 650 username = usr.username
650 651 fixture.create_user_group(obj_name, cur_user=usr.username)
651 652
652 653 new_user = Session().query(User)\
653 654 .filter(User.username == username).one()
654 655 try:
655 656 response = self.app.post(
656 657 route_path('user_delete', user_id=new_user.user_id),
657 658 params={'user_user_groups': 'detach',
658 659 'csrf_token': self.csrf_token})
659 660
660 661 msg = 'Detached 1 user groups'
661 662 assert_session_flash(response, msg)
662 663 finally:
663 664 fixture.destroy_user_group(obj_name)
664 665
665 666 def test_delete_owner_of_user_group_deleting(self, request, user_util):
666 667 self.log_user()
667 668 obj_name = 'test_user_group'
668 669 usr = user_util.create_user(auto_cleanup=False)
669 670 username = usr.username
670 671 fixture.create_user_group(obj_name, cur_user=usr.username)
671 672
672 673 new_user = Session().query(User)\
673 674 .filter(User.username == username).one()
674 675 response = self.app.post(
675 676 route_path('user_delete', user_id=new_user.user_id),
676 677 params={'user_user_groups': 'delete', 'csrf_token': self.csrf_token})
677 678
678 679 msg = 'Deleted 1 user groups'
679 680 assert_session_flash(response, msg)
680 681
681 682 def test_edit(self, user_util):
682 683 self.log_user()
683 684 user = user_util.create_user()
684 685 self.app.get(route_path('user_edit', user_id=user.user_id))
685 686
686 687 def test_edit_default_user_redirect(self):
687 688 self.log_user()
688 689 user = User.get_default_user()
689 690 self.app.get(route_path('user_edit', user_id=user.user_id), status=302)
690 691
691 692 @pytest.mark.parametrize(
692 693 'repo_create, repo_create_write, user_group_create, repo_group_create,'
693 694 'fork_create, inherit_default_permissions, expect_error,'
694 695 'expect_form_error', [
695 696 ('hg.create.none', 'hg.create.write_on_repogroup.false',
696 697 'hg.usergroup.create.false', 'hg.repogroup.create.false',
697 698 'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
698 699 ('hg.create.repository', 'hg.create.write_on_repogroup.false',
699 700 'hg.usergroup.create.false', 'hg.repogroup.create.false',
700 701 'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
701 702 ('hg.create.repository', 'hg.create.write_on_repogroup.true',
702 703 'hg.usergroup.create.true', 'hg.repogroup.create.true',
703 704 'hg.fork.repository', 'hg.inherit_default_perms.false', False,
704 705 False),
705 706 ('hg.create.XXX', 'hg.create.write_on_repogroup.true',
706 707 'hg.usergroup.create.true', 'hg.repogroup.create.true',
707 708 'hg.fork.repository', 'hg.inherit_default_perms.false', False,
708 709 True),
709 710 ('', '', '', '', '', '', True, False),
710 711 ])
711 712 def test_global_perms_on_user(
712 713 self, repo_create, repo_create_write, user_group_create,
713 714 repo_group_create, fork_create, expect_error, expect_form_error,
714 715 inherit_default_permissions, user_util):
715 716 self.log_user()
716 717 user = user_util.create_user()
717 718 uid = user.user_id
718 719
719 720 # ENABLE REPO CREATE ON A GROUP
720 721 perm_params = {
721 722 'inherit_default_permissions': False,
722 723 'default_repo_create': repo_create,
723 724 'default_repo_create_on_write': repo_create_write,
724 725 'default_user_group_create': user_group_create,
725 726 'default_repo_group_create': repo_group_create,
726 727 'default_fork_create': fork_create,
727 728 'default_inherit_default_permissions': inherit_default_permissions,
728 729 'csrf_token': self.csrf_token,
729 730 }
730 731 response = self.app.post(
731 732 route_path('user_edit_global_perms_update', user_id=uid),
732 733 params=perm_params)
733 734
734 735 if expect_form_error:
735 736 assert response.status_int == 200
736 737 response.mustcontain('Value must be one of')
737 738 else:
738 739 if expect_error:
739 740 msg = 'An error occurred during permissions saving'
740 741 else:
741 742 msg = 'User global permissions updated successfully'
742 743 ug = User.get(uid)
743 744 del perm_params['inherit_default_permissions']
744 745 del perm_params['csrf_token']
745 746 assert perm_params == ug.get_default_perms()
746 747 assert_session_flash(response, msg)
747 748
748 749 def test_global_permissions_initial_values(self, user_util):
749 750 self.log_user()
750 751 user = user_util.create_user()
751 752 uid = user.user_id
752 753 response = self.app.get(
753 754 route_path('user_edit_global_perms', user_id=uid))
754 755 default_user = User.get_default_user()
755 756 default_permissions = default_user.get_default_perms()
756 757 assert_response = response.assert_response()
757 758 expected_permissions = (
758 759 'default_repo_create', 'default_repo_create_on_write',
759 760 'default_fork_create', 'default_repo_group_create',
760 761 'default_user_group_create', 'default_inherit_default_permissions')
761 762 for permission in expected_permissions:
762 763 css_selector = '[name={}][checked=checked]'.format(permission)
763 764 element = assert_response.get_element(css_selector)
764 765 assert element.value == default_permissions[permission]
765 766
766 767 def test_perms_summary_page(self):
767 768 user = self.log_user()
768 769 response = self.app.get(
769 770 route_path('edit_user_perms_summary', user_id=user['user_id']))
770 771 for repo in Repository.query().all():
771 772 response.mustcontain(repo.repo_name)
772 773
773 774 def test_perms_summary_page_json(self):
774 775 user = self.log_user()
775 776 response = self.app.get(
776 777 route_path('edit_user_perms_summary_json', user_id=user['user_id']))
777 778 for repo in Repository.query().all():
778 779 response.mustcontain(repo.repo_name)
779 780
780 781 def test_audit_log_page(self):
781 782 user = self.log_user()
782 783 self.app.get(
783 784 route_path('edit_user_audit_logs', user_id=user['user_id']))
784 785
785 786 def test_audit_log_page_download(self):
786 787 user = self.log_user()
787 788 user_id = user['user_id']
788 789 response = self.app.get(
789 790 route_path('edit_user_audit_logs_download', user_id=user_id))
790 791
791 792 assert response.content_disposition == \
792 793 'attachment; filename=user_{}_audit_logs.json'.format(user_id)
793 794 assert response.content_type == "application/json"
@@ -1,127 +1,179 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21
22 22 import pytest
23 23
24 24 import rhodecode
25 from rhodecode.model.db import Repository
25 from rhodecode.model.db import Repository, RepoGroup, User
26 26 from rhodecode.model.meta import Session
27 27 from rhodecode.model.repo import RepoModel
28 28 from rhodecode.model.repo_group import RepoGroupModel
29 29 from rhodecode.model.settings import SettingsModel
30 30 from rhodecode.tests import TestController
31 31 from rhodecode.tests.fixture import Fixture
32 32 from rhodecode.lib import helpers as h
33 33
34 34 fixture = Fixture()
35 35
36 36
37 37 def route_path(name, **kwargs):
38 38 return {
39 39 'home': '/',
40 'main_page_repos_data': '/_home_repos',
41 'main_page_repo_groups_data': '/_home_repo_groups',
40 42 'repo_group_home': '/{repo_group_name}'
41 43 }[name].format(**kwargs)
42 44
43 45
44 46 class TestHomeController(TestController):
45 47
46 48 def test_index(self):
47 49 self.log_user()
48 50 response = self.app.get(route_path('home'))
49 51 # if global permission is set
50 52 response.mustcontain('New Repository')
51 53
54 def test_index_grid_repos(self, xhr_header):
55 self.log_user()
56 response = self.app.get(route_path('main_page_repos_data'), extra_environ=xhr_header)
52 57 # search for objects inside the JavaScript JSON
53 for repo in Repository.getAll():
54 response.mustcontain('"name_raw": "%s"' % repo.repo_name)
58 for obj in Repository.getAll():
59 response.mustcontain('<a href=\\"/{}\\">'.format(obj.repo_name))
60
61 def test_index_grid_repo_groups(self, xhr_header):
62 self.log_user()
63 response = self.app.get(route_path('main_page_repo_groups_data'),
64 extra_environ=xhr_header,)
65
66 # search for objects inside the JavaScript JSON
67 for obj in RepoGroup.getAll():
68 response.mustcontain('<a href=\\"/{}\\">'.format(obj.group_name))
69
70 def test_index_grid_repo_groups_without_access(self, xhr_header, user_util):
71 user = user_util.create_user(password='qweqwe')
72 group_ok = user_util.create_repo_group(owner=user)
73 group_id_ok = group_ok.group_id
74
75 group_forbidden = user_util.create_repo_group(owner=User.get_first_super_admin())
76 group_id_forbidden = group_forbidden.group_id
77
78 user_util.grant_user_permission_to_repo_group(group_forbidden, user, 'group.none')
79 self.log_user(user.username, 'qweqwe')
80
81 self.app.get(route_path('main_page_repo_groups_data'),
82 extra_environ=xhr_header,
83 params={'repo_group_id': group_id_ok}, status=200)
84
85 self.app.get(route_path('main_page_repo_groups_data'),
86 extra_environ=xhr_header,
87 params={'repo_group_id': group_id_forbidden}, status=404)
55 88
56 89 def test_index_contains_statics_with_ver(self):
57 90 from rhodecode.lib.base import calculate_version_hash
58 91
59 92 self.log_user()
60 93 response = self.app.get(route_path('home'))
61 94
62 95 rhodecode_version_hash = calculate_version_hash(
63 96 {'beaker.session.secret': 'test-rc-uytcxaz'})
64 97 response.mustcontain('style.css?ver={0}'.format(rhodecode_version_hash))
65 98 response.mustcontain('scripts.min.js?ver={0}'.format(rhodecode_version_hash))
66 99
67 def test_index_contains_backend_specific_details(self, backend):
100 def test_index_contains_backend_specific_details(self, backend, xhr_header):
68 101 self.log_user()
69 response = self.app.get(route_path('home'))
102 response = self.app.get(route_path('main_page_repos_data'), extra_environ=xhr_header)
70 103 tip = backend.repo.get_commit().raw_id
71 104
72 105 # html in javascript variable:
73 106 response.mustcontain(r'<i class=\"icon-%s\"' % (backend.alias, ))
74 107 response.mustcontain(r'href=\"/%s\"' % (backend.repo_name, ))
75 108
76 109 response.mustcontain("""/%s/changeset/%s""" % (backend.repo_name, tip))
77 110 response.mustcontain("""Added a symlink""")
78 111
79 112 def test_index_with_anonymous_access_disabled(self):
80 113 with fixture.anon_access(False):
81 114 response = self.app.get(route_path('home'), status=302)
82 115 assert 'login' in response.location
83 116
84 def test_index_page_on_groups(self, autologin_user, repo_group):
85 response = self.app.get(route_path('repo_group_home', repo_group_name='gr1'))
86 response.mustcontain("gr1/repo_in_group")
117 def test_index_page_on_groups_with_wrong_group_id(self, autologin_user, xhr_header):
118 group_id = 918123
119 self.app.get(
120 route_path('main_page_repo_groups_data'),
121 params={'repo_group_id': group_id},
122 status=404, extra_environ=xhr_header)
87 123
88 def test_index_page_on_group_with_trailing_slash(
89 self, autologin_user, repo_group):
90 response = self.app.get(route_path('repo_group_home', repo_group_name='gr1') + '/')
91 response.mustcontain("gr1/repo_in_group")
124 def test_index_page_on_groups(self, autologin_user, user_util, xhr_header):
125 gr = user_util.create_repo_group()
126 repo = user_util.create_repo(parent=gr)
127 repo_name = repo.repo_name
128 group_id = gr.group_id
129
130 response = self.app.get(route_path(
131 'repo_group_home', repo_group_name=gr.group_name))
132 response.mustcontain('d.repo_group_id = {}'.format(group_id))
92 133
93 @pytest.fixture(scope='class')
94 def repo_group(self, request):
95 gr = fixture.create_repo_group('gr1')
96 fixture.create_repo(name='gr1/repo_in_group', repo_group=gr)
134 response = self.app.get(
135 route_path('main_page_repos_data'),
136 params={'repo_group_id': group_id},
137 extra_environ=xhr_header,)
138 response.mustcontain(repo_name)
97 139
98 @request.addfinalizer
99 def cleanup():
100 RepoModel().delete('gr1/repo_in_group')
101 RepoGroupModel().delete(repo_group='gr1', force_delete=True)
102 Session().commit()
140 def test_index_page_on_group_with_trailing_slash(self, autologin_user, user_util, xhr_header):
141 gr = user_util.create_repo_group()
142 repo = user_util.create_repo(parent=gr)
143 repo_name = repo.repo_name
144 group_id = gr.group_id
145
146 response = self.app.get(route_path(
147 'repo_group_home', repo_group_name=gr.group_name+'/'))
148 response.mustcontain('d.repo_group_id = {}'.format(group_id))
149
150 response = self.app.get(
151 route_path('main_page_repos_data'),
152 params={'repo_group_id': group_id},
153 extra_environ=xhr_header, )
154 response.mustcontain(repo_name)
103 155
104 156 @pytest.mark.parametrize("name, state", [
105 157 ('Disabled', False),
106 158 ('Enabled', True),
107 159 ])
108 160 def test_index_show_version(self, autologin_user, name, state):
109 161 version_string = 'RhodeCode Enterprise %s' % rhodecode.__version__
110 162
111 163 sett = SettingsModel().create_or_update_setting(
112 164 'show_version', state, 'bool')
113 165 Session().add(sett)
114 166 Session().commit()
115 167 SettingsModel().invalidate_settings_cache()
116 168
117 169 response = self.app.get(route_path('home'))
118 170 if state is True:
119 171 response.mustcontain(version_string)
120 172 if state is False:
121 173 response.mustcontain(no=[version_string])
122 174
123 175 def test_logout_form_contains_csrf(self, autologin_user, csrf_token):
124 176 response = self.app.get(route_path('home'))
125 177 assert_response = response.assert_response()
126 178 element = assert_response.get_element('.logout [name=csrf_token]')
127 179 assert element.value == csrf_token
@@ -1,823 +1,822 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import re
22 22 import logging
23 23 import collections
24 24
25 25 from pyramid.httpexceptions import HTTPNotFound
26 26 from pyramid.view import view_config
27 27
28 28 from rhodecode.apps._base import BaseAppView, DataGridAppView
29 29 from rhodecode.lib import helpers as h
30 30 from rhodecode.lib.auth import (
31 31 LoginRequired, NotAnonymous, HasRepoGroupPermissionAnyDecorator, CSRFRequired,
32 HasRepoGroupPermissionAny)
32 HasRepoGroupPermissionAny, AuthUser)
33 33 from rhodecode.lib.codeblocks import filenode_as_lines_tokens
34 34 from rhodecode.lib.index import searcher_from_config
35 35 from rhodecode.lib.utils2 import safe_unicode, str2bool, safe_int
36 36 from rhodecode.lib.vcs.nodes import FileNode
37 37 from rhodecode.model.db import (
38 38 func, true, or_, case, in_filter_generator, Session,
39 39 Repository, RepoGroup, User, UserGroup)
40 40 from rhodecode.model.repo import RepoModel
41 41 from rhodecode.model.repo_group import RepoGroupModel
42 42 from rhodecode.model.user import UserModel
43 43 from rhodecode.model.user_group import UserGroupModel
44 44
45 45 log = logging.getLogger(__name__)
46 46
47 47
48 48 class HomeView(BaseAppView, DataGridAppView):
49 49
50 50 def load_default_context(self):
51 51 c = self._get_local_tmpl_context()
52 52 c.user = c.auth_user.get_instance()
53 53
54 54 return c
55 55
56 56 @LoginRequired()
57 57 @view_config(
58 58 route_name='user_autocomplete_data', request_method='GET',
59 59 renderer='json_ext', xhr=True)
60 60 def user_autocomplete_data(self):
61 61 self.load_default_context()
62 62 query = self.request.GET.get('query')
63 63 active = str2bool(self.request.GET.get('active') or True)
64 64 include_groups = str2bool(self.request.GET.get('user_groups'))
65 65 expand_groups = str2bool(self.request.GET.get('user_groups_expand'))
66 66 skip_default_user = str2bool(self.request.GET.get('skip_default_user'))
67 67
68 68 log.debug('generating user list, query:%s, active:%s, with_groups:%s',
69 69 query, active, include_groups)
70 70
71 71 _users = UserModel().get_users(
72 72 name_contains=query, only_active=active)
73 73
74 74 def maybe_skip_default_user(usr):
75 75 if skip_default_user and usr['username'] == UserModel.cls.DEFAULT_USER:
76 76 return False
77 77 return True
78 78 _users = filter(maybe_skip_default_user, _users)
79 79
80 80 if include_groups:
81 81 # extend with user groups
82 82 _user_groups = UserGroupModel().get_user_groups(
83 83 name_contains=query, only_active=active,
84 84 expand_groups=expand_groups)
85 85 _users = _users + _user_groups
86 86
87 87 return {'suggestions': _users}
88 88
89 89 @LoginRequired()
90 90 @NotAnonymous()
91 91 @view_config(
92 92 route_name='user_group_autocomplete_data', request_method='GET',
93 93 renderer='json_ext', xhr=True)
94 94 def user_group_autocomplete_data(self):
95 95 self.load_default_context()
96 96 query = self.request.GET.get('query')
97 97 active = str2bool(self.request.GET.get('active') or True)
98 98 expand_groups = str2bool(self.request.GET.get('user_groups_expand'))
99 99
100 100 log.debug('generating user group list, query:%s, active:%s',
101 101 query, active)
102 102
103 103 _user_groups = UserGroupModel().get_user_groups(
104 104 name_contains=query, only_active=active,
105 105 expand_groups=expand_groups)
106 106 _user_groups = _user_groups
107 107
108 108 return {'suggestions': _user_groups}
109 109
110 110 def _get_repo_list(self, name_contains=None, repo_type=None, repo_group_name='', limit=20):
111 111 org_query = name_contains
112 112 allowed_ids = self._rhodecode_user.repo_acl_ids(
113 113 ['repository.read', 'repository.write', 'repository.admin'],
114 114 cache=False, name_filter=name_contains) or [-1]
115 115
116 116 query = Repository.query()\
117 117 .filter(Repository.archived.isnot(true()))\
118 118 .filter(or_(
119 119 # generate multiple IN to fix limitation problems
120 120 *in_filter_generator(Repository.repo_id, allowed_ids)
121 121 ))
122 122
123 123 query = query.order_by(case(
124 124 [
125 125 (Repository.repo_name.startswith(repo_group_name), repo_group_name+'/'),
126 126 ],
127 127 ))
128 128 query = query.order_by(func.length(Repository.repo_name))
129 129 query = query.order_by(Repository.repo_name)
130 130
131 131 if repo_type:
132 132 query = query.filter(Repository.repo_type == repo_type)
133 133
134 134 if name_contains:
135 135 ilike_expression = u'%{}%'.format(safe_unicode(name_contains))
136 136 query = query.filter(
137 137 Repository.repo_name.ilike(ilike_expression))
138 138 query = query.limit(limit)
139 139
140 140 acl_iter = query
141 141
142 142 return [
143 143 {
144 144 'id': obj.repo_name,
145 145 'value': org_query,
146 146 'value_display': obj.repo_name,
147 147 'text': obj.repo_name,
148 148 'type': 'repo',
149 149 'repo_id': obj.repo_id,
150 150 'repo_type': obj.repo_type,
151 151 'private': obj.private,
152 152 'url': h.route_path('repo_summary', repo_name=obj.repo_name)
153 153 }
154 154 for obj in acl_iter]
155 155
156 156 def _get_repo_group_list(self, name_contains=None, repo_group_name='', limit=20):
157 157 org_query = name_contains
158 158 allowed_ids = self._rhodecode_user.repo_group_acl_ids(
159 159 ['group.read', 'group.write', 'group.admin'],
160 160 cache=False, name_filter=name_contains) or [-1]
161 161
162 162 query = RepoGroup.query()\
163 163 .filter(or_(
164 164 # generate multiple IN to fix limitation problems
165 165 *in_filter_generator(RepoGroup.group_id, allowed_ids)
166 166 ))
167 167
168 168 query = query.order_by(case(
169 169 [
170 170 (RepoGroup.group_name.startswith(repo_group_name), repo_group_name+'/'),
171 171 ],
172 172 ))
173 173 query = query.order_by(func.length(RepoGroup.group_name))
174 174 query = query.order_by(RepoGroup.group_name)
175 175
176 176 if name_contains:
177 177 ilike_expression = u'%{}%'.format(safe_unicode(name_contains))
178 178 query = query.filter(
179 179 RepoGroup.group_name.ilike(ilike_expression))
180 180 query = query.limit(limit)
181 181
182 182 acl_iter = query
183 183
184 184 return [
185 185 {
186 186 'id': obj.group_name,
187 187 'value': org_query,
188 188 'value_display': obj.group_name,
189 189 'text': obj.group_name,
190 190 'type': 'repo_group',
191 191 'repo_group_id': obj.group_id,
192 192 'url': h.route_path(
193 193 'repo_group_home', repo_group_name=obj.group_name)
194 194 }
195 195 for obj in acl_iter]
196 196
197 197 def _get_user_list(self, name_contains=None, limit=20):
198 198 org_query = name_contains
199 199 if not name_contains:
200 200 return [], False
201 201
202 202 # TODO(marcink): should all logged in users be allowed to search others?
203 203 allowed_user_search = self._rhodecode_user.username != User.DEFAULT_USER
204 204 if not allowed_user_search:
205 205 return [], False
206 206
207 207 name_contains = re.compile('(?:user:[ ]?)(.+)').findall(name_contains)
208 208 if len(name_contains) != 1:
209 209 return [], False
210 210
211 211 name_contains = name_contains[0]
212 212
213 213 query = User.query()\
214 214 .order_by(func.length(User.username))\
215 215 .order_by(User.username) \
216 216 .filter(User.username != User.DEFAULT_USER)
217 217
218 218 if name_contains:
219 219 ilike_expression = u'%{}%'.format(safe_unicode(name_contains))
220 220 query = query.filter(
221 221 User.username.ilike(ilike_expression))
222 222 query = query.limit(limit)
223 223
224 224 acl_iter = query
225 225
226 226 return [
227 227 {
228 228 'id': obj.user_id,
229 229 'value': org_query,
230 230 'value_display': 'user: `{}`'.format(obj.username),
231 231 'type': 'user',
232 232 'icon_link': h.gravatar_url(obj.email, 30),
233 233 'url': h.route_path(
234 234 'user_profile', username=obj.username)
235 235 }
236 236 for obj in acl_iter], True
237 237
238 238 def _get_user_groups_list(self, name_contains=None, limit=20):
239 239 org_query = name_contains
240 240 if not name_contains:
241 241 return [], False
242 242
243 243 # TODO(marcink): should all logged in users be allowed to search others?
244 244 allowed_user_search = self._rhodecode_user.username != User.DEFAULT_USER
245 245 if not allowed_user_search:
246 246 return [], False
247 247
248 248 name_contains = re.compile('(?:user_group:[ ]?)(.+)').findall(name_contains)
249 249 if len(name_contains) != 1:
250 250 return [], False
251 251
252 252 name_contains = name_contains[0]
253 253
254 254 query = UserGroup.query()\
255 255 .order_by(func.length(UserGroup.users_group_name))\
256 256 .order_by(UserGroup.users_group_name)
257 257
258 258 if name_contains:
259 259 ilike_expression = u'%{}%'.format(safe_unicode(name_contains))
260 260 query = query.filter(
261 261 UserGroup.users_group_name.ilike(ilike_expression))
262 262 query = query.limit(limit)
263 263
264 264 acl_iter = query
265 265
266 266 return [
267 267 {
268 268 'id': obj.users_group_id,
269 269 'value': org_query,
270 270 'value_display': 'user_group: `{}`'.format(obj.users_group_name),
271 271 'type': 'user_group',
272 272 'url': h.route_path(
273 273 'user_group_profile', user_group_name=obj.users_group_name)
274 274 }
275 275 for obj in acl_iter], True
276 276
277 277 def _get_hash_commit_list(self, auth_user, searcher, query, repo=None, repo_group=None):
278 278 repo_name = repo_group_name = None
279 279 if repo:
280 280 repo_name = repo.repo_name
281 281 if repo_group:
282 282 repo_group_name = repo_group.group_name
283 283
284 284 org_query = query
285 285 if not query or len(query) < 3 or not searcher:
286 286 return [], False
287 287
288 288 commit_hashes = re.compile('(?:commit:[ ]?)([0-9a-f]{2,40})').findall(query)
289 289
290 290 if len(commit_hashes) != 1:
291 291 return [], False
292 292
293 293 commit_hash = commit_hashes[0]
294 294
295 295 result = searcher.search(
296 296 'commit_id:{}*'.format(commit_hash), 'commit', auth_user,
297 297 repo_name, repo_group_name, raise_on_exc=False)
298 298
299 299 commits = []
300 300 for entry in result['results']:
301 301 repo_data = {
302 302 'repository_id': entry.get('repository_id'),
303 303 'repository_type': entry.get('repo_type'),
304 304 'repository_name': entry.get('repository'),
305 305 }
306 306
307 307 commit_entry = {
308 308 'id': entry['commit_id'],
309 309 'value': org_query,
310 310 'value_display': '`{}` commit: {}'.format(
311 311 entry['repository'], entry['commit_id']),
312 312 'type': 'commit',
313 313 'repo': entry['repository'],
314 314 'repo_data': repo_data,
315 315
316 316 'url': h.route_path(
317 317 'repo_commit',
318 318 repo_name=entry['repository'], commit_id=entry['commit_id'])
319 319 }
320 320
321 321 commits.append(commit_entry)
322 322 return commits, True
323 323
324 324 def _get_path_list(self, auth_user, searcher, query, repo=None, repo_group=None):
325 325 repo_name = repo_group_name = None
326 326 if repo:
327 327 repo_name = repo.repo_name
328 328 if repo_group:
329 329 repo_group_name = repo_group.group_name
330 330
331 331 org_query = query
332 332 if not query or len(query) < 3 or not searcher:
333 333 return [], False
334 334
335 335 paths_re = re.compile('(?:file:[ ]?)(.+)').findall(query)
336 336 if len(paths_re) != 1:
337 337 return [], False
338 338
339 339 file_path = paths_re[0]
340 340
341 341 search_path = searcher.escape_specials(file_path)
342 342 result = searcher.search(
343 343 'file.raw:*{}*'.format(search_path), 'path', auth_user,
344 344 repo_name, repo_group_name, raise_on_exc=False)
345 345
346 346 files = []
347 347 for entry in result['results']:
348 348 repo_data = {
349 349 'repository_id': entry.get('repository_id'),
350 350 'repository_type': entry.get('repo_type'),
351 351 'repository_name': entry.get('repository'),
352 352 }
353 353
354 354 file_entry = {
355 355 'id': entry['commit_id'],
356 356 'value': org_query,
357 357 'value_display': '`{}` file: {}'.format(
358 358 entry['repository'], entry['file']),
359 359 'type': 'file',
360 360 'repo': entry['repository'],
361 361 'repo_data': repo_data,
362 362
363 363 'url': h.route_path(
364 364 'repo_files',
365 365 repo_name=entry['repository'], commit_id=entry['commit_id'],
366 366 f_path=entry['file'])
367 367 }
368 368
369 369 files.append(file_entry)
370 370 return files, True
371 371
372 372 @LoginRequired()
373 373 @view_config(
374 374 route_name='repo_list_data', request_method='GET',
375 375 renderer='json_ext', xhr=True)
376 376 def repo_list_data(self):
377 377 _ = self.request.translate
378 378 self.load_default_context()
379 379
380 380 query = self.request.GET.get('query')
381 381 repo_type = self.request.GET.get('repo_type')
382 382 log.debug('generating repo list, query:%s, repo_type:%s',
383 383 query, repo_type)
384 384
385 385 res = []
386 386 repos = self._get_repo_list(query, repo_type=repo_type)
387 387 if repos:
388 388 res.append({
389 389 'text': _('Repositories'),
390 390 'children': repos
391 391 })
392 392
393 393 data = {
394 394 'more': False,
395 395 'results': res
396 396 }
397 397 return data
398 398
399 399 @LoginRequired()
400 400 @view_config(
401 401 route_name='repo_group_list_data', request_method='GET',
402 402 renderer='json_ext', xhr=True)
403 403 def repo_group_list_data(self):
404 404 _ = self.request.translate
405 405 self.load_default_context()
406 406
407 407 query = self.request.GET.get('query')
408 408
409 409 log.debug('generating repo group list, query:%s',
410 410 query)
411 411
412 412 res = []
413 413 repo_groups = self._get_repo_group_list(query)
414 414 if repo_groups:
415 415 res.append({
416 416 'text': _('Repository Groups'),
417 417 'children': repo_groups
418 418 })
419 419
420 420 data = {
421 421 'more': False,
422 422 'results': res
423 423 }
424 424 return data
425 425
426 426 def _get_default_search_queries(self, search_context, searcher, query):
427 427 if not searcher:
428 428 return []
429 429
430 430 is_es_6 = searcher.is_es_6
431 431
432 432 queries = []
433 433 repo_group_name, repo_name, repo_context = None, None, None
434 434
435 435 # repo group context
436 436 if search_context.get('search_context[repo_group_name]'):
437 437 repo_group_name = search_context.get('search_context[repo_group_name]')
438 438 if search_context.get('search_context[repo_name]'):
439 439 repo_name = search_context.get('search_context[repo_name]')
440 440 repo_context = search_context.get('search_context[repo_view_type]')
441 441
442 442 if is_es_6 and repo_name:
443 443 # files
444 444 def query_modifier():
445 445 qry = query
446 446 return {'q': qry, 'type': 'content'}
447 447
448 448 label = u'File search for `{}`'.format(h.escape(query))
449 449 file_qry = {
450 450 'id': -10,
451 451 'value': query,
452 452 'value_display': label,
453 453 'value_icon': '<i class="icon-code"></i>',
454 454 'type': 'search',
455 455 'subtype': 'repo',
456 456 'url': h.route_path('search_repo',
457 457 repo_name=repo_name,
458 458 _query=query_modifier())
459 459 }
460 460
461 461 # commits
462 462 def query_modifier():
463 463 qry = query
464 464 return {'q': qry, 'type': 'commit'}
465 465
466 466 label = u'Commit search for `{}`'.format(h.escape(query))
467 467 commit_qry = {
468 468 'id': -20,
469 469 'value': query,
470 470 'value_display': label,
471 471 'value_icon': '<i class="icon-history"></i>',
472 472 'type': 'search',
473 473 'subtype': 'repo',
474 474 'url': h.route_path('search_repo',
475 475 repo_name=repo_name,
476 476 _query=query_modifier())
477 477 }
478 478
479 479 if repo_context in ['commit', 'commits']:
480 480 queries.extend([commit_qry, file_qry])
481 481 elif repo_context in ['files', 'summary']:
482 482 queries.extend([file_qry, commit_qry])
483 483 else:
484 484 queries.extend([commit_qry, file_qry])
485 485
486 486 elif is_es_6 and repo_group_name:
487 487 # files
488 488 def query_modifier():
489 489 qry = query
490 490 return {'q': qry, 'type': 'content'}
491 491
492 492 label = u'File search for `{}`'.format(query)
493 493 file_qry = {
494 494 'id': -30,
495 495 'value': query,
496 496 'value_display': label,
497 497 'value_icon': '<i class="icon-code"></i>',
498 498 'type': 'search',
499 499 'subtype': 'repo_group',
500 500 'url': h.route_path('search_repo_group',
501 501 repo_group_name=repo_group_name,
502 502 _query=query_modifier())
503 503 }
504 504
505 505 # commits
506 506 def query_modifier():
507 507 qry = query
508 508 return {'q': qry, 'type': 'commit'}
509 509
510 510 label = u'Commit search for `{}`'.format(query)
511 511 commit_qry = {
512 512 'id': -40,
513 513 'value': query,
514 514 'value_display': label,
515 515 'value_icon': '<i class="icon-history"></i>',
516 516 'type': 'search',
517 517 'subtype': 'repo_group',
518 518 'url': h.route_path('search_repo_group',
519 519 repo_group_name=repo_group_name,
520 520 _query=query_modifier())
521 521 }
522 522
523 523 if repo_context in ['commit', 'commits']:
524 524 queries.extend([commit_qry, file_qry])
525 525 elif repo_context in ['files', 'summary']:
526 526 queries.extend([file_qry, commit_qry])
527 527 else:
528 528 queries.extend([commit_qry, file_qry])
529 529
530 530 # Global, not scoped
531 531 if not queries:
532 532 queries.append(
533 533 {
534 534 'id': -1,
535 535 'value': query,
536 536 'value_display': u'File search for: `{}`'.format(query),
537 537 'value_icon': '<i class="icon-code"></i>',
538 538 'type': 'search',
539 539 'subtype': 'global',
540 540 'url': h.route_path('search',
541 541 _query={'q': query, 'type': 'content'})
542 542 })
543 543 queries.append(
544 544 {
545 545 'id': -2,
546 546 'value': query,
547 547 'value_display': u'Commit search for: `{}`'.format(query),
548 548 'value_icon': '<i class="icon-history"></i>',
549 549 'type': 'search',
550 550 'subtype': 'global',
551 551 'url': h.route_path('search',
552 552 _query={'q': query, 'type': 'commit'})
553 553 })
554 554
555 555 return queries
556 556
557 557 @LoginRequired()
558 558 @view_config(
559 559 route_name='goto_switcher_data', request_method='GET',
560 560 renderer='json_ext', xhr=True)
561 561 def goto_switcher_data(self):
562 562 c = self.load_default_context()
563 563
564 564 _ = self.request.translate
565 565
566 566 query = self.request.GET.get('query')
567 567 log.debug('generating main filter data, query %s', query)
568 568
569 569 res = []
570 570 if not query:
571 571 return {'suggestions': res}
572 572
573 573 def no_match(name):
574 574 return {
575 575 'id': -1,
576 576 'value': "",
577 577 'value_display': name,
578 578 'type': 'text',
579 579 'url': ""
580 580 }
581 581 searcher = searcher_from_config(self.request.registry.settings)
582 582 has_specialized_search = False
583 583
584 584 # set repo context
585 585 repo = None
586 586 repo_id = safe_int(self.request.GET.get('search_context[repo_id]'))
587 587 if repo_id:
588 588 repo = Repository.get(repo_id)
589 589
590 590 # set group context
591 591 repo_group = None
592 592 repo_group_id = safe_int(self.request.GET.get('search_context[repo_group_id]'))
593 593 if repo_group_id:
594 594 repo_group = RepoGroup.get(repo_group_id)
595 595 prefix_match = False
596 596
597 597 # user: type search
598 598 if not prefix_match:
599 599 users, prefix_match = self._get_user_list(query)
600 600 if users:
601 601 has_specialized_search = True
602 602 for serialized_user in users:
603 603 res.append(serialized_user)
604 604 elif prefix_match:
605 605 has_specialized_search = True
606 606 res.append(no_match('No matching users found'))
607 607
608 608 # user_group: type search
609 609 if not prefix_match:
610 610 user_groups, prefix_match = self._get_user_groups_list(query)
611 611 if user_groups:
612 612 has_specialized_search = True
613 613 for serialized_user_group in user_groups:
614 614 res.append(serialized_user_group)
615 615 elif prefix_match:
616 616 has_specialized_search = True
617 617 res.append(no_match('No matching user groups found'))
618 618
619 619 # FTS commit: type search
620 620 if not prefix_match:
621 621 commits, prefix_match = self._get_hash_commit_list(
622 622 c.auth_user, searcher, query, repo, repo_group)
623 623 if commits:
624 624 has_specialized_search = True
625 625 unique_repos = collections.OrderedDict()
626 626 for commit in commits:
627 627 repo_name = commit['repo']
628 628 unique_repos.setdefault(repo_name, []).append(commit)
629 629
630 630 for _repo, commits in unique_repos.items():
631 631 for commit in commits:
632 632 res.append(commit)
633 633 elif prefix_match:
634 634 has_specialized_search = True
635 635 res.append(no_match('No matching commits found'))
636 636
637 637 # FTS file: type search
638 638 if not prefix_match:
639 639 paths, prefix_match = self._get_path_list(
640 640 c.auth_user, searcher, query, repo, repo_group)
641 641 if paths:
642 642 has_specialized_search = True
643 643 unique_repos = collections.OrderedDict()
644 644 for path in paths:
645 645 repo_name = path['repo']
646 646 unique_repos.setdefault(repo_name, []).append(path)
647 647
648 648 for repo, paths in unique_repos.items():
649 649 for path in paths:
650 650 res.append(path)
651 651 elif prefix_match:
652 652 has_specialized_search = True
653 653 res.append(no_match('No matching files found'))
654 654
655 655 # main suggestions
656 656 if not has_specialized_search:
657 657 repo_group_name = ''
658 658 if repo_group:
659 659 repo_group_name = repo_group.group_name
660 660
661 661 for _q in self._get_default_search_queries(self.request.GET, searcher, query):
662 662 res.append(_q)
663 663
664 664 repo_groups = self._get_repo_group_list(query, repo_group_name=repo_group_name)
665 665 for serialized_repo_group in repo_groups:
666 666 res.append(serialized_repo_group)
667 667
668 668 repos = self._get_repo_list(query, repo_group_name=repo_group_name)
669 669 for serialized_repo in repos:
670 670 res.append(serialized_repo)
671 671
672 672 if not repos and not repo_groups:
673 673 res.append(no_match('No matches found'))
674 674
675 675 return {'suggestions': res}
676 676
677 677 @LoginRequired()
678 678 @view_config(
679 679 route_name='home', request_method='GET',
680 680 renderer='rhodecode:templates/index.mako')
681 681 def main_page(self):
682 682 c = self.load_default_context()
683 683 c.repo_group = None
684 684 return self._get_template_context(c)
685 685
686 686 def _main_page_repo_groups_data(self, repo_group_id):
687 687 column_map = {
688 688 'name': 'group_name_hash',
689 689 'desc': 'group_description',
690 690 'last_change': 'updated_on',
691 691 'owner': 'user_username',
692 692 }
693 693 draw, start, limit = self._extract_chunk(self.request)
694 694 search_q, order_by, order_dir = self._extract_ordering(
695 695 self.request, column_map=column_map)
696 696 return RepoGroupModel().get_repo_groups_data_table(
697 697 draw, start, limit,
698 698 search_q, order_by, order_dir,
699 699 self._rhodecode_user, repo_group_id)
700 700
701 701 def _main_page_repos_data(self, repo_group_id):
702 702 column_map = {
703 703 'name': 'repo_name',
704 704 'desc': 'description',
705 705 'last_change': 'updated_on',
706 706 'owner': 'user_username',
707 707 }
708 708 draw, start, limit = self._extract_chunk(self.request)
709 709 search_q, order_by, order_dir = self._extract_ordering(
710 710 self.request, column_map=column_map)
711 711 return RepoModel().get_repos_data_table(
712 712 draw, start, limit,
713 713 search_q, order_by, order_dir,
714 714 self._rhodecode_user, repo_group_id)
715 715
716 716 @LoginRequired()
717 717 @view_config(
718 718 route_name='main_page_repo_groups_data',
719 719 request_method='GET', renderer='json_ext', xhr=True)
720 720 def main_page_repo_groups_data(self):
721 721 self.load_default_context()
722 722 repo_group_id = safe_int(self.request.GET.get('repo_group_id'))
723 723
724 724 if repo_group_id:
725 725 group = RepoGroup.get_or_404(repo_group_id)
726 _perms = ['group.read', 'group.write', 'group.admin']
726 _perms = AuthUser.repo_group_read_perms
727 727 if not HasRepoGroupPermissionAny(*_perms)(
728 728 group.group_name, 'user is allowed to list repo group children'):
729 729 raise HTTPNotFound()
730 730
731 731 return self._main_page_repo_groups_data(repo_group_id)
732 732
733 733 @LoginRequired()
734 734 @view_config(
735 735 route_name='main_page_repos_data',
736 736 request_method='GET', renderer='json_ext', xhr=True)
737 737 def main_page_repos_data(self):
738 738 self.load_default_context()
739 739 repo_group_id = safe_int(self.request.GET.get('repo_group_id'))
740 740
741 741 if repo_group_id:
742 742 group = RepoGroup.get_or_404(repo_group_id)
743 _perms = ['group.read', 'group.write', 'group.admin']
743 _perms = AuthUser.repo_group_read_perms
744 744 if not HasRepoGroupPermissionAny(*_perms)(
745 745 group.group_name, 'user is allowed to list repo group children'):
746 746 raise HTTPNotFound()
747 747
748 748 return self._main_page_repos_data(repo_group_id)
749 749
750 750 @LoginRequired()
751 @HasRepoGroupPermissionAnyDecorator(
752 'group.read', 'group.write', 'group.admin')
751 @HasRepoGroupPermissionAnyDecorator(*AuthUser.repo_group_read_perms)
753 752 @view_config(
754 753 route_name='repo_group_home', request_method='GET',
755 754 renderer='rhodecode:templates/index_repo_group.mako')
756 755 @view_config(
757 756 route_name='repo_group_home_slash', request_method='GET',
758 757 renderer='rhodecode:templates/index_repo_group.mako')
759 758 def repo_group_main_page(self):
760 759 c = self.load_default_context()
761 760 c.repo_group = self.request.db_repo_group
762 761 return self._get_template_context(c)
763 762
764 763 @LoginRequired()
765 764 @CSRFRequired()
766 765 @view_config(
767 766 route_name='markup_preview', request_method='POST',
768 767 renderer='string', xhr=True)
769 768 def markup_preview(self):
770 769 # Technically a CSRF token is not needed as no state changes with this
771 770 # call. However, as this is a POST is better to have it, so automated
772 771 # tools don't flag it as potential CSRF.
773 772 # Post is required because the payload could be bigger than the maximum
774 773 # allowed by GET.
775 774
776 775 text = self.request.POST.get('text')
777 776 renderer = self.request.POST.get('renderer') or 'rst'
778 777 if text:
779 778 return h.render(text, renderer=renderer, mentions=True)
780 779 return ''
781 780
782 781 @LoginRequired()
783 782 @CSRFRequired()
784 783 @view_config(
785 784 route_name='file_preview', request_method='POST',
786 785 renderer='string', xhr=True)
787 786 def file_preview(self):
788 787 # Technically a CSRF token is not needed as no state changes with this
789 788 # call. However, as this is a POST is better to have it, so automated
790 789 # tools don't flag it as potential CSRF.
791 790 # Post is required because the payload could be bigger than the maximum
792 791 # allowed by GET.
793 792
794 793 text = self.request.POST.get('text')
795 794 file_path = self.request.POST.get('file_path')
796 795
797 796 renderer = h.renderer_from_filename(file_path)
798 797
799 798 if renderer:
800 799 return h.render(text, renderer=renderer, mentions=True)
801 800 else:
802 801 self.load_default_context()
803 802 _render = self.request.get_partial_renderer(
804 803 'rhodecode:templates/files/file_content.mako')
805 804
806 805 lines = filenode_as_lines_tokens(FileNode(file_path, text))
807 806
808 807 return _render('render_lines', lines)
809 808
810 809 @LoginRequired()
811 810 @CSRFRequired()
812 811 @view_config(
813 812 route_name='store_user_session_value', request_method='POST',
814 813 renderer='string', xhr=True)
815 814 def store_user_session_attr(self):
816 815 key = self.request.POST.get('key')
817 816 val = self.request.POST.get('val')
818 817
819 818 existing_value = self.request.session.get(key)
820 819 if existing_value != val:
821 820 self.request.session[key] = val
822 821
823 822 return 'stored:{}:{}'.format(key, val)
@@ -1,579 +1,578 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import urlparse
22 22
23 23 import mock
24 24 import pytest
25 25
26 26 from rhodecode.tests import (
27 27 assert_session_flash, HG_REPO, TEST_USER_ADMIN_LOGIN,
28 28 no_newline_id_generator)
29 29 from rhodecode.tests.fixture import Fixture
30 30 from rhodecode.lib.auth import check_password
31 31 from rhodecode.lib import helpers as h
32 32 from rhodecode.model.auth_token import AuthTokenModel
33 33 from rhodecode.model.db import User, Notification, UserApiKeys
34 34 from rhodecode.model.meta import Session
35 35
36 36 fixture = Fixture()
37 37
38 38 whitelist_view = ['RepoCommitsView:repo_commit_raw']
39 39
40 40
41 41 def route_path(name, params=None, **kwargs):
42 42 import urllib
43 43 from rhodecode.apps._base import ADMIN_PREFIX
44 44
45 45 base_url = {
46 46 'login': ADMIN_PREFIX + '/login',
47 47 'logout': ADMIN_PREFIX + '/logout',
48 48 'register': ADMIN_PREFIX + '/register',
49 49 'reset_password':
50 50 ADMIN_PREFIX + '/password_reset',
51 51 'reset_password_confirmation':
52 52 ADMIN_PREFIX + '/password_reset_confirmation',
53 53
54 54 'admin_permissions_application':
55 55 ADMIN_PREFIX + '/permissions/application',
56 56 'admin_permissions_application_update':
57 57 ADMIN_PREFIX + '/permissions/application/update',
58 58
59 59 'repo_commit_raw': '/{repo_name}/raw-changeset/{commit_id}'
60 60
61 61 }[name].format(**kwargs)
62 62
63 63 if params:
64 64 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
65 65 return base_url
66 66
67 67
68 68 @pytest.mark.usefixtures('app')
69 69 class TestLoginController(object):
70 70 destroy_users = set()
71 71
72 72 @classmethod
73 73 def teardown_class(cls):
74 74 fixture.destroy_users(cls.destroy_users)
75 75
76 76 def teardown_method(self, method):
77 77 for n in Notification.query().all():
78 78 Session().delete(n)
79 79
80 80 Session().commit()
81 81 assert Notification.query().all() == []
82 82
83 83 def test_index(self):
84 84 response = self.app.get(route_path('login'))
85 85 assert response.status == '200 OK'
86 86 # Test response...
87 87
88 88 def test_login_admin_ok(self):
89 89 response = self.app.post(route_path('login'),
90 90 {'username': 'test_admin',
91 91 'password': 'test12'}, status=302)
92 92 response = response.follow()
93 93 session = response.get_session_from_response()
94 94 username = session['rhodecode_user'].get('username')
95 95 assert username == 'test_admin'
96 response.mustcontain('/%s' % HG_REPO)
96 response.mustcontain('logout')
97 97
98 98 def test_login_regular_ok(self):
99 99 response = self.app.post(route_path('login'),
100 100 {'username': 'test_regular',
101 101 'password': 'test12'}, status=302)
102 102
103 103 response = response.follow()
104 104 session = response.get_session_from_response()
105 105 username = session['rhodecode_user'].get('username')
106 106 assert username == 'test_regular'
107
108 response.mustcontain('/%s' % HG_REPO)
107 response.mustcontain('logout')
109 108
110 109 def test_login_regular_forbidden_when_super_admin_restriction(self):
111 110 from rhodecode.authentication.plugins.auth_rhodecode import RhodeCodeAuthPlugin
112 111 with fixture.auth_restriction(RhodeCodeAuthPlugin.AUTH_RESTRICTION_SUPER_ADMIN):
113 112 response = self.app.post(route_path('login'),
114 113 {'username': 'test_regular',
115 114 'password': 'test12'})
116 115
117 116 response.mustcontain('invalid user name')
118 117 response.mustcontain('invalid password')
119 118
120 119 def test_login_regular_forbidden_when_scope_restriction(self):
121 120 from rhodecode.authentication.plugins.auth_rhodecode import RhodeCodeAuthPlugin
122 121 with fixture.scope_restriction(RhodeCodeAuthPlugin.AUTH_RESTRICTION_SCOPE_VCS):
123 122 response = self.app.post(route_path('login'),
124 123 {'username': 'test_regular',
125 124 'password': 'test12'})
126 125
127 126 response.mustcontain('invalid user name')
128 127 response.mustcontain('invalid password')
129 128
130 129 def test_login_ok_came_from(self):
131 130 test_came_from = '/_admin/users?branch=stable'
132 131 _url = '{}?came_from={}'.format(route_path('login'), test_came_from)
133 132 response = self.app.post(
134 133 _url, {'username': 'test_admin', 'password': 'test12'}, status=302)
135 134
136 135 assert 'branch=stable' in response.location
137 136 response = response.follow()
138 137
139 138 assert response.status == '200 OK'
140 139 response.mustcontain('Users administration')
141 140
142 141 def test_redirect_to_login_with_get_args(self):
143 142 with fixture.anon_access(False):
144 143 kwargs = {'branch': 'stable'}
145 144 response = self.app.get(
146 145 h.route_path('repo_summary', repo_name=HG_REPO, _query=kwargs),
147 146 status=302)
148 147
149 148 response_query = urlparse.parse_qsl(response.location)
150 149 assert 'branch=stable' in response_query[0][1]
151 150
152 151 def test_login_form_with_get_args(self):
153 152 _url = '{}?came_from=/_admin/users,branch=stable'.format(route_path('login'))
154 153 response = self.app.get(_url)
155 154 assert 'branch%3Dstable' in response.form.action
156 155
157 156 @pytest.mark.parametrize("url_came_from", [
158 157 'data:text/html,<script>window.alert("xss")</script>',
159 158 'mailto:test@rhodecode.org',
160 159 'file:///etc/passwd',
161 160 'ftp://some.ftp.server',
162 161 'http://other.domain',
163 162 '/\r\nX-Forwarded-Host: http://example.org',
164 163 ], ids=no_newline_id_generator)
165 164 def test_login_bad_came_froms(self, url_came_from):
166 165 _url = '{}?came_from={}'.format(route_path('login'), url_came_from)
167 166 response = self.app.post(
168 167 _url,
169 168 {'username': 'test_admin', 'password': 'test12'})
170 169 assert response.status == '302 Found'
171 170 response = response.follow()
172 171 assert response.status == '200 OK'
173 172 assert response.request.path == '/'
174 173
175 174 def test_login_short_password(self):
176 175 response = self.app.post(route_path('login'),
177 176 {'username': 'test_admin',
178 177 'password': 'as'})
179 178 assert response.status == '200 OK'
180 179
181 180 response.mustcontain('Enter 3 characters or more')
182 181
183 182 def test_login_wrong_non_ascii_password(self, user_regular):
184 183 response = self.app.post(
185 184 route_path('login'),
186 185 {'username': user_regular.username,
187 186 'password': u'invalid-non-asci\xe4'.encode('utf8')})
188 187
189 188 response.mustcontain('invalid user name')
190 189 response.mustcontain('invalid password')
191 190
192 191 def test_login_with_non_ascii_password(self, user_util):
193 192 password = u'valid-non-ascii\xe4'
194 193 user = user_util.create_user(password=password)
195 194 response = self.app.post(
196 195 route_path('login'),
197 196 {'username': user.username,
198 197 'password': password.encode('utf-8')})
199 198 assert response.status_code == 302
200 199
201 200 def test_login_wrong_username_password(self):
202 201 response = self.app.post(route_path('login'),
203 202 {'username': 'error',
204 203 'password': 'test12'})
205 204
206 205 response.mustcontain('invalid user name')
207 206 response.mustcontain('invalid password')
208 207
209 208 def test_login_admin_ok_password_migration(self, real_crypto_backend):
210 209 from rhodecode.lib import auth
211 210
212 211 # create new user, with sha256 password
213 212 temp_user = 'test_admin_sha256'
214 213 user = fixture.create_user(temp_user)
215 214 user.password = auth._RhodeCodeCryptoSha256().hash_create(
216 215 b'test123')
217 216 Session().add(user)
218 217 Session().commit()
219 218 self.destroy_users.add(temp_user)
220 219 response = self.app.post(route_path('login'),
221 220 {'username': temp_user,
222 221 'password': 'test123'}, status=302)
223 222
224 223 response = response.follow()
225 224 session = response.get_session_from_response()
226 225 username = session['rhodecode_user'].get('username')
227 226 assert username == temp_user
228 response.mustcontain('/%s' % HG_REPO)
227 response.mustcontain('logout')
229 228
230 229 # new password should be bcrypted, after log-in and transfer
231 230 user = User.get_by_username(temp_user)
232 231 assert user.password.startswith('$')
233 232
234 233 # REGISTRATIONS
235 234 def test_register(self):
236 235 response = self.app.get(route_path('register'))
237 236 response.mustcontain('Create an Account')
238 237
239 238 def test_register_err_same_username(self):
240 239 uname = 'test_admin'
241 240 response = self.app.post(
242 241 route_path('register'),
243 242 {
244 243 'username': uname,
245 244 'password': 'test12',
246 245 'password_confirmation': 'test12',
247 246 'email': 'goodmail@domain.com',
248 247 'firstname': 'test',
249 248 'lastname': 'test'
250 249 }
251 250 )
252 251
253 252 assertr = response.assert_response()
254 253 msg = 'Username "%(username)s" already exists'
255 254 msg = msg % {'username': uname}
256 255 assertr.element_contains('#username+.error-message', msg)
257 256
258 257 def test_register_err_same_email(self):
259 258 response = self.app.post(
260 259 route_path('register'),
261 260 {
262 261 'username': 'test_admin_0',
263 262 'password': 'test12',
264 263 'password_confirmation': 'test12',
265 264 'email': 'test_admin@mail.com',
266 265 'firstname': 'test',
267 266 'lastname': 'test'
268 267 }
269 268 )
270 269
271 270 assertr = response.assert_response()
272 271 msg = u'This e-mail address is already taken'
273 272 assertr.element_contains('#email+.error-message', msg)
274 273
275 274 def test_register_err_same_email_case_sensitive(self):
276 275 response = self.app.post(
277 276 route_path('register'),
278 277 {
279 278 'username': 'test_admin_1',
280 279 'password': 'test12',
281 280 'password_confirmation': 'test12',
282 281 'email': 'TesT_Admin@mail.COM',
283 282 'firstname': 'test',
284 283 'lastname': 'test'
285 284 }
286 285 )
287 286 assertr = response.assert_response()
288 287 msg = u'This e-mail address is already taken'
289 288 assertr.element_contains('#email+.error-message', msg)
290 289
291 290 def test_register_err_wrong_data(self):
292 291 response = self.app.post(
293 292 route_path('register'),
294 293 {
295 294 'username': 'xs',
296 295 'password': 'test',
297 296 'password_confirmation': 'test',
298 297 'email': 'goodmailm',
299 298 'firstname': 'test',
300 299 'lastname': 'test'
301 300 }
302 301 )
303 302 assert response.status == '200 OK'
304 303 response.mustcontain('An email address must contain a single @')
305 304 response.mustcontain('Enter a value 6 characters long or more')
306 305
307 306 def test_register_err_username(self):
308 307 response = self.app.post(
309 308 route_path('register'),
310 309 {
311 310 'username': 'error user',
312 311 'password': 'test12',
313 312 'password_confirmation': 'test12',
314 313 'email': 'goodmailm',
315 314 'firstname': 'test',
316 315 'lastname': 'test'
317 316 }
318 317 )
319 318
320 319 response.mustcontain('An email address must contain a single @')
321 320 response.mustcontain(
322 321 'Username may only contain '
323 322 'alphanumeric characters underscores, '
324 323 'periods or dashes and must begin with '
325 324 'alphanumeric character')
326 325
327 326 def test_register_err_case_sensitive(self):
328 327 usr = 'Test_Admin'
329 328 response = self.app.post(
330 329 route_path('register'),
331 330 {
332 331 'username': usr,
333 332 'password': 'test12',
334 333 'password_confirmation': 'test12',
335 334 'email': 'goodmailm',
336 335 'firstname': 'test',
337 336 'lastname': 'test'
338 337 }
339 338 )
340 339
341 340 assertr = response.assert_response()
342 341 msg = u'Username "%(username)s" already exists'
343 342 msg = msg % {'username': usr}
344 343 assertr.element_contains('#username+.error-message', msg)
345 344
346 345 def test_register_special_chars(self):
347 346 response = self.app.post(
348 347 route_path('register'),
349 348 {
350 349 'username': 'xxxaxn',
351 350 'password': 'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
352 351 'password_confirmation': 'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
353 352 'email': 'goodmailm@test.plx',
354 353 'firstname': 'test',
355 354 'lastname': 'test'
356 355 }
357 356 )
358 357
359 358 msg = u'Invalid characters (non-ascii) in password'
360 359 response.mustcontain(msg)
361 360
362 361 def test_register_password_mismatch(self):
363 362 response = self.app.post(
364 363 route_path('register'),
365 364 {
366 365 'username': 'xs',
367 366 'password': '123qwe',
368 367 'password_confirmation': 'qwe123',
369 368 'email': 'goodmailm@test.plxa',
370 369 'firstname': 'test',
371 370 'lastname': 'test'
372 371 }
373 372 )
374 373 msg = u'Passwords do not match'
375 374 response.mustcontain(msg)
376 375
377 376 def test_register_ok(self):
378 377 username = 'test_regular4'
379 378 password = 'qweqwe'
380 379 email = 'marcin@test.com'
381 380 name = 'testname'
382 381 lastname = 'testlastname'
383 382
384 383 # this initializes a session
385 384 response = self.app.get(route_path('register'))
386 385 response.mustcontain('Create an Account')
387 386
388 387
389 388 response = self.app.post(
390 389 route_path('register'),
391 390 {
392 391 'username': username,
393 392 'password': password,
394 393 'password_confirmation': password,
395 394 'email': email,
396 395 'firstname': name,
397 396 'lastname': lastname,
398 397 'admin': True
399 398 },
400 399 status=302
401 400 ) # This should be overridden
402 401
403 402 assert_session_flash(
404 403 response, 'You have successfully registered with RhodeCode. You can log-in now.')
405 404
406 405 ret = Session().query(User).filter(
407 406 User.username == 'test_regular4').one()
408 407 assert ret.username == username
409 408 assert check_password(password, ret.password)
410 409 assert ret.email == email
411 410 assert ret.name == name
412 411 assert ret.lastname == lastname
413 412 assert ret.auth_tokens is not None
414 413 assert not ret.admin
415 414
416 415 def test_forgot_password_wrong_mail(self):
417 416 bad_email = 'marcin@wrongmail.org'
418 417 # this initializes a session
419 418 self.app.get(route_path('reset_password'))
420 419
421 420 response = self.app.post(
422 421 route_path('reset_password'), {'email': bad_email, }
423 422 )
424 423 assert_session_flash(response,
425 424 'If such email exists, a password reset link was sent to it.')
426 425
427 426 def test_forgot_password(self, user_util):
428 427 # this initializes a session
429 428 self.app.get(route_path('reset_password'))
430 429
431 430 user = user_util.create_user()
432 431 user_id = user.user_id
433 432 email = user.email
434 433
435 434 response = self.app.post(route_path('reset_password'), {'email': email, })
436 435
437 436 assert_session_flash(response,
438 437 'If such email exists, a password reset link was sent to it.')
439 438
440 439 # BAD KEY
441 440 confirm_url = '{}?key={}'.format(route_path('reset_password_confirmation'), 'badkey')
442 441 response = self.app.get(confirm_url, status=302)
443 442 assert response.location.endswith(route_path('reset_password'))
444 443 assert_session_flash(response, 'Given reset token is invalid')
445 444
446 445 response.follow() # cleanup flash
447 446
448 447 # GOOD KEY
449 448 key = UserApiKeys.query()\
450 449 .filter(UserApiKeys.user_id == user_id)\
451 450 .filter(UserApiKeys.role == UserApiKeys.ROLE_PASSWORD_RESET)\
452 451 .first()
453 452
454 453 assert key
455 454
456 455 confirm_url = '{}?key={}'.format(route_path('reset_password_confirmation'), key.api_key)
457 456 response = self.app.get(confirm_url)
458 457 assert response.status == '302 Found'
459 458 assert response.location.endswith(route_path('login'))
460 459
461 460 assert_session_flash(
462 461 response,
463 462 'Your password reset was successful, '
464 463 'a new password has been sent to your email')
465 464
466 465 response.follow()
467 466
468 467 def _get_api_whitelist(self, values=None):
469 468 config = {'api_access_controllers_whitelist': values or []}
470 469 return config
471 470
472 471 @pytest.mark.parametrize("test_name, auth_token", [
473 472 ('none', None),
474 473 ('empty_string', ''),
475 474 ('fake_number', '123456'),
476 475 ('proper_auth_token', None)
477 476 ])
478 477 def test_access_not_whitelisted_page_via_auth_token(
479 478 self, test_name, auth_token, user_admin):
480 479
481 480 whitelist = self._get_api_whitelist([])
482 481 with mock.patch.dict('rhodecode.CONFIG', whitelist):
483 482 assert [] == whitelist['api_access_controllers_whitelist']
484 483 if test_name == 'proper_auth_token':
485 484 # use builtin if api_key is None
486 485 auth_token = user_admin.api_key
487 486
488 487 with fixture.anon_access(False):
489 488 self.app.get(
490 489 route_path('repo_commit_raw',
491 490 repo_name=HG_REPO, commit_id='tip',
492 491 params=dict(api_key=auth_token)),
493 492 status=302)
494 493
495 494 @pytest.mark.parametrize("test_name, auth_token, code", [
496 495 ('none', None, 302),
497 496 ('empty_string', '', 302),
498 497 ('fake_number', '123456', 302),
499 498 ('proper_auth_token', None, 200)
500 499 ])
501 500 def test_access_whitelisted_page_via_auth_token(
502 501 self, test_name, auth_token, code, user_admin):
503 502
504 503 whitelist = self._get_api_whitelist(whitelist_view)
505 504
506 505 with mock.patch.dict('rhodecode.CONFIG', whitelist):
507 506 assert whitelist_view == whitelist['api_access_controllers_whitelist']
508 507
509 508 if test_name == 'proper_auth_token':
510 509 auth_token = user_admin.api_key
511 510 assert auth_token
512 511
513 512 with fixture.anon_access(False):
514 513 self.app.get(
515 514 route_path('repo_commit_raw',
516 515 repo_name=HG_REPO, commit_id='tip',
517 516 params=dict(api_key=auth_token)),
518 517 status=code)
519 518
520 519 @pytest.mark.parametrize("test_name, auth_token, code", [
521 520 ('proper_auth_token', None, 200),
522 521 ('wrong_auth_token', '123456', 302),
523 522 ])
524 523 def test_access_whitelisted_page_via_auth_token_bound_to_token(
525 524 self, test_name, auth_token, code, user_admin):
526 525
527 526 expected_token = auth_token
528 527 if test_name == 'proper_auth_token':
529 528 auth_token = user_admin.api_key
530 529 expected_token = auth_token
531 530 assert auth_token
532 531
533 532 whitelist = self._get_api_whitelist([
534 533 'RepoCommitsView:repo_commit_raw@{}'.format(expected_token)])
535 534
536 535 with mock.patch.dict('rhodecode.CONFIG', whitelist):
537 536
538 537 with fixture.anon_access(False):
539 538 self.app.get(
540 539 route_path('repo_commit_raw',
541 540 repo_name=HG_REPO, commit_id='tip',
542 541 params=dict(api_key=auth_token)),
543 542 status=code)
544 543
545 544 def test_access_page_via_extra_auth_token(self):
546 545 whitelist = self._get_api_whitelist(whitelist_view)
547 546 with mock.patch.dict('rhodecode.CONFIG', whitelist):
548 547 assert whitelist_view == \
549 548 whitelist['api_access_controllers_whitelist']
550 549
551 550 new_auth_token = AuthTokenModel().create(
552 551 TEST_USER_ADMIN_LOGIN, 'test')
553 552 Session().commit()
554 553 with fixture.anon_access(False):
555 554 self.app.get(
556 555 route_path('repo_commit_raw',
557 556 repo_name=HG_REPO, commit_id='tip',
558 557 params=dict(api_key=new_auth_token.api_key)),
559 558 status=200)
560 559
561 560 def test_access_page_via_expired_auth_token(self):
562 561 whitelist = self._get_api_whitelist(whitelist_view)
563 562 with mock.patch.dict('rhodecode.CONFIG', whitelist):
564 563 assert whitelist_view == \
565 564 whitelist['api_access_controllers_whitelist']
566 565
567 566 new_auth_token = AuthTokenModel().create(
568 567 TEST_USER_ADMIN_LOGIN, 'test')
569 568 Session().commit()
570 569 # patch the api key and make it expired
571 570 new_auth_token.expires = 0
572 571 Session().add(new_auth_token)
573 572 Session().commit()
574 573 with fixture.anon_access(False):
575 574 self.app.get(
576 575 route_path('repo_commit_raw',
577 576 repo_name=HG_REPO, commit_id='tip',
578 577 params=dict(api_key=new_auth_token.api_key)),
579 578 status=302)
@@ -1,152 +1,157 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import pytest
22 22
23 23 from rhodecode.tests import TestController
24 24 from rhodecode.tests.fixture import Fixture
25 25
26 26
27 27 def route_path(name, params=None, **kwargs):
28 28 import urllib
29 29 from rhodecode.apps._base import ADMIN_PREFIX
30 30
31 31 base_url = {
32 32 'home': '/',
33 33 'admin_home': ADMIN_PREFIX,
34 34 'repos':
35 35 ADMIN_PREFIX + '/repos',
36 'repos_data':
37 ADMIN_PREFIX + '/repos_data',
36 38 'repo_groups':
37 39 ADMIN_PREFIX + '/repo_groups',
38 40 'repo_groups_data':
39 41 ADMIN_PREFIX + '/repo_groups_data',
40 42 'user_groups':
41 43 ADMIN_PREFIX + '/user_groups',
42 44 'user_groups_data':
43 45 ADMIN_PREFIX + '/user_groups_data',
44 46 }[name].format(**kwargs)
45 47
46 48 if params:
47 49 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
48 50 return base_url
49 51
50 52
51 53 fixture = Fixture()
52 54
53 55
54 56 class TestAdminDelegatedUser(TestController):
55 57
56 58 def test_regular_user_cannot_see_admin_interfaces(self, user_util, xhr_header):
57 59 user = user_util.create_user(password='qweqwe')
58 60 user_util.inherit_default_user_permissions(user.username, False)
59 61
60 62 self.log_user(user.username, 'qweqwe')
61 63
62 64 # user doesn't have any access to resources so main admin page should 404
63 65 self.app.get(route_path('admin_home'), status=404)
64 66
65 response = self.app.get(route_path('repos'), status=200)
66 response.mustcontain('data: []')
67 response = self.app.get(route_path('repos_data'),
68 status=200, extra_environ=xhr_header)
69 assert response.json['data'] == []
67 70
68 71 response = self.app.get(route_path('repo_groups_data'),
69 72 status=200, extra_environ=xhr_header)
70 73 assert response.json['data'] == []
71 74
72 75 response = self.app.get(route_path('user_groups_data'),
73 76 status=200, extra_environ=xhr_header)
74 77 assert response.json['data'] == []
75 78
76 79 def test_regular_user_can_see_admin_interfaces_if_owner(self, user_util, xhr_header):
77 80 user = user_util.create_user(password='qweqwe')
78 81 username = user.username
79 82
80 83 repo = user_util.create_repo(owner=username)
81 84 repo_name = repo.repo_name
82 85
83 86 repo_group = user_util.create_repo_group(owner=username)
84 87 repo_group_name = repo_group.group_name
85 88
86 89 user_group = user_util.create_user_group(owner=username)
87 90 user_group_name = user_group.users_group_name
88 91
89 92 self.log_user(username, 'qweqwe')
90 93
91 94 response = self.app.get(route_path('admin_home'))
92 95
93 96 assert_response = response.assert_response()
94 97
95 98 assert_response.element_contains('td.delegated-admin-repos', '1')
96 99 assert_response.element_contains('td.delegated-admin-repo-groups', '1')
97 100 assert_response.element_contains('td.delegated-admin-user-groups', '1')
98 101
99 102 # admin interfaces have visible elements
100 response = self.app.get(route_path('repos'), status=200)
101 response.mustcontain('"name_raw": "{}"'.format(repo_name))
103 response = self.app.get(route_path('repos_data'),
104 extra_environ=xhr_header, status=200)
105 response.mustcontain('<a href=\\"/{}\\">'.format(repo_name))
102 106
103 107 response = self.app.get(route_path('repo_groups_data'),
104 108 extra_environ=xhr_header, status=200)
105 response.mustcontain('"name_raw": "{}"'.format(repo_group_name))
109 response.mustcontain('<a href=\\"/{}\\">'.format(repo_group_name))
106 110
107 111 response = self.app.get(route_path('user_groups_data'),
108 112 extra_environ=xhr_header, status=200)
109 response.mustcontain('"name_raw": "{}"'.format(user_group_name))
113 response.mustcontain('<a href=\\"/_profile_user_group/{}\\">'.format(user_group_name))
110 114
111 115 def test_regular_user_can_see_admin_interfaces_if_admin_perm(
112 116 self, user_util, xhr_header):
113 117 user = user_util.create_user(password='qweqwe')
114 118 username = user.username
115 119
116 120 repo = user_util.create_repo()
117 121 repo_name = repo.repo_name
118 122
119 123 repo_group = user_util.create_repo_group()
120 124 repo_group_name = repo_group.group_name
121 125
122 126 user_group = user_util.create_user_group()
123 127 user_group_name = user_group.users_group_name
124 128
125 129 user_util.grant_user_permission_to_repo(
126 130 repo, user, 'repository.admin')
127 131 user_util.grant_user_permission_to_repo_group(
128 132 repo_group, user, 'group.admin')
129 133 user_util.grant_user_permission_to_user_group(
130 134 user_group, user, 'usergroup.admin')
131 135
132 136 self.log_user(username, 'qweqwe')
133 137 # check if in home view, such user doesn't see the "admin" menus
134 138 response = self.app.get(route_path('admin_home'))
135 139
136 140 assert_response = response.assert_response()
137 141
138 142 assert_response.element_contains('td.delegated-admin-repos', '1')
139 143 assert_response.element_contains('td.delegated-admin-repo-groups', '1')
140 144 assert_response.element_contains('td.delegated-admin-user-groups', '1')
141 145
142 146 # admin interfaces have visible elements
143 response = self.app.get(route_path('repos'), status=200)
144 response.mustcontain('"name_raw": "{}"'.format(repo_name))
147 response = self.app.get(route_path('repos_data'),
148 extra_environ=xhr_header, status=200)
149 response.mustcontain('<a href=\\"/{}\\">'.format(repo_name))
145 150
146 151 response = self.app.get(route_path('repo_groups_data'),
147 152 extra_environ=xhr_header, status=200)
148 response.mustcontain('"name_raw": "{}"'.format(repo_group_name))
153 response.mustcontain('<a href=\\"/{}\\">'.format(repo_group_name))
149 154
150 155 response = self.app.get(route_path('user_groups_data'),
151 156 extra_environ=xhr_header, status=200)
152 response.mustcontain('"name_raw": "{}"'.format(user_group_name))
157 response.mustcontain('<a href=\\"/_profile_user_group/{}\\">'.format(user_group_name))
General Comments 0
You need to be logged in to leave comments. Login now