Show More
@@ -0,0 +1,77 b'' | |||
|
1 | # -*- coding: utf-8 -*- | |
|
2 | ||
|
3 | # Copyright (C) 2010-2018 RhodeCode GmbH | |
|
4 | # | |
|
5 | # This program is free software: you can redistribute it and/or modify | |
|
6 | # it under the terms of the GNU Affero General Public License, version 3 | |
|
7 | # (only), as published by the Free Software Foundation. | |
|
8 | # | |
|
9 | # This program is distributed in the hope that it will be useful, | |
|
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
|
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
|
12 | # GNU General Public License for more details. | |
|
13 | # | |
|
14 | # You should have received a copy of the GNU Affero General Public License | |
|
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
|
16 | # | |
|
17 | # This program is dual-licensed. If you wish to learn more about the | |
|
18 | # RhodeCode Enterprise Edition, including its added features, Support services, | |
|
19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ | |
|
20 | ||
|
21 | import pytest | |
|
22 | ||
|
23 | from rhodecode.tests.utils import permission_update_data_generator | |
|
24 | ||
|
25 | ||
|
26 | def route_path(name, params=None, **kwargs): | |
|
27 | import urllib | |
|
28 | ||
|
29 | base_url = { | |
|
30 | 'edit_repo_perms': '/{repo_name}/settings/permissions' | |
|
31 | # update is the same url | |
|
32 | }[name].format(**kwargs) | |
|
33 | ||
|
34 | if params: | |
|
35 | base_url = '{}?{}'.format(base_url, urllib.urlencode(params)) | |
|
36 | return base_url | |
|
37 | ||
|
38 | ||
|
39 | @pytest.mark.usefixtures("app") | |
|
40 | class TestRepoPermissionsView(object): | |
|
41 | ||
|
42 | def test_edit_perms_view(self, user_util, autologin_user): | |
|
43 | repo = user_util.create_repo() | |
|
44 | self.app.get( | |
|
45 | route_path('edit_repo_perms', | |
|
46 | repo_name=repo.repo_name), status=200) | |
|
47 | ||
|
48 | def test_update_permissions(self, csrf_token, user_util): | |
|
49 | repo = user_util.create_repo() | |
|
50 | repo_name = repo.repo_name | |
|
51 | user = user_util.create_user() | |
|
52 | user_id = user.user_id | |
|
53 | username = user.username | |
|
54 | ||
|
55 | # grant new | |
|
56 | form_data = permission_update_data_generator( | |
|
57 | csrf_token, | |
|
58 | default='repository.write', | |
|
59 | grant=[(user_id, 'repository.write', username, 'user')]) | |
|
60 | ||
|
61 | response = self.app.post( | |
|
62 | route_path('edit_repo_perms', | |
|
63 | repo_name=repo_name), form_data).follow() | |
|
64 | ||
|
65 | assert 'Repository permissions updated' in response | |
|
66 | ||
|
67 | # revoke given | |
|
68 | form_data = permission_update_data_generator( | |
|
69 | csrf_token, | |
|
70 | default='repository.read', | |
|
71 | revoke=[(user_id, 'user')]) | |
|
72 | ||
|
73 | response = self.app.post( | |
|
74 | route_path('edit_repo_perms', | |
|
75 | repo_name=repo_name), form_data).follow() | |
|
76 | ||
|
77 | assert 'Repository permissions updated' in response |
@@ -0,0 +1,80 b'' | |||
|
1 | # -*- coding: utf-8 -*- | |
|
2 | ||
|
3 | # Copyright (C) 2010-2018 RhodeCode GmbH | |
|
4 | # | |
|
5 | # This program is free software: you can redistribute it and/or modify | |
|
6 | # it under the terms of the GNU Affero General Public License, version 3 | |
|
7 | # (only), as published by the Free Software Foundation. | |
|
8 | # | |
|
9 | # This program is distributed in the hope that it will be useful, | |
|
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
|
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
|
12 | # GNU General Public License for more details. | |
|
13 | # | |
|
14 | # You should have received a copy of the GNU Affero General Public License | |
|
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
|
16 | # | |
|
17 | # This program is dual-licensed. If you wish to learn more about the | |
|
18 | # RhodeCode Enterprise Edition, including its added features, Support services, | |
|
19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ | |
|
20 | ||
|
21 | import pytest | |
|
22 | ||
|
23 | from rhodecode.tests.utils import permission_update_data_generator | |
|
24 | ||
|
25 | ||
|
26 | def route_path(name, params=None, **kwargs): | |
|
27 | import urllib | |
|
28 | from rhodecode.apps._base import ADMIN_PREFIX | |
|
29 | ||
|
30 | base_url = { | |
|
31 | 'edit_user_group_perms': | |
|
32 | ADMIN_PREFIX + '/user_groups/{user_group_id}/edit/permissions', | |
|
33 | 'edit_user_group_perms_update': | |
|
34 | ADMIN_PREFIX + '/user_groups/{user_group_id}/edit/permissions/update', | |
|
35 | }[name].format(**kwargs) | |
|
36 | ||
|
37 | if params: | |
|
38 | base_url = '{}?{}'.format(base_url, urllib.urlencode(params)) | |
|
39 | return base_url | |
|
40 | ||
|
41 | ||
|
42 | @pytest.mark.usefixtures("app") | |
|
43 | class TestUserGroupPermissionsView(object): | |
|
44 | ||
|
45 | def test_edit_perms_view(self, user_util, autologin_user): | |
|
46 | user_group = user_util.create_user_group() | |
|
47 | self.app.get( | |
|
48 | route_path('edit_user_group_perms', | |
|
49 | user_group_id=user_group.users_group_id), status=200) | |
|
50 | ||
|
51 | def test_update_permissions(self, csrf_token, user_util): | |
|
52 | user_group = user_util.create_user_group() | |
|
53 | user_group_id = user_group.users_group_id | |
|
54 | user = user_util.create_user() | |
|
55 | user_id = user.user_id | |
|
56 | username = user.username | |
|
57 | ||
|
58 | # grant new | |
|
59 | form_data = permission_update_data_generator( | |
|
60 | csrf_token, | |
|
61 | default='usergroup.write', | |
|
62 | grant=[(user_id, 'usergroup.write', username, 'user')]) | |
|
63 | ||
|
64 | response = self.app.post( | |
|
65 | route_path('edit_user_group_perms_update', | |
|
66 | user_group_id=user_group_id), form_data).follow() | |
|
67 | ||
|
68 | assert 'User Group permissions updated' in response | |
|
69 | ||
|
70 | # revoke given | |
|
71 | form_data = permission_update_data_generator( | |
|
72 | csrf_token, | |
|
73 | default='usergroup.read', | |
|
74 | revoke=[(user_id, 'user')]) | |
|
75 | ||
|
76 | response = self.app.post( | |
|
77 | route_path('edit_user_group_perms_update', | |
|
78 | user_group_id=user_group_id), form_data).follow() | |
|
79 | ||
|
80 | assert 'User Group permissions updated' in response |
@@ -20,6 +20,8 b'' | |||
|
20 | 20 | |
|
21 | 21 | import pytest |
|
22 | 22 | |
|
23 | from rhodecode.tests.utils import permission_update_data_generator | |
|
24 | ||
|
23 | 25 | |
|
24 | 26 | def route_path(name, params=None, **kwargs): |
|
25 | 27 | import urllib |
@@ -37,13 +39,48 b' def route_path(name, params=None, **kwar' | |||
|
37 | 39 | |
|
38 | 40 | |
|
39 | 41 | @pytest.mark.usefixtures("app") |
|
40 |
class TestRepoGroup |
|
|
42 | class TestRepoGroupPermissionsView(object): | |
|
41 | 43 | |
|
42 |
def test_edit_ |
|
|
44 | def test_edit_perms_view(self, user_util, autologin_user): | |
|
43 | 45 | repo_group = user_util.create_repo_group() |
|
46 | ||
|
44 | 47 | self.app.get( |
|
45 | 48 | route_path('edit_repo_group_perms', |
|
46 | 49 | repo_group_name=repo_group.group_name), status=200) |
|
47 | 50 | |
|
48 | def test_update_permissions(self): | |
|
49 | pass | |
|
51 | def test_update_permissions(self, csrf_token, user_util): | |
|
52 | repo_group = user_util.create_repo_group() | |
|
53 | repo_group_name = repo_group.group_name | |
|
54 | user = user_util.create_user() | |
|
55 | user_id = user.user_id | |
|
56 | username = user.username | |
|
57 | ||
|
58 | # grant new | |
|
59 | form_data = permission_update_data_generator( | |
|
60 | csrf_token, | |
|
61 | default='group.write', | |
|
62 | grant=[(user_id, 'group.write', username, 'user')]) | |
|
63 | ||
|
64 | # recursive flag required for repo groups | |
|
65 | form_data.extend([('recursive', u'none')]) | |
|
66 | ||
|
67 | response = self.app.post( | |
|
68 | route_path('edit_repo_group_perms_update', | |
|
69 | repo_group_name=repo_group_name), form_data).follow() | |
|
70 | ||
|
71 | assert 'Repository Group permissions updated' in response | |
|
72 | ||
|
73 | # revoke given | |
|
74 | form_data = permission_update_data_generator( | |
|
75 | csrf_token, | |
|
76 | default='group.read', | |
|
77 | revoke=[(user_id, 'user')]) | |
|
78 | ||
|
79 | # recursive flag required for repo groups | |
|
80 | form_data.extend([('recursive', u'none')]) | |
|
81 | ||
|
82 | response = self.app.post( | |
|
83 | route_path('edit_repo_group_perms_update', | |
|
84 | repo_group_name=repo_group_name), form_data).follow() | |
|
85 | ||
|
86 | assert 'Repository Group permissions updated' in response |
@@ -547,14 +547,16 b' class RepoModel(BaseModel):' | |||
|
547 | 547 | # this updates also current one if found |
|
548 | 548 | self.grant_user_permission( |
|
549 | 549 | repo=repo, user=member_id, perm=perm) |
|
550 |
else: |
|
|
550 | elif member_type == 'user_group': | |
|
551 | 551 | # check if we have permissions to alter this usergroup |
|
552 | 552 | member_name = UserGroup.get(member_id).users_group_name |
|
553 | 553 | if not check_perms or HasUserGroupPermissionAny( |
|
554 | 554 | *req_perms)(member_name, user=cur_user): |
|
555 | 555 | self.grant_user_group_permission( |
|
556 | 556 | repo=repo, group_name=member_id, perm=perm) |
|
557 | ||
|
557 | else: | |
|
558 | raise ValueError("member_type must be 'user' or 'user_group' " | |
|
559 | "got {} instead".format(member_type)) | |
|
558 | 560 | changes['updated'].append({'type': member_type, 'id': member_id, |
|
559 | 561 | 'name': member_name, 'new_perm': perm}) |
|
560 | 562 | |
@@ -565,13 +567,17 b' class RepoModel(BaseModel):' | |||
|
565 | 567 | member_name = User.get(member_id).username |
|
566 | 568 | self.grant_user_permission( |
|
567 | 569 | repo=repo, user=member_id, perm=perm) |
|
568 |
else: |
|
|
570 | elif member_type == 'user_group': | |
|
569 | 571 | # check if we have permissions to alter this usergroup |
|
570 | 572 | member_name = UserGroup.get(member_id).users_group_name |
|
571 | 573 | if not check_perms or HasUserGroupPermissionAny( |
|
572 | 574 | *req_perms)(member_name, user=cur_user): |
|
573 | 575 | self.grant_user_group_permission( |
|
574 | 576 | repo=repo, group_name=member_id, perm=perm) |
|
577 | else: | |
|
578 | raise ValueError("member_type must be 'user' or 'user_group' " | |
|
579 | "got {} instead".format(member_type)) | |
|
580 | ||
|
575 | 581 | changes['added'].append({'type': member_type, 'id': member_id, |
|
576 | 582 | 'name': member_name, 'new_perm': perm}) |
|
577 | 583 | # delete permissions |
@@ -580,13 +586,16 b' class RepoModel(BaseModel):' | |||
|
580 | 586 | if member_type == 'user': |
|
581 | 587 | member_name = User.get(member_id).username |
|
582 | 588 | self.revoke_user_permission(repo=repo, user=member_id) |
|
583 |
else: |
|
|
589 | elif member_type == 'user_group': | |
|
584 | 590 | # check if we have permissions to alter this usergroup |
|
585 | 591 | member_name = UserGroup.get(member_id).users_group_name |
|
586 | 592 | if not check_perms or HasUserGroupPermissionAny( |
|
587 | 593 | *req_perms)(member_name, user=cur_user): |
|
588 | 594 | self.revoke_user_group_permission( |
|
589 | 595 | repo=repo, group_name=member_id) |
|
596 | else: | |
|
597 | raise ValueError("member_type must be 'user' or 'user_group' " | |
|
598 | "got {} instead".format(member_type)) | |
|
590 | 599 | |
|
591 | 600 | changes['deleted'].append({'type': member_type, 'id': member_id, |
|
592 | 601 | 'name': member_name, 'new_perm': perm}) |
@@ -425,11 +425,14 b' class RepoGroupModel(BaseModel):' | |||
|
425 | 425 | member_name = User.get(member_id).username |
|
426 | 426 | # this updates also current one if found |
|
427 | 427 | _set_perm_user(obj, user=member_id, perm=perm) |
|
428 |
else: |
|
|
428 | elif member_type == 'user_group': | |
|
429 | 429 | member_name = UserGroup.get(member_id).users_group_name |
|
430 | 430 | if not check_perms or has_group_perm(member_name, |
|
431 | 431 | user=cur_user): |
|
432 | 432 | _set_perm_group(obj, users_group=member_id, perm=perm) |
|
433 | else: | |
|
434 | raise ValueError("member_type must be 'user' or 'user_group' " | |
|
435 | "got {} instead".format(member_type)) | |
|
433 | 436 | |
|
434 | 437 | changes['updated'].append( |
|
435 | 438 | {'change_obj': change_obj, 'type': member_type, |
@@ -441,12 +444,15 b' class RepoGroupModel(BaseModel):' | |||
|
441 | 444 | if member_type == 'user': |
|
442 | 445 | member_name = User.get(member_id).username |
|
443 | 446 | _set_perm_user(obj, user=member_id, perm=perm) |
|
444 |
else: |
|
|
447 | elif member_type == 'user_group': | |
|
445 | 448 | # check if we have permissions to alter this usergroup |
|
446 | 449 | member_name = UserGroup.get(member_id).users_group_name |
|
447 | 450 | if not check_perms or has_group_perm(member_name, |
|
448 | 451 | user=cur_user): |
|
449 | 452 | _set_perm_group(obj, users_group=member_id, perm=perm) |
|
453 | else: | |
|
454 | raise ValueError("member_type must be 'user' or 'user_group' " | |
|
455 | "got {} instead".format(member_type)) | |
|
450 | 456 | |
|
451 | 457 | changes['added'].append( |
|
452 | 458 | {'change_obj': change_obj, 'type': member_type, |
@@ -458,12 +464,15 b' class RepoGroupModel(BaseModel):' | |||
|
458 | 464 | if member_type == 'user': |
|
459 | 465 | member_name = User.get(member_id).username |
|
460 | 466 | _revoke_perm_user(obj, user=member_id) |
|
461 |
else: |
|
|
467 | elif member_type == 'user_group': | |
|
462 | 468 | # check if we have permissions to alter this usergroup |
|
463 | 469 | member_name = UserGroup.get(member_id).users_group_name |
|
464 | 470 | if not check_perms or has_group_perm(member_name, |
|
465 | 471 | user=cur_user): |
|
466 | 472 | _revoke_perm_group(obj, user_group=member_id) |
|
473 | else: | |
|
474 | raise ValueError("member_type must be 'user' or 'user_group' " | |
|
475 | "got {} instead".format(member_type)) | |
|
467 | 476 | |
|
468 | 477 | changes['deleted'].append( |
|
469 | 478 | {'change_obj': change_obj, 'type': member_type, |
@@ -90,13 +90,16 b' class UserGroupModel(BaseModel):' | |||
|
90 | 90 | self.grant_user_permission( |
|
91 | 91 | user_group=user_group, user=member_id, perm=perm |
|
92 | 92 | ) |
|
93 | else: | |
|
93 | elif member_type == 'user_group': | |
|
94 | 94 | # check if we have permissions to alter this usergroup |
|
95 | 95 | member_name = UserGroup.get(member_id).users_group_name |
|
96 | 96 | if not check_perms or HasUserGroupPermissionAny( |
|
97 | 97 | *req_perms)(member_name, user=cur_user): |
|
98 | 98 | self.grant_user_group_permission( |
|
99 | 99 | target_user_group=user_group, user_group=member_id, perm=perm) |
|
100 | else: | |
|
101 | raise ValueError("member_type must be 'user' or 'user_group' " | |
|
102 | "got {} instead".format(member_type)) | |
|
100 | 103 | |
|
101 | 104 | changes['updated'].append({ |
|
102 | 105 | 'change_obj': change_obj, |
@@ -110,13 +113,16 b' class UserGroupModel(BaseModel):' | |||
|
110 | 113 | member_name = User.get(member_id).username |
|
111 | 114 | self.grant_user_permission( |
|
112 | 115 | user_group=user_group, user=member_id, perm=perm) |
|
113 | else: | |
|
116 | elif member_type == 'user_group': | |
|
114 | 117 | # check if we have permissions to alter this usergroup |
|
115 | 118 | member_name = UserGroup.get(member_id).users_group_name |
|
116 | 119 | if not check_perms or HasUserGroupPermissionAny( |
|
117 | 120 | *req_perms)(member_name, user=cur_user): |
|
118 | 121 | self.grant_user_group_permission( |
|
119 | 122 | target_user_group=user_group, user_group=member_id, perm=perm) |
|
123 | else: | |
|
124 | raise ValueError("member_type must be 'user' or 'user_group' " | |
|
125 | "got {} instead".format(member_type)) | |
|
120 | 126 | |
|
121 | 127 | changes['added'].append({ |
|
122 | 128 | 'change_obj': change_obj, |
@@ -129,13 +135,16 b' class UserGroupModel(BaseModel):' | |||
|
129 | 135 | if member_type == 'user': |
|
130 | 136 | member_name = User.get(member_id).username |
|
131 | 137 | self.revoke_user_permission(user_group=user_group, user=member_id) |
|
132 | else: | |
|
138 | elif member_type == 'user_group': | |
|
133 | 139 | # check if we have permissions to alter this usergroup |
|
134 | 140 | member_name = UserGroup.get(member_id).users_group_name |
|
135 | 141 | if not check_perms or HasUserGroupPermissionAny( |
|
136 | 142 | *req_perms)(member_name, user=cur_user): |
|
137 | 143 | self.revoke_user_group_permission( |
|
138 | 144 | target_user_group=user_group, user_group=member_id) |
|
145 | else: | |
|
146 | raise ValueError("member_type must be 'user' or 'user_group' " | |
|
147 | "got {} instead".format(member_type)) | |
|
139 | 148 | |
|
140 | 149 | changes['deleted'].append({ |
|
141 | 150 | 'change_obj': change_obj, |
@@ -797,7 +797,7 b" def ValidPerms(localizer, type_='repo'):" | |||
|
797 | 797 | obj_type = k[0] |
|
798 | 798 | obj_id = k[7:] |
|
799 | 799 | update_type = {'u': 'user', |
|
800 |
'g': 'user |
|
|
800 | 'g': 'user_group'}[obj_type] | |
|
801 | 801 | |
|
802 | 802 | if obj_type == 'u' and safe_int(obj_id) == default_user_id: |
|
803 | 803 | if str2bool(value.get('repo_private')): |
@@ -827,7 +827,7 b" def ValidPerms(localizer, type_='repo'):" | |||
|
827 | 827 | User.query()\ |
|
828 | 828 | .filter(User.active == true())\ |
|
829 | 829 | .filter(User.user_id == member_id).one() |
|
830 |
if member_type == 'user |
|
|
830 | if member_type == 'user_group': | |
|
831 | 831 | UserGroup.query()\ |
|
832 | 832 | .filter(UserGroup.users_group_active == true())\ |
|
833 | 833 | .filter(UserGroup.users_group_id == member_id)\ |
@@ -48,7 +48,7 b' def permissions_setup_func_orig(' | |||
|
48 | 48 | repo_group = RepoGroup.get_by_group_name(group_name=group_name) |
|
49 | 49 | if not repo_group: |
|
50 | 50 | raise Exception('Cannot get group %s' % group_name) |
|
51 |
perm_updates = [[test_u2_gr_id, perm, 'user |
|
|
51 | perm_updates = [[test_u2_gr_id, perm, 'user_group']] | |
|
52 | 52 | RepoGroupModel().update_permissions(repo_group, |
|
53 | 53 | perm_updates=perm_updates, |
|
54 | 54 | recursive=recursive, check_perms=False) |
@@ -427,3 +427,32 b' def commit_change(' | |||
|
427 | 427 | f_path=filename |
|
428 | 428 | ) |
|
429 | 429 | return commit |
|
430 | ||
|
431 | ||
|
432 | def permission_update_data_generator(csrf_token, default=None, grant=None, revoke=None): | |
|
433 | if not default: | |
|
434 | raise ValueError('Permission for default user must be given') | |
|
435 | form_data = [( | |
|
436 | 'csrf_token', csrf_token | |
|
437 | )] | |
|
438 | # add default | |
|
439 | form_data.extend([ | |
|
440 | ('u_perm_1', default) | |
|
441 | ]) | |
|
442 | ||
|
443 | if grant: | |
|
444 | for cnt, (obj_id, perm, obj_name, obj_type) in enumerate(grant, 1): | |
|
445 | form_data.extend([ | |
|
446 | ('perm_new_member_perm_new{}'.format(cnt), perm), | |
|
447 | ('perm_new_member_id_new{}'.format(cnt), obj_id), | |
|
448 | ('perm_new_member_name_new{}'.format(cnt), obj_name), | |
|
449 | ('perm_new_member_type_new{}'.format(cnt), obj_type), | |
|
450 | ||
|
451 | ]) | |
|
452 | if revoke: | |
|
453 | for obj_id, obj_type in revoke: | |
|
454 | form_data.extend([ | |
|
455 | ('perm_del_member_id_{}'.format(obj_id), obj_id), | |
|
456 | ('perm_del_member_type_{}'.format(obj_id), obj_type), | |
|
457 | ]) | |
|
458 | return form_data |
General Comments 0
You need to be logged in to leave comments.
Login now