##// END OF EJS Templates
repo-group-schemas: refactor repository group schemas....
marcink -
r1154:e3bdce95 default
parent child Browse files
Show More
@@ -0,0 +1,116 b''
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2016-2016 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
21 import colander
22 import pytest
23
24 from rhodecode.model.validation_schema import types
25 from rhodecode.model.validation_schema.schemas import repo_group_schema
26
27
28 class TestRepoGroupSchema(object):
29
30 @pytest.mark.parametrize('given, expected', [
31 ('my repo', 'my-repo'),
32 (' hello world mike ', 'hello-world-mike'),
33
34 ('//group1/group2//', 'group1/group2'),
35 ('//group1///group2//', 'group1/group2'),
36 ('///group1/group2///group3', 'group1/group2/group3'),
37 ('word g1/group2///group3', 'word-g1/group2/group3'),
38
39 ('grou p1/gro;,,##up2//.../group3', 'grou-p1/group2/group3'),
40
41 ('group,,,/,,,/1/2/3', 'group/1/2/3'),
42 ('grou[]p1/gro;up2///gro up3', 'group1/group2/gro-up3'),
43 (u'grou[]p1/gro;up2///gro up3/Δ…Δ‡', u'group1/group2/gro-up3/Δ…Δ‡'),
44 ])
45 def test_deserialize_repo_name(self, app, user_admin, given, expected):
46 schema = repo_group_schema.RepoGroupSchema().bind()
47 assert schema.get('repo_group_name').deserialize(given) == expected
48
49 def test_deserialize(self, app, user_admin):
50 schema = repo_group_schema.RepoGroupSchema().bind(
51 user=user_admin
52 )
53
54 schema_data = schema.deserialize(dict(
55 repo_group_name='dupa',
56 repo_group_owner=user_admin.username
57 ))
58
59 assert schema_data['repo_group_name'] == 'dupa'
60 assert schema_data['repo_group'] == {
61 'repo_group_id': None,
62 'repo_group_name': types.RootLocation,
63 'repo_group_name_without_group': 'dupa'}
64
65 @pytest.mark.parametrize('given, err_key, expected_exc', [
66 ('xxx/dupa', 'repo_group', 'Parent repository group `xxx` does not exist'),
67 ('', 'repo_group_name', 'Name must start with a letter or number. Got ``'),
68 ])
69 def test_deserialize_with_bad_group_name(
70 self, app, user_admin, given, err_key, expected_exc):
71 schema = repo_group_schema.RepoGroupSchema().bind(
72 repo_type_options=['hg'],
73 user=user_admin
74 )
75
76 with pytest.raises(colander.Invalid) as excinfo:
77 schema.deserialize(dict(
78 repo_group_name=given,
79 repo_group_owner=user_admin.username
80 ))
81
82 assert excinfo.value.asdict()[err_key] == expected_exc
83
84 def test_deserialize_with_group_name(self, app, user_admin, test_repo_group):
85 schema = repo_group_schema.RepoGroupSchema().bind(
86 user=user_admin
87 )
88
89 full_name = test_repo_group.group_name + '/dupa'
90 schema_data = schema.deserialize(dict(
91 repo_group_name=full_name,
92 repo_group_owner=user_admin.username
93 ))
94
95 assert schema_data['repo_group_name'] == full_name
96 assert schema_data['repo_group'] == {
97 'repo_group_id': test_repo_group.group_id,
98 'repo_group_name': test_repo_group.group_name,
99 'repo_group_name_without_group': 'dupa'}
100
101 def test_deserialize_with_group_name_regular_user_no_perms(
102 self, app, user_regular, test_repo_group):
103 schema = repo_group_schema.RepoGroupSchema().bind(
104 user=user_regular
105 )
106
107 full_name = test_repo_group.group_name + '/dupa'
108 with pytest.raises(colander.Invalid) as excinfo:
109 schema.deserialize(dict(
110 repo_group_name=full_name,
111 repo_group_owner=user_regular.username
112 ))
113
114 expected = 'Parent repository group `{}` does not exist'.format(
115 test_repo_group.group_name)
116 assert excinfo.value.asdict()['repo_group'] == expected
@@ -54,55 +54,10 b' class TestCreateRepoGroup(object):'
54 'repo_group': repo_group.get_api_data()
54 'repo_group': repo_group.get_api_data()
55 }
55 }
56 expected = ret
56 expected = ret
57 assert_ok(id_, expected, given=response.body)
57 try:
58 fixture.destroy_repo_group(repo_group_name)
58 assert_ok(id_, expected, given=response.body)
59
59 finally:
60 def test_api_create_repo_group_regular_user(self):
60 fixture.destroy_repo_group(repo_group_name)
61 repo_group_name = 'api-repo-group'
62
63 usr = UserModel().get_by_username(self.TEST_USER_LOGIN)
64 usr.inherit_default_permissions = False
65 Session().add(usr)
66 UserModel().grant_perm(
67 self.TEST_USER_LOGIN, 'hg.repogroup.create.true')
68 Session().commit()
69
70 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
71 assert repo_group is None
72
73 id_, params = build_data(
74 self.apikey_regular, 'create_repo_group',
75 group_name=repo_group_name,
76 owner=TEST_USER_ADMIN_LOGIN,)
77 response = api_call(self.app, params)
78
79 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
80 assert repo_group is not None
81 ret = {
82 'msg': 'Created new repo group `%s`' % (repo_group_name,),
83 'repo_group': repo_group.get_api_data()
84 }
85 expected = ret
86 assert_ok(id_, expected, given=response.body)
87 fixture.destroy_repo_group(repo_group_name)
88 UserModel().revoke_perm(
89 self.TEST_USER_LOGIN, 'hg.repogroup.create.true')
90 usr = UserModel().get_by_username(self.TEST_USER_LOGIN)
91 usr.inherit_default_permissions = True
92 Session().add(usr)
93 Session().commit()
94
95 def test_api_create_repo_group_regular_user_no_permission(self):
96 repo_group_name = 'api-repo-group'
97
98 id_, params = build_data(
99 self.apikey_regular, 'create_repo_group',
100 group_name=repo_group_name,
101 owner=TEST_USER_ADMIN_LOGIN,)
102 response = api_call(self.app, params)
103
104 expected = "Access was denied to this resource."
105 assert_error(id_, expected, given=response.body)
106
61
107 def test_api_create_repo_group_in_another_group(self):
62 def test_api_create_repo_group_in_another_group(self):
108 repo_group_name = 'api-repo-group'
63 repo_group_name = 'api-repo-group'
@@ -127,9 +82,11 b' class TestCreateRepoGroup(object):'
127 'repo_group': repo_group.get_api_data()
82 'repo_group': repo_group.get_api_data()
128 }
83 }
129 expected = ret
84 expected = ret
130 assert_ok(id_, expected, given=response.body)
85 try:
131 fixture.destroy_repo_group(full_repo_group_name)
86 assert_ok(id_, expected, given=response.body)
132 fixture.destroy_repo_group(repo_group_name)
87 finally:
88 fixture.destroy_repo_group(full_repo_group_name)
89 fixture.destroy_repo_group(repo_group_name)
133
90
134 def test_api_create_repo_group_in_another_group_not_existing(self):
91 def test_api_create_repo_group_in_another_group_not_existing(self):
135 repo_group_name = 'api-repo-group-no'
92 repo_group_name = 'api-repo-group-no'
@@ -144,7 +101,10 b' class TestCreateRepoGroup(object):'
144 owner=TEST_USER_ADMIN_LOGIN,
101 owner=TEST_USER_ADMIN_LOGIN,
145 copy_permissions=True)
102 copy_permissions=True)
146 response = api_call(self.app, params)
103 response = api_call(self.app, params)
147 expected = 'repository group `%s` does not exist' % (repo_group_name,)
104 expected = {
105 'repo_group':
106 'Parent repository group `{}` does not exist'.format(
107 repo_group_name)}
148 assert_error(id_, expected, given=response.body)
108 assert_error(id_, expected, given=response.body)
149
109
150 def test_api_create_repo_group_that_exists(self):
110 def test_api_create_repo_group_that_exists(self):
@@ -159,9 +119,139 b' class TestCreateRepoGroup(object):'
159 group_name=repo_group_name,
119 group_name=repo_group_name,
160 owner=TEST_USER_ADMIN_LOGIN,)
120 owner=TEST_USER_ADMIN_LOGIN,)
161 response = api_call(self.app, params)
121 response = api_call(self.app, params)
162 expected = 'repo group `%s` already exist' % (repo_group_name,)
122 expected = {
123 'unique_repo_group_name':
124 'Repository group with name `{}` already exists'.format(
125 repo_group_name)}
126 try:
127 assert_error(id_, expected, given=response.body)
128 finally:
129 fixture.destroy_repo_group(repo_group_name)
130
131 def test_api_create_repo_group_regular_user_wit_root_location_perms(
132 self, user_util):
133 regular_user = user_util.create_user()
134 regular_user_api_key = regular_user.api_key
135
136 repo_group_name = 'api-repo-group-by-regular-user'
137
138 usr = UserModel().get_by_username(regular_user.username)
139 usr.inherit_default_permissions = False
140 Session().add(usr)
141
142 UserModel().grant_perm(
143 regular_user.username, 'hg.repogroup.create.true')
144 Session().commit()
145
146 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
147 assert repo_group is None
148
149 id_, params = build_data(
150 regular_user_api_key, 'create_repo_group',
151 group_name=repo_group_name)
152 response = api_call(self.app, params)
153
154 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
155 assert repo_group is not None
156 expected = {
157 'msg': 'Created new repo group `%s`' % (repo_group_name,),
158 'repo_group': repo_group.get_api_data()
159 }
160 try:
161 assert_ok(id_, expected, given=response.body)
162 finally:
163 fixture.destroy_repo_group(repo_group_name)
164
165 def test_api_create_repo_group_regular_user_with_admin_perms_to_parent(
166 self, user_util):
167
168 repo_group_name = 'api-repo-group-parent'
169
170 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
171 assert repo_group is None
172 # create the parent
173 fixture.create_repo_group(repo_group_name)
174
175 # user perms
176 regular_user = user_util.create_user()
177 regular_user_api_key = regular_user.api_key
178
179 usr = UserModel().get_by_username(regular_user.username)
180 usr.inherit_default_permissions = False
181 Session().add(usr)
182
183 RepoGroupModel().grant_user_permission(
184 repo_group_name, regular_user.username, 'group.admin')
185 Session().commit()
186
187 full_repo_group_name = repo_group_name + '/' + repo_group_name
188 id_, params = build_data(
189 regular_user_api_key, 'create_repo_group',
190 group_name=full_repo_group_name)
191 response = api_call(self.app, params)
192
193 repo_group = RepoGroupModel.cls.get_by_group_name(full_repo_group_name)
194 assert repo_group is not None
195 expected = {
196 'msg': 'Created new repo group `{}`'.format(full_repo_group_name),
197 'repo_group': repo_group.get_api_data()
198 }
199 try:
200 assert_ok(id_, expected, given=response.body)
201 finally:
202 fixture.destroy_repo_group(full_repo_group_name)
203 fixture.destroy_repo_group(repo_group_name)
204
205 def test_api_create_repo_group_regular_user_no_permission_to_create_to_root_level(self):
206 repo_group_name = 'api-repo-group'
207
208 id_, params = build_data(
209 self.apikey_regular, 'create_repo_group',
210 group_name=repo_group_name)
211 response = api_call(self.app, params)
212
213 expected = {
214 'repo_group':
215 u'You do not have the permission to store '
216 u'repository groups in the root location.'}
163 assert_error(id_, expected, given=response.body)
217 assert_error(id_, expected, given=response.body)
164 fixture.destroy_repo_group(repo_group_name)
218
219 def test_api_create_repo_group_regular_user_no_parent_group_perms(self):
220 repo_group_name = 'api-repo-group-regular-user'
221
222 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
223 assert repo_group is None
224 # create the parent
225 fixture.create_repo_group(repo_group_name)
226
227 full_repo_group_name = repo_group_name+'/'+repo_group_name
228
229 id_, params = build_data(
230 self.apikey_regular, 'create_repo_group',
231 group_name=full_repo_group_name)
232 response = api_call(self.app, params)
233
234 expected = {
235 'repo_group':
236 'Parent repository group `{}` does not exist'.format(
237 repo_group_name)}
238 try:
239 assert_error(id_, expected, given=response.body)
240 finally:
241 fixture.destroy_repo_group(repo_group_name)
242
243 def test_api_create_repo_group_regular_user_no_permission_to_specify_owner(
244 self):
245 repo_group_name = 'api-repo-group'
246
247 id_, params = build_data(
248 self.apikey_regular, 'create_repo_group',
249 group_name=repo_group_name,
250 owner=TEST_USER_ADMIN_LOGIN,)
251 response = api_call(self.app, params)
252
253 expected = "Only RhodeCode super-admin can specify `owner` param"
254 assert_error(id_, expected, given=response.body)
165
255
166 @mock.patch.object(RepoGroupModel, 'create', crash)
256 @mock.patch.object(RepoGroupModel, 'create', crash)
167 def test_api_create_repo_group_exception_occurred(self):
257 def test_api_create_repo_group_exception_occurred(self):
@@ -30,22 +30,30 b' from rhodecode.api.tests.utils import ('
30
30
31 @pytest.mark.usefixtures("testuser_api", "app")
31 @pytest.mark.usefixtures("testuser_api", "app")
32 class TestApiUpdateRepoGroup(object):
32 class TestApiUpdateRepoGroup(object):
33
33 def test_update_group_name(self, user_util):
34 def test_update_group_name(self, user_util):
34 new_group_name = 'new-group'
35 new_group_name = 'new-group'
35 initial_name = self._update(user_util, group_name=new_group_name)
36 initial_name = self._update(user_util, group_name=new_group_name)
36 assert RepoGroupModel()._get_repo_group(initial_name) is None
37 assert RepoGroupModel()._get_repo_group(initial_name) is None
37 assert RepoGroupModel()._get_repo_group(new_group_name) is not None
38 new_group = RepoGroupModel()._get_repo_group(new_group_name)
39 assert new_group is not None
40 assert new_group.full_path == new_group_name
38
41
39 def test_update_parent(self, user_util):
42 def test_update_group_name_change_parent(self, user_util):
43
40 parent_group = user_util.create_repo_group()
44 parent_group = user_util.create_repo_group()
41 initial_name = self._update(user_util, parent=parent_group.name)
45 parent_group_name = parent_group.name
42
46
43 expected_group_name = '{}/{}'.format(parent_group.name, initial_name)
47 expected_group_name = '{}/{}'.format(parent_group_name, 'new-group')
48 initial_name = self._update(user_util, group_name=expected_group_name)
49
44 repo_group = RepoGroupModel()._get_repo_group(expected_group_name)
50 repo_group = RepoGroupModel()._get_repo_group(expected_group_name)
51
45 assert repo_group is not None
52 assert repo_group is not None
46 assert repo_group.group_name == expected_group_name
53 assert repo_group.group_name == expected_group_name
47 assert repo_group.name == initial_name
54 assert repo_group.full_path == expected_group_name
48 assert RepoGroupModel()._get_repo_group(initial_name) is None
55 assert RepoGroupModel()._get_repo_group(initial_name) is None
56
49 new_path = os.path.join(
57 new_path = os.path.join(
50 RepoGroupModel().repos_path, *repo_group.full_path_splitted)
58 RepoGroupModel().repos_path, *repo_group.full_path_splitted)
51 assert os.path.exists(new_path)
59 assert os.path.exists(new_path)
@@ -67,15 +75,47 b' class TestApiUpdateRepoGroup(object):'
67 repo_group = RepoGroupModel()._get_repo_group(initial_name)
75 repo_group = RepoGroupModel()._get_repo_group(initial_name)
68 assert repo_group.user.username == owner
76 assert repo_group.user.username == owner
69
77
70 def test_api_update_repo_group_by_regular_user_no_permission(
78 def test_update_group_name_conflict_with_existing(self, user_util):
71 self, backend):
79 group_1 = user_util.create_repo_group()
72 repo = backend.create_repo()
80 group_2 = user_util.create_repo_group()
73 repo_name = repo.repo_name
81 repo_group_name_1 = group_1.group_name
82 repo_group_name_2 = group_2.group_name
74
83
75 id_, params = build_data(
84 id_, params = build_data(
76 self.apikey_regular, 'update_repo_group', repogroupid=repo_name)
85 self.apikey, 'update_repo_group', repogroupid=repo_group_name_1,
86 group_name=repo_group_name_2)
87 response = api_call(self.app, params)
88 expected = {
89 'unique_repo_group_name':
90 'Repository group with name `{}` already exists'.format(
91 repo_group_name_2)}
92 assert_error(id_, expected, given=response.body)
93
94 def test_api_update_repo_group_by_regular_user_no_permission(self, user_util):
95 temp_user = user_util.create_user()
96 temp_user_api_key = temp_user.api_key
97 parent_group = user_util.create_repo_group()
98 repo_group_name = parent_group.group_name
99 id_, params = build_data(
100 temp_user_api_key, 'update_repo_group', repogroupid=repo_group_name)
77 response = api_call(self.app, params)
101 response = api_call(self.app, params)
78 expected = 'repository group `%s` does not exist' % (repo_name,)
102 expected = 'repository group `%s` does not exist' % (repo_group_name,)
103 assert_error(id_, expected, given=response.body)
104
105 def test_api_update_repo_group_regular_user_no_root_write_permissions(
106 self, user_util):
107 temp_user = user_util.create_user()
108 temp_user_api_key = temp_user.api_key
109 parent_group = user_util.create_repo_group(owner=temp_user.username)
110 repo_group_name = parent_group.group_name
111
112 id_, params = build_data(
113 temp_user_api_key, 'update_repo_group', repogroupid=repo_group_name,
114 group_name='at-root-level')
115 response = api_call(self.app, params)
116 expected = {
117 'repo_group': 'You do not have the permission to store '
118 'repository groups in the root location.'}
79 assert_error(id_, expected, given=response.body)
119 assert_error(id_, expected, given=response.body)
80
120
81 def _update(self, user_util, **kwargs):
121 def _update(self, user_util, **kwargs):
@@ -89,7 +129,10 b' class TestApiUpdateRepoGroup(object):'
89 self.apikey, 'update_repo_group', repogroupid=initial_name,
129 self.apikey, 'update_repo_group', repogroupid=initial_name,
90 **kwargs)
130 **kwargs)
91 response = api_call(self.app, params)
131 response = api_call(self.app, params)
92 ret = {
132
133 repo_group = RepoGroupModel.cls.get(repo_group.group_id)
134
135 expected = {
93 'msg': 'updated repository group ID:{} {}'.format(
136 'msg': 'updated repository group ID:{} {}'.format(
94 repo_group.group_id, repo_group.group_name),
137 repo_group.group_id, repo_group.group_name),
95 'repo_group': {
138 'repo_group': {
@@ -103,5 +146,5 b' class TestApiUpdateRepoGroup(object):'
103 if repo_group.parent_group else None)
146 if repo_group.parent_group else None)
104 }
147 }
105 }
148 }
106 assert_ok(id_, ret, given=response.body)
149 assert_ok(id_, expected, given=response.body)
107 return initial_name
150 return initial_name
@@ -21,19 +21,18 b''
21
21
22 import logging
22 import logging
23
23
24 import colander
24 from rhodecode.api import JSONRPCValidationError
25
25 from rhodecode.api import jsonrpc_method, JSONRPCError
26 from rhodecode.api import jsonrpc_method, JSONRPCError, JSONRPCForbidden
27 from rhodecode.api.utils import (
26 from rhodecode.api.utils import (
28 has_superadmin_permission, Optional, OAttr, get_user_or_error,
27 has_superadmin_permission, Optional, OAttr, get_user_or_error,
29 store_update, get_repo_group_or_error,
28 get_repo_group_or_error, get_perm_or_error, get_user_group_or_error,
30 get_perm_or_error, get_user_group_or_error, get_origin)
29 get_origin, validate_repo_group_permissions, validate_set_owner_permissions)
31 from rhodecode.lib.auth import (
30 from rhodecode.lib.auth import (
32 HasPermissionAnyApi, HasRepoGroupPermissionAnyApi,
31 HasRepoGroupPermissionAnyApi, HasUserGroupPermissionAnyApi)
33 HasUserGroupPermissionAnyApi)
32 from rhodecode.model.db import Session
34 from rhodecode.model.db import Session, RepoGroup
35 from rhodecode.model.repo_group import RepoGroupModel
33 from rhodecode.model.repo_group import RepoGroupModel
36 from rhodecode.model.scm import RepoGroupList
34 from rhodecode.model.scm import RepoGroupList
35 from rhodecode.model import validation_schema
37 from rhodecode.model.validation_schema.schemas import repo_group_schema
36 from rhodecode.model.validation_schema.schemas import repo_group_schema
38
37
39
38
@@ -142,21 +141,24 b' def get_repo_groups(request, apiuser):'
142
141
143
142
144 @jsonrpc_method()
143 @jsonrpc_method()
145 def create_repo_group(request, apiuser, group_name, description=Optional(''),
144 def create_repo_group(
146 owner=Optional(OAttr('apiuser')),
145 request, apiuser, group_name,
147 copy_permissions=Optional(False)):
146 owner=Optional(OAttr('apiuser')),
147 description=Optional(''),
148 copy_permissions=Optional(False)):
148 """
149 """
149 Creates a repository group.
150 Creates a repository group.
150
151
151 * If the repository group name contains "/", all the required repository
152 * If the repository group name contains "/", repository group will be
152 groups will be created.
153 created inside a repository group or nested repository groups
153
154
154 For example "foo/bar/baz" will create |repo| groups "foo" and "bar"
155 For example "foo/bar/group1" will create repository group called "group1"
155 (with "foo" as parent). It will also create the "baz" repository
156 inside group "foo/bar". You have to have permissions to access and
156 with "bar" as |repo| group.
157 write to the last repository group ("bar" in this example)
157
158
158 This command can only be run using an |authtoken| with admin
159 This command can only be run using an |authtoken| with at least
159 permissions.
160 permissions to create repository groups, or admin permissions to
161 parent repository groups.
160
162
161 :param apiuser: This is filled automatically from the |authtoken|.
163 :param apiuser: This is filled automatically from the |authtoken|.
162 :type apiuser: AuthUser
164 :type apiuser: AuthUser
@@ -193,72 +195,64 b' def create_repo_group(request, apiuser, '
193
195
194 """
196 """
195
197
196 schema = repo_group_schema.RepoGroupSchema()
198 owner = validate_set_owner_permissions(apiuser, owner)
197 try:
198 data = schema.deserialize({
199 'group_name': group_name
200 })
201 except colander.Invalid as e:
202 raise JSONRPCError("Validation failed: %s" % (e.asdict(),))
203 group_name = data['group_name']
204
199
205 if isinstance(owner, Optional):
200 description = Optional.extract(description)
206 owner = apiuser.user_id
207
208 group_description = Optional.extract(description)
209 copy_permissions = Optional.extract(copy_permissions)
201 copy_permissions = Optional.extract(copy_permissions)
210
202
211 # get by full name with parents, check if it already exist
203 schema = repo_group_schema.RepoGroupSchema().bind(
212 if RepoGroup.get_by_group_name(group_name):
204 # user caller
213 raise JSONRPCError("repo group `%s` already exist" % (group_name,))
205 user=apiuser)
214
215 (group_name_cleaned,
216 parent_group_name) = RepoGroupModel()._get_group_name_and_parent(
217 group_name)
218
206
219 parent_group = None
207 try:
220 if parent_group_name:
208 schema_data = schema.deserialize(dict(
221 parent_group = get_repo_group_or_error(parent_group_name)
209 repo_group_name=group_name,
210 repo_group_owner=owner.username,
211 repo_group_description=description,
212 repo_group_copy_permissions=copy_permissions,
213 ))
214 except validation_schema.Invalid as err:
215 raise JSONRPCValidationError(colander_exc=err)
222
216
223 if not HasPermissionAnyApi(
217 validated_group_name = schema_data['repo_group_name']
224 'hg.admin', 'hg.repogroup.create.true')(user=apiuser):
225 # check if we have admin permission for this parent repo group !
226 # users without admin or hg.repogroup.create can only create other
227 # groups in groups they own so this is a required, but can be empty
228 parent_group = getattr(parent_group, 'group_name', '')
229 _perms = ('group.admin',)
230 if not HasRepoGroupPermissionAnyApi(*_perms)(
231 user=apiuser, group_name=parent_group):
232 raise JSONRPCForbidden()
233
218
234 try:
219 try:
235 repo_group = RepoGroupModel().create(
220 repo_group = RepoGroupModel().create(
236 group_name=group_name,
237 group_description=group_description,
238 owner=owner,
221 owner=owner,
239 copy_permissions=copy_permissions)
222 group_name=validated_group_name,
223 group_description=schema_data['repo_group_name'],
224 copy_permissions=schema_data['repo_group_copy_permissions'])
240 Session().commit()
225 Session().commit()
241 return {
226 return {
242 'msg': 'Created new repo group `%s`' % group_name,
227 'msg': 'Created new repo group `%s`' % validated_group_name,
243 'repo_group': repo_group.get_api_data()
228 'repo_group': repo_group.get_api_data()
244 }
229 }
245 except Exception:
230 except Exception:
246 log.exception("Exception occurred while trying create repo group")
231 log.exception("Exception occurred while trying create repo group")
247 raise JSONRPCError(
232 raise JSONRPCError(
248 'failed to create repo group `%s`' % (group_name,))
233 'failed to create repo group `%s`' % (validated_group_name,))
249
234
250
235
251 @jsonrpc_method()
236 @jsonrpc_method()
252 def update_repo_group(
237 def update_repo_group(
253 request, apiuser, repogroupid, group_name=Optional(''),
238 request, apiuser, repogroupid, group_name=Optional(''),
254 description=Optional(''), owner=Optional(OAttr('apiuser')),
239 description=Optional(''), owner=Optional(OAttr('apiuser')),
255 parent=Optional(None), enable_locking=Optional(False)):
240 enable_locking=Optional(False)):
256 """
241 """
257 Updates repository group with the details given.
242 Updates repository group with the details given.
258
243
259 This command can only be run using an |authtoken| with admin
244 This command can only be run using an |authtoken| with admin
260 permissions.
245 permissions.
261
246
247 * If the group_name name contains "/", repository group will be updated
248 accordingly with a repository group or nested repository groups
249
250 For example repogroupid=group-test group_name="foo/bar/group-test"
251 will update repository group called "group-test" and place it
252 inside group "foo/bar".
253 You have to have permissions to access and write to the last repository
254 group ("bar" in this example)
255
262 :param apiuser: This is filled automatically from the |authtoken|.
256 :param apiuser: This is filled automatically from the |authtoken|.
263 :type apiuser: AuthUser
257 :type apiuser: AuthUser
264 :param repogroupid: Set the ID of repository group.
258 :param repogroupid: Set the ID of repository group.
@@ -269,29 +263,55 b' def update_repo_group('
269 :type description: str
263 :type description: str
270 :param owner: Set the |repo| group owner.
264 :param owner: Set the |repo| group owner.
271 :type owner: str
265 :type owner: str
272 :param parent: Set the |repo| group parent.
273 :type parent: str or int
274 :param enable_locking: Enable |repo| locking. The default is false.
266 :param enable_locking: Enable |repo| locking. The default is false.
275 :type enable_locking: bool
267 :type enable_locking: bool
276 """
268 """
277
269
278 repo_group = get_repo_group_or_error(repogroupid)
270 repo_group = get_repo_group_or_error(repogroupid)
271
279 if not has_superadmin_permission(apiuser):
272 if not has_superadmin_permission(apiuser):
280 # check if we have admin permission for this repo group !
273 validate_repo_group_permissions(
281 _perms = ('group.admin',)
274 apiuser, repogroupid, repo_group, ('group.admin',))
282 if not HasRepoGroupPermissionAnyApi(*_perms)(
275
283 user=apiuser, group_name=repo_group.group_name):
276 updates = dict(
284 raise JSONRPCError(
277 group_name=group_name
285 'repository group `%s` does not exist' % (repogroupid,))
278 if not isinstance(group_name, Optional) else repo_group.group_name,
279
280 group_description=description
281 if not isinstance(description, Optional) else repo_group.group_description,
282
283 user=owner
284 if not isinstance(owner, Optional) else repo_group.user.username,
285
286 enable_locking=enable_locking
287 if not isinstance(enable_locking, Optional) else repo_group.enable_locking
288 )
286
289
287 updates = {}
290 schema = repo_group_schema.RepoGroupSchema().bind(
291 # user caller
292 user=apiuser,
293 old_values=repo_group.get_api_data())
294
288 try:
295 try:
289 store_update(updates, group_name, 'group_name')
296 schema_data = schema.deserialize(dict(
290 store_update(updates, description, 'group_description')
297 repo_group_name=updates['group_name'],
291 store_update(updates, owner, 'user')
298 repo_group_owner=updates['user'],
292 store_update(updates, parent, 'group_parent_id')
299 repo_group_description=updates['group_description'],
293 store_update(updates, enable_locking, 'enable_locking')
300 repo_group_enable_locking=updates['enable_locking'],
294 repo_group = RepoGroupModel().update(repo_group, updates)
301 ))
302 except validation_schema.Invalid as err:
303 raise JSONRPCValidationError(colander_exc=err)
304
305 validated_updates = dict(
306 group_name=schema_data['repo_group']['repo_group_name_without_group'],
307 group_parent_id=schema_data['repo_group']['repo_group_id'],
308 user=schema_data['repo_group_owner'],
309 group_description=schema_data['repo_group_description'],
310 enable_locking=schema_data['repo_group_enable_locking'],
311 )
312
313 try:
314 RepoGroupModel().update(repo_group, validated_updates)
295 Session().commit()
315 Session().commit()
296 return {
316 return {
297 'msg': 'updated repository group ID:%s %s' % (
317 'msg': 'updated repository group ID:%s %s' % (
@@ -299,7 +319,9 b' def update_repo_group('
299 'repo_group': repo_group.get_api_data()
319 'repo_group': repo_group.get_api_data()
300 }
320 }
301 except Exception:
321 except Exception:
302 log.exception("Exception occurred while trying update repo group")
322 log.exception(
323 u"Exception occurred while trying update repo group %s",
324 repogroupid)
303 raise JSONRPCError('failed to update repository group `%s`'
325 raise JSONRPCError('failed to update repository group `%s`'
304 % (repogroupid,))
326 % (repogroupid,))
305
327
@@ -340,12 +362,9 b' def delete_repo_group(request, apiuser, '
340
362
341 repo_group = get_repo_group_or_error(repogroupid)
363 repo_group = get_repo_group_or_error(repogroupid)
342 if not has_superadmin_permission(apiuser):
364 if not has_superadmin_permission(apiuser):
343 # check if we have admin permission for this repo group !
365 validate_repo_group_permissions(
344 _perms = ('group.admin',)
366 apiuser, repogroupid, repo_group, ('group.admin',))
345 if not HasRepoGroupPermissionAnyApi(*_perms)(
367
346 user=apiuser, group_name=repo_group.group_name):
347 raise JSONRPCError(
348 'repository group `%s` does not exist' % (repogroupid,))
349 try:
368 try:
350 RepoGroupModel().delete(repo_group)
369 RepoGroupModel().delete(repo_group)
351 Session().commit()
370 Session().commit()
@@ -408,12 +427,8 b' def grant_user_permission_to_repo_group('
408 repo_group = get_repo_group_or_error(repogroupid)
427 repo_group = get_repo_group_or_error(repogroupid)
409
428
410 if not has_superadmin_permission(apiuser):
429 if not has_superadmin_permission(apiuser):
411 # check if we have admin permission for this repo group !
430 validate_repo_group_permissions(
412 _perms = ('group.admin',)
431 apiuser, repogroupid, repo_group, ('group.admin',))
413 if not HasRepoGroupPermissionAnyApi(*_perms)(
414 user=apiuser, group_name=repo_group.group_name):
415 raise JSONRPCError(
416 'repository group `%s` does not exist' % (repogroupid,))
417
432
418 user = get_user_or_error(userid)
433 user = get_user_or_error(userid)
419 perm = get_perm_or_error(perm, prefix='group.')
434 perm = get_perm_or_error(perm, prefix='group.')
@@ -487,12 +502,8 b' def revoke_user_permission_from_repo_gro'
487 repo_group = get_repo_group_or_error(repogroupid)
502 repo_group = get_repo_group_or_error(repogroupid)
488
503
489 if not has_superadmin_permission(apiuser):
504 if not has_superadmin_permission(apiuser):
490 # check if we have admin permission for this repo group !
505 validate_repo_group_permissions(
491 _perms = ('group.admin',)
506 apiuser, repogroupid, repo_group, ('group.admin',))
492 if not HasRepoGroupPermissionAnyApi(*_perms)(
493 user=apiuser, group_name=repo_group.group_name):
494 raise JSONRPCError(
495 'repository group `%s` does not exist' % (repogroupid,))
496
507
497 user = get_user_or_error(userid)
508 user = get_user_or_error(userid)
498 apply_to_children = Optional.extract(apply_to_children)
509 apply_to_children = Optional.extract(apply_to_children)
@@ -569,12 +580,8 b' def grant_user_group_permission_to_repo_'
569 perm = get_perm_or_error(perm, prefix='group.')
580 perm = get_perm_or_error(perm, prefix='group.')
570 user_group = get_user_group_or_error(usergroupid)
581 user_group = get_user_group_or_error(usergroupid)
571 if not has_superadmin_permission(apiuser):
582 if not has_superadmin_permission(apiuser):
572 # check if we have admin permission for this repo group !
583 validate_repo_group_permissions(
573 _perms = ('group.admin',)
584 apiuser, repogroupid, repo_group, ('group.admin',))
574 if not HasRepoGroupPermissionAnyApi(*_perms)(
575 user=apiuser, group_name=repo_group.group_name):
576 raise JSONRPCError(
577 'repository group `%s` does not exist' % (repogroupid,))
578
585
579 # check if we have at least read permission for this user group !
586 # check if we have at least read permission for this user group !
580 _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
587 _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
@@ -656,12 +663,8 b' def revoke_user_group_permission_from_re'
656 repo_group = get_repo_group_or_error(repogroupid)
663 repo_group = get_repo_group_or_error(repogroupid)
657 user_group = get_user_group_or_error(usergroupid)
664 user_group = get_user_group_or_error(usergroupid)
658 if not has_superadmin_permission(apiuser):
665 if not has_superadmin_permission(apiuser):
659 # check if we have admin permission for this repo group !
666 validate_repo_group_permissions(
660 _perms = ('group.admin',)
667 apiuser, repogroupid, repo_group, ('group.admin',))
661 if not HasRepoGroupPermissionAnyApi(*_perms)(
662 user=apiuser, group_name=repo_group.group_name):
663 raise JSONRPCError(
664 'repository group `%s` does not exist' % (repogroupid,))
665
668
666 # check if we have at least read permission for this user group !
669 # check if we have at least read permission for this user group !
667 _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
670 _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
@@ -21,9 +21,220 b''
21
21
22 import colander
22 import colander
23
23
24
24 from rhodecode.translation import _
25 from rhodecode.model.validation_schema import validators, preparers, types
25 from rhodecode.model.validation_schema import validators, preparers, types
26
26
27
27
28 def get_group_and_repo(repo_name):
29 from rhodecode.model.repo_group import RepoGroupModel
30 return RepoGroupModel()._get_group_name_and_parent(
31 repo_name, get_object=True)
32
33
34 @colander.deferred
35 def deferred_can_write_to_group_validator(node, kw):
36 old_values = kw.get('old_values') or {}
37 request_user = kw.get('user')
38
39 def can_write_group_validator(node, value):
40 from rhodecode.lib.auth import (
41 HasPermissionAny, HasRepoGroupPermissionAny)
42 from rhodecode.model.repo_group import RepoGroupModel
43
44 messages = {
45 'invalid_parent_repo_group':
46 _(u"Parent repository group `{}` does not exist"),
47 # permissions denied we expose as not existing, to prevent
48 # resource discovery
49 'permission_denied_parent_group':
50 _(u"Parent repository group `{}` does not exist"),
51 'permission_denied_root':
52 _(u"You do not have the permission to store "
53 u"repository groups in the root location.")
54 }
55
56 value = value['repo_group_name']
57 parent_group_name = value
58
59 is_root_location = value is types.RootLocation
60
61 # NOT initialized validators, we must call them
62 can_create_repo_groups_at_root = HasPermissionAny(
63 'hg.admin', 'hg.repogroup.create.true')
64
65 if is_root_location:
66 if can_create_repo_groups_at_root(user=request_user):
67 # we can create repo group inside tool-level. No more checks
68 # are required
69 return
70 else:
71 raise colander.Invalid(node, messages['permission_denied_root'])
72
73 # check if the parent repo group actually exists
74 parent_group = None
75 if parent_group_name:
76 parent_group = RepoGroupModel().get_by_group_name(parent_group_name)
77 if value and not parent_group:
78 raise colander.Invalid(
79 node, messages['invalid_parent_repo_group'].format(
80 parent_group_name))
81
82 # check if we have permissions to create new groups under
83 # parent repo group
84 # create repositories with write permission on group is set to true
85 create_on_write = HasPermissionAny(
86 'hg.create.write_on_repogroup.true')(user=request_user)
87
88 group_admin = HasRepoGroupPermissionAny('group.admin')(
89 parent_group_name, 'can write into group validator', user=request_user)
90 group_write = HasRepoGroupPermissionAny('group.write')(
91 parent_group_name, 'can write into group validator', user=request_user)
92
93 # creation by write access is currently disabled. Needs thinking if
94 # we want to allow this...
95 forbidden = not (group_admin or (group_write and create_on_write and 0))
96
97 if parent_group and forbidden:
98 msg = messages['permission_denied_parent_group'].format(
99 parent_group_name)
100 raise colander.Invalid(node, msg)
101
102 return can_write_group_validator
103
104
105 @colander.deferred
106 def deferred_repo_group_owner_validator(node, kw):
107
108 def repo_owner_validator(node, value):
109 from rhodecode.model.db import User
110 existing = User.get_by_username(value)
111 if not existing:
112 msg = _(u'Repo group owner with id `{}` does not exists').format(
113 value)
114 raise colander.Invalid(node, msg)
115
116 return repo_owner_validator
117
118
119 @colander.deferred
120 def deferred_unique_name_validator(node, kw):
121 request_user = kw.get('user')
122 old_values = kw.get('old_values') or {}
123
124 def unique_name_validator(node, value):
125 from rhodecode.model.db import Repository, RepoGroup
126 name_changed = value != old_values.get('group_name')
127
128 existing = Repository.get_by_repo_name(value)
129 if name_changed and existing:
130 msg = _(u'Repository with name `{}` already exists').format(value)
131 raise colander.Invalid(node, msg)
132
133 existing_group = RepoGroup.get_by_group_name(value)
134 if name_changed and existing_group:
135 msg = _(u'Repository group with name `{}` already exists').format(
136 value)
137 raise colander.Invalid(node, msg)
138 return unique_name_validator
139
140
141 @colander.deferred
142 def deferred_repo_group_name_validator(node, kw):
143 return validators.valid_name_validator
144
145
146 class GroupType(colander.Mapping):
147 def _validate(self, node, value):
148 try:
149 return dict(repo_group_name=value)
150 except Exception as e:
151 raise colander.Invalid(
152 node, '"${val}" is not a mapping type: ${err}'.format(
153 val=value, err=e))
154
155 def deserialize(self, node, cstruct):
156 if cstruct is colander.null:
157 return cstruct
158
159 appstruct = super(GroupType, self).deserialize(node, cstruct)
160 validated_name = appstruct['repo_group_name']
161
162 # inject group based on once deserialized data
163 (repo_group_name_without_group,
164 parent_group_name,
165 parent_group) = get_group_and_repo(validated_name)
166
167 appstruct['repo_group_name_without_group'] = repo_group_name_without_group
168 appstruct['repo_group_name'] = parent_group_name or types.RootLocation
169 if parent_group:
170 appstruct['repo_group_id'] = parent_group.group_id
171
172 return appstruct
173
174
175 class GroupSchema(colander.SchemaNode):
176 schema_type = GroupType
177 validator = deferred_can_write_to_group_validator
178 missing = colander.null
179
180
181 class RepoGroup(GroupSchema):
182 repo_group_name = colander.SchemaNode(
183 types.GroupNameType())
184 repo_group_id = colander.SchemaNode(
185 colander.String(), missing=None)
186 repo_group_name_without_group = colander.SchemaNode(
187 colander.String(), missing=None)
188
189
190 class RepoGroupAccessSchema(colander.MappingSchema):
191 repo_group = RepoGroup()
192
193
194 class RepoGroupNameUniqueSchema(colander.MappingSchema):
195 unique_repo_group_name = colander.SchemaNode(
196 colander.String(),
197 validator=deferred_unique_name_validator)
198
199
28 class RepoGroupSchema(colander.Schema):
200 class RepoGroupSchema(colander.Schema):
29 group_name = colander.SchemaNode(types.GroupNameType())
201
202 repo_group_name = colander.SchemaNode(
203 types.GroupNameType(),
204 validator=deferred_repo_group_name_validator)
205
206 repo_group_owner = colander.SchemaNode(
207 colander.String(),
208 validator=deferred_repo_group_owner_validator)
209
210 repo_group_description = colander.SchemaNode(
211 colander.String(), missing='')
212
213 repo_group_copy_permissions = colander.SchemaNode(
214 types.StringBooleanType(),
215 missing=False)
216
217 repo_group_enable_locking = colander.SchemaNode(
218 types.StringBooleanType(),
219 missing=False)
220
221 def deserialize(self, cstruct):
222 """
223 Custom deserialize that allows to chain validation, and verify
224 permissions, and as last step uniqueness
225 """
226
227 appstruct = super(RepoGroupSchema, self).deserialize(cstruct)
228 validated_name = appstruct['repo_group_name']
229
230 # second pass to validate permissions to repo_group
231 second = RepoGroupAccessSchema().bind(**self.bindings)
232 appstruct_second = second.deserialize({'repo_group': validated_name})
233 # save result
234 appstruct['repo_group'] = appstruct_second['repo_group']
235
236 # thirds to validate uniqueness
237 third = RepoGroupNameUniqueSchema().bind(**self.bindings)
238 third.deserialize({'unique_repo_group_name': validated_name})
239
240 return appstruct
General Comments 0
You need to be logged in to leave comments. Login now