##// END OF EJS Templates
get-perms-api-helper: show nicer message when prefix is specified to help in API usage.
marcink -
r4194:22366957 stable
parent child Browse files
Show More
@@ -1,90 +1,90 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import mock
22 22 import pytest
23 23
24 24 from rhodecode.model.repo import RepoModel
25 25 from rhodecode.api.tests.utils import (
26 26 build_data, api_call, assert_error, assert_ok, crash)
27 27
28 28
29 29 @pytest.mark.usefixtures("testuser_api", "app")
30 30 class TestGrantUserGroupPermission(object):
31 31 @pytest.mark.parametrize("name, perm", [
32 32 ('none', 'repository.none'),
33 33 ('read', 'repository.read'),
34 34 ('write', 'repository.write'),
35 35 ('admin', 'repository.admin')
36 36 ])
37 37 def test_api_grant_user_group_permission(
38 38 self, name, perm, backend, user_util):
39 39 user_group = user_util.create_user_group()
40 40 id_, params = build_data(
41 41 self.apikey,
42 42 'grant_user_group_permission',
43 43 repoid=backend.repo_name,
44 44 usergroupid=user_group.users_group_name,
45 45 perm=perm)
46 46 response = api_call(self.app, params)
47 47
48 48 ret = {
49 49 'msg': 'Granted perm: `%s` for user group: `%s` in repo: `%s`' % (
50 50 perm, user_group.users_group_name, backend.repo_name
51 51 ),
52 52 'success': True
53 53 }
54 54 expected = ret
55 55 assert_ok(id_, expected, given=response.body)
56 56
57 57 def test_api_grant_user_group_permission_wrong_permission(
58 58 self, backend, user_util):
59 59 perm = 'haha.no.permission'
60 60 user_group = user_util.create_user_group()
61 61 id_, params = build_data(
62 62 self.apikey,
63 63 'grant_user_group_permission',
64 64 repoid=backend.repo_name,
65 65 usergroupid=user_group.users_group_name,
66 66 perm=perm)
67 67 response = api_call(self.app, params)
68 68
69 expected = 'permission `%s` does not exist' % (perm,)
69 expected = 'permission `%s` does not exist.' % (perm,)
70 70 assert_error(id_, expected, given=response.body)
71 71
72 72 @mock.patch.object(RepoModel, 'grant_user_group_permission', crash)
73 73 def test_api_grant_user_group_permission_exception_when_adding(
74 74 self, backend, user_util):
75 75 perm = 'repository.read'
76 76 user_group = user_util.create_user_group()
77 77 id_, params = build_data(
78 78 self.apikey,
79 79 'grant_user_group_permission',
80 80 repoid=backend.repo_name,
81 81 usergroupid=user_group.users_group_name,
82 82 perm=perm)
83 83 response = api_call(self.app, params)
84 84
85 85 expected = (
86 86 'failed to edit permission for user group: `%s` in repo: `%s`' % (
87 87 user_group.users_group_name, backend.repo_name
88 88 )
89 89 )
90 90 assert_error(id_, expected, given=response.body)
@@ -1,174 +1,173 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import mock
22 22 import pytest
23 23
24 24 from rhodecode.model.user import UserModel
25 25 from rhodecode.model.repo_group import RepoGroupModel
26 26 from rhodecode.api.tests.utils import (
27 27 build_data, api_call, assert_error, assert_ok, crash)
28 28
29 29
30 30 @pytest.mark.usefixtures("testuser_api", "app")
31 31 class TestGrantUserGroupPermissionFromRepoGroup(object):
32 32 @pytest.mark.parametrize("name, perm, apply_to_children", [
33 33 ('none', 'group.none', 'none'),
34 34 ('read', 'group.read', 'none'),
35 35 ('write', 'group.write', 'none'),
36 36 ('admin', 'group.admin', 'none'),
37 37
38 38 ('none', 'group.none', 'all'),
39 39 ('read', 'group.read', 'all'),
40 40 ('write', 'group.write', 'all'),
41 41 ('admin', 'group.admin', 'all'),
42 42
43 43 ('none', 'group.none', 'repos'),
44 44 ('read', 'group.read', 'repos'),
45 45 ('write', 'group.write', 'repos'),
46 46 ('admin', 'group.admin', 'repos'),
47 47
48 48 ('none', 'group.none', 'groups'),
49 49 ('read', 'group.read', 'groups'),
50 50 ('write', 'group.write', 'groups'),
51 51 ('admin', 'group.admin', 'groups'),
52 52 ])
53 53 def test_api_grant_user_group_permission_to_repo_group(
54 54 self, name, perm, apply_to_children, user_util):
55 55 user_group = user_util.create_user_group()
56 56 repo_group = user_util.create_repo_group()
57 57 user_util.create_repo(parent=repo_group)
58 58
59 59 id_, params = build_data(
60 60 self.apikey,
61 61 'grant_user_group_permission_to_repo_group',
62 62 repogroupid=repo_group.name,
63 63 usergroupid=user_group.users_group_name,
64 64 perm=perm,
65 65 apply_to_children=apply_to_children,)
66 66 response = api_call(self.app, params)
67 67
68 68 ret = {
69 69 'msg': (
70 70 'Granted perm: `%s` (recursive:%s) for user group: `%s`'
71 71 ' in repo group: `%s`' % (
72 72 perm, apply_to_children, user_group.users_group_name,
73 73 repo_group.name
74 74 )
75 75 ),
76 76 'success': True
77 77 }
78 78 expected = ret
79 79 try:
80 80 assert_ok(id_, expected, given=response.body)
81 81 finally:
82 82 RepoGroupModel().revoke_user_group_permission(
83 83 repo_group.group_id, user_group.users_group_id)
84 84
85 85 @pytest.mark.parametrize(
86 86 "name, perm, apply_to_children, grant_admin, access_ok", [
87 87 ('none_fails', 'group.none', 'none', False, False),
88 88 ('read_fails', 'group.read', 'none', False, False),
89 89 ('write_fails', 'group.write', 'none', False, False),
90 90 ('admin_fails', 'group.admin', 'none', False, False),
91 91
92 92 # with granted perms
93 93 ('none_ok', 'group.none', 'none', True, True),
94 94 ('read_ok', 'group.read', 'none', True, True),
95 95 ('write_ok', 'group.write', 'none', True, True),
96 96 ('admin_ok', 'group.admin', 'none', True, True),
97 97 ]
98 98 )
99 99 def test_api_grant_user_group_permission_to_repo_group_by_regular_user(
100 100 self, name, perm, apply_to_children, grant_admin, access_ok,
101 101 user_util):
102 102 user = UserModel().get_by_username(self.TEST_USER_LOGIN)
103 103 user_group = user_util.create_user_group()
104 104 repo_group = user_util.create_repo_group()
105 105 if grant_admin:
106 106 user_util.grant_user_permission_to_repo_group(
107 107 repo_group, user, 'group.admin')
108 108
109 109 id_, params = build_data(
110 110 self.apikey_regular,
111 111 'grant_user_group_permission_to_repo_group',
112 112 repogroupid=repo_group.name,
113 113 usergroupid=user_group.users_group_name,
114 114 perm=perm,
115 115 apply_to_children=apply_to_children,)
116 116 response = api_call(self.app, params)
117 117 if access_ok:
118 118 ret = {
119 119 'msg': (
120 120 'Granted perm: `%s` (recursive:%s) for user group: `%s`'
121 121 ' in repo group: `%s`' % (
122 122 perm, apply_to_children, user_group.users_group_name,
123 123 repo_group.name
124 124 )
125 125 ),
126 126 'success': True
127 127 }
128 128 expected = ret
129 129 try:
130 130 assert_ok(id_, expected, given=response.body)
131 131 finally:
132 132 RepoGroupModel().revoke_user_group_permission(
133 133 repo_group.group_id, user_group.users_group_id)
134 134 else:
135 expected = 'repository group `%s` does not exist' % (
136 repo_group.name,)
135 expected = 'repository group `%s` does not exist' % (repo_group.name,)
137 136 assert_error(id_, expected, given=response.body)
138 137
139 138 def test_api_grant_user_group_permission_to_repo_group_wrong_permission(
140 139 self, user_util):
141 140 user_group = user_util.create_user_group()
142 141 repo_group = user_util.create_repo_group()
143 142 perm = 'haha.no.permission'
144 143 id_, params = build_data(
145 144 self.apikey,
146 145 'grant_user_group_permission_to_repo_group',
147 146 repogroupid=repo_group.name,
148 147 usergroupid=user_group.users_group_name,
149 148 perm=perm)
150 149 response = api_call(self.app, params)
151 150
152 expected = 'permission `%s` does not exist' % (perm,)
151 expected = 'permission `%s` does not exist. Permission should start with prefix: `group.`' % (perm,)
153 152 assert_error(id_, expected, given=response.body)
154 153
155 154 @mock.patch.object(RepoGroupModel, 'grant_user_group_permission', crash)
156 155 def test_api_grant_user_group_permission_exception_when_adding_2(
157 156 self, user_util):
158 157 user_group = user_util.create_user_group()
159 158 repo_group = user_util.create_repo_group()
160 159 perm = 'group.read'
161 160 id_, params = build_data(
162 161 self.apikey,
163 162 'grant_user_group_permission_to_repo_group',
164 163 repogroupid=repo_group.name,
165 164 usergroupid=user_group.users_group_name,
166 165 perm=perm)
167 166 response = api_call(self.app, params)
168 167
169 168 expected = (
170 169 'failed to edit permission for user group: `%s`'
171 170 ' in repo group: `%s`' % (
172 171 user_group.users_group_name, repo_group.name)
173 172 )
174 173 assert_error(id_, expected, given=response.body)
@@ -1,87 +1,87 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import mock
22 22 import pytest
23 23
24 24 from rhodecode.model.repo import RepoModel
25 25 from rhodecode.api.tests.utils import (
26 26 build_data, api_call, assert_error, assert_ok, crash)
27 27
28 28
29 29 @pytest.mark.usefixtures("testuser_api", "app")
30 30 class TestGrantUserPermission(object):
31 31 @pytest.mark.parametrize("name, perm", [
32 32 ('none', 'repository.none'),
33 33 ('read', 'repository.read'),
34 34 ('write', 'repository.write'),
35 35 ('admin', 'repository.admin')
36 36 ])
37 37 def test_api_grant_user_permission(self, name, perm, backend, user_util):
38 38 user = user_util.create_user()
39 39 id_, params = build_data(
40 40 self.apikey,
41 41 'grant_user_permission',
42 42 repoid=backend.repo_name,
43 43 userid=user.username,
44 44 perm=perm)
45 45 response = api_call(self.app, params)
46 46
47 47 ret = {
48 48 'msg': 'Granted perm: `%s` for user: `%s` in repo: `%s`' % (
49 49 perm, user.username, backend.repo_name
50 50 ),
51 51 'success': True
52 52 }
53 53 expected = ret
54 54 assert_ok(id_, expected, given=response.body)
55 55
56 56 def test_api_grant_user_permission_wrong_permission(
57 57 self, backend, user_util):
58 58 user = user_util.create_user()
59 59 perm = 'haha.no.permission'
60 60 id_, params = build_data(
61 61 self.apikey,
62 62 'grant_user_permission',
63 63 repoid=backend.repo_name,
64 64 userid=user.username,
65 65 perm=perm)
66 66 response = api_call(self.app, params)
67 67
68 expected = 'permission `%s` does not exist' % (perm,)
68 expected = 'permission `%s` does not exist.' % (perm,)
69 69 assert_error(id_, expected, given=response.body)
70 70
71 71 @mock.patch.object(RepoModel, 'grant_user_permission', crash)
72 72 def test_api_grant_user_permission_exception_when_adding(
73 73 self, backend, user_util):
74 74 user = user_util.create_user()
75 75 perm = 'repository.read'
76 76 id_, params = build_data(
77 77 self.apikey,
78 78 'grant_user_permission',
79 79 repoid=backend.repo_name,
80 80 userid=user.username,
81 81 perm=perm)
82 82 response = api_call(self.app, params)
83 83
84 84 expected = 'failed to edit permission for user: `%s` in repo: `%s`' % (
85 85 user.username, backend.repo_name
86 86 )
87 87 assert_error(id_, expected, given=response.body)
@@ -1,157 +1,157 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import mock
22 22 import pytest
23 23
24 24 from rhodecode.model.user import UserModel
25 25 from rhodecode.model.repo_group import RepoGroupModel
26 26 from rhodecode.api.tests.utils import (
27 27 build_data, api_call, assert_error, assert_ok, crash)
28 28
29 29
30 30 @pytest.mark.usefixtures("testuser_api", "app")
31 31 class TestGrantUserPermissionFromRepoGroup(object):
32 32 @pytest.mark.parametrize("name, perm, apply_to_children", [
33 33 ('none', 'group.none', 'none'),
34 34 ('read', 'group.read', 'none'),
35 35 ('write', 'group.write', 'none'),
36 36 ('admin', 'group.admin', 'none'),
37 37
38 38 ('none', 'group.none', 'all'),
39 39 ('read', 'group.read', 'all'),
40 40 ('write', 'group.write', 'all'),
41 41 ('admin', 'group.admin', 'all'),
42 42
43 43 ('none', 'group.none', 'repos'),
44 44 ('read', 'group.read', 'repos'),
45 45 ('write', 'group.write', 'repos'),
46 46 ('admin', 'group.admin', 'repos'),
47 47
48 48 ('none', 'group.none', 'groups'),
49 49 ('read', 'group.read', 'groups'),
50 50 ('write', 'group.write', 'groups'),
51 51 ('admin', 'group.admin', 'groups'),
52 52 ])
53 53 def test_api_grant_user_permission_to_repo_group(
54 54 self, name, perm, apply_to_children, user_util):
55 55 user = user_util.create_user()
56 56 repo_group = user_util.create_repo_group()
57 57 id_, params = build_data(
58 58 self.apikey, 'grant_user_permission_to_repo_group',
59 59 repogroupid=repo_group.name, userid=user.username,
60 60 perm=perm, apply_to_children=apply_to_children)
61 61 response = api_call(self.app, params)
62 62
63 63 ret = {
64 64 'msg': (
65 65 'Granted perm: `%s` (recursive:%s) for user: `%s`'
66 66 ' in repo group: `%s`' % (
67 67 perm, apply_to_children, user.username, repo_group.name
68 68 )
69 69 ),
70 70 'success': True
71 71 }
72 72 expected = ret
73 73 assert_ok(id_, expected, given=response.body)
74 74
75 75 @pytest.mark.parametrize(
76 76 "name, perm, apply_to_children, grant_admin, access_ok", [
77 77 ('none_fails', 'group.none', 'none', False, False),
78 78 ('read_fails', 'group.read', 'none', False, False),
79 79 ('write_fails', 'group.write', 'none', False, False),
80 80 ('admin_fails', 'group.admin', 'none', False, False),
81 81
82 82 # with granted perms
83 83 ('none_ok', 'group.none', 'none', True, True),
84 84 ('read_ok', 'group.read', 'none', True, True),
85 85 ('write_ok', 'group.write', 'none', True, True),
86 86 ('admin_ok', 'group.admin', 'none', True, True),
87 87 ]
88 88 )
89 89 def test_api_grant_user_permission_to_repo_group_by_regular_user(
90 90 self, name, perm, apply_to_children, grant_admin, access_ok,
91 91 user_util):
92 92 user = user_util.create_user()
93 93 repo_group = user_util.create_repo_group()
94 94
95 95 if grant_admin:
96 96 test_user = UserModel().get_by_username(self.TEST_USER_LOGIN)
97 97 user_util.grant_user_permission_to_repo_group(
98 98 repo_group, test_user, 'group.admin')
99 99
100 100 id_, params = build_data(
101 101 self.apikey_regular, 'grant_user_permission_to_repo_group',
102 102 repogroupid=repo_group.name, userid=user.username,
103 103 perm=perm, apply_to_children=apply_to_children)
104 104 response = api_call(self.app, params)
105 105 if access_ok:
106 106 ret = {
107 107 'msg': (
108 108 'Granted perm: `%s` (recursive:%s) for user: `%s`'
109 109 ' in repo group: `%s`' % (
110 110 perm, apply_to_children, user.username, repo_group.name
111 111 )
112 112 ),
113 113 'success': True
114 114 }
115 115 expected = ret
116 116 assert_ok(id_, expected, given=response.body)
117 117 else:
118 118 expected = 'repository group `%s` does not exist' % (
119 119 repo_group.name, )
120 120 assert_error(id_, expected, given=response.body)
121 121
122 122 def test_api_grant_user_permission_to_repo_group_wrong_permission(
123 123 self, user_util):
124 124 user = user_util.create_user()
125 125 repo_group = user_util.create_repo_group()
126 126 perm = 'haha.no.permission'
127 127 id_, params = build_data(
128 128 self.apikey,
129 129 'grant_user_permission_to_repo_group',
130 130 repogroupid=repo_group.name,
131 131 userid=user.username,
132 132 perm=perm)
133 133 response = api_call(self.app, params)
134 134
135 expected = 'permission `%s` does not exist' % (perm,)
135 expected = 'permission `%s` does not exist. Permission should start with prefix: `group.`' % (perm,)
136 136 assert_error(id_, expected, given=response.body)
137 137
138 138 @mock.patch.object(RepoGroupModel, 'grant_user_permission', crash)
139 139 def test_api_grant_user_permission_to_repo_group_exception_when_adding(
140 140 self, user_util):
141 141 user = user_util.create_user()
142 142 repo_group = user_util.create_repo_group()
143 143 perm = 'group.read'
144 144 id_, params = build_data(
145 145 self.apikey,
146 146 'grant_user_permission_to_repo_group',
147 147 repogroupid=repo_group.name,
148 148 userid=user.username,
149 149 perm=perm)
150 150 response = api_call(self.app, params)
151 151
152 152 expected = (
153 153 'failed to edit permission for user: `%s` in repo group: `%s`' % (
154 154 user.username, repo_group.name
155 155 )
156 156 )
157 157 assert_error(id_, expected, given=response.body)
@@ -1,156 +1,156 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import mock
22 22 import pytest
23 23
24 24 from rhodecode.model.user import UserModel
25 25 from rhodecode.model.user_group import UserGroupModel
26 26 from rhodecode.api.tests.utils import (
27 27 build_data, api_call, assert_error, assert_ok, crash)
28 28
29 29
30 30 @pytest.mark.usefixtures("testuser_api", "app")
31 31 class TestGrantUserPermissionFromUserGroup(object):
32 32 @pytest.mark.parametrize("name, perm", [
33 33 ('none', 'usergroup.none'),
34 34 ('read', 'usergroup.read'),
35 35 ('write', 'usergroup.write'),
36 36 ('admin', 'usergroup.admin'),
37 37
38 38 ('none', 'usergroup.none'),
39 39 ('read', 'usergroup.read'),
40 40 ('write', 'usergroup.write'),
41 41 ('admin', 'usergroup.admin'),
42 42
43 43 ('none', 'usergroup.none'),
44 44 ('read', 'usergroup.read'),
45 45 ('write', 'usergroup.write'),
46 46 ('admin', 'usergroup.admin'),
47 47
48 48 ('none', 'usergroup.none'),
49 49 ('read', 'usergroup.read'),
50 50 ('write', 'usergroup.write'),
51 51 ('admin', 'usergroup.admin'),
52 52 ])
53 53 def test_api_grant_user_permission_to_user_group(
54 54 self, name, perm, user_util):
55 55 user = user_util.create_user()
56 56 group = user_util.create_user_group()
57 57 id_, params = build_data(
58 58 self.apikey,
59 59 'grant_user_permission_to_user_group',
60 60 usergroupid=group.users_group_name,
61 61 userid=user.username,
62 62 perm=perm)
63 63 response = api_call(self.app, params)
64 64
65 65 ret = {
66 66 'msg': 'Granted perm: `%s` for user: `%s` in user group: `%s`' % (
67 67 perm, user.username, group.users_group_name
68 68 ),
69 69 'success': True
70 70 }
71 71 expected = ret
72 72 assert_ok(id_, expected, given=response.body)
73 73
74 74 @pytest.mark.parametrize("name, perm, grant_admin, access_ok", [
75 75 ('none_fails', 'usergroup.none', False, False),
76 76 ('read_fails', 'usergroup.read', False, False),
77 77 ('write_fails', 'usergroup.write', False, False),
78 78 ('admin_fails', 'usergroup.admin', False, False),
79 79
80 80 # with granted perms
81 81 ('none_ok', 'usergroup.none', True, True),
82 82 ('read_ok', 'usergroup.read', True, True),
83 83 ('write_ok', 'usergroup.write', True, True),
84 84 ('admin_ok', 'usergroup.admin', True, True),
85 85 ])
86 86 def test_api_grant_user_permission_to_user_group_by_regular_user(
87 87 self, name, perm, grant_admin, access_ok, user_util):
88 88 api_user = UserModel().get_by_username(self.TEST_USER_LOGIN)
89 89 user = user_util.create_user()
90 90 group = user_util.create_user_group()
91 91 # grant the user ability to at least read the group
92 92 permission = 'usergroup.admin' if grant_admin else 'usergroup.read'
93 93 user_util.grant_user_permission_to_user_group(
94 94 group, api_user, permission)
95 95
96 96 id_, params = build_data(
97 97 self.apikey_regular,
98 98 'grant_user_permission_to_user_group',
99 99 usergroupid=group.users_group_name,
100 100 userid=user.username,
101 101 perm=perm)
102 102 response = api_call(self.app, params)
103 103
104 104 if access_ok:
105 105 ret = {
106 106 'msg': (
107 107 'Granted perm: `%s` for user: `%s` in user group: `%s`' % (
108 108 perm, user.username, group.users_group_name
109 109 )
110 110 ),
111 111 'success': True
112 112 }
113 113 expected = ret
114 114 assert_ok(id_, expected, given=response.body)
115 115 else:
116 116 expected = 'user group `%s` does not exist' % (
117 117 group.users_group_name)
118 118 assert_error(id_, expected, given=response.body)
119 119
120 120 def test_api_grant_user_permission_to_user_group_wrong_permission(
121 121 self, user_util):
122 122 user = user_util.create_user()
123 123 group = user_util.create_user_group()
124 124 perm = 'haha.no.permission'
125 125 id_, params = build_data(
126 126 self.apikey,
127 127 'grant_user_permission_to_user_group',
128 128 usergroupid=group.users_group_name,
129 129 userid=user.username,
130 130 perm=perm)
131 131 response = api_call(self.app, params)
132 132
133 expected = 'permission `%s` does not exist' % perm
133 expected = 'permission `%s` does not exist. Permission should start with prefix: `usergroup.`' % perm
134 134 assert_error(id_, expected, given=response.body)
135 135
136 136 def test_api_grant_user_permission_to_user_group_exception_when_adding(
137 137 self, user_util):
138 138 user = user_util.create_user()
139 139 group = user_util.create_user_group()
140 140
141 141 perm = 'usergroup.read'
142 142 id_, params = build_data(
143 143 self.apikey,
144 144 'grant_user_permission_to_user_group',
145 145 usergroupid=group.users_group_name,
146 146 userid=user.username,
147 147 perm=perm)
148 148 with mock.patch.object(UserGroupModel, 'grant_user_permission', crash):
149 149 response = api_call(self.app, params)
150 150
151 151 expected = (
152 152 'failed to edit permission for user: `%s` in user group: `%s`' % (
153 153 user.username, group.users_group_name
154 154 )
155 155 )
156 156 assert_error(id_, expected, given=response.body)
@@ -1,449 +1,453 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2014-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 JSON RPC utils
23 23 """
24 24
25 25 import collections
26 26 import logging
27 27
28 28 from rhodecode.api.exc import JSONRPCError
29 29 from rhodecode.lib.auth import (
30 30 HasPermissionAnyApi, HasRepoPermissionAnyApi, HasRepoGroupPermissionAnyApi)
31 31 from rhodecode.lib.utils import safe_unicode
32 32 from rhodecode.lib.vcs.exceptions import RepositoryError
33 33 from rhodecode.lib.view_utils import get_commit_from_ref_name
34 34 from rhodecode.lib.utils2 import str2bool
35 35
36 36 log = logging.getLogger(__name__)
37 37
38 38
39 39 class OAttr(object):
40 40 """
41 41 Special Option that defines other attribute, and can default to them
42 42
43 43 Example::
44 44
45 45 def test(apiuser, userid=Optional(OAttr('apiuser')):
46 46 user = Optional.extract(userid, evaluate_locals=local())
47 47 #if we pass in userid, we get it, else it will default to apiuser
48 48 #attribute
49 49 """
50 50
51 51 def __init__(self, attr_name):
52 52 self.attr_name = attr_name
53 53
54 54 def __repr__(self):
55 55 return '<OptionalAttr:%s>' % self.attr_name
56 56
57 57 def __call__(self):
58 58 return self
59 59
60 60
61 61 class Optional(object):
62 62 """
63 63 Defines an optional parameter::
64 64
65 65 param = param.getval() if isinstance(param, Optional) else param
66 66 param = param() if isinstance(param, Optional) else param
67 67
68 68 is equivalent of::
69 69
70 70 param = Optional.extract(param)
71 71
72 72 """
73 73
74 74 def __init__(self, type_):
75 75 self.type_ = type_
76 76
77 77 def __repr__(self):
78 78 return '<Optional:%s>' % self.type_.__repr__()
79 79
80 80 def __call__(self):
81 81 return self.getval()
82 82
83 83 def getval(self, evaluate_locals=None):
84 84 """
85 85 returns value from this Optional instance
86 86 """
87 87 if isinstance(self.type_, OAttr):
88 88 param_name = self.type_.attr_name
89 89 if evaluate_locals:
90 90 return evaluate_locals[param_name]
91 91 # use params name
92 92 return param_name
93 93 return self.type_
94 94
95 95 @classmethod
96 96 def extract(cls, val, evaluate_locals=None, binary=None):
97 97 """
98 98 Extracts value from Optional() instance
99 99
100 100 :param val:
101 101 :return: original value if it's not Optional instance else
102 102 value of instance
103 103 """
104 104 if isinstance(val, cls):
105 105 val = val.getval(evaluate_locals)
106 106
107 107 if binary:
108 108 val = str2bool(val)
109 109
110 110 return val
111 111
112 112
113 113 def parse_args(cli_args, key_prefix=''):
114 114 from rhodecode.lib.utils2 import (escape_split)
115 115 kwargs = collections.defaultdict(dict)
116 116 for el in escape_split(cli_args, ','):
117 117 kv = escape_split(el, '=', 1)
118 118 if len(kv) == 2:
119 119 k, v = kv
120 120 kwargs[key_prefix + k] = v
121 121 return kwargs
122 122
123 123
124 124 def get_origin(obj):
125 125 """
126 126 Get origin of permission from object.
127 127
128 128 :param obj:
129 129 """
130 130 origin = 'permission'
131 131
132 132 if getattr(obj, 'owner_row', '') and getattr(obj, 'admin_row', ''):
133 133 # admin and owner case, maybe we should use dual string ?
134 134 origin = 'owner'
135 135 elif getattr(obj, 'owner_row', ''):
136 136 origin = 'owner'
137 137 elif getattr(obj, 'admin_row', ''):
138 138 origin = 'super-admin'
139 139 return origin
140 140
141 141
142 142 def store_update(updates, attr, name):
143 143 """
144 144 Stores param in updates dict if it's not instance of Optional
145 145 allows easy updates of passed in params
146 146 """
147 147 if not isinstance(attr, Optional):
148 148 updates[name] = attr
149 149
150 150
151 151 def has_superadmin_permission(apiuser):
152 152 """
153 153 Return True if apiuser is admin or return False
154 154
155 155 :param apiuser:
156 156 """
157 157 if HasPermissionAnyApi('hg.admin')(user=apiuser):
158 158 return True
159 159 return False
160 160
161 161
162 162 def validate_repo_permissions(apiuser, repoid, repo, perms):
163 163 """
164 164 Raise JsonRPCError if apiuser is not authorized or return True
165 165
166 166 :param apiuser:
167 167 :param repoid:
168 168 :param repo:
169 169 :param perms:
170 170 """
171 171 if not HasRepoPermissionAnyApi(*perms)(
172 172 user=apiuser, repo_name=repo.repo_name):
173 173 raise JSONRPCError(
174 174 'repository `%s` does not exist' % repoid)
175 175
176 176 return True
177 177
178 178
179 179 def validate_repo_group_permissions(apiuser, repogroupid, repo_group, perms):
180 180 """
181 181 Raise JsonRPCError if apiuser is not authorized or return True
182 182
183 183 :param apiuser:
184 184 :param repogroupid: just the id of repository group
185 185 :param repo_group: instance of repo_group
186 186 :param perms:
187 187 """
188 188 if not HasRepoGroupPermissionAnyApi(*perms)(
189 189 user=apiuser, group_name=repo_group.group_name):
190 190 raise JSONRPCError(
191 191 'repository group `%s` does not exist' % repogroupid)
192 192
193 193 return True
194 194
195 195
196 196 def validate_set_owner_permissions(apiuser, owner):
197 197 if isinstance(owner, Optional):
198 198 owner = get_user_or_error(apiuser.user_id)
199 199 else:
200 200 if has_superadmin_permission(apiuser):
201 201 owner = get_user_or_error(owner)
202 202 else:
203 203 # forbid setting owner for non-admins
204 204 raise JSONRPCError(
205 205 'Only RhodeCode super-admin can specify `owner` param')
206 206 return owner
207 207
208 208
209 209 def get_user_or_error(userid):
210 210 """
211 211 Get user by id or name or return JsonRPCError if not found
212 212
213 213 :param userid:
214 214 """
215 215 from rhodecode.model.user import UserModel
216 216 user_model = UserModel()
217 217
218 218 if isinstance(userid, (int, long)):
219 219 try:
220 220 user = user_model.get_user(userid)
221 221 except ValueError:
222 222 user = None
223 223 else:
224 224 user = user_model.get_by_username(userid)
225 225
226 226 if user is None:
227 227 raise JSONRPCError(
228 228 'user `%s` does not exist' % (userid,))
229 229 return user
230 230
231 231
232 232 def get_repo_or_error(repoid):
233 233 """
234 234 Get repo by id or name or return JsonRPCError if not found
235 235
236 236 :param repoid:
237 237 """
238 238 from rhodecode.model.repo import RepoModel
239 239 repo_model = RepoModel()
240 240
241 241 if isinstance(repoid, (int, long)):
242 242 try:
243 243 repo = repo_model.get_repo(repoid)
244 244 except ValueError:
245 245 repo = None
246 246 else:
247 247 repo = repo_model.get_by_repo_name(repoid)
248 248
249 249 if repo is None:
250 250 raise JSONRPCError(
251 251 'repository `%s` does not exist' % (repoid,))
252 252 return repo
253 253
254 254
255 255 def get_repo_group_or_error(repogroupid):
256 256 """
257 257 Get repo group by id or name or return JsonRPCError if not found
258 258
259 259 :param repogroupid:
260 260 """
261 261 from rhodecode.model.repo_group import RepoGroupModel
262 262 repo_group_model = RepoGroupModel()
263 263
264 264 if isinstance(repogroupid, (int, long)):
265 265 try:
266 266 repo_group = repo_group_model._get_repo_group(repogroupid)
267 267 except ValueError:
268 268 repo_group = None
269 269 else:
270 270 repo_group = repo_group_model.get_by_group_name(repogroupid)
271 271
272 272 if repo_group is None:
273 273 raise JSONRPCError(
274 274 'repository group `%s` does not exist' % (repogroupid,))
275 275 return repo_group
276 276
277 277
278 278 def get_user_group_or_error(usergroupid):
279 279 """
280 280 Get user group by id or name or return JsonRPCError if not found
281 281
282 282 :param usergroupid:
283 283 """
284 284 from rhodecode.model.user_group import UserGroupModel
285 285 user_group_model = UserGroupModel()
286 286
287 287 if isinstance(usergroupid, (int, long)):
288 288 try:
289 289 user_group = user_group_model.get_group(usergroupid)
290 290 except ValueError:
291 291 user_group = None
292 292 else:
293 293 user_group = user_group_model.get_by_name(usergroupid)
294 294
295 295 if user_group is None:
296 296 raise JSONRPCError(
297 297 'user group `%s` does not exist' % (usergroupid,))
298 298 return user_group
299 299
300 300
301 301 def get_perm_or_error(permid, prefix=None):
302 302 """
303 303 Get permission by id or name or return JsonRPCError if not found
304 304
305 305 :param permid:
306 306 """
307 307 from rhodecode.model.permission import PermissionModel
308 308
309 309 perm = PermissionModel.cls.get_by_key(permid)
310 310 if perm is None:
311 raise JSONRPCError('permission `%s` does not exist' % (permid,))
311 msg = 'permission `{}` does not exist.'.format(permid)
312 if prefix:
313 msg += ' Permission should start with prefix: `{}`'.format(prefix)
314 raise JSONRPCError(msg)
315
312 316 if prefix:
313 317 if not perm.permission_name.startswith(prefix):
314 318 raise JSONRPCError('permission `%s` is invalid, '
315 319 'should start with %s' % (permid, prefix))
316 320 return perm
317 321
318 322
319 323 def get_gist_or_error(gistid):
320 324 """
321 325 Get gist by id or gist_access_id or return JsonRPCError if not found
322 326
323 327 :param gistid:
324 328 """
325 329 from rhodecode.model.gist import GistModel
326 330
327 331 gist = GistModel.cls.get_by_access_id(gistid)
328 332 if gist is None:
329 333 raise JSONRPCError('gist `%s` does not exist' % (gistid,))
330 334 return gist
331 335
332 336
333 337 def get_pull_request_or_error(pullrequestid):
334 338 """
335 339 Get pull request by id or return JsonRPCError if not found
336 340
337 341 :param pullrequestid:
338 342 """
339 343 from rhodecode.model.pull_request import PullRequestModel
340 344
341 345 try:
342 346 pull_request = PullRequestModel().get(int(pullrequestid))
343 347 except ValueError:
344 348 raise JSONRPCError('pullrequestid must be an integer')
345 349 if not pull_request:
346 350 raise JSONRPCError('pull request `%s` does not exist' % (
347 351 pullrequestid,))
348 352 return pull_request
349 353
350 354
351 355 def build_commit_data(commit, detail_level):
352 356 parsed_diff = []
353 357 if detail_level == 'extended':
354 358 for f in commit.added:
355 359 parsed_diff.append(_get_commit_dict(filename=f.path, op='A'))
356 360 for f in commit.changed:
357 361 parsed_diff.append(_get_commit_dict(filename=f.path, op='M'))
358 362 for f in commit.removed:
359 363 parsed_diff.append(_get_commit_dict(filename=f.path, op='D'))
360 364
361 365 elif detail_level == 'full':
362 366 from rhodecode.lib.diffs import DiffProcessor
363 367 diff_processor = DiffProcessor(commit.diff())
364 368 for dp in diff_processor.prepare():
365 369 del dp['stats']['ops']
366 370 _stats = dp['stats']
367 371 parsed_diff.append(_get_commit_dict(
368 372 filename=dp['filename'], op=dp['operation'],
369 373 new_revision=dp['new_revision'],
370 374 old_revision=dp['old_revision'],
371 375 raw_diff=dp['raw_diff'], stats=_stats))
372 376
373 377 return parsed_diff
374 378
375 379
376 380 def get_commit_or_error(ref, repo):
377 381 try:
378 382 ref_type, _, ref_hash = ref.split(':')
379 383 except ValueError:
380 384 raise JSONRPCError(
381 385 'Ref `{ref}` given in a wrong format. Please check the API'
382 386 ' documentation for more details'.format(ref=ref))
383 387 try:
384 388 # TODO: dan: refactor this to use repo.scm_instance().get_commit()
385 389 # once get_commit supports ref_types
386 390 return get_commit_from_ref_name(repo, ref_hash)
387 391 except RepositoryError:
388 392 raise JSONRPCError('Ref `{ref}` does not exist'.format(ref=ref))
389 393
390 394
391 395 def _get_ref_hash(repo, type_, name):
392 396 vcs_repo = repo.scm_instance()
393 397 if type_ in ['branch'] and vcs_repo.alias in ('hg', 'git'):
394 398 return vcs_repo.branches[name]
395 399 elif type_ in ['bookmark', 'book'] and vcs_repo.alias == 'hg':
396 400 return vcs_repo.bookmarks[name]
397 401 else:
398 402 raise ValueError()
399 403
400 404
401 405 def resolve_ref_or_error(ref, repo, allowed_ref_types=None):
402 406 allowed_ref_types = allowed_ref_types or ['bookmark', 'book', 'tag', 'branch']
403 407
404 408 def _parse_ref(type_, name, hash_=None):
405 409 return type_, name, hash_
406 410
407 411 try:
408 412 ref_type, ref_name, ref_hash = _parse_ref(*ref.split(':'))
409 413 except TypeError:
410 414 raise JSONRPCError(
411 415 'Ref `{ref}` given in a wrong format. Please check the API'
412 416 ' documentation for more details'.format(ref=ref))
413 417
414 418 if ref_type not in allowed_ref_types:
415 419 raise JSONRPCError(
416 420 'Ref `{ref}` type is not allowed. '
417 421 'Only:{allowed_refs} are possible.'.format(
418 422 ref=ref, allowed_refs=allowed_ref_types))
419 423
420 424 try:
421 425 ref_hash = ref_hash or _get_ref_hash(repo, ref_type, ref_name)
422 426 except (KeyError, ValueError):
423 427 raise JSONRPCError(
424 428 'The specified value:{type}:`{name}` does not exist, or is not allowed.'.format(
425 429 type=ref_type, name=ref_name))
426 430
427 431 return ':'.join([ref_type, ref_name, ref_hash])
428 432
429 433
430 434 def _get_commit_dict(
431 435 filename, op, new_revision=None, old_revision=None,
432 436 raw_diff=None, stats=None):
433 437 if stats is None:
434 438 stats = {
435 439 "added": None,
436 440 "binary": None,
437 441 "deleted": None
438 442 }
439 443 return {
440 444 "filename": safe_unicode(filename),
441 445 "op": op,
442 446
443 447 # extra details
444 448 "new_revision": new_revision,
445 449 "old_revision": old_revision,
446 450
447 451 "raw_diff": raw_diff,
448 452 "stats": stats
449 453 }
General Comments 0
You need to be logged in to leave comments. Login now