##// END OF EJS Templates
repo-schemas: refactor repository schemas and use it in API update/create functions....
marcink -
r1153:0f1be4aa default
parent child Browse files
Show More
@@ -0,0 +1,128 b''
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2016-2016 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
21 import colander
22 import pytest
23
24 from rhodecode.model.validation_schema import types
25 from rhodecode.model.validation_schema.schemas import repo_schema
26
27
28 class TestRepoSchema(object):
29
30 #TODO:
31 # test nested groups
32
33 @pytest.mark.parametrize('given, expected', [
34 ('my repo', 'my-repo'),
35 (' hello world mike ', 'hello-world-mike'),
36
37 ('//group1/group2//', 'group1/group2'),
38 ('//group1///group2//', 'group1/group2'),
39 ('///group1/group2///group3', 'group1/group2/group3'),
40 ('word g1/group2///group3', 'word-g1/group2/group3'),
41
42 ('grou p1/gro;,,##up2//.../group3', 'grou-p1/group2/group3'),
43
44 ('group,,,/,,,/1/2/3', 'group/1/2/3'),
45 ('grou[]p1/gro;up2///gro up3', 'group1/group2/gro-up3'),
46 (u'grou[]p1/gro;up2///gro up3/Δ…Δ‡', u'group1/group2/gro-up3/Δ…Δ‡'),
47 ])
48 def test_deserialize_repo_name(self, app, user_admin, given, expected):
49
50 schema = repo_schema.RepoSchema().bind()
51 assert expected == schema.get('repo_name').deserialize(given)
52
53 def test_deserialize(self, app, user_admin):
54 schema = repo_schema.RepoSchema().bind(
55 repo_type_options=['hg'],
56 user=user_admin
57 )
58
59 schema_data = schema.deserialize(dict(
60 repo_name='dupa',
61 repo_type='hg',
62 repo_owner=user_admin.username
63 ))
64
65 assert schema_data['repo_name'] == 'dupa'
66 assert schema_data['repo_group'] == {
67 'repo_group_id': None,
68 'repo_group_name': types.RootLocation,
69 'repo_name_without_group': 'dupa'}
70
71 @pytest.mark.parametrize('given, err_key, expected_exc', [
72 ('xxx/dupa','repo_group', 'Repository group `xxx` does not exist'),
73 ('', 'repo_name', 'Name must start with a letter or number. Got ``'),
74 ])
75 def test_deserialize_with_bad_group_name(
76 self, app, user_admin, given, err_key, expected_exc):
77
78 schema = repo_schema.RepoSchema().bind(
79 repo_type_options=['hg'],
80 user=user_admin
81 )
82
83 with pytest.raises(colander.Invalid) as excinfo:
84 schema.deserialize(dict(
85 repo_name=given,
86 repo_type='hg',
87 repo_owner=user_admin.username
88 ))
89
90 assert excinfo.value.asdict()[err_key] == expected_exc
91
92 def test_deserialize_with_group_name(self, app, user_admin, test_repo_group):
93 schema = repo_schema.RepoSchema().bind(
94 repo_type_options=['hg'],
95 user=user_admin
96 )
97
98 full_name = test_repo_group.group_name + '/dupa'
99 schema_data = schema.deserialize(dict(
100 repo_name=full_name,
101 repo_type='hg',
102 repo_owner=user_admin.username
103 ))
104
105 assert schema_data['repo_name'] == full_name
106 assert schema_data['repo_group'] == {
107 'repo_group_id': test_repo_group.group_id,
108 'repo_group_name': test_repo_group.group_name,
109 'repo_name_without_group': 'dupa'}
110
111 def test_deserialize_with_group_name_regular_user_no_perms(
112 self, app, user_regular, test_repo_group):
113 schema = repo_schema.RepoSchema().bind(
114 repo_type_options=['hg'],
115 user=user_regular
116 )
117
118 full_name = test_repo_group.group_name + '/dupa'
119 with pytest.raises(colander.Invalid) as excinfo:
120 schema.deserialize(dict(
121 repo_name=full_name,
122 repo_type='hg',
123 repo_owner=user_regular.username
124 ))
125
126 expected = 'Repository group `{}` does not exist'.format(
127 test_repo_group.group_name)
128 assert excinfo.value.asdict()['repo_group'] == expected
@@ -23,8 +23,11 b' import json'
23 23 import mock
24 24 import pytest
25 25
26 from rhodecode.lib.utils2 import safe_unicode
26 27 from rhodecode.lib.vcs import settings
28 from rhodecode.model.meta import Session
27 29 from rhodecode.model.repo import RepoModel
30 from rhodecode.model.user import UserModel
28 31 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
29 32 from rhodecode.api.tests.utils import (
30 33 build_data, api_call, assert_ok, assert_error, crash)
@@ -36,29 +39,37 b' fixture = Fixture()'
36 39
37 40 @pytest.mark.usefixtures("testuser_api", "app")
38 41 class TestCreateRepo(object):
39 def test_api_create_repo(self, backend):
40 repo_name = 'api-repo-1'
42
43 @pytest.mark.parametrize('given, expected_name, expected_exc', [
44 ('api repo-1', 'api-repo-1', False),
45 ('api-repo 1-Δ…Δ‡', 'api-repo-1-Δ…Δ‡', False),
46 (u'unicode-Δ…Δ‡', u'unicode-Δ…Δ‡', False),
47 ('some repo v1.2', 'some-repo-v1.2', False),
48 ('v2.0', 'v2.0', False),
49 ])
50 def test_api_create_repo(self, backend, given, expected_name, expected_exc):
51
41 52 id_, params = build_data(
42 53 self.apikey,
43 54 'create_repo',
44 repo_name=repo_name,
55 repo_name=given,
45 56 owner=TEST_USER_ADMIN_LOGIN,
46 57 repo_type=backend.alias,
47 58 )
48 59 response = api_call(self.app, params)
49 60
50 repo = RepoModel().get_by_repo_name(repo_name)
51
52 assert repo is not None
53 61 ret = {
54 'msg': 'Created new repository `%s`' % (repo_name,),
62 'msg': 'Created new repository `%s`' % (expected_name,),
55 63 'success': True,
56 64 'task': None,
57 65 }
58 66 expected = ret
59 67 assert_ok(id_, expected, given=response.body)
60 68
61 id_, params = build_data(self.apikey, 'get_repo', repoid=repo_name)
69 repo = RepoModel().get_by_repo_name(safe_unicode(expected_name))
70 assert repo is not None
71
72 id_, params = build_data(self.apikey, 'get_repo', repoid=expected_name)
62 73 response = api_call(self.app, params)
63 74 body = json.loads(response.body)
64 75
@@ -66,7 +77,7 b' class TestCreateRepo(object):'
66 77 assert body['result']['enable_locking'] is False
67 78 assert body['result']['enable_statistics'] is False
68 79
69 fixture.destroy_repo(repo_name)
80 fixture.destroy_repo(safe_unicode(expected_name))
70 81
71 82 def test_api_create_restricted_repo_type(self, backend):
72 83 repo_name = 'api-repo-type-{0}'.format(backend.alias)
@@ -158,6 +169,21 b' class TestCreateRepo(object):'
158 169 fixture.destroy_repo(repo_name)
159 170 fixture.destroy_repo_group(repo_group_name)
160 171
172 def test_create_repo_in_group_that_doesnt_exist(self, backend, user_util):
173 repo_group_name = 'fake_group'
174
175 repo_name = '%s/api-repo-gr' % (repo_group_name,)
176 id_, params = build_data(
177 self.apikey, 'create_repo',
178 repo_name=repo_name,
179 owner=TEST_USER_ADMIN_LOGIN,
180 repo_type=backend.alias,)
181 response = api_call(self.app, params)
182
183 expected = {'repo_group': 'Repository group `{}` does not exist'.format(
184 repo_group_name)}
185 assert_error(id_, expected, given=response.body)
186
161 187 def test_api_create_repo_unknown_owner(self, backend):
162 188 repo_name = 'api-repo-2'
163 189 owner = 'i-dont-exist'
@@ -218,10 +244,48 b' class TestCreateRepo(object):'
218 244 owner=owner)
219 245 response = api_call(self.app, params)
220 246
221 expected = 'Only RhodeCode admin can specify `owner` param'
247 expected = 'Only RhodeCode super-admin can specify `owner` param'
222 248 assert_error(id_, expected, given=response.body)
223 249 fixture.destroy_repo(repo_name)
224 250
251 def test_api_create_repo_by_non_admin_no_parent_group_perms(self, backend):
252 repo_group_name = 'no-access'
253 fixture.create_repo_group(repo_group_name)
254 repo_name = 'no-access/api-repo'
255
256 id_, params = build_data(
257 self.apikey_regular, 'create_repo',
258 repo_name=repo_name,
259 repo_type=backend.alias)
260 response = api_call(self.app, params)
261
262 expected = {'repo_group': 'Repository group `{}` does not exist'.format(
263 repo_group_name)}
264 assert_error(id_, expected, given=response.body)
265 fixture.destroy_repo_group(repo_group_name)
266 fixture.destroy_repo(repo_name)
267
268 def test_api_create_repo_non_admin_no_permission_to_create_to_root_level(
269 self, backend, user_util):
270
271 regular_user = user_util.create_user()
272 regular_user_api_key = regular_user.api_key
273
274 usr = UserModel().get_by_username(regular_user.username)
275 usr.inherit_default_permissions = False
276 Session().add(usr)
277
278 repo_name = backend.new_repo_name()
279 id_, params = build_data(
280 regular_user_api_key, 'create_repo',
281 repo_name=repo_name,
282 repo_type=backend.alias)
283 response = api_call(self.app, params)
284 expected = {
285 "repo_name": "You do not have the permission to "
286 "store repositories in the root location."}
287 assert_error(id_, expected, given=response.body)
288
225 289 def test_api_create_repo_exists(self, backend):
226 290 repo_name = backend.repo_name
227 291 id_, params = build_data(
@@ -230,7 +294,9 b' class TestCreateRepo(object):'
230 294 owner=TEST_USER_ADMIN_LOGIN,
231 295 repo_type=backend.alias,)
232 296 response = api_call(self.app, params)
233 expected = "repo `%s` already exist" % (repo_name,)
297 expected = {
298 'unique_repo_name': 'Repository with name `{}` already exists'.format(
299 repo_name)}
234 300 assert_error(id_, expected, given=response.body)
235 301
236 302 @mock.patch.object(RepoModel, 'create', crash)
@@ -245,26 +311,40 b' class TestCreateRepo(object):'
245 311 expected = 'failed to create repository `%s`' % (repo_name,)
246 312 assert_error(id_, expected, given=response.body)
247 313
248 def test_create_repo_with_extra_slashes_in_name(self, backend, user_util):
249 existing_repo_group = user_util.create_repo_group()
250 dirty_repo_name = '//{}/repo_name//'.format(
251 existing_repo_group.group_name)
252 cleaned_repo_name = '{}/repo_name'.format(
253 existing_repo_group.group_name)
314 @pytest.mark.parametrize('parent_group, dirty_name, expected_name', [
315 (None, 'foo bar x', 'foo-bar-x'),
316 ('foo', '/foo//bar x', 'foo/bar-x'),
317 ('foo-bar', 'foo-bar //bar x', 'foo-bar/bar-x'),
318 ])
319 def test_create_repo_with_extra_slashes_in_name(
320 self, backend, parent_group, dirty_name, expected_name):
321
322 if parent_group:
323 gr = fixture.create_repo_group(parent_group)
324 assert gr.group_name == parent_group
254 325
255 326 id_, params = build_data(
256 327 self.apikey, 'create_repo',
257 repo_name=dirty_repo_name,
328 repo_name=dirty_name,
258 329 repo_type=backend.alias,
259 330 owner=TEST_USER_ADMIN_LOGIN,)
260 331 response = api_call(self.app, params)
261 repo = RepoModel().get_by_repo_name(cleaned_repo_name)
332 expected ={
333 "msg": "Created new repository `{}`".format(expected_name),
334 "task": None,
335 "success": True
336 }
337 assert_ok(id_, expected, response.body)
338
339 repo = RepoModel().get_by_repo_name(expected_name)
262 340 assert repo is not None
263 341
264 342 expected = {
265 'msg': 'Created new repository `%s`' % (cleaned_repo_name,),
343 'msg': 'Created new repository `%s`' % (expected_name,),
266 344 'success': True,
267 345 'task': None,
268 346 }
269 347 assert_ok(id_, expected, given=response.body)
270 fixture.destroy_repo(cleaned_repo_name)
348 fixture.destroy_repo(expected_name)
349 if parent_group:
350 fixture.destroy_repo_group(parent_group)
@@ -24,6 +24,7 b' import pytest'
24 24
25 25 from rhodecode.model.meta import Session
26 26 from rhodecode.model.repo import RepoModel
27 from rhodecode.model.repo_group import RepoGroupModel
27 28 from rhodecode.model.user import UserModel
28 29 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
29 30 from rhodecode.api.tests.utils import (
@@ -99,11 +100,35 b' class TestApiForkRepo(object):'
99 100 finally:
100 101 fixture.destroy_repo(fork_name)
101 102
103 def test_api_fork_repo_non_admin_into_group_no_permission(self, backend, user_util):
104 source_name = backend['minimal'].repo_name
105 repo_group = user_util.create_repo_group()
106 repo_group_name = repo_group.group_name
107 fork_name = '%s/api-repo-fork' % repo_group_name
108
109 id_, params = build_data(
110 self.apikey_regular, 'fork_repo',
111 repoid=source_name,
112 fork_name=fork_name)
113 response = api_call(self.app, params)
114
115 expected = {
116 'repo_group': 'Repository group `{}` does not exist'.format(
117 repo_group_name)}
118 try:
119 assert_error(id_, expected, given=response.body)
120 finally:
121 fixture.destroy_repo(fork_name)
122
102 123 def test_api_fork_repo_non_admin_into_group(self, backend, user_util):
103 124 source_name = backend['minimal'].repo_name
104 125 repo_group = user_util.create_repo_group()
105 126 fork_name = '%s/api-repo-fork' % repo_group.group_name
106 127
128 RepoGroupModel().grant_user_permission(
129 repo_group, self.TEST_USER_LOGIN, 'group.admin')
130 Session().commit()
131
107 132 id_, params = build_data(
108 133 self.apikey_regular, 'fork_repo',
109 134 repoid=source_name,
@@ -129,10 +154,11 b' class TestApiForkRepo(object):'
129 154 fork_name=fork_name,
130 155 owner=TEST_USER_ADMIN_LOGIN)
131 156 response = api_call(self.app, params)
132 expected = 'Only RhodeCode admin can specify `owner` param'
157 expected = 'Only RhodeCode super-admin can specify `owner` param'
133 158 assert_error(id_, expected, given=response.body)
134 159
135 def test_api_fork_repo_non_admin_no_permission_to_fork(self, backend):
160 def test_api_fork_repo_non_admin_no_permission_of_source_repo(
161 self, backend):
136 162 source_name = backend['minimal'].repo_name
137 163 RepoModel().grant_user_permission(repo=source_name,
138 164 user=self.TEST_USER_LOGIN,
@@ -147,19 +173,44 b' class TestApiForkRepo(object):'
147 173 assert_error(id_, expected, given=response.body)
148 174
149 175 def test_api_fork_repo_non_admin_no_permission_to_fork_to_root_level(
150 self, backend):
176 self, backend, user_util):
177
178 regular_user = user_util.create_user()
179 regular_user_api_key = regular_user.api_key
180 usr = UserModel().get_by_username(regular_user.username)
181 usr.inherit_default_permissions = False
182 Session().add(usr)
183 UserModel().grant_perm(regular_user.username, 'hg.fork.repository')
184
151 185 source_name = backend['minimal'].repo_name
186 fork_name = backend.new_repo_name()
187 id_, params = build_data(
188 regular_user_api_key, 'fork_repo',
189 repoid=source_name,
190 fork_name=fork_name)
191 response = api_call(self.app, params)
192 expected = {
193 "repo_name": "You do not have the permission to "
194 "store repositories in the root location."}
195 assert_error(id_, expected, given=response.body)
152 196
153 usr = UserModel().get_by_username(self.TEST_USER_LOGIN)
197 def test_api_fork_repo_non_admin_no_permission_to_fork(
198 self, backend, user_util):
199
200 regular_user = user_util.create_user()
201 regular_user_api_key = regular_user.api_key
202 usr = UserModel().get_by_username(regular_user.username)
154 203 usr.inherit_default_permissions = False
155 204 Session().add(usr)
156 205
206 source_name = backend['minimal'].repo_name
157 207 fork_name = backend.new_repo_name()
158 208 id_, params = build_data(
159 self.apikey_regular, 'fork_repo',
209 regular_user_api_key, 'fork_repo',
160 210 repoid=source_name,
161 211 fork_name=fork_name)
162 212 response = api_call(self.app, params)
213
163 214 expected = "Access was denied to this resource."
164 215 assert_error(id_, expected, given=response.body)
165 216
@@ -189,7 +240,9 b' class TestApiForkRepo(object):'
189 240 response = api_call(self.app, params)
190 241
191 242 try:
192 expected = "fork `%s` already exist" % (fork_name,)
243 expected = {
244 'unique_repo_name': 'Repository with name `{}` already exists'.format(
245 fork_name)}
193 246 assert_error(id_, expected, given=response.body)
194 247 finally:
195 248 fixture.destroy_repo(fork_repo.repo_name)
@@ -205,7 +258,9 b' class TestApiForkRepo(object):'
205 258 owner=TEST_USER_ADMIN_LOGIN)
206 259 response = api_call(self.app, params)
207 260
208 expected = "repo `%s` already exist" % (fork_name,)
261 expected = {
262 'unique_repo_name': 'Repository with name `{}` already exists'.format(
263 fork_name)}
209 264 assert_error(id_, expected, given=response.body)
210 265
211 266 @mock.patch.object(RepoModel, 'create_fork', crash)
@@ -32,35 +32,60 b' fixture = Fixture()'
32 32
33 33 UPDATE_REPO_NAME = 'api_update_me'
34 34
35 class SAME_AS_UPDATES(object): """ Constant used for tests below """
35
36 class SAME_AS_UPDATES(object):
37 """ Constant used for tests below """
38
36 39
37 40 @pytest.mark.usefixtures("testuser_api", "app")
38 41 class TestApiUpdateRepo(object):
39 42
40 43 @pytest.mark.parametrize("updates, expected", [
41 ({'owner': TEST_USER_REGULAR_LOGIN}, SAME_AS_UPDATES),
42 ({'description': 'new description'}, SAME_AS_UPDATES),
43 ({'clone_uri': 'http://foo.com/repo'}, SAME_AS_UPDATES),
44 ({'clone_uri': None}, {'clone_uri': ''}),
45 ({'clone_uri': ''}, {'clone_uri': ''}),
46 ({'landing_rev': 'branch:master'}, {'landing_rev': ['branch','master']}),
47 ({'enable_statistics': True}, SAME_AS_UPDATES),
48 ({'enable_locking': True}, SAME_AS_UPDATES),
49 ({'enable_downloads': True}, SAME_AS_UPDATES),
50 ({'name': 'new_repo_name'}, {
44 ({'owner': TEST_USER_REGULAR_LOGIN},
45 SAME_AS_UPDATES),
46
47 ({'description': 'new description'},
48 SAME_AS_UPDATES),
49
50 ({'clone_uri': 'http://foo.com/repo'},
51 SAME_AS_UPDATES),
52
53 ({'clone_uri': None},
54 {'clone_uri': ''}),
55
56 ({'clone_uri': ''},
57 {'clone_uri': ''}),
58
59 ({'landing_rev': 'rev:tip'},
60 {'landing_rev': ['rev', 'tip']}),
61
62 ({'enable_statistics': True},
63 SAME_AS_UPDATES),
64
65 ({'enable_locking': True},
66 SAME_AS_UPDATES),
67
68 ({'enable_downloads': True},
69 SAME_AS_UPDATES),
70
71 ({'repo_name': 'new_repo_name'},
72 {
51 73 'repo_name': 'new_repo_name',
52 'url': 'http://test.example.com:80/new_repo_name',
53 }),
54 ({'group': 'test_group_for_update'}, {
55 'repo_name': 'test_group_for_update/%s' % UPDATE_REPO_NAME,
56 'url': 'http://test.example.com:80/test_group_for_update/%s' % UPDATE_REPO_NAME
57 }),
74 'url': 'http://test.example.com:80/new_repo_name'
75 }),
76
77 ({'repo_name': 'test_group_for_update/{}'.format(UPDATE_REPO_NAME),
78 '_group': 'test_group_for_update'},
79 {
80 'repo_name': 'test_group_for_update/{}'.format(UPDATE_REPO_NAME),
81 'url': 'http://test.example.com:80/test_group_for_update/{}'.format(UPDATE_REPO_NAME)
82 }),
58 83 ])
59 84 def test_api_update_repo(self, updates, expected, backend):
60 85 repo_name = UPDATE_REPO_NAME
61 86 repo = fixture.create_repo(repo_name, repo_type=backend.alias)
62 if updates.get('group'):
63 fixture.create_repo_group(updates['group'])
87 if updates.get('_group'):
88 fixture.create_repo_group(updates['_group'])
64 89
65 90 expected_api_data = repo.get_api_data(include_secrets=True)
66 91 if expected is SAME_AS_UPDATES:
@@ -68,15 +93,12 b' class TestApiUpdateRepo(object):'
68 93 else:
69 94 expected_api_data.update(expected)
70 95
71
72 96 id_, params = build_data(
73 97 self.apikey, 'update_repo', repoid=repo_name, **updates)
74 98 response = api_call(self.app, params)
75 99
76 if updates.get('name'):
77 repo_name = updates['name']
78 if updates.get('group'):
79 repo_name = '/'.join([updates['group'], repo_name])
100 if updates.get('repo_name'):
101 repo_name = updates['repo_name']
80 102
81 103 try:
82 104 expected = {
@@ -86,8 +108,8 b' class TestApiUpdateRepo(object):'
86 108 assert_ok(id_, expected, given=response.body)
87 109 finally:
88 110 fixture.destroy_repo(repo_name)
89 if updates.get('group'):
90 fixture.destroy_repo_group(updates['group'])
111 if updates.get('_group'):
112 fixture.destroy_repo_group(updates['_group'])
91 113
92 114 def test_api_update_repo_fork_of_field(self, backend):
93 115 master_repo = backend.create_repo()
@@ -118,19 +140,23 b' class TestApiUpdateRepo(object):'
118 140 id_, params = build_data(
119 141 self.apikey, 'update_repo', repoid=repo.repo_name, **updates)
120 142 response = api_call(self.app, params)
121 expected = 'repository `{}` does not exist'.format(master_repo_name)
143 expected = {
144 'repo_fork_of': 'Fork with id `{}` does not exists'.format(
145 master_repo_name)}
122 146 assert_error(id_, expected, given=response.body)
123 147
124 148 def test_api_update_repo_with_repo_group_not_existing(self):
125 149 repo_name = 'admin_owned'
150 fake_repo_group = 'test_group_for_update'
126 151 fixture.create_repo(repo_name)
127 updates = {'group': 'test_group_for_update'}
152 updates = {'repo_name': '{}/{}'.format(fake_repo_group, repo_name)}
128 153 id_, params = build_data(
129 154 self.apikey, 'update_repo', repoid=repo_name, **updates)
130 155 response = api_call(self.app, params)
131 156 try:
132 expected = 'repository group `%s` does not exist' % (
133 updates['group'],)
157 expected = {
158 'repo_group': 'Repository group `{}` does not exist'.format(fake_repo_group)
159 }
134 160 assert_error(id_, expected, given=response.body)
135 161 finally:
136 162 fixture.destroy_repo(repo_name)
@@ -21,29 +21,26 b''
21 21 import logging
22 22 import time
23 23
24 import colander
25
26 from rhodecode import BACKENDS
27 from rhodecode.api import jsonrpc_method, JSONRPCError, JSONRPCForbidden, json
24 import rhodecode
25 from rhodecode.api import (
26 jsonrpc_method, JSONRPCError, JSONRPCForbidden, JSONRPCValidationError)
28 27 from rhodecode.api.utils import (
29 28 has_superadmin_permission, Optional, OAttr, get_repo_or_error,
30 get_user_group_or_error, get_user_or_error, has_repo_permissions,
31 get_perm_or_error, store_update, get_repo_group_or_error, parse_args,
32 get_origin, build_commit_data)
33 from rhodecode.lib.auth import (
34 HasPermissionAnyApi, HasRepoGroupPermissionAnyApi,
35 HasUserGroupPermissionAnyApi)
29 get_user_group_or_error, get_user_or_error, validate_repo_permissions,
30 get_perm_or_error, parse_args, get_origin, build_commit_data,
31 validate_set_owner_permissions)
32 from rhodecode.lib.auth import HasPermissionAnyApi, HasUserGroupPermissionAnyApi
36 33 from rhodecode.lib.exceptions import StatusChangeOnClosedPullRequestError
37 from rhodecode.lib.utils import map_groups
38 34 from rhodecode.lib.utils2 import str2bool, time_to_datetime
35 from rhodecode.lib.ext_json import json
39 36 from rhodecode.model.changeset_status import ChangesetStatusModel
40 37 from rhodecode.model.comment import ChangesetCommentsModel
41 38 from rhodecode.model.db import (
42 39 Session, ChangesetStatus, RepositoryField, Repository)
43 40 from rhodecode.model.repo import RepoModel
44 from rhodecode.model.repo_group import RepoGroupModel
45 41 from rhodecode.model.scm import ScmModel, RepoList
46 42 from rhodecode.model.settings import SettingsModel, VcsSettingsModel
43 from rhodecode.model import validation_schema
47 44 from rhodecode.model.validation_schema.schemas import repo_schema
48 45
49 46 log = logging.getLogger(__name__)
@@ -177,6 +174,7 b' def get_repo(request, apiuser, repoid, c'
177 174
178 175 repo = get_repo_or_error(repoid)
179 176 cache = Optional.extract(cache)
177
180 178 include_secrets = False
181 179 if has_superadmin_permission(apiuser):
182 180 include_secrets = True
@@ -184,7 +182,7 b' def get_repo(request, apiuser, repoid, c'
184 182 # check if we have at least read permission for this repo !
185 183 _perms = (
186 184 'repository.admin', 'repository.write', 'repository.read',)
187 has_repo_permissions(apiuser, repoid, repo, _perms)
185 validate_repo_permissions(apiuser, repoid, repo, _perms)
188 186
189 187 permissions = []
190 188 for _user in repo.permissions():
@@ -292,7 +290,7 b' def get_repo_changeset(request, apiuser,'
292 290 if not has_superadmin_permission(apiuser):
293 291 _perms = (
294 292 'repository.admin', 'repository.write', 'repository.read',)
295 has_repo_permissions(apiuser, repoid, repo, _perms)
293 validate_repo_permissions(apiuser, repoid, repo, _perms)
296 294
297 295 changes_details = Optional.extract(details)
298 296 _changes_details_types = ['basic', 'extended', 'full']
@@ -355,7 +353,7 b' def get_repo_changesets(request, apiuser'
355 353 if not has_superadmin_permission(apiuser):
356 354 _perms = (
357 355 'repository.admin', 'repository.write', 'repository.read',)
358 has_repo_permissions(apiuser, repoid, repo, _perms)
356 validate_repo_permissions(apiuser, repoid, repo, _perms)
359 357
360 358 changes_details = Optional.extract(details)
361 359 _changes_details_types = ['basic', 'extended', 'full']
@@ -450,7 +448,7 b' def get_repo_nodes(request, apiuser, rep'
450 448 if not has_superadmin_permission(apiuser):
451 449 _perms = (
452 450 'repository.admin', 'repository.write', 'repository.read',)
453 has_repo_permissions(apiuser, repoid, repo, _perms)
451 validate_repo_permissions(apiuser, repoid, repo, _perms)
454 452
455 453 ret_type = Optional.extract(ret_type)
456 454 details = Optional.extract(details)
@@ -523,7 +521,7 b' def get_repo_refs(request, apiuser, repo'
523 521 repo = get_repo_or_error(repoid)
524 522 if not has_superadmin_permission(apiuser):
525 523 _perms = ('repository.admin', 'repository.write', 'repository.read',)
526 has_repo_permissions(apiuser, repoid, repo, _perms)
524 validate_repo_permissions(apiuser, repoid, repo, _perms)
527 525
528 526 try:
529 527 # check if repo is not empty by any chance, skip quicker if it is.
@@ -538,26 +536,30 b' def get_repo_refs(request, apiuser, repo'
538 536
539 537
540 538 @jsonrpc_method()
541 def create_repo(request, apiuser, repo_name, repo_type,
542 owner=Optional(OAttr('apiuser')), description=Optional(''),
543 private=Optional(False), clone_uri=Optional(None),
544 landing_rev=Optional('rev:tip'),
545 enable_statistics=Optional(False),
546 enable_locking=Optional(False),
547 enable_downloads=Optional(False),
548 copy_permissions=Optional(False)):
539 def create_repo(
540 request, apiuser, repo_name, repo_type,
541 owner=Optional(OAttr('apiuser')),
542 description=Optional(''),
543 private=Optional(False),
544 clone_uri=Optional(None),
545 landing_rev=Optional('rev:tip'),
546 enable_statistics=Optional(False),
547 enable_locking=Optional(False),
548 enable_downloads=Optional(False),
549 copy_permissions=Optional(False)):
549 550 """
550 551 Creates a repository.
551 552
552 * If the repository name contains "/", all the required repository
553 groups will be created.
553 * If the repository name contains "/", repository will be created inside
554 a repository group or nested repository groups
554 555
555 For example "foo/bar/baz" will create |repo| groups "foo" and "bar"
556 (with "foo" as parent). It will also create the "baz" repository
557 with "bar" as |repo| group.
556 For example "foo/bar/repo1" will create |repo| called "repo1" inside
557 group "foo/bar". You have to have permissions to access and write to
558 the last repository group ("bar" in this example)
558 559
559 560 This command can only be run using an |authtoken| with at least
560 write permissions to the |repo|.
561 permissions to create repositories, or write permissions to
562 parent repository groups.
561 563
562 564 :param apiuser: This is filled automatically from the |authtoken|.
563 565 :type apiuser: AuthUser
@@ -569,9 +571,9 b' def create_repo(request, apiuser, repo_n'
569 571 :type owner: Optional(str)
570 572 :param description: Set the repository description.
571 573 :type description: Optional(str)
572 :param private:
574 :param private: set repository as private
573 575 :type private: bool
574 :param clone_uri:
576 :param clone_uri: set clone_uri
575 577 :type clone_uri: str
576 578 :param landing_rev: <rev_type>:<rev>
577 579 :type landing_rev: str
@@ -610,49 +612,13 b' def create_repo(request, apiuser, repo_n'
610 612 }
611 613
612 614 """
613 schema = repo_schema.RepoSchema()
614 try:
615 data = schema.deserialize({
616 'repo_name': repo_name
617 })
618 except colander.Invalid as e:
619 raise JSONRPCError("Validation failed: %s" % (e.asdict(),))
620 repo_name = data['repo_name']
621 615
622 (repo_name_cleaned,
623 parent_group_name) = RepoGroupModel()._get_group_name_and_parent(
624 repo_name)
625
626 if not HasPermissionAnyApi(
627 'hg.admin', 'hg.create.repository')(user=apiuser):
628 # check if we have admin permission for this repo group if given !
629
630 if parent_group_name:
631 repogroupid = parent_group_name
632 repo_group = get_repo_group_or_error(parent_group_name)
616 owner = validate_set_owner_permissions(apiuser, owner)
633 617
634 _perms = ('group.admin',)
635 if not HasRepoGroupPermissionAnyApi(*_perms)(
636 user=apiuser, group_name=repo_group.group_name):
637 raise JSONRPCError(
638 'repository group `%s` does not exist' % (
639 repogroupid,))
640 else:
641 raise JSONRPCForbidden()
642
643 if not has_superadmin_permission(apiuser):
644 if not isinstance(owner, Optional):
645 # forbid setting owner for non-admins
646 raise JSONRPCError(
647 'Only RhodeCode admin can specify `owner` param')
648
649 if isinstance(owner, Optional):
650 owner = apiuser.user_id
651
652 owner = get_user_or_error(owner)
653
654 if RepoModel().get_by_repo_name(repo_name):
655 raise JSONRPCError("repo `%s` already exist" % repo_name)
618 description = Optional.extract(description)
619 copy_permissions = Optional.extract(copy_permissions)
620 clone_uri = Optional.extract(clone_uri)
621 landing_commit_ref = Optional.extract(landing_rev)
656 622
657 623 defs = SettingsModel().get_default_repo_settings(strip_prefix=True)
658 624 if isinstance(private, Optional):
@@ -666,32 +632,44 b' def create_repo(request, apiuser, repo_n'
666 632 if isinstance(enable_downloads, Optional):
667 633 enable_downloads = defs.get('repo_enable_downloads')
668 634
669 clone_uri = Optional.extract(clone_uri)
670 description = Optional.extract(description)
671 landing_rev = Optional.extract(landing_rev)
672 copy_permissions = Optional.extract(copy_permissions)
635 schema = repo_schema.RepoSchema().bind(
636 repo_type_options=rhodecode.BACKENDS.keys(),
637 # user caller
638 user=apiuser)
673 639
674 640 try:
675 # create structure of groups and return the last group
676 repo_group = map_groups(repo_name)
641 schema_data = schema.deserialize(dict(
642 repo_name=repo_name,
643 repo_type=repo_type,
644 repo_owner=owner.username,
645 repo_description=description,
646 repo_landing_commit_ref=landing_commit_ref,
647 repo_clone_uri=clone_uri,
648 repo_private=private,
649 repo_copy_permissions=copy_permissions,
650 repo_enable_statistics=enable_statistics,
651 repo_enable_downloads=enable_downloads,
652 repo_enable_locking=enable_locking))
653 except validation_schema.Invalid as err:
654 raise JSONRPCValidationError(colander_exc=err)
655
656 try:
677 657 data = {
678 'repo_name': repo_name_cleaned,
679 'repo_name_full': repo_name,
680 'repo_type': repo_type,
681 'repo_description': description,
682 658 'owner': owner,
683 'repo_private': private,
684 'clone_uri': clone_uri,
685 'repo_group': repo_group.group_id if repo_group else None,
686 'repo_landing_rev': landing_rev,
687 'enable_statistics': enable_statistics,
688 'enable_locking': enable_locking,
689 'enable_downloads': enable_downloads,
690 'repo_copy_permissions': copy_permissions,
659 'repo_name': schema_data['repo_group']['repo_name_without_group'],
660 'repo_name_full': schema_data['repo_name'],
661 'repo_group': schema_data['repo_group']['repo_group_id'],
662 'repo_type': schema_data['repo_type'],
663 'repo_description': schema_data['repo_description'],
664 'repo_private': schema_data['repo_private'],
665 'clone_uri': schema_data['repo_clone_uri'],
666 'repo_landing_rev': schema_data['repo_landing_commit_ref'],
667 'enable_statistics': schema_data['repo_enable_statistics'],
668 'enable_locking': schema_data['repo_enable_locking'],
669 'enable_downloads': schema_data['repo_enable_downloads'],
670 'repo_copy_permissions': schema_data['repo_copy_permissions'],
691 671 }
692 672
693 if repo_type not in BACKENDS.keys():
694 raise Exception("Invalid backend type %s" % repo_type)
695 673 task = RepoModel().create(form_data=data, cur_user=owner)
696 674 from celery.result import BaseAsyncResult
697 675 task_id = None
@@ -699,17 +677,17 b' def create_repo(request, apiuser, repo_n'
699 677 task_id = task.task_id
700 678 # no commit, it's done in RepoModel, or async via celery
701 679 return {
702 'msg': "Created new repository `%s`" % (repo_name,),
680 'msg': "Created new repository `%s`" % (schema_data['repo_name'],),
703 681 'success': True, # cannot return the repo data here since fork
704 # cann be done async
682 # can be done async
705 683 'task': task_id
706 684 }
707 685 except Exception:
708 686 log.exception(
709 687 u"Exception while trying to create the repository %s",
710 repo_name)
688 schema_data['repo_name'])
711 689 raise JSONRPCError(
712 'failed to create repository `%s`' % (repo_name,))
690 'failed to create repository `%s`' % (schema_data['repo_name'],))
713 691
714 692
715 693 @jsonrpc_method()
@@ -735,7 +713,7 b' def add_field_to_repo(request, apiuser, '
735 713 repo = get_repo_or_error(repoid)
736 714 if not has_superadmin_permission(apiuser):
737 715 _perms = ('repository.admin',)
738 has_repo_permissions(apiuser, repoid, repo, _perms)
716 validate_repo_permissions(apiuser, repoid, repo, _perms)
739 717
740 718 label = Optional.extract(label) or key
741 719 description = Optional.extract(description)
@@ -778,7 +756,7 b' def remove_field_from_repo(request, apiu'
778 756 repo = get_repo_or_error(repoid)
779 757 if not has_superadmin_permission(apiuser):
780 758 _perms = ('repository.admin',)
781 has_repo_permissions(apiuser, repoid, repo, _perms)
759 validate_repo_permissions(apiuser, repoid, repo, _perms)
782 760
783 761 field = RepositoryField.get_by_key_name(key, repo)
784 762 if not field:
@@ -800,33 +778,38 b' def remove_field_from_repo(request, apiu'
800 778
801 779
802 780 @jsonrpc_method()
803 def update_repo(request, apiuser, repoid, name=Optional(None),
804 owner=Optional(OAttr('apiuser')),
805 group=Optional(None),
806 fork_of=Optional(None),
807 description=Optional(''), private=Optional(False),
808 clone_uri=Optional(None), landing_rev=Optional('rev:tip'),
809 enable_statistics=Optional(False),
810 enable_locking=Optional(False),
811 enable_downloads=Optional(False),
812 fields=Optional('')):
781 def update_repo(
782 request, apiuser, repoid, repo_name=Optional(None),
783 owner=Optional(OAttr('apiuser')), description=Optional(''),
784 private=Optional(False), clone_uri=Optional(None),
785 landing_rev=Optional('rev:tip'), fork_of=Optional(None),
786 enable_statistics=Optional(False),
787 enable_locking=Optional(False),
788 enable_downloads=Optional(False), fields=Optional('')):
813 789 """
814 790 Updates a repository with the given information.
815 791
816 792 This command can only be run using an |authtoken| with at least
817 write permissions to the |repo|.
793 admin permissions to the |repo|.
794
795 * If the repository name contains "/", repository will be updated
796 accordingly with a repository group or nested repository groups
797
798 For example repoid=repo-test name="foo/bar/repo-test" will update |repo|
799 called "repo-test" and place it inside group "foo/bar".
800 You have to have permissions to access and write to the last repository
801 group ("bar" in this example)
818 802
819 803 :param apiuser: This is filled automatically from the |authtoken|.
820 804 :type apiuser: AuthUser
821 805 :param repoid: repository name or repository ID.
822 806 :type repoid: str or int
823 :param name: Update the |repo| name.
824 :type name: str
807 :param repo_name: Update the |repo| name, including the
808 repository group it's in.
809 :type repo_name: str
825 810 :param owner: Set the |repo| owner.
826 811 :type owner: str
827 :param group: Set the |repo| group the |repo| belongs to.
828 :type group: str
829 :param fork_of: Set the master |repo| name.
812 :param fork_of: Set the |repo| as fork of another |repo|.
830 813 :type fork_of: str
831 814 :param description: Update the |repo| description.
832 815 :type description: str
@@ -834,69 +817,115 b' def update_repo(request, apiuser, repoid'
834 817 :type private: bool
835 818 :param clone_uri: Update the |repo| clone URI.
836 819 :type clone_uri: str
837 :param landing_rev: Set the |repo| landing revision. Default is
838 ``tip``.
820 :param landing_rev: Set the |repo| landing revision. Default is ``rev:tip``.
839 821 :type landing_rev: str
840 :param enable_statistics: Enable statistics on the |repo|,
841 (True | False).
822 :param enable_statistics: Enable statistics on the |repo|, (True | False).
842 823 :type enable_statistics: bool
843 824 :param enable_locking: Enable |repo| locking.
844 825 :type enable_locking: bool
845 :param enable_downloads: Enable downloads from the |repo|,
846 (True | False).
826 :param enable_downloads: Enable downloads from the |repo|, (True | False).
847 827 :type enable_downloads: bool
848 828 :param fields: Add extra fields to the |repo|. Use the following
849 829 example format: ``field_key=field_val,field_key2=fieldval2``.
850 830 Escape ', ' with \,
851 831 :type fields: str
852 832 """
833
853 834 repo = get_repo_or_error(repoid)
835
854 836 include_secrets = False
855 if has_superadmin_permission(apiuser):
837 if not has_superadmin_permission(apiuser):
838 validate_repo_permissions(apiuser, repoid, repo, ('repository.admin',))
839 else:
856 840 include_secrets = True
857 else:
858 _perms = ('repository.admin',)
859 has_repo_permissions(apiuser, repoid, repo, _perms)
841
842 updates = dict(
843 repo_name=repo_name
844 if not isinstance(repo_name, Optional) else repo.repo_name,
845
846 fork_id=fork_of
847 if not isinstance(fork_of, Optional) else repo.fork.repo_name if repo.fork else None,
848
849 user=owner
850 if not isinstance(owner, Optional) else repo.user.username,
851
852 repo_description=description
853 if not isinstance(description, Optional) else repo.description,
854
855 repo_private=private
856 if not isinstance(private, Optional) else repo.private,
857
858 clone_uri=clone_uri
859 if not isinstance(clone_uri, Optional) else repo.clone_uri,
860
861 repo_landing_rev=landing_rev
862 if not isinstance(landing_rev, Optional) else repo._landing_revision,
863
864 repo_enable_statistics=enable_statistics
865 if not isinstance(enable_statistics, Optional) else repo.enable_statistics,
866
867 repo_enable_locking=enable_locking
868 if not isinstance(enable_locking, Optional) else repo.enable_locking,
869
870 repo_enable_downloads=enable_downloads
871 if not isinstance(enable_downloads, Optional) else repo.enable_downloads)
872
873 ref_choices, _labels = ScmModel().get_repo_landing_revs(repo=repo)
860 874
861 updates = {
862 # update function requires this.
863 'repo_name': repo.just_name
864 }
865 repo_group = group
866 if not isinstance(repo_group, Optional):
867 repo_group = get_repo_group_or_error(repo_group)
868 repo_group = repo_group.group_id
875 schema = repo_schema.RepoSchema().bind(
876 repo_type_options=rhodecode.BACKENDS.keys(),
877 repo_ref_options=ref_choices,
878 # user caller
879 user=apiuser,
880 old_values=repo.get_api_data())
881 try:
882 schema_data = schema.deserialize(dict(
883 # we save old value, users cannot change type
884 repo_type=repo.repo_type,
885
886 repo_name=updates['repo_name'],
887 repo_owner=updates['user'],
888 repo_description=updates['repo_description'],
889 repo_clone_uri=updates['clone_uri'],
890 repo_fork_of=updates['fork_id'],
891 repo_private=updates['repo_private'],
892 repo_landing_commit_ref=updates['repo_landing_rev'],
893 repo_enable_statistics=updates['repo_enable_statistics'],
894 repo_enable_downloads=updates['repo_enable_downloads'],
895 repo_enable_locking=updates['repo_enable_locking']))
896 except validation_schema.Invalid as err:
897 raise JSONRPCValidationError(colander_exc=err)
869 898
870 repo_fork_of = fork_of
871 if not isinstance(repo_fork_of, Optional):
872 repo_fork_of = get_repo_or_error(repo_fork_of)
873 repo_fork_of = repo_fork_of.repo_id
899 # save validated data back into the updates dict
900 validated_updates = dict(
901 repo_name=schema_data['repo_group']['repo_name_without_group'],
902 repo_group=schema_data['repo_group']['repo_group_id'],
903
904 user=schema_data['repo_owner'],
905 repo_description=schema_data['repo_description'],
906 repo_private=schema_data['repo_private'],
907 clone_uri=schema_data['repo_clone_uri'],
908 repo_landing_rev=schema_data['repo_landing_commit_ref'],
909 repo_enable_statistics=schema_data['repo_enable_statistics'],
910 repo_enable_locking=schema_data['repo_enable_locking'],
911 repo_enable_downloads=schema_data['repo_enable_downloads'],
912 )
913
914 if schema_data['repo_fork_of']:
915 fork_repo = get_repo_or_error(schema_data['repo_fork_of'])
916 validated_updates['fork_id'] = fork_repo.repo_id
917
918 # extra fields
919 fields = parse_args(Optional.extract(fields), key_prefix='ex_')
920 if fields:
921 validated_updates.update(fields)
874 922
875 923 try:
876 store_update(updates, name, 'repo_name')
877 store_update(updates, repo_group, 'repo_group')
878 store_update(updates, repo_fork_of, 'fork_id')
879 store_update(updates, owner, 'user')
880 store_update(updates, description, 'repo_description')
881 store_update(updates, private, 'repo_private')
882 store_update(updates, clone_uri, 'clone_uri')
883 store_update(updates, landing_rev, 'repo_landing_rev')
884 store_update(updates, enable_statistics, 'repo_enable_statistics')
885 store_update(updates, enable_locking, 'repo_enable_locking')
886 store_update(updates, enable_downloads, 'repo_enable_downloads')
887
888 # extra fields
889 fields = parse_args(Optional.extract(fields), key_prefix='ex_')
890 if fields:
891 updates.update(fields)
892
893 RepoModel().update(repo, **updates)
924 RepoModel().update(repo, **validated_updates)
894 925 Session().commit()
895 926 return {
896 'msg': 'updated repo ID:%s %s' % (
897 repo.repo_id, repo.repo_name),
898 'repository': repo.get_api_data(
899 include_secrets=include_secrets)
927 'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo.repo_name),
928 'repository': repo.get_api_data(include_secrets=include_secrets)
900 929 }
901 930 except Exception:
902 931 log.exception(
@@ -908,26 +937,33 b' def update_repo(request, apiuser, repoid'
908 937 @jsonrpc_method()
909 938 def fork_repo(request, apiuser, repoid, fork_name,
910 939 owner=Optional(OAttr('apiuser')),
911 description=Optional(''), copy_permissions=Optional(False),
912 private=Optional(False), landing_rev=Optional('rev:tip')):
940 description=Optional(''),
941 private=Optional(False),
942 clone_uri=Optional(None),
943 landing_rev=Optional('rev:tip'),
944 copy_permissions=Optional(False)):
913 945 """
914 946 Creates a fork of the specified |repo|.
915 947
916 * If using |RCE| with Celery this will immediately return a success
917 message, even though the fork will be created asynchronously.
948 * If the fork_name contains "/", fork will be created inside
949 a repository group or nested repository groups
918 950
919 This command can only be run using an |authtoken| with fork
920 permissions on the |repo|.
951 For example "foo/bar/fork-repo" will create fork called "fork-repo"
952 inside group "foo/bar". You have to have permissions to access and
953 write to the last repository group ("bar" in this example)
954
955 This command can only be run using an |authtoken| with minimum
956 read permissions of the forked repo, create fork permissions for an user.
921 957
922 958 :param apiuser: This is filled automatically from the |authtoken|.
923 959 :type apiuser: AuthUser
924 960 :param repoid: Set repository name or repository ID.
925 961 :type repoid: str or int
926 :param fork_name: Set the fork name.
962 :param fork_name: Set the fork name, including it's repository group membership.
927 963 :type fork_name: str
928 964 :param owner: Set the fork owner.
929 965 :type owner: str
930 :param description: Set the fork descripton.
966 :param description: Set the fork description.
931 967 :type description: str
932 968 :param copy_permissions: Copy permissions from parent |repo|. The
933 969 default is False.
@@ -965,71 +1001,63 b' def fork_repo(request, apiuser, repoid, '
965 1001 error: null
966 1002
967 1003 """
968 if not has_superadmin_permission(apiuser):
969 if not HasPermissionAnyApi('hg.fork.repository')(user=apiuser):
970 raise JSONRPCForbidden()
971 1004
972 1005 repo = get_repo_or_error(repoid)
973 1006 repo_name = repo.repo_name
974 1007
975 (fork_name_cleaned,
976 parent_group_name) = RepoGroupModel()._get_group_name_and_parent(
977 fork_name)
978
979 1008 if not has_superadmin_permission(apiuser):
980 1009 # check if we have at least read permission for
981 1010 # this repo that we fork !
982 1011 _perms = (
983 1012 'repository.admin', 'repository.write', 'repository.read')
984 has_repo_permissions(apiuser, repoid, repo, _perms)
1013 validate_repo_permissions(apiuser, repoid, repo, _perms)
985 1014
986 if not isinstance(owner, Optional):
987 # forbid setting owner for non super admins
988 raise JSONRPCError(
989 'Only RhodeCode admin can specify `owner` param'
990 )
991 # check if we have a create.repo permission if not maybe the parent
992 # group permission
993 if not HasPermissionAnyApi('hg.create.repository')(user=apiuser):
994 if parent_group_name:
995 repogroupid = parent_group_name
996 repo_group = get_repo_group_or_error(parent_group_name)
1015 # check if the regular user has at least fork permissions as well
1016 if not HasPermissionAnyApi('hg.fork.repository')(user=apiuser):
1017 raise JSONRPCForbidden()
1018
1019 # check if user can set owner parameter
1020 owner = validate_set_owner_permissions(apiuser, owner)
997 1021
998 _perms = ('group.admin',)
999 if not HasRepoGroupPermissionAnyApi(*_perms)(
1000 user=apiuser, group_name=repo_group.group_name):
1001 raise JSONRPCError(
1002 'repository group `%s` does not exist' % (
1003 repogroupid,))
1004 else:
1005 raise JSONRPCForbidden()
1022 description = Optional.extract(description)
1023 copy_permissions = Optional.extract(copy_permissions)
1024 clone_uri = Optional.extract(clone_uri)
1025 landing_commit_ref = Optional.extract(landing_rev)
1026 private = Optional.extract(private)
1006 1027
1007 _repo = RepoModel().get_by_repo_name(fork_name)
1008 if _repo:
1009 type_ = 'fork' if _repo.fork else 'repo'
1010 raise JSONRPCError("%s `%s` already exist" % (type_, fork_name))
1011
1012 if isinstance(owner, Optional):
1013 owner = apiuser.user_id
1014
1015 owner = get_user_or_error(owner)
1028 schema = repo_schema.RepoSchema().bind(
1029 repo_type_options=rhodecode.BACKENDS.keys(),
1030 # user caller
1031 user=apiuser)
1016 1032
1017 1033 try:
1018 # create structure of groups and return the last group
1019 repo_group = map_groups(fork_name)
1020 form_data = {
1021 'repo_name': fork_name_cleaned,
1022 'repo_name_full': fork_name,
1023 'repo_group': repo_group.group_id if repo_group else None,
1024 'repo_type': repo.repo_type,
1025 'description': Optional.extract(description),
1026 'private': Optional.extract(private),
1027 'copy_permissions': Optional.extract(copy_permissions),
1028 'landing_rev': Optional.extract(landing_rev),
1034 schema_data = schema.deserialize(dict(
1035 repo_name=fork_name,
1036 repo_type=repo.repo_type,
1037 repo_owner=owner.username,
1038 repo_description=description,
1039 repo_landing_commit_ref=landing_commit_ref,
1040 repo_clone_uri=clone_uri,
1041 repo_private=private,
1042 repo_copy_permissions=copy_permissions))
1043 except validation_schema.Invalid as err:
1044 raise JSONRPCValidationError(colander_exc=err)
1045
1046 try:
1047 data = {
1029 1048 'fork_parent_id': repo.repo_id,
1049
1050 'repo_name': schema_data['repo_group']['repo_name_without_group'],
1051 'repo_name_full': schema_data['repo_name'],
1052 'repo_group': schema_data['repo_group']['repo_group_id'],
1053 'repo_type': schema_data['repo_type'],
1054 'description': schema_data['repo_description'],
1055 'private': schema_data['repo_private'],
1056 'copy_permissions': schema_data['repo_copy_permissions'],
1057 'landing_rev': schema_data['repo_landing_commit_ref'],
1030 1058 }
1031 1059
1032 task = RepoModel().create_fork(form_data, cur_user=owner)
1060 task = RepoModel().create_fork(data, cur_user=owner)
1033 1061 # no commit, it's done in RepoModel, or async via celery
1034 1062 from celery.result import BaseAsyncResult
1035 1063 task_id = None
@@ -1037,16 +1065,18 b' def fork_repo(request, apiuser, repoid, '
1037 1065 task_id = task.task_id
1038 1066 return {
1039 1067 'msg': 'Created fork of `%s` as `%s`' % (
1040 repo.repo_name, fork_name),
1068 repo.repo_name, schema_data['repo_name']),
1041 1069 'success': True, # cannot return the repo data here since fork
1042 1070 # can be done async
1043 1071 'task': task_id
1044 1072 }
1045 1073 except Exception:
1046 log.exception("Exception occurred while trying to fork a repo")
1074 log.exception(
1075 u"Exception while trying to create fork %s",
1076 schema_data['repo_name'])
1047 1077 raise JSONRPCError(
1048 1078 'failed to fork repository `%s` as `%s`' % (
1049 repo_name, fork_name))
1079 repo_name, schema_data['repo_name']))
1050 1080
1051 1081
1052 1082 @jsonrpc_method()
@@ -1082,7 +1112,7 b' def delete_repo(request, apiuser, repoid'
1082 1112 repo = get_repo_or_error(repoid)
1083 1113 if not has_superadmin_permission(apiuser):
1084 1114 _perms = ('repository.admin',)
1085 has_repo_permissions(apiuser, repoid, repo, _perms)
1115 validate_repo_permissions(apiuser, repoid, repo, _perms)
1086 1116
1087 1117 try:
1088 1118 handle_forks = Optional.extract(forks)
@@ -1157,7 +1187,7 b' def invalidate_cache(request, apiuser, r'
1157 1187 repo = get_repo_or_error(repoid)
1158 1188 if not has_superadmin_permission(apiuser):
1159 1189 _perms = ('repository.admin', 'repository.write',)
1160 has_repo_permissions(apiuser, repoid, repo, _perms)
1190 validate_repo_permissions(apiuser, repoid, repo, _perms)
1161 1191
1162 1192 delete = Optional.extract(delete_keys)
1163 1193 try:
@@ -1236,7 +1266,7 b' def lock(request, apiuser, repoid, locke'
1236 1266 if not has_superadmin_permission(apiuser):
1237 1267 # check if we have at least write permission for this repo !
1238 1268 _perms = ('repository.admin', 'repository.write',)
1239 has_repo_permissions(apiuser, repoid, repo, _perms)
1269 validate_repo_permissions(apiuser, repoid, repo, _perms)
1240 1270
1241 1271 # make sure normal user does not pass someone else userid,
1242 1272 # he is not allowed to do that
@@ -1347,7 +1377,7 b' def comment_commit('
1347 1377 repo = get_repo_or_error(repoid)
1348 1378 if not has_superadmin_permission(apiuser):
1349 1379 _perms = ('repository.read', 'repository.write', 'repository.admin')
1350 has_repo_permissions(apiuser, repoid, repo, _perms)
1380 validate_repo_permissions(apiuser, repoid, repo, _perms)
1351 1381
1352 1382 if isinstance(userid, Optional):
1353 1383 userid = apiuser.user_id
@@ -1438,7 +1468,7 b' def grant_user_permission(request, apius'
1438 1468 perm = get_perm_or_error(perm)
1439 1469 if not has_superadmin_permission(apiuser):
1440 1470 _perms = ('repository.admin',)
1441 has_repo_permissions(apiuser, repoid, repo, _perms)
1471 validate_repo_permissions(apiuser, repoid, repo, _perms)
1442 1472
1443 1473 try:
1444 1474
@@ -1492,7 +1522,7 b' def revoke_user_permission(request, apiu'
1492 1522 user = get_user_or_error(userid)
1493 1523 if not has_superadmin_permission(apiuser):
1494 1524 _perms = ('repository.admin',)
1495 has_repo_permissions(apiuser, repoid, repo, _perms)
1525 validate_repo_permissions(apiuser, repoid, repo, _perms)
1496 1526
1497 1527 try:
1498 1528 RepoModel().revoke_user_permission(repo=repo, user=user)
@@ -1560,7 +1590,7 b' def grant_user_group_permission(request,'
1560 1590 perm = get_perm_or_error(perm)
1561 1591 if not has_superadmin_permission(apiuser):
1562 1592 _perms = ('repository.admin',)
1563 has_repo_permissions(apiuser, repoid, repo, _perms)
1593 validate_repo_permissions(apiuser, repoid, repo, _perms)
1564 1594
1565 1595 user_group = get_user_group_or_error(usergroupid)
1566 1596 if not has_superadmin_permission(apiuser):
@@ -1625,7 +1655,7 b' def revoke_user_group_permission(request'
1625 1655 repo = get_repo_or_error(repoid)
1626 1656 if not has_superadmin_permission(apiuser):
1627 1657 _perms = ('repository.admin',)
1628 has_repo_permissions(apiuser, repoid, repo, _perms)
1658 validate_repo_permissions(apiuser, repoid, repo, _perms)
1629 1659
1630 1660 user_group = get_user_group_or_error(usergroupid)
1631 1661 if not has_superadmin_permission(apiuser):
@@ -1701,7 +1731,7 b' def pull(request, apiuser, repoid):'
1701 1731 repo = get_repo_or_error(repoid)
1702 1732 if not has_superadmin_permission(apiuser):
1703 1733 _perms = ('repository.admin',)
1704 has_repo_permissions(apiuser, repoid, repo, _perms)
1734 validate_repo_permissions(apiuser, repoid, repo, _perms)
1705 1735
1706 1736 try:
1707 1737 ScmModel().pull_changes(repo.repo_name, apiuser.username)
@@ -1764,7 +1794,7 b' def strip(request, apiuser, repoid, revi'
1764 1794 repo = get_repo_or_error(repoid)
1765 1795 if not has_superadmin_permission(apiuser):
1766 1796 _perms = ('repository.admin',)
1767 has_repo_permissions(apiuser, repoid, repo, _perms)
1797 validate_repo_permissions(apiuser, repoid, repo, _perms)
1768 1798
1769 1799 try:
1770 1800 ScmModel().strip(repo, revision, branch)
@@ -377,11 +377,11 b' class RepoModel(BaseModel):'
377 377 log.debug('Updating repo %s with params:%s', cur_repo, kwargs)
378 378
379 379 update_keys = [
380 (1, 'repo_enable_downloads'),
381 380 (1, 'repo_description'),
382 (1, 'repo_enable_locking'),
383 381 (1, 'repo_landing_rev'),
384 382 (1, 'repo_private'),
383 (1, 'repo_enable_downloads'),
384 (1, 'repo_enable_locking'),
385 385 (1, 'repo_enable_statistics'),
386 386 (0, 'clone_uri'),
387 387 (0, 'fork_id')
@@ -762,11 +762,15 b' class ScmModel(BaseModel):'
762 762 :param repo:
763 763 """
764 764
765 hist_l = []
766 choices = []
767 765 repo = self._get_repo(repo)
768 hist_l.append(['rev:tip', _('latest tip')])
769 choices.append('rev:tip')
766
767 hist_l = [
768 ['rev:tip', _('latest tip')]
769 ]
770 choices = [
771 'rev:tip'
772 ]
773
770 774 if not repo:
771 775 return choices, hist_l
772 776
@@ -21,7 +21,6 b''
21 21 import unicodedata
22 22
23 23
24
25 24 def strip_preparer(value):
26 25 """
27 26 strips given values using .strip() function
@@ -20,8 +20,302 b''
20 20
21 21 import colander
22 22
23 from rhodecode.translation import _
23 24 from rhodecode.model.validation_schema import validators, preparers, types
24 25
26 DEFAULT_LANDING_REF = 'rev:tip'
27
28
29 def get_group_and_repo(repo_name):
30 from rhodecode.model.repo_group import RepoGroupModel
31 return RepoGroupModel()._get_group_name_and_parent(
32 repo_name, get_object=True)
33
34
35 @colander.deferred
36 def deferred_repo_type_validator(node, kw):
37 options = kw.get('repo_type_options', [])
38 return colander.OneOf([x for x in options])
39
40
41 @colander.deferred
42 def deferred_repo_owner_validator(node, kw):
43
44 def repo_owner_validator(node, value):
45 from rhodecode.model.db import User
46 existing = User.get_by_username(value)
47 if not existing:
48 msg = _(u'Repo owner with id `{}` does not exists').format(value)
49 raise colander.Invalid(node, msg)
50
51 return repo_owner_validator
52
53
54 @colander.deferred
55 def deferred_landing_ref_validator(node, kw):
56 options = kw.get('repo_ref_options', [DEFAULT_LANDING_REF])
57 return colander.OneOf([x for x in options])
58
59
60 @colander.deferred
61 def deferred_fork_of_validator(node, kw):
62 old_values = kw.get('old_values') or {}
63
64 def fork_of_validator(node, value):
65 from rhodecode.model.db import Repository, RepoGroup
66 existing = Repository.get_by_repo_name(value)
67 if not existing:
68 msg = _(u'Fork with id `{}` does not exists').format(value)
69 raise colander.Invalid(node, msg)
70 elif old_values['repo_name'] == existing.repo_name:
71 msg = _(u'Cannot set fork of '
72 u'parameter of this repository to itself').format(value)
73 raise colander.Invalid(node, msg)
74
75 return fork_of_validator
76
77
78 @colander.deferred
79 def deferred_can_write_to_group_validator(node, kw):
80 request_user = kw.get('user')
81 old_values = kw.get('old_values') or {}
82
83 def can_write_to_group_validator(node, value):
84 """
85 Checks if given repo path is writable by user. This includes checks if
86 user is allowed to create repositories under root path or under
87 repo group paths
88 """
89
90 from rhodecode.lib.auth import (
91 HasPermissionAny, HasRepoGroupPermissionAny)
92 from rhodecode.model.repo_group import RepoGroupModel
93
94 messages = {
95 'invalid_repo_group':
96 _(u"Repository group `{}` does not exist"),
97 # permissions denied we expose as not existing, to prevent
98 # resource discovery
99 'permission_denied':
100 _(u"Repository group `{}` does not exist"),
101 'permission_denied_root':
102 _(u"You do not have the permission to store "
103 u"repositories in the root location.")
104 }
105
106 value = value['repo_group_name']
107
108 is_root_location = value is types.RootLocation
109 # NOT initialized validators, we must call them
110 can_create_repos_at_root = HasPermissionAny(
111 'hg.admin', 'hg.create.repository')
112
113 # if values is root location, we simply need to check if we can write
114 # to root location !
115 if is_root_location:
116 if can_create_repos_at_root(user=request_user):
117 # we can create repo group inside tool-level. No more checks
118 # are required
119 return
120 else:
121 # "fake" node name as repo_name, otherwise we oddly report
122 # the error as if it was coming form repo_group
123 # however repo_group is empty when using root location.
124 node.name = 'repo_name'
125 raise colander.Invalid(node, messages['permission_denied_root'])
126
127 # parent group not exists ? throw an error
128 repo_group = RepoGroupModel().get_by_group_name(value)
129 if value and not repo_group:
130 raise colander.Invalid(
131 node, messages['invalid_repo_group'].format(value))
132
133 gr_name = repo_group.group_name
134
135 # create repositories with write permission on group is set to true
136 create_on_write = HasPermissionAny(
137 'hg.create.write_on_repogroup.true')(user=request_user)
138
139 group_admin = HasRepoGroupPermissionAny('group.admin')(
140 gr_name, 'can write into group validator', user=request_user)
141 group_write = HasRepoGroupPermissionAny('group.write')(
142 gr_name, 'can write into group validator', user=request_user)
143
144 forbidden = not (group_admin or (group_write and create_on_write))
145
146 # TODO: handling of old values, and detecting no-change in path
147 # to skip permission checks in such cases. This only needs to be
148 # implemented if we use this schema in forms as well
149
150 # gid = (old_data['repo_group'].get('group_id')
151 # if (old_data and 'repo_group' in old_data) else None)
152 # value_changed = gid != safe_int(value)
153 # new = not old_data
154
155 # do check if we changed the value, there's a case that someone got
156 # revoked write permissions to a repository, he still created, we
157 # don't need to check permission if he didn't change the value of
158 # groups in form box
159 # if value_changed or new:
160 # # parent group need to be existing
161 # TODO: ENDS HERE
162
163 if repo_group and forbidden:
164 msg = messages['permission_denied'].format(value)
165 raise colander.Invalid(node, msg)
166
167 return can_write_to_group_validator
168
25 169
26 class RepoSchema(colander.Schema):
27 repo_name = colander.SchemaNode(types.GroupNameType())
170 @colander.deferred
171 def deferred_unique_name_validator(node, kw):
172 request_user = kw.get('user')
173 old_values = kw.get('old_values') or {}
174
175 def unique_name_validator(node, value):
176 from rhodecode.model.db import Repository, RepoGroup
177 name_changed = value != old_values.get('repo_name')
178
179 existing = Repository.get_by_repo_name(value)
180 if name_changed and existing:
181 msg = _(u'Repository with name `{}` already exists').format(value)
182 raise colander.Invalid(node, msg)
183
184 existing_group = RepoGroup.get_by_group_name(value)
185 if name_changed and existing_group:
186 msg = _(u'Repository group with name `{}` already exists').format(
187 value)
188 raise colander.Invalid(node, msg)
189 return unique_name_validator
190
191
192 @colander.deferred
193 def deferred_repo_name_validator(node, kw):
194 return validators.valid_name_validator
195
196
197 class GroupType(colander.Mapping):
198 def _validate(self, node, value):
199 try:
200 return dict(repo_group_name=value)
201 except Exception as e:
202 raise colander.Invalid(
203 node, '"${val}" is not a mapping type: ${err}'.format(
204 val=value, err=e))
205
206 def deserialize(self, node, cstruct):
207 if cstruct is colander.null:
208 return cstruct
209
210 appstruct = super(GroupType, self).deserialize(node, cstruct)
211 validated_name = appstruct['repo_group_name']
212
213 # inject group based on once deserialized data
214 (repo_name_without_group,
215 parent_group_name,
216 parent_group) = get_group_and_repo(validated_name)
217
218 appstruct['repo_name_without_group'] = repo_name_without_group
219 appstruct['repo_group_name'] = parent_group_name or types.RootLocation
220 if parent_group:
221 appstruct['repo_group_id'] = parent_group.group_id
222
223 return appstruct
224
225
226 class GroupSchema(colander.SchemaNode):
227 schema_type = GroupType
228 validator = deferred_can_write_to_group_validator
229 missing = colander.null
230
231
232 class RepoGroup(GroupSchema):
233 repo_group_name = colander.SchemaNode(
234 types.GroupNameType())
235 repo_group_id = colander.SchemaNode(
236 colander.String(), missing=None)
237 repo_name_without_group = colander.SchemaNode(
238 colander.String(), missing=None)
239
240
241 class RepoGroupAccessSchema(colander.MappingSchema):
242 repo_group = RepoGroup()
243
244
245 class RepoNameUniqueSchema(colander.MappingSchema):
246 unique_repo_name = colander.SchemaNode(
247 colander.String(),
248 validator=deferred_unique_name_validator)
249
250
251 class RepoSchema(colander.MappingSchema):
252
253 repo_name = colander.SchemaNode(
254 types.RepoNameType(),
255 validator=deferred_repo_name_validator)
256
257 repo_type = colander.SchemaNode(
258 colander.String(),
259 validator=deferred_repo_type_validator)
260
261 repo_owner = colander.SchemaNode(
262 colander.String(),
263 validator=deferred_repo_owner_validator)
264
265 repo_description = colander.SchemaNode(
266 colander.String(), missing='')
267
268 repo_landing_commit_ref = colander.SchemaNode(
269 colander.String(),
270 validator=deferred_landing_ref_validator,
271 preparers=[preparers.strip_preparer],
272 missing=DEFAULT_LANDING_REF)
273
274 repo_clone_uri = colander.SchemaNode(
275 colander.String(),
276 validator=colander.All(colander.Length(min=1)),
277 preparers=[preparers.strip_preparer],
278 missing='')
279
280 repo_fork_of = colander.SchemaNode(
281 colander.String(),
282 validator=deferred_fork_of_validator,
283 missing=None)
284
285 repo_private = colander.SchemaNode(
286 types.StringBooleanType(),
287 missing=False)
288 repo_copy_permissions = colander.SchemaNode(
289 types.StringBooleanType(),
290 missing=False)
291 repo_enable_statistics = colander.SchemaNode(
292 types.StringBooleanType(),
293 missing=False)
294 repo_enable_downloads = colander.SchemaNode(
295 types.StringBooleanType(),
296 missing=False)
297 repo_enable_locking = colander.SchemaNode(
298 types.StringBooleanType(),
299 missing=False)
300
301 def deserialize(self, cstruct):
302 """
303 Custom deserialize that allows to chain validation, and verify
304 permissions, and as last step uniqueness
305 """
306
307 # first pass, to validate given data
308 appstruct = super(RepoSchema, self).deserialize(cstruct)
309 validated_name = appstruct['repo_name']
310
311 # second pass to validate permissions to repo_group
312 second = RepoGroupAccessSchema().bind(**self.bindings)
313 appstruct_second = second.deserialize({'repo_group': validated_name})
314 # save result
315 appstruct['repo_group'] = appstruct_second['repo_group']
316
317 # thirds to validate uniqueness
318 third = RepoNameUniqueSchema().bind(**self.bindings)
319 third.deserialize({'unique_repo_name': validated_name})
320
321 return appstruct
@@ -33,8 +33,7 b' class SearchParamsSchema(colander.Mappin'
33 33 search_sort = colander.SchemaNode(
34 34 colander.String(),
35 35 missing='newfirst',
36 validator=colander.OneOf(
37 ['oldfirst', 'newfirst']))
36 validator=colander.OneOf(['oldfirst', 'newfirst']))
38 37 page_limit = colander.SchemaNode(
39 38 colander.Integer(),
40 39 missing=10,
@@ -18,22 +18,73 b''
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 import re
22
21 23 import colander
24 from rhodecode.model.validation_schema import preparers
25 from rhodecode.model.db import User, UserGroup
26
27
28 class _RootLocation(object):
29 pass
30
31 RootLocation = _RootLocation()
32
33
34 def _normalize(seperator, path):
35
36 if not path:
37 return ''
38 elif path is colander.null:
39 return colander.null
40
41 parts = path.split(seperator)
22 42
23 from rhodecode.model.db import User, UserGroup
43 def bad_parts(value):
44 if not value:
45 return False
46 if re.match(r'^[.]+$', value):
47 return False
48
49 return True
50
51 def slugify(value):
52 value = preparers.slugify_preparer(value)
53 value = re.sub(r'[.]{2,}', '.', value)
54 return value
55
56 clean_parts = [slugify(item) for item in parts if item]
57 path = filter(bad_parts, clean_parts)
58 return seperator.join(path)
59
60
61 class RepoNameType(colander.String):
62 SEPARATOR = '/'
63
64 def deserialize(self, node, cstruct):
65 result = super(RepoNameType, self).deserialize(node, cstruct)
66 if cstruct is colander.null:
67 return colander.null
68 return self._normalize(result)
69
70 def _normalize(self, path):
71 return _normalize(self.SEPARATOR, path)
24 72
25 73
26 74 class GroupNameType(colander.String):
27 75 SEPARATOR = '/'
28 76
29 77 def deserialize(self, node, cstruct):
30 result = super(GroupNameType, self).deserialize(node, cstruct)
31 return self._replace_extra_slashes(result)
78 if cstruct is RootLocation:
79 return cstruct
32 80
33 def _replace_extra_slashes(self, path):
34 path = path.split(self.SEPARATOR)
35 path = [item for item in path if item]
36 return self.SEPARATOR.join(path)
81 result = super(GroupNameType, self).deserialize(node, cstruct)
82 if cstruct is colander.null:
83 return colander.null
84 return self._normalize(result)
85
86 def _normalize(self, path):
87 return _normalize(self.SEPARATOR, path)
37 88
38 89
39 90 class StringBooleanType(colander.String):
@@ -36,3 +36,13 b' def glob_validator(node, value):'
36 36 except Exception:
37 37 msg = _(u'Invalid glob pattern')
38 38 raise colander.Invalid(node, msg)
39
40
41 def valid_name_validator(node, value):
42 from rhodecode.model.validation_schema import types
43 if value is types.RootLocation:
44 return
45
46 msg = _('Name must start with a letter or number. Got `{}`').format(value)
47 if not re.match(r'^[a-zA-z0-9]{1,}', value):
48 raise colander.Invalid(node, msg)
@@ -21,8 +21,6 b''
21 21 import colander
22 22 import pytest
23 23
24 from rhodecode.model import validation_schema
25
26 24 from rhodecode.integrations import integration_type_registry
27 25 from rhodecode.integrations.types.base import IntegrationTypeBase
28 26 from rhodecode.model.validation_schema.schemas.integration_schema import (
@@ -33,14 +31,12 b' from rhodecode.model.validation_schema.s'
33 31 @pytest.mark.usefixtures('app', 'autologin_user')
34 32 class TestIntegrationSchema(object):
35 33
36 def test_deserialize_integration_schema_perms(self, backend_random,
37 test_repo_group,
38 StubIntegrationType):
34 def test_deserialize_integration_schema_perms(
35 self, backend_random, test_repo_group, StubIntegrationType):
39 36
40 37 repo = backend_random.repo
41 38 repo_group = test_repo_group
42 39
43
44 40 empty_perms_dict = {
45 41 'global': [],
46 42 'repositories': {},
@@ -21,26 +21,82 b''
21 21 import colander
22 22 import pytest
23 23
24 from rhodecode.model.validation_schema.types import GroupNameType
24 from rhodecode.model.validation_schema.types import (
25 GroupNameType, RepoNameType, StringBooleanType)
25 26
26 27
27 28 class TestGroupNameType(object):
28 29 @pytest.mark.parametrize('given, expected', [
29 30 ('//group1/group2//', 'group1/group2'),
30 31 ('//group1///group2//', 'group1/group2'),
31 ('group1/group2///group3', 'group1/group2/group3')
32 ('group1/group2///group3', 'group1/group2/group3'),
32 33 ])
33 def test_replace_extra_slashes_cleans_up_extra_slashes(
34 self, given, expected):
35 type_ = GroupNameType()
36 result = type_._replace_extra_slashes(given)
34 def test_normalize_path(self, given, expected):
35 result = GroupNameType()._normalize(given)
37 36 assert result == expected
38 37
39 def test_deserialize_cleans_up_extra_slashes(self):
38 @pytest.mark.parametrize('given, expected', [
39 ('//group1/group2//', 'group1/group2'),
40 ('//group1///group2//', 'group1/group2'),
41 ('group1/group2///group3', 'group1/group2/group3'),
42 ('v1.2', 'v1.2'),
43 ('/v1.2', 'v1.2'),
44 ('.dirs', '.dirs'),
45 ('..dirs', '.dirs'),
46 ('./..dirs', '.dirs'),
47 ('dir/;name;/;[];/sub', 'dir/name/sub'),
48 (',/,/,d,,,', 'd'),
49 ('/;/#/,d,,,', 'd'),
50 ('long../../..name', 'long./.name'),
51 ('long../..name', 'long./.name'),
52 ('../', ''),
53 ('\'../"../', ''),
54 ('c,/,/..//./,c,,,/.d/../.........c', 'c/c/.d/.c'),
55 ('c,/,/..//./,c,,,', 'c/c'),
56 ('d../..d', 'd./.d'),
57 ('d../../d', 'd./d'),
58
59 ('d\;\./\,\./d', 'd./d'),
60 ('d\.\./\.\./d', 'd./d'),
61 ('d\.\./\..\../d', 'd./d'),
62 ])
63 def test_deserialize_clean_up_name(self, given, expected):
40 64 class TestSchema(colander.Schema):
41 field = colander.SchemaNode(GroupNameType())
65 field_group = colander.SchemaNode(GroupNameType())
66 field_repo = colander.SchemaNode(RepoNameType())
42 67
43 68 schema = TestSchema()
44 cleaned_data = schema.deserialize(
45 {'field': '//group1/group2///group3//'})
46 assert cleaned_data['field'] == 'group1/group2/group3'
69 cleaned_data = schema.deserialize({
70 'field_group': given,
71 'field_repo': given
72 })
73 assert cleaned_data['field_group'] == expected
74 assert cleaned_data['field_repo'] == expected
75
76
77 class TestStringBooleanType(object):
78
79 def _get_schema(self):
80 class Schema(colander.MappingSchema):
81 bools = colander.SchemaNode(StringBooleanType())
82 return Schema()
83
84 @pytest.mark.parametrize('given, expected', [
85 ('1', True),
86 ('yEs', True),
87 ('true', True),
88
89 ('0', False),
90 ('NO', False),
91 ('FALSE', False),
92
93 ])
94 def test_convert_type(self, given, expected):
95 schema = self._get_schema()
96 result = schema.deserialize({'bools':given})
97 assert result['bools'] == expected
98
99 def test_try_convert_bad_type(self):
100 schema = self._get_schema()
101 with pytest.raises(colander.Invalid):
102 result = schema.deserialize({'bools': 'boom'})
General Comments 0
You need to be logged in to leave comments. Login now