##// END OF EJS Templates
repo-schema: made changes to allow deform usage.
marcink -
r1718:47073062 default
parent child Browse files
Show More
@@ -1,326 +1,360 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import colander
22 import deform.widget
22 23
23 24 from rhodecode.translation import _
25 from rhodecode.model.validation_schema.utils import convert_to_optgroup
24 26 from rhodecode.model.validation_schema import validators, preparers, types
25 27
26 28 DEFAULT_LANDING_REF = 'rev:tip'
27 29
28 30
29 31 def get_group_and_repo(repo_name):
30 32 from rhodecode.model.repo_group import RepoGroupModel
31 33 return RepoGroupModel()._get_group_name_and_parent(
32 34 repo_name, get_object=True)
33 35
34 36
37 def get_repo_group(repo_group_id):
38 from rhodecode.model.repo_group import RepoGroup
39 return RepoGroup.get(repo_group_id), RepoGroup.CHOICES_SEPARATOR
40
41
35 42 @colander.deferred
36 43 def deferred_repo_type_validator(node, kw):
37 44 options = kw.get('repo_type_options', [])
38 45 return colander.OneOf([x for x in options])
39 46
40 47
41 48 @colander.deferred
42 49 def deferred_repo_owner_validator(node, kw):
43 50
44 51 def repo_owner_validator(node, value):
45 52 from rhodecode.model.db import User
46 53 existing = User.get_by_username(value)
47 54 if not existing:
48 55 msg = _(u'Repo owner with id `{}` does not exists').format(value)
49 56 raise colander.Invalid(node, msg)
50 57
51 58 return repo_owner_validator
52 59
53 60
54 61 @colander.deferred
55 62 def deferred_landing_ref_validator(node, kw):
56 options = kw.get('repo_ref_options', [DEFAULT_LANDING_REF])
63 options = kw.get(
64 'repo_ref_options', [DEFAULT_LANDING_REF])
57 65 return colander.OneOf([x for x in options])
58 66
59 67
60 68 @colander.deferred
69 def deferred_landing_ref_widget(node, kw):
70 items = kw.get(
71 'repo_ref_items', [(DEFAULT_LANDING_REF, DEFAULT_LANDING_REF)])
72 items = convert_to_optgroup(items)
73 return deform.widget.Select2Widget(values=items)
74
75
76 @colander.deferred
61 77 def deferred_fork_of_validator(node, kw):
62 78 old_values = kw.get('old_values') or {}
63 79
64 80 def fork_of_validator(node, value):
65 81 from rhodecode.model.db import Repository, RepoGroup
66 82 existing = Repository.get_by_repo_name(value)
67 83 if not existing:
68 84 msg = _(u'Fork with id `{}` does not exists').format(value)
69 85 raise colander.Invalid(node, msg)
70 86 elif old_values['repo_name'] == existing.repo_name:
71 87 msg = _(u'Cannot set fork of '
72 88 u'parameter of this repository to itself').format(value)
73 89 raise colander.Invalid(node, msg)
74 90
75 91 return fork_of_validator
76 92
77 93
78 94 @colander.deferred
79 95 def deferred_can_write_to_group_validator(node, kw):
80 96 request_user = kw.get('user')
81 97 old_values = kw.get('old_values') or {}
82 98
83 99 def can_write_to_group_validator(node, value):
84 100 """
85 101 Checks if given repo path is writable by user. This includes checks if
86 102 user is allowed to create repositories under root path or under
87 103 repo group paths
88 104 """
89 105
90 106 from rhodecode.lib.auth import (
91 107 HasPermissionAny, HasRepoGroupPermissionAny)
92 108 from rhodecode.model.repo_group import RepoGroupModel
93 109
94 110 messages = {
95 111 'invalid_repo_group':
96 112 _(u"Repository group `{}` does not exist"),
97 113 # permissions denied we expose as not existing, to prevent
98 114 # resource discovery
99 115 'permission_denied':
100 116 _(u"Repository group `{}` does not exist"),
101 117 'permission_denied_root':
102 118 _(u"You do not have the permission to store "
103 119 u"repositories in the root location.")
104 120 }
105 121
106 122 value = value['repo_group_name']
107 123
108 124 is_root_location = value is types.RootLocation
109 125 # NOT initialized validators, we must call them
110 126 can_create_repos_at_root = HasPermissionAny(
111 127 'hg.admin', 'hg.create.repository')
112 128
113 129 # if values is root location, we simply need to check if we can write
114 130 # to root location !
115 131 if is_root_location:
116 132 if can_create_repos_at_root(user=request_user):
117 133 # we can create repo group inside tool-level. No more checks
118 134 # are required
119 135 return
120 136 else:
121 137 # "fake" node name as repo_name, otherwise we oddly report
122 138 # the error as if it was coming form repo_group
123 139 # however repo_group is empty when using root location.
124 140 node.name = 'repo_name'
125 141 raise colander.Invalid(node, messages['permission_denied_root'])
126 142
127 143 # parent group not exists ? throw an error
128 144 repo_group = RepoGroupModel().get_by_group_name(value)
129 145 if value and not repo_group:
130 146 raise colander.Invalid(
131 147 node, messages['invalid_repo_group'].format(value))
132 148
133 149 gr_name = repo_group.group_name
134 150
135 151 # create repositories with write permission on group is set to true
136 152 create_on_write = HasPermissionAny(
137 153 'hg.create.write_on_repogroup.true')(user=request_user)
138 154
139 155 group_admin = HasRepoGroupPermissionAny('group.admin')(
140 156 gr_name, 'can write into group validator', user=request_user)
141 157 group_write = HasRepoGroupPermissionAny('group.write')(
142 158 gr_name, 'can write into group validator', user=request_user)
143 159
144 160 forbidden = not (group_admin or (group_write and create_on_write))
145 161
146 162 # TODO: handling of old values, and detecting no-change in path
147 163 # to skip permission checks in such cases. This only needs to be
148 164 # implemented if we use this schema in forms as well
149 165
150 166 # gid = (old_data['repo_group'].get('group_id')
151 167 # if (old_data and 'repo_group' in old_data) else None)
152 168 # value_changed = gid != safe_int(value)
153 169 # new = not old_data
154 170
155 171 # do check if we changed the value, there's a case that someone got
156 172 # revoked write permissions to a repository, he still created, we
157 173 # don't need to check permission if he didn't change the value of
158 174 # groups in form box
159 175 # if value_changed or new:
160 176 # # parent group need to be existing
161 177 # TODO: ENDS HERE
162 178
163 179 if repo_group and forbidden:
164 180 msg = messages['permission_denied'].format(value)
165 181 raise colander.Invalid(node, msg)
166 182
167 183 return can_write_to_group_validator
168 184
169 185
170 186 @colander.deferred
171 187 def deferred_unique_name_validator(node, kw):
172 188 request_user = kw.get('user')
173 189 old_values = kw.get('old_values') or {}
174 190
175 191 def unique_name_validator(node, value):
176 192 from rhodecode.model.db import Repository, RepoGroup
177 193 name_changed = value != old_values.get('repo_name')
178 194
179 195 existing = Repository.get_by_repo_name(value)
180 196 if name_changed and existing:
181 197 msg = _(u'Repository with name `{}` already exists').format(value)
182 198 raise colander.Invalid(node, msg)
183 199
184 200 existing_group = RepoGroup.get_by_group_name(value)
185 201 if name_changed and existing_group:
186 202 msg = _(u'Repository group with name `{}` already exists').format(
187 203 value)
188 204 raise colander.Invalid(node, msg)
189 205 return unique_name_validator
190 206
191 207
192 208 @colander.deferred
193 209 def deferred_repo_name_validator(node, kw):
194 210 def no_git_suffix_validator(node, value):
195 211 if value.endswith('.git'):
196 212 msg = _('Repository name cannot end with .git')
197 213 raise colander.Invalid(node, msg)
198 214 return colander.All(
199 215 no_git_suffix_validator, validators.valid_name_validator)
200 216
201 217
218 @colander.deferred
219 def deferred_repo_group_validator(node, kw):
220 options = kw.get(
221 'repo_repo_group_options')
222 return colander.OneOf([x for x in options])
223
224
225 @colander.deferred
226 def deferred_repo_group_widget(node, kw):
227 items = kw.get('repo_repo_group_items')
228 return deform.widget.Select2Widget(values=items)
229
230
202 231 class GroupType(colander.Mapping):
203 232 def _validate(self, node, value):
204 233 try:
205 234 return dict(repo_group_name=value)
206 235 except Exception as e:
207 236 raise colander.Invalid(
208 237 node, '"${val}" is not a mapping type: ${err}'.format(
209 238 val=value, err=e))
210 239
211 240 def deserialize(self, node, cstruct):
212 241 if cstruct is colander.null:
213 242 return cstruct
214 243
215 244 appstruct = super(GroupType, self).deserialize(node, cstruct)
216 245 validated_name = appstruct['repo_group_name']
217 246
218 247 # inject group based on once deserialized data
219 248 (repo_name_without_group,
220 249 parent_group_name,
221 250 parent_group) = get_group_and_repo(validated_name)
222 251
252 appstruct['repo_name_with_group'] = validated_name
223 253 appstruct['repo_name_without_group'] = repo_name_without_group
224 254 appstruct['repo_group_name'] = parent_group_name or types.RootLocation
255
225 256 if parent_group:
226 257 appstruct['repo_group_id'] = parent_group.group_id
227 258
228 259 return appstruct
229 260
230 261
231 262 class GroupSchema(colander.SchemaNode):
232 263 schema_type = GroupType
233 264 validator = deferred_can_write_to_group_validator
234 265 missing = colander.null
235 266
236 267
237 268 class RepoGroup(GroupSchema):
238 269 repo_group_name = colander.SchemaNode(
239 270 types.GroupNameType())
240 271 repo_group_id = colander.SchemaNode(
241 272 colander.String(), missing=None)
242 273 repo_name_without_group = colander.SchemaNode(
243 274 colander.String(), missing=None)
244 275
245 276
246 277 class RepoGroupAccessSchema(colander.MappingSchema):
247 278 repo_group = RepoGroup()
248 279
249 280
250 281 class RepoNameUniqueSchema(colander.MappingSchema):
251 282 unique_repo_name = colander.SchemaNode(
252 283 colander.String(),
253 284 validator=deferred_unique_name_validator)
254 285
255 286
256 287 class RepoSchema(colander.MappingSchema):
257 288
258 289 repo_name = colander.SchemaNode(
259 290 types.RepoNameType(),
260 291 validator=deferred_repo_name_validator)
261 292
262 293 repo_type = colander.SchemaNode(
263 294 colander.String(),
264 295 validator=deferred_repo_type_validator)
265 296
266 297 repo_owner = colander.SchemaNode(
267 298 colander.String(),
268 validator=deferred_repo_owner_validator)
299 validator=deferred_repo_owner_validator,
300 widget=deform.widget.TextInputWidget())
269 301
270 302 repo_description = colander.SchemaNode(
271 colander.String(), missing='')
303 colander.String(), missing='',
304 widget=deform.widget.TextAreaWidget())
272 305
273 306 repo_landing_commit_ref = colander.SchemaNode(
274 307 colander.String(),
275 308 validator=deferred_landing_ref_validator,
276 309 preparers=[preparers.strip_preparer],
277 missing=DEFAULT_LANDING_REF)
310 missing=DEFAULT_LANDING_REF,
311 widget=deferred_landing_ref_widget)
278 312
279 313 repo_clone_uri = colander.SchemaNode(
280 314 colander.String(),
281 315 validator=colander.All(colander.Length(min=1)),
282 316 preparers=[preparers.strip_preparer],
283 317 missing='')
284 318
285 319 repo_fork_of = colander.SchemaNode(
286 320 colander.String(),
287 321 validator=deferred_fork_of_validator,
288 322 missing=None)
289 323
290 324 repo_private = colander.SchemaNode(
291 325 types.StringBooleanType(),
292 missing=False)
326 missing=False, widget=deform.widget.CheckboxWidget())
293 327 repo_copy_permissions = colander.SchemaNode(
294 328 types.StringBooleanType(),
295 missing=False)
329 missing=False, widget=deform.widget.CheckboxWidget())
296 330 repo_enable_statistics = colander.SchemaNode(
297 331 types.StringBooleanType(),
298 missing=False)
332 missing=False, widget=deform.widget.CheckboxWidget())
299 333 repo_enable_downloads = colander.SchemaNode(
300 334 types.StringBooleanType(),
301 missing=False)
335 missing=False, widget=deform.widget.CheckboxWidget())
302 336 repo_enable_locking = colander.SchemaNode(
303 337 types.StringBooleanType(),
304 missing=False)
338 missing=False, widget=deform.widget.CheckboxWidget())
305 339
306 340 def deserialize(self, cstruct):
307 341 """
308 342 Custom deserialize that allows to chain validation, and verify
309 343 permissions, and as last step uniqueness
310 344 """
311 345
312 346 # first pass, to validate given data
313 347 appstruct = super(RepoSchema, self).deserialize(cstruct)
314 348 validated_name = appstruct['repo_name']
315 349
316 350 # second pass to validate permissions to repo_group
317 351 second = RepoGroupAccessSchema().bind(**self.bindings)
318 352 appstruct_second = second.deserialize({'repo_group': validated_name})
319 353 # save result
320 354 appstruct['repo_group'] = appstruct_second['repo_group']
321 355
322 356 # thirds to validate uniqueness
323 357 third = RepoNameUniqueSchema().bind(**self.bindings)
324 358 third.deserialize({'unique_repo_name': validated_name})
325 359
326 360 return appstruct
General Comments 0
You need to be logged in to leave comments. Login now