##// END OF EJS Templates
permissions: allow users to update settings for repository groups they still own, or have admin perms, when they don't change their name....
dan -
r4421:73f70a03 default
parent child Browse files
Show More
@@ -1,289 +1,289 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2020 RhodeCode GmbH
3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import mock
21 import mock
22 import pytest
22 import pytest
23
23
24 from rhodecode.model.meta import Session
24 from rhodecode.model.meta import Session
25 from rhodecode.model.repo_group import RepoGroupModel
25 from rhodecode.model.repo_group import RepoGroupModel
26 from rhodecode.model.user import UserModel
26 from rhodecode.model.user import UserModel
27 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
27 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
28 from rhodecode.api.tests.utils import (
28 from rhodecode.api.tests.utils import (
29 build_data, api_call, assert_ok, assert_error, crash)
29 build_data, api_call, assert_ok, assert_error, crash)
30 from rhodecode.tests.fixture import Fixture
30 from rhodecode.tests.fixture import Fixture
31
31
32
32
33 fixture = Fixture()
33 fixture = Fixture()
34
34
35
35
36 @pytest.mark.usefixtures("testuser_api", "app")
36 @pytest.mark.usefixtures("testuser_api", "app")
37 class TestCreateRepoGroup(object):
37 class TestCreateRepoGroup(object):
38 def test_api_create_repo_group(self):
38 def test_api_create_repo_group(self):
39 repo_group_name = 'api-repo-group'
39 repo_group_name = 'api-repo-group'
40
40
41 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
41 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
42 assert repo_group is None
42 assert repo_group is None
43
43
44 id_, params = build_data(
44 id_, params = build_data(
45 self.apikey, 'create_repo_group',
45 self.apikey, 'create_repo_group',
46 group_name=repo_group_name,
46 group_name=repo_group_name,
47 owner=TEST_USER_ADMIN_LOGIN,)
47 owner=TEST_USER_ADMIN_LOGIN,)
48 response = api_call(self.app, params)
48 response = api_call(self.app, params)
49
49
50 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
50 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
51 assert repo_group is not None
51 assert repo_group is not None
52 ret = {
52 ret = {
53 'msg': 'Created new repo group `%s`' % (repo_group_name,),
53 'msg': 'Created new repo group `%s`' % (repo_group_name,),
54 'repo_group': repo_group.get_api_data()
54 'repo_group': repo_group.get_api_data()
55 }
55 }
56 expected = ret
56 expected = ret
57 try:
57 try:
58 assert_ok(id_, expected, given=response.body)
58 assert_ok(id_, expected, given=response.body)
59 finally:
59 finally:
60 fixture.destroy_repo_group(repo_group_name)
60 fixture.destroy_repo_group(repo_group_name)
61
61
62 def test_api_create_repo_group_in_another_group(self):
62 def test_api_create_repo_group_in_another_group(self):
63 repo_group_name = 'api-repo-group'
63 repo_group_name = 'api-repo-group'
64
64
65 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
65 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
66 assert repo_group is None
66 assert repo_group is None
67 # create the parent
67 # create the parent
68 fixture.create_repo_group(repo_group_name)
68 fixture.create_repo_group(repo_group_name)
69
69
70 full_repo_group_name = repo_group_name+'/'+repo_group_name
70 full_repo_group_name = repo_group_name+'/'+repo_group_name
71 id_, params = build_data(
71 id_, params = build_data(
72 self.apikey, 'create_repo_group',
72 self.apikey, 'create_repo_group',
73 group_name=full_repo_group_name,
73 group_name=full_repo_group_name,
74 owner=TEST_USER_ADMIN_LOGIN,
74 owner=TEST_USER_ADMIN_LOGIN,
75 copy_permissions=True)
75 copy_permissions=True)
76 response = api_call(self.app, params)
76 response = api_call(self.app, params)
77
77
78 repo_group = RepoGroupModel.cls.get_by_group_name(full_repo_group_name)
78 repo_group = RepoGroupModel.cls.get_by_group_name(full_repo_group_name)
79 assert repo_group is not None
79 assert repo_group is not None
80 ret = {
80 ret = {
81 'msg': 'Created new repo group `%s`' % (full_repo_group_name,),
81 'msg': 'Created new repo group `%s`' % (full_repo_group_name,),
82 'repo_group': repo_group.get_api_data()
82 'repo_group': repo_group.get_api_data()
83 }
83 }
84 expected = ret
84 expected = ret
85 try:
85 try:
86 assert_ok(id_, expected, given=response.body)
86 assert_ok(id_, expected, given=response.body)
87 finally:
87 finally:
88 fixture.destroy_repo_group(full_repo_group_name)
88 fixture.destroy_repo_group(full_repo_group_name)
89 fixture.destroy_repo_group(repo_group_name)
89 fixture.destroy_repo_group(repo_group_name)
90
90
91 def test_api_create_repo_group_in_another_group_not_existing(self):
91 def test_api_create_repo_group_in_another_group_not_existing(self):
92 repo_group_name = 'api-repo-group-no'
92 repo_group_name = 'api-repo-group-no'
93
93
94 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
94 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
95 assert repo_group is None
95 assert repo_group is None
96
96
97 full_repo_group_name = repo_group_name+'/'+repo_group_name
97 full_repo_group_name = repo_group_name+'/'+repo_group_name
98 id_, params = build_data(
98 id_, params = build_data(
99 self.apikey, 'create_repo_group',
99 self.apikey, 'create_repo_group',
100 group_name=full_repo_group_name,
100 group_name=full_repo_group_name,
101 owner=TEST_USER_ADMIN_LOGIN,
101 owner=TEST_USER_ADMIN_LOGIN,
102 copy_permissions=True)
102 copy_permissions=True)
103 response = api_call(self.app, params)
103 response = api_call(self.app, params)
104 expected = {
104 expected = {
105 'repo_group':
105 'repo_group':
106 'Parent repository group `{}` does not exist'.format(
106 'Parent repository group `{}` does not exist'.format(
107 repo_group_name)}
107 repo_group_name)}
108 assert_error(id_, expected, given=response.body)
108 assert_error(id_, expected, given=response.body)
109
109
110 def test_api_create_repo_group_that_exists(self):
110 def test_api_create_repo_group_that_exists(self):
111 repo_group_name = 'api-repo-group'
111 repo_group_name = 'api-repo-group'
112
112
113 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
113 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
114 assert repo_group is None
114 assert repo_group is None
115
115
116 fixture.create_repo_group(repo_group_name)
116 fixture.create_repo_group(repo_group_name)
117 id_, params = build_data(
117 id_, params = build_data(
118 self.apikey, 'create_repo_group',
118 self.apikey, 'create_repo_group',
119 group_name=repo_group_name,
119 group_name=repo_group_name,
120 owner=TEST_USER_ADMIN_LOGIN,)
120 owner=TEST_USER_ADMIN_LOGIN,)
121 response = api_call(self.app, params)
121 response = api_call(self.app, params)
122 expected = {
122 expected = {
123 'unique_repo_group_name':
123 'unique_repo_group_name':
124 'Repository group with name `{}` already exists'.format(
124 'Repository group with name `{}` already exists'.format(
125 repo_group_name)}
125 repo_group_name)}
126 try:
126 try:
127 assert_error(id_, expected, given=response.body)
127 assert_error(id_, expected, given=response.body)
128 finally:
128 finally:
129 fixture.destroy_repo_group(repo_group_name)
129 fixture.destroy_repo_group(repo_group_name)
130
130
131 def test_api_create_repo_group_regular_user_wit_root_location_perms(
131 def test_api_create_repo_group_regular_user_wit_root_location_perms(
132 self, user_util):
132 self, user_util):
133 regular_user = user_util.create_user()
133 regular_user = user_util.create_user()
134 regular_user_api_key = regular_user.api_key
134 regular_user_api_key = regular_user.api_key
135
135
136 repo_group_name = 'api-repo-group-by-regular-user'
136 repo_group_name = 'api-repo-group-by-regular-user'
137
137
138 usr = UserModel().get_by_username(regular_user.username)
138 usr = UserModel().get_by_username(regular_user.username)
139 usr.inherit_default_permissions = False
139 usr.inherit_default_permissions = False
140 Session().add(usr)
140 Session().add(usr)
141
141
142 UserModel().grant_perm(
142 UserModel().grant_perm(
143 regular_user.username, 'hg.repogroup.create.true')
143 regular_user.username, 'hg.repogroup.create.true')
144 Session().commit()
144 Session().commit()
145
145
146 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
146 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
147 assert repo_group is None
147 assert repo_group is None
148
148
149 id_, params = build_data(
149 id_, params = build_data(
150 regular_user_api_key, 'create_repo_group',
150 regular_user_api_key, 'create_repo_group',
151 group_name=repo_group_name)
151 group_name=repo_group_name)
152 response = api_call(self.app, params)
152 response = api_call(self.app, params)
153
153
154 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
154 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
155 assert repo_group is not None
155 assert repo_group is not None
156 expected = {
156 expected = {
157 'msg': 'Created new repo group `%s`' % (repo_group_name,),
157 'msg': 'Created new repo group `%s`' % (repo_group_name,),
158 'repo_group': repo_group.get_api_data()
158 'repo_group': repo_group.get_api_data()
159 }
159 }
160 try:
160 try:
161 assert_ok(id_, expected, given=response.body)
161 assert_ok(id_, expected, given=response.body)
162 finally:
162 finally:
163 fixture.destroy_repo_group(repo_group_name)
163 fixture.destroy_repo_group(repo_group_name)
164
164
165 def test_api_create_repo_group_regular_user_with_admin_perms_to_parent(
165 def test_api_create_repo_group_regular_user_with_admin_perms_to_parent(
166 self, user_util):
166 self, user_util):
167
167
168 repo_group_name = 'api-repo-group-parent'
168 repo_group_name = 'api-repo-group-parent'
169
169
170 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
170 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
171 assert repo_group is None
171 assert repo_group is None
172 # create the parent
172 # create the parent
173 fixture.create_repo_group(repo_group_name)
173 fixture.create_repo_group(repo_group_name)
174
174
175 # user perms
175 # user perms
176 regular_user = user_util.create_user()
176 regular_user = user_util.create_user()
177 regular_user_api_key = regular_user.api_key
177 regular_user_api_key = regular_user.api_key
178
178
179 usr = UserModel().get_by_username(regular_user.username)
179 usr = UserModel().get_by_username(regular_user.username)
180 usr.inherit_default_permissions = False
180 usr.inherit_default_permissions = False
181 Session().add(usr)
181 Session().add(usr)
182
182
183 RepoGroupModel().grant_user_permission(
183 RepoGroupModel().grant_user_permission(
184 repo_group_name, regular_user.username, 'group.admin')
184 repo_group_name, regular_user.username, 'group.admin')
185 Session().commit()
185 Session().commit()
186
186
187 full_repo_group_name = repo_group_name + '/' + repo_group_name
187 full_repo_group_name = repo_group_name + '/' + repo_group_name
188 id_, params = build_data(
188 id_, params = build_data(
189 regular_user_api_key, 'create_repo_group',
189 regular_user_api_key, 'create_repo_group',
190 group_name=full_repo_group_name)
190 group_name=full_repo_group_name)
191 response = api_call(self.app, params)
191 response = api_call(self.app, params)
192
192
193 repo_group = RepoGroupModel.cls.get_by_group_name(full_repo_group_name)
193 repo_group = RepoGroupModel.cls.get_by_group_name(full_repo_group_name)
194 assert repo_group is not None
194 assert repo_group is not None
195 expected = {
195 expected = {
196 'msg': 'Created new repo group `{}`'.format(full_repo_group_name),
196 'msg': 'Created new repo group `{}`'.format(full_repo_group_name),
197 'repo_group': repo_group.get_api_data()
197 'repo_group': repo_group.get_api_data()
198 }
198 }
199 try:
199 try:
200 assert_ok(id_, expected, given=response.body)
200 assert_ok(id_, expected, given=response.body)
201 finally:
201 finally:
202 fixture.destroy_repo_group(full_repo_group_name)
202 fixture.destroy_repo_group(full_repo_group_name)
203 fixture.destroy_repo_group(repo_group_name)
203 fixture.destroy_repo_group(repo_group_name)
204
204
205 def test_api_create_repo_group_regular_user_no_permission_to_create_to_root_level(self):
205 def test_api_create_repo_group_regular_user_no_permission_to_create_to_root_level(self):
206 repo_group_name = 'api-repo-group'
206 repo_group_name = 'api-repo-group'
207
207
208 id_, params = build_data(
208 id_, params = build_data(
209 self.apikey_regular, 'create_repo_group',
209 self.apikey_regular, 'create_repo_group',
210 group_name=repo_group_name)
210 group_name=repo_group_name)
211 response = api_call(self.app, params)
211 response = api_call(self.app, params)
212
212
213 expected = {
213 expected = {
214 'repo_group':
214 'repo_group':
215 u'You do not have the permission to store '
215 u'You do not have the permission to store '
216 u'repository groups in the root location.'}
216 u'repository groups in the root location.'}
217 assert_error(id_, expected, given=response.body)
217 assert_error(id_, expected, given=response.body)
218
218
219 def test_api_create_repo_group_regular_user_no_parent_group_perms(self):
219 def test_api_create_repo_group_regular_user_no_parent_group_perms(self):
220 repo_group_name = 'api-repo-group-regular-user'
220 repo_group_name = 'api-repo-group-regular-user'
221
221
222 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
222 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
223 assert repo_group is None
223 assert repo_group is None
224 # create the parent
224 # create the parent
225 fixture.create_repo_group(repo_group_name)
225 fixture.create_repo_group(repo_group_name)
226
226
227 full_repo_group_name = repo_group_name+'/'+repo_group_name
227 full_repo_group_name = repo_group_name+'/'+repo_group_name
228
228
229 id_, params = build_data(
229 id_, params = build_data(
230 self.apikey_regular, 'create_repo_group',
230 self.apikey_regular, 'create_repo_group',
231 group_name=full_repo_group_name)
231 group_name=full_repo_group_name)
232 response = api_call(self.app, params)
232 response = api_call(self.app, params)
233
233
234 expected = {
234 expected = {
235 'repo_group':
235 'repo_group':
236 'Parent repository group `{}` does not exist'.format(
236 u"You do not have the permissions to store "
237 repo_group_name)}
237 u"repository groups inside repository group `{}`".format(repo_group_name)}
238 try:
238 try:
239 assert_error(id_, expected, given=response.body)
239 assert_error(id_, expected, given=response.body)
240 finally:
240 finally:
241 fixture.destroy_repo_group(repo_group_name)
241 fixture.destroy_repo_group(repo_group_name)
242
242
243 def test_api_create_repo_group_regular_user_no_permission_to_specify_owner(
243 def test_api_create_repo_group_regular_user_no_permission_to_specify_owner(
244 self):
244 self):
245 repo_group_name = 'api-repo-group'
245 repo_group_name = 'api-repo-group'
246
246
247 id_, params = build_data(
247 id_, params = build_data(
248 self.apikey_regular, 'create_repo_group',
248 self.apikey_regular, 'create_repo_group',
249 group_name=repo_group_name,
249 group_name=repo_group_name,
250 owner=TEST_USER_ADMIN_LOGIN,)
250 owner=TEST_USER_ADMIN_LOGIN,)
251 response = api_call(self.app, params)
251 response = api_call(self.app, params)
252
252
253 expected = "Only RhodeCode super-admin can specify `owner` param"
253 expected = "Only RhodeCode super-admin can specify `owner` param"
254 assert_error(id_, expected, given=response.body)
254 assert_error(id_, expected, given=response.body)
255
255
256 @mock.patch.object(RepoGroupModel, 'create', crash)
256 @mock.patch.object(RepoGroupModel, 'create', crash)
257 def test_api_create_repo_group_exception_occurred(self):
257 def test_api_create_repo_group_exception_occurred(self):
258 repo_group_name = 'api-repo-group'
258 repo_group_name = 'api-repo-group'
259
259
260 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
260 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
261 assert repo_group is None
261 assert repo_group is None
262
262
263 id_, params = build_data(
263 id_, params = build_data(
264 self.apikey, 'create_repo_group',
264 self.apikey, 'create_repo_group',
265 group_name=repo_group_name,
265 group_name=repo_group_name,
266 owner=TEST_USER_ADMIN_LOGIN,)
266 owner=TEST_USER_ADMIN_LOGIN,)
267 response = api_call(self.app, params)
267 response = api_call(self.app, params)
268 expected = 'failed to create repo group `%s`' % (repo_group_name,)
268 expected = 'failed to create repo group `%s`' % (repo_group_name,)
269 assert_error(id_, expected, given=response.body)
269 assert_error(id_, expected, given=response.body)
270
270
271 def test_create_group_with_extra_slashes_in_name(self, user_util):
271 def test_create_group_with_extra_slashes_in_name(self, user_util):
272 existing_repo_group = user_util.create_repo_group()
272 existing_repo_group = user_util.create_repo_group()
273 dirty_group_name = '//{}//group2//'.format(
273 dirty_group_name = '//{}//group2//'.format(
274 existing_repo_group.group_name)
274 existing_repo_group.group_name)
275 cleaned_group_name = '{}/group2'.format(
275 cleaned_group_name = '{}/group2'.format(
276 existing_repo_group.group_name)
276 existing_repo_group.group_name)
277
277
278 id_, params = build_data(
278 id_, params = build_data(
279 self.apikey, 'create_repo_group',
279 self.apikey, 'create_repo_group',
280 group_name=dirty_group_name,
280 group_name=dirty_group_name,
281 owner=TEST_USER_ADMIN_LOGIN,)
281 owner=TEST_USER_ADMIN_LOGIN,)
282 response = api_call(self.app, params)
282 response = api_call(self.app, params)
283 repo_group = RepoGroupModel.cls.get_by_group_name(cleaned_group_name)
283 repo_group = RepoGroupModel.cls.get_by_group_name(cleaned_group_name)
284 expected = {
284 expected = {
285 'msg': 'Created new repo group `%s`' % (cleaned_group_name,),
285 'msg': 'Created new repo group `%s`' % (cleaned_group_name,),
286 'repo_group': repo_group.get_api_data()
286 'repo_group': repo_group.get_api_data()
287 }
287 }
288 assert_ok(id_, expected, given=response.body)
288 assert_ok(id_, expected, given=response.body)
289 fixture.destroy_repo_group(cleaned_group_name)
289 fixture.destroy_repo_group(cleaned_group_name)
@@ -1,298 +1,311 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2020 RhodeCode GmbH
3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21
21
22 import colander
22 import colander
23 import deform.widget
23 import deform.widget
24
24
25 from rhodecode.translation import _
25 from rhodecode.translation import _
26 from rhodecode.model.validation_schema import validators, preparers, types
26 from rhodecode.model.validation_schema import validators, preparers, types
27
27
28
28
29 def get_group_and_repo(repo_name):
29 def get_group_and_repo(repo_name):
30 from rhodecode.model.repo_group import RepoGroupModel
30 from rhodecode.model.repo_group import RepoGroupModel
31 return RepoGroupModel()._get_group_name_and_parent(
31 return RepoGroupModel()._get_group_name_and_parent(
32 repo_name, get_object=True)
32 repo_name, get_object=True)
33
33
34
34
35 def get_repo_group(repo_group_id):
35 def get_repo_group(repo_group_id):
36 from rhodecode.model.repo_group import RepoGroup
36 from rhodecode.model.repo_group import RepoGroup
37 return RepoGroup.get(repo_group_id), RepoGroup.CHOICES_SEPARATOR
37 return RepoGroup.get(repo_group_id), RepoGroup.CHOICES_SEPARATOR
38
38
39
39
40 @colander.deferred
40 @colander.deferred
41 def deferred_can_write_to_group_validator(node, kw):
41 def deferred_can_write_to_group_validator(node, kw):
42 old_values = kw.get('old_values') or {}
42 old_values = kw.get('old_values') or {}
43 request_user = kw.get('user')
43 request_user = kw.get('user')
44
44
45 def can_write_group_validator(node, value):
45 def can_write_group_validator(node, value):
46 from rhodecode.lib.auth import (
46 from rhodecode.lib.auth import (
47 HasPermissionAny, HasRepoGroupPermissionAny)
47 HasPermissionAny, HasRepoGroupPermissionAny)
48 from rhodecode.model.repo_group import RepoGroupModel
48 from rhodecode.model.repo_group import RepoGroupModel
49
49
50 messages = {
50 messages = {
51 'invalid_parent_repo_group':
51 'invalid_parent_repo_group':
52 _(u"Parent repository group `{}` does not exist"),
52 _(u"Parent repository group `{}` does not exist"),
53 # permissions denied we expose as not existing, to prevent
53 # permissions denied we expose as not existing, to prevent
54 # resource discovery
54 # resource discovery
55 'permission_denied_parent_group':
55 'permission_denied_parent_group':
56 _(u"Parent repository group `{}` does not exist"),
56 _(u"You do not have the permissions to store "
57 u"repository groups inside repository group `{}`"),
57 'permission_denied_root':
58 'permission_denied_root':
58 _(u"You do not have the permission to store "
59 _(u"You do not have the permission to store "
59 u"repository groups in the root location.")
60 u"repository groups in the root location.")
60 }
61 }
61
62
62 value = value['repo_group_name']
63 value = value['repo_group_name']
63 parent_group_name = value
64 parent_group_name = value
64
65
65 is_root_location = value is types.RootLocation
66 is_root_location = value is types.RootLocation
66
67
67 # NOT initialized validators, we must call them
68 # NOT initialized validators, we must call them
68 can_create_repo_groups_at_root = HasPermissionAny(
69 can_create_repo_groups_at_root = HasPermissionAny(
69 'hg.admin', 'hg.repogroup.create.true')
70 'hg.admin', 'hg.repogroup.create.true')
70
71
71 if is_root_location:
72 if is_root_location:
72 if can_create_repo_groups_at_root(user=request_user):
73 if can_create_repo_groups_at_root(user=request_user):
73 # we can create repo group inside tool-level. No more checks
74 # we can create repo group inside tool-level. No more checks
74 # are required
75 # are required
75 return
76 return
76 else:
77 else:
77 raise colander.Invalid(node, messages['permission_denied_root'])
78 raise colander.Invalid(node, messages['permission_denied_root'])
78
79
79 # check if the parent repo group actually exists
80 # check if the parent repo group actually exists
80 parent_group = None
81 parent_group = None
81 if parent_group_name:
82 if parent_group_name:
82 parent_group = RepoGroupModel().get_by_group_name(parent_group_name)
83 parent_group = RepoGroupModel().get_by_group_name(parent_group_name)
83 if value and not parent_group:
84 if value and not parent_group:
84 raise colander.Invalid(
85 raise colander.Invalid(
85 node, messages['invalid_parent_repo_group'].format(
86 node, messages['invalid_parent_repo_group'].format(
86 parent_group_name))
87 parent_group_name))
87
88
88 # check if we have permissions to create new groups under
89 # check if we have permissions to create new groups under
89 # parent repo group
90 # parent repo group
90 # create repositories with write permission on group is set to true
91 # create repositories with write permission on group is set to true
91 create_on_write = HasPermissionAny(
92 create_on_write = HasPermissionAny(
92 'hg.create.write_on_repogroup.true')(user=request_user)
93 'hg.create.write_on_repogroup.true')(user=request_user)
93
94
94 group_admin = HasRepoGroupPermissionAny('group.admin')(
95 group_admin = HasRepoGroupPermissionAny('group.admin')(
95 parent_group_name, 'can write into group validator', user=request_user)
96 parent_group_name, 'can write into group validator', user=request_user)
96 group_write = HasRepoGroupPermissionAny('group.write')(
97 group_write = HasRepoGroupPermissionAny('group.write')(
97 parent_group_name, 'can write into group validator', user=request_user)
98 parent_group_name, 'can write into group validator', user=request_user)
98
99
99 # creation by write access is currently disabled. Needs thinking if
100 # creation by write access is currently disabled. Needs thinking if
100 # we want to allow this...
101 # we want to allow this...
101 forbidden = not (group_admin or (group_write and create_on_write and 0))
102 forbidden = not (group_admin or (group_write and create_on_write and 0))
102
103
104 old_name = old_values.get('group_name')
105 if old_name and old_name == old_values.get('submitted_repo_group_name'):
106 # we're editing a repository group, we didn't change the name
107 # we skip the check for write into parent group now
108 # this allows changing settings for this repo group
109 return
110
103 if parent_group and forbidden:
111 if parent_group and forbidden:
104 msg = messages['permission_denied_parent_group'].format(
112 msg = messages['permission_denied_parent_group'].format(parent_group_name)
105 parent_group_name)
106 raise colander.Invalid(node, msg)
113 raise colander.Invalid(node, msg)
107
114
108 return can_write_group_validator
115 return can_write_group_validator
109
116
110
117
111 @colander.deferred
118 @colander.deferred
112 def deferred_repo_group_owner_validator(node, kw):
119 def deferred_repo_group_owner_validator(node, kw):
113
120
114 def repo_owner_validator(node, value):
121 def repo_owner_validator(node, value):
115 from rhodecode.model.db import User
122 from rhodecode.model.db import User
116 existing = User.get_by_username(value)
123 existing = User.get_by_username(value)
117 if not existing:
124 if not existing:
118 msg = _(u'Repo group owner with id `{}` does not exists').format(
125 msg = _(u'Repo group owner with id `{}` does not exists').format(
119 value)
126 value)
120 raise colander.Invalid(node, msg)
127 raise colander.Invalid(node, msg)
121
128
122 return repo_owner_validator
129 return repo_owner_validator
123
130
124
131
125 @colander.deferred
132 @colander.deferred
126 def deferred_unique_name_validator(node, kw):
133 def deferred_unique_name_validator(node, kw):
127 request_user = kw.get('user')
134 request_user = kw.get('user')
128 old_values = kw.get('old_values') or {}
135 old_values = kw.get('old_values') or {}
129
136
130 def unique_name_validator(node, value):
137 def unique_name_validator(node, value):
131 from rhodecode.model.db import Repository, RepoGroup
138 from rhodecode.model.db import Repository, RepoGroup
132 name_changed = value != old_values.get('group_name')
139 name_changed = value != old_values.get('group_name')
133
140
134 existing = Repository.get_by_repo_name(value)
141 existing = Repository.get_by_repo_name(value)
135 if name_changed and existing:
142 if name_changed and existing:
136 msg = _(u'Repository with name `{}` already exists').format(value)
143 msg = _(u'Repository with name `{}` already exists').format(value)
137 raise colander.Invalid(node, msg)
144 raise colander.Invalid(node, msg)
138
145
139 existing_group = RepoGroup.get_by_group_name(value)
146 existing_group = RepoGroup.get_by_group_name(value)
140 if name_changed and existing_group:
147 if name_changed and existing_group:
141 msg = _(u'Repository group with name `{}` already exists').format(
148 msg = _(u'Repository group with name `{}` already exists').format(
142 value)
149 value)
143 raise colander.Invalid(node, msg)
150 raise colander.Invalid(node, msg)
144 return unique_name_validator
151 return unique_name_validator
145
152
146
153
147 @colander.deferred
154 @colander.deferred
148 def deferred_repo_group_name_validator(node, kw):
155 def deferred_repo_group_name_validator(node, kw):
149 return validators.valid_name_validator
156 return validators.valid_name_validator
150
157
151
158
152 @colander.deferred
159 @colander.deferred
153 def deferred_repo_group_validator(node, kw):
160 def deferred_repo_group_validator(node, kw):
154 options = kw.get(
161 options = kw.get(
155 'repo_group_repo_group_options')
162 'repo_group_repo_group_options')
156 return colander.OneOf([x for x in options])
163 return colander.OneOf([x for x in options])
157
164
158
165
159 @colander.deferred
166 @colander.deferred
160 def deferred_repo_group_widget(node, kw):
167 def deferred_repo_group_widget(node, kw):
161 items = kw.get('repo_group_repo_group_items')
168 items = kw.get('repo_group_repo_group_items')
162 return deform.widget.Select2Widget(values=items)
169 return deform.widget.Select2Widget(values=items)
163
170
164
171
165 class GroupType(colander.Mapping):
172 class GroupType(colander.Mapping):
166 def _validate(self, node, value):
173 def _validate(self, node, value):
167 try:
174 try:
168 return dict(repo_group_name=value)
175 return dict(repo_group_name=value)
169 except Exception as e:
176 except Exception as e:
170 raise colander.Invalid(
177 raise colander.Invalid(
171 node, '"${val}" is not a mapping type: ${err}'.format(
178 node, '"${val}" is not a mapping type: ${err}'.format(
172 val=value, err=e))
179 val=value, err=e))
173
180
174 def deserialize(self, node, cstruct):
181 def deserialize(self, node, cstruct):
175 if cstruct is colander.null:
182 if cstruct is colander.null:
176 return cstruct
183 return cstruct
177
184
178 appstruct = super(GroupType, self).deserialize(node, cstruct)
185 appstruct = super(GroupType, self).deserialize(node, cstruct)
179 validated_name = appstruct['repo_group_name']
186 validated_name = appstruct['repo_group_name']
180
187
181 # inject group based on once deserialized data
188 # inject group based on once deserialized data
182 (repo_group_name_without_group,
189 (repo_group_name_without_group,
183 parent_group_name,
190 parent_group_name,
184 parent_group) = get_group_and_repo(validated_name)
191 parent_group) = get_group_and_repo(validated_name)
185
192
186 appstruct['repo_group_name_with_group'] = validated_name
193 appstruct['repo_group_name_with_group'] = validated_name
187 appstruct['repo_group_name_without_group'] = repo_group_name_without_group
194 appstruct['repo_group_name_without_group'] = repo_group_name_without_group
188 appstruct['repo_group_name'] = parent_group_name or types.RootLocation
195 appstruct['repo_group_name'] = parent_group_name or types.RootLocation
189 if parent_group:
196 if parent_group:
190 appstruct['repo_group_id'] = parent_group.group_id
197 appstruct['repo_group_id'] = parent_group.group_id
191
198
192 return appstruct
199 return appstruct
193
200
194
201
195 class GroupSchema(colander.SchemaNode):
202 class GroupSchema(colander.SchemaNode):
196 schema_type = GroupType
203 schema_type = GroupType
197 validator = deferred_can_write_to_group_validator
204 validator = deferred_can_write_to_group_validator
198 missing = colander.null
205 missing = colander.null
199
206
200
207
201 class RepoGroup(GroupSchema):
208 class RepoGroup(GroupSchema):
202 repo_group_name = colander.SchemaNode(
209 repo_group_name = colander.SchemaNode(
203 types.GroupNameType())
210 types.GroupNameType())
204 repo_group_id = colander.SchemaNode(
211 repo_group_id = colander.SchemaNode(
205 colander.String(), missing=None)
212 colander.String(), missing=None)
206 repo_group_name_without_group = colander.SchemaNode(
213 repo_group_name_without_group = colander.SchemaNode(
207 colander.String(), missing=None)
214 colander.String(), missing=None)
208
215
209
216
210 class RepoGroupAccessSchema(colander.MappingSchema):
217 class RepoGroupAccessSchema(colander.MappingSchema):
211 repo_group = RepoGroup()
218 repo_group = RepoGroup()
212
219
213
220
214 class RepoGroupNameUniqueSchema(colander.MappingSchema):
221 class RepoGroupNameUniqueSchema(colander.MappingSchema):
215 unique_repo_group_name = colander.SchemaNode(
222 unique_repo_group_name = colander.SchemaNode(
216 colander.String(),
223 colander.String(),
217 validator=deferred_unique_name_validator)
224 validator=deferred_unique_name_validator)
218
225
219
226
220 class RepoGroupSchema(colander.Schema):
227 class RepoGroupSchema(colander.Schema):
221
228
222 repo_group_name = colander.SchemaNode(
229 repo_group_name = colander.SchemaNode(
223 types.GroupNameType(),
230 types.GroupNameType(),
224 validator=deferred_repo_group_name_validator)
231 validator=deferred_repo_group_name_validator)
225
232
226 repo_group_owner = colander.SchemaNode(
233 repo_group_owner = colander.SchemaNode(
227 colander.String(),
234 colander.String(),
228 validator=deferred_repo_group_owner_validator)
235 validator=deferred_repo_group_owner_validator)
229
236
230 repo_group_description = colander.SchemaNode(
237 repo_group_description = colander.SchemaNode(
231 colander.String(), missing='', widget=deform.widget.TextAreaWidget())
238 colander.String(), missing='', widget=deform.widget.TextAreaWidget())
232
239
233 repo_group_copy_permissions = colander.SchemaNode(
240 repo_group_copy_permissions = colander.SchemaNode(
234 types.StringBooleanType(),
241 types.StringBooleanType(),
235 missing=False, widget=deform.widget.CheckboxWidget())
242 missing=False, widget=deform.widget.CheckboxWidget())
236
243
237 repo_group_enable_locking = colander.SchemaNode(
244 repo_group_enable_locking = colander.SchemaNode(
238 types.StringBooleanType(),
245 types.StringBooleanType(),
239 missing=False, widget=deform.widget.CheckboxWidget())
246 missing=False, widget=deform.widget.CheckboxWidget())
240
247
241 def deserialize(self, cstruct):
248 def deserialize(self, cstruct):
242 """
249 """
243 Custom deserialize that allows to chain validation, and verify
250 Custom deserialize that allows to chain validation, and verify
244 permissions, and as last step uniqueness
251 permissions, and as last step uniqueness
245 """
252 """
246
253
247 appstruct = super(RepoGroupSchema, self).deserialize(cstruct)
254 appstruct = super(RepoGroupSchema, self).deserialize(cstruct)
248 validated_name = appstruct['repo_group_name']
255 validated_name = appstruct['repo_group_name']
249
256
250 # second pass to validate permissions to repo_group
257 # second pass to validate permissions to repo_group
258 if 'old_values' in self.bindings:
259 # save current repo name for name change checks
260 self.bindings['old_values']['submitted_repo_group_name'] = validated_name
251 second = RepoGroupAccessSchema().bind(**self.bindings)
261 second = RepoGroupAccessSchema().bind(**self.bindings)
252 appstruct_second = second.deserialize({'repo_group': validated_name})
262 appstruct_second = second.deserialize({'repo_group': validated_name})
253 # save result
263 # save result
254 appstruct['repo_group'] = appstruct_second['repo_group']
264 appstruct['repo_group'] = appstruct_second['repo_group']
255
265
256 # thirds to validate uniqueness
266 # thirds to validate uniqueness
257 third = RepoGroupNameUniqueSchema().bind(**self.bindings)
267 third = RepoGroupNameUniqueSchema().bind(**self.bindings)
258 third.deserialize({'unique_repo_group_name': validated_name})
268 third.deserialize({'unique_repo_group_name': validated_name})
259
269
260 return appstruct
270 return appstruct
261
271
262
272
263 class RepoGroupSettingsSchema(RepoGroupSchema):
273 class RepoGroupSettingsSchema(RepoGroupSchema):
264 repo_group = colander.SchemaNode(
274 repo_group = colander.SchemaNode(
265 colander.Integer(),
275 colander.Integer(),
266 validator=deferred_repo_group_validator,
276 validator=deferred_repo_group_validator,
267 widget=deferred_repo_group_widget,
277 widget=deferred_repo_group_widget,
268 missing='')
278 missing='')
269
279
270 def deserialize(self, cstruct):
280 def deserialize(self, cstruct):
271 """
281 """
272 Custom deserialize that allows to chain validation, and verify
282 Custom deserialize that allows to chain validation, and verify
273 permissions, and as last step uniqueness
283 permissions, and as last step uniqueness
274 """
284 """
275
285
276 # first pass, to validate given data
286 # first pass, to validate given data
277 appstruct = super(RepoGroupSchema, self).deserialize(cstruct)
287 appstruct = super(RepoGroupSchema, self).deserialize(cstruct)
278 validated_name = appstruct['repo_group_name']
288 validated_name = appstruct['repo_group_name']
279
289
280 # because of repoSchema adds repo-group as an ID, we inject it as
290 # because of repoSchema adds repo-group as an ID, we inject it as
281 # full name here because validators require it, it's unwrapped later
291 # full name here because validators require it, it's unwrapped later
282 # so it's safe to use and final name is going to be without group anyway
292 # so it's safe to use and final name is going to be without group anyway
283
293
284 group, separator = get_repo_group(appstruct['repo_group'])
294 group, separator = get_repo_group(appstruct['repo_group'])
285 if group:
295 if group:
286 validated_name = separator.join([group.group_name, validated_name])
296 validated_name = separator.join([group.group_name, validated_name])
287
297
288 # second pass to validate permissions to repo_group
298 # second pass to validate permissions to repo_group
299 if 'old_values' in self.bindings:
300 # save current repo name for name change checks
301 self.bindings['old_values']['submitted_repo_group_name'] = validated_name
289 second = RepoGroupAccessSchema().bind(**self.bindings)
302 second = RepoGroupAccessSchema().bind(**self.bindings)
290 appstruct_second = second.deserialize({'repo_group': validated_name})
303 appstruct_second = second.deserialize({'repo_group': validated_name})
291 # save result
304 # save result
292 appstruct['repo_group'] = appstruct_second['repo_group']
305 appstruct['repo_group'] = appstruct_second['repo_group']
293
306
294 # thirds to validate uniqueness
307 # thirds to validate uniqueness
295 third = RepoGroupNameUniqueSchema().bind(**self.bindings)
308 third = RepoGroupNameUniqueSchema().bind(**self.bindings)
296 third.deserialize({'unique_repo_group_name': validated_name})
309 third.deserialize({'unique_repo_group_name': validated_name})
297
310
298 return appstruct
311 return appstruct
General Comments 0
You need to be logged in to leave comments. Login now