##// END OF EJS Templates
repositories: allow updating repository settings for users without store-in-root permissions...
marcink -
r4415:fc1f6c1b default
parent child Browse files
Show More
@@ -1,441 +1,453 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import colander
22 22 import deform.widget
23 23
24 24 from rhodecode.translation import _
25 25 from rhodecode.model.validation_schema.utils import convert_to_optgroup
26 26 from rhodecode.model.validation_schema import validators, preparers, types
27 27
28 28 DEFAULT_LANDING_REF = 'rev:tip'
29 29 DEFAULT_BACKEND_LANDING_REF = {
30 30 'hg': 'branch:default',
31 31 'git': 'branch:master',
32 32 'svn': 'rev:tip',
33 33 }
34 34
35 35
36 36 def get_group_and_repo(repo_name):
37 37 from rhodecode.model.repo_group import RepoGroupModel
38 38 return RepoGroupModel()._get_group_name_and_parent(
39 39 repo_name, get_object=True)
40 40
41 41
42 42 def get_repo_group(repo_group_id):
43 43 from rhodecode.model.repo_group import RepoGroup
44 44 return RepoGroup.get(repo_group_id), RepoGroup.CHOICES_SEPARATOR
45 45
46 46
47 47 @colander.deferred
48 48 def deferred_repo_type_validator(node, kw):
49 49 options = kw.get('repo_type_options', [])
50 50 return colander.OneOf([x for x in options])
51 51
52 52
53 53 @colander.deferred
54 54 def deferred_repo_owner_validator(node, kw):
55 55
56 56 def repo_owner_validator(node, value):
57 57 from rhodecode.model.db import User
58 58 existing = User.get_by_username(value)
59 59 if not existing:
60 60 msg = _(u'Repo owner with id `{}` does not exists').format(value)
61 61 raise colander.Invalid(node, msg)
62 62
63 63 return repo_owner_validator
64 64
65 65
66 66 @colander.deferred
67 67 def deferred_landing_ref_validator(node, kw):
68 68 options = kw.get(
69 69 'repo_ref_options', [DEFAULT_LANDING_REF])
70 70 return colander.OneOf([x for x in options])
71 71
72 72
73 73 @colander.deferred
74 74 def deferred_sync_uri_validator(node, kw):
75 75 repo_type = kw.get('repo_type')
76 76 validator = validators.CloneUriValidator(repo_type)
77 77 return validator
78 78
79 79
80 80 @colander.deferred
81 81 def deferred_landing_ref_widget(node, kw):
82 82 repo_type = kw.get('repo_type')
83 83 default_opts = []
84 84 if repo_type:
85 85 default_opts.append(
86 86 (DEFAULT_BACKEND_LANDING_REF[repo_type],
87 87 DEFAULT_BACKEND_LANDING_REF[repo_type]))
88 88
89 89 items = kw.get('repo_ref_items', default_opts)
90 90 items = convert_to_optgroup(items)
91 91 return deform.widget.Select2Widget(values=items)
92 92
93 93
94 94 @colander.deferred
95 95 def deferred_fork_of_validator(node, kw):
96 96 old_values = kw.get('old_values') or {}
97 97
98 98 def fork_of_validator(node, value):
99 99 from rhodecode.model.db import Repository, RepoGroup
100 100 existing = Repository.get_by_repo_name(value)
101 101 if not existing:
102 102 msg = _(u'Fork with id `{}` does not exists').format(value)
103 103 raise colander.Invalid(node, msg)
104 104 elif old_values['repo_name'] == existing.repo_name:
105 105 msg = _(u'Cannot set fork of '
106 106 u'parameter of this repository to itself').format(value)
107 107 raise colander.Invalid(node, msg)
108 108
109 109 return fork_of_validator
110 110
111 111
112 112 @colander.deferred
113 113 def deferred_can_write_to_group_validator(node, kw):
114 114 request_user = kw.get('user')
115 115 old_values = kw.get('old_values') or {}
116 116
117 117 def can_write_to_group_validator(node, value):
118 118 """
119 119 Checks if given repo path is writable by user. This includes checks if
120 120 user is allowed to create repositories under root path or under
121 121 repo group paths
122 122 """
123 123
124 124 from rhodecode.lib.auth import (
125 125 HasPermissionAny, HasRepoGroupPermissionAny)
126 126 from rhodecode.model.repo_group import RepoGroupModel
127 127
128 128 messages = {
129 129 'invalid_repo_group':
130 130 _(u"Repository group `{}` does not exist"),
131 131 # permissions denied we expose as not existing, to prevent
132 132 # resource discovery
133 133 'permission_denied':
134 134 _(u"Repository group `{}` does not exist"),
135 135 'permission_denied_root':
136 136 _(u"You do not have the permission to store "
137 137 u"repositories in the root location.")
138 138 }
139 139
140 140 value = value['repo_group_name']
141 141
142 142 is_root_location = value is types.RootLocation
143 143 # NOT initialized validators, we must call them
144 can_create_repos_at_root = HasPermissionAny(
145 'hg.admin', 'hg.create.repository')
144 can_create_repos_at_root = HasPermissionAny('hg.admin', 'hg.create.repository')
146 145
147 146 # if values is root location, we simply need to check if we can write
148 147 # to root location !
149 148 if is_root_location:
149
150 150 if can_create_repos_at_root(user=request_user):
151 151 # we can create repo group inside tool-level. No more checks
152 152 # are required
153 153 return
154 154 else:
155 old_name = old_values.get('repo_name')
156 if old_name and old_name == old_values.get('submitted_repo_name'):
157 # since we didn't change the name, we can skip validation and
158 # allow current users without store-in-root permissions to update
159 return
160
155 161 # "fake" node name as repo_name, otherwise we oddly report
156 162 # the error as if it was coming form repo_group
157 163 # however repo_group is empty when using root location.
158 164 node.name = 'repo_name'
159 165 raise colander.Invalid(node, messages['permission_denied_root'])
160 166
161 167 # parent group not exists ? throw an error
162 168 repo_group = RepoGroupModel().get_by_group_name(value)
163 169 if value and not repo_group:
164 170 raise colander.Invalid(
165 171 node, messages['invalid_repo_group'].format(value))
166 172
167 173 gr_name = repo_group.group_name
168 174
169 175 # create repositories with write permission on group is set to true
170 176 create_on_write = HasPermissionAny(
171 177 'hg.create.write_on_repogroup.true')(user=request_user)
172 178
173 179 group_admin = HasRepoGroupPermissionAny('group.admin')(
174 180 gr_name, 'can write into group validator', user=request_user)
175 181 group_write = HasRepoGroupPermissionAny('group.write')(
176 182 gr_name, 'can write into group validator', user=request_user)
177 183
178 184 forbidden = not (group_admin or (group_write and create_on_write))
179 185
180 186 # TODO: handling of old values, and detecting no-change in path
181 187 # to skip permission checks in such cases. This only needs to be
182 188 # implemented if we use this schema in forms as well
183 189
184 190 # gid = (old_data['repo_group'].get('group_id')
185 191 # if (old_data and 'repo_group' in old_data) else None)
186 192 # value_changed = gid != safe_int(value)
187 193 # new = not old_data
188 194
189 195 # do check if we changed the value, there's a case that someone got
190 196 # revoked write permissions to a repository, he still created, we
191 197 # don't need to check permission if he didn't change the value of
192 198 # groups in form box
193 199 # if value_changed or new:
194 200 # # parent group need to be existing
195 201 # TODO: ENDS HERE
196 202
197 203 if repo_group and forbidden:
198 204 msg = messages['permission_denied'].format(value)
199 205 raise colander.Invalid(node, msg)
200 206
201 207 return can_write_to_group_validator
202 208
203 209
204 210 @colander.deferred
205 211 def deferred_unique_name_validator(node, kw):
206 212 request_user = kw.get('user')
207 213 old_values = kw.get('old_values') or {}
208 214
209 215 def unique_name_validator(node, value):
210 216 from rhodecode.model.db import Repository, RepoGroup
211 217 name_changed = value != old_values.get('repo_name')
212 218
213 219 existing = Repository.get_by_repo_name(value)
214 220 if name_changed and existing:
215 221 msg = _(u'Repository with name `{}` already exists').format(value)
216 222 raise colander.Invalid(node, msg)
217 223
218 224 existing_group = RepoGroup.get_by_group_name(value)
219 225 if name_changed and existing_group:
220 226 msg = _(u'Repository group with name `{}` already exists').format(
221 227 value)
222 228 raise colander.Invalid(node, msg)
223 229 return unique_name_validator
224 230
225 231
226 232 @colander.deferred
227 233 def deferred_repo_name_validator(node, kw):
228 234 def no_git_suffix_validator(node, value):
229 235 if value.endswith('.git'):
230 236 msg = _('Repository name cannot end with .git')
231 237 raise colander.Invalid(node, msg)
232 238 return colander.All(
233 239 no_git_suffix_validator, validators.valid_name_validator)
234 240
235 241
236 242 @colander.deferred
237 243 def deferred_repo_group_validator(node, kw):
238 244 options = kw.get(
239 245 'repo_repo_group_options')
240 246 return colander.OneOf([x for x in options])
241 247
242 248
243 249 @colander.deferred
244 250 def deferred_repo_group_widget(node, kw):
245 251 items = kw.get('repo_repo_group_items')
246 252 return deform.widget.Select2Widget(values=items)
247 253
248 254
249 255 class GroupType(colander.Mapping):
250 256 def _validate(self, node, value):
251 257 try:
252 258 return dict(repo_group_name=value)
253 259 except Exception as e:
254 260 raise colander.Invalid(
255 261 node, '"${val}" is not a mapping type: ${err}'.format(
256 262 val=value, err=e))
257 263
258 264 def deserialize(self, node, cstruct):
259 265 if cstruct is colander.null:
260 266 return cstruct
261 267
262 268 appstruct = super(GroupType, self).deserialize(node, cstruct)
263 269 validated_name = appstruct['repo_group_name']
264 270
265 271 # inject group based on once deserialized data
266 272 (repo_name_without_group,
267 273 parent_group_name,
268 274 parent_group) = get_group_and_repo(validated_name)
269 275
270 276 appstruct['repo_name_with_group'] = validated_name
271 277 appstruct['repo_name_without_group'] = repo_name_without_group
272 278 appstruct['repo_group_name'] = parent_group_name or types.RootLocation
273 279
274 280 if parent_group:
275 281 appstruct['repo_group_id'] = parent_group.group_id
276 282
277 283 return appstruct
278 284
279 285
280 286 class GroupSchema(colander.SchemaNode):
281 287 schema_type = GroupType
282 288 validator = deferred_can_write_to_group_validator
283 289 missing = colander.null
284 290
285 291
286 292 class RepoGroup(GroupSchema):
287 293 repo_group_name = colander.SchemaNode(
288 294 types.GroupNameType())
289 295 repo_group_id = colander.SchemaNode(
290 296 colander.String(), missing=None)
291 297 repo_name_without_group = colander.SchemaNode(
292 298 colander.String(), missing=None)
293 299
294 300
295 301 class RepoGroupAccessSchema(colander.MappingSchema):
296 302 repo_group = RepoGroup()
297 303
298 304
299 305 class RepoNameUniqueSchema(colander.MappingSchema):
300 306 unique_repo_name = colander.SchemaNode(
301 307 colander.String(),
302 308 validator=deferred_unique_name_validator)
303 309
304 310
305 311 class RepoSchema(colander.MappingSchema):
306 312
307 313 repo_name = colander.SchemaNode(
308 314 types.RepoNameType(),
309 315 validator=deferred_repo_name_validator)
310 316
311 317 repo_type = colander.SchemaNode(
312 318 colander.String(),
313 319 validator=deferred_repo_type_validator)
314 320
315 321 repo_owner = colander.SchemaNode(
316 322 colander.String(),
317 323 validator=deferred_repo_owner_validator,
318 324 widget=deform.widget.TextInputWidget())
319 325
320 326 repo_description = colander.SchemaNode(
321 327 colander.String(), missing='',
322 328 widget=deform.widget.TextAreaWidget())
323 329
324 330 repo_landing_commit_ref = colander.SchemaNode(
325 331 colander.String(),
326 332 validator=deferred_landing_ref_validator,
327 333 preparers=[preparers.strip_preparer],
328 334 missing=DEFAULT_LANDING_REF,
329 335 widget=deferred_landing_ref_widget)
330 336
331 337 repo_clone_uri = colander.SchemaNode(
332 338 colander.String(),
333 339 validator=deferred_sync_uri_validator,
334 340 preparers=[preparers.strip_preparer],
335 341 missing='')
336 342
337 343 repo_push_uri = colander.SchemaNode(
338 344 colander.String(),
339 345 validator=deferred_sync_uri_validator,
340 346 preparers=[preparers.strip_preparer],
341 347 missing='')
342 348
343 349 repo_fork_of = colander.SchemaNode(
344 350 colander.String(),
345 351 validator=deferred_fork_of_validator,
346 352 missing=None)
347 353
348 354 repo_private = colander.SchemaNode(
349 355 types.StringBooleanType(),
350 356 missing=False, widget=deform.widget.CheckboxWidget())
351 357 repo_copy_permissions = colander.SchemaNode(
352 358 types.StringBooleanType(),
353 359 missing=False, widget=deform.widget.CheckboxWidget())
354 360 repo_enable_statistics = colander.SchemaNode(
355 361 types.StringBooleanType(),
356 362 missing=False, widget=deform.widget.CheckboxWidget())
357 363 repo_enable_downloads = colander.SchemaNode(
358 364 types.StringBooleanType(),
359 365 missing=False, widget=deform.widget.CheckboxWidget())
360 366 repo_enable_locking = colander.SchemaNode(
361 367 types.StringBooleanType(),
362 368 missing=False, widget=deform.widget.CheckboxWidget())
363 369
364 370 def deserialize(self, cstruct):
365 371 """
366 372 Custom deserialize that allows to chain validation, and verify
367 373 permissions, and as last step uniqueness
368 374 """
369 375
370 376 # first pass, to validate given data
371 377 appstruct = super(RepoSchema, self).deserialize(cstruct)
372 378 validated_name = appstruct['repo_name']
373 379
374 380 # second pass to validate permissions to repo_group
381 if 'old_values' in self.bindings:
382 # save current repo name for name change checks
383 self.bindings['old_values']['submitted_repo_name'] = validated_name
375 384 second = RepoGroupAccessSchema().bind(**self.bindings)
376 385 appstruct_second = second.deserialize({'repo_group': validated_name})
377 386 # save result
378 387 appstruct['repo_group'] = appstruct_second['repo_group']
379 388
380 389 # thirds to validate uniqueness
381 390 third = RepoNameUniqueSchema().bind(**self.bindings)
382 391 third.deserialize({'unique_repo_name': validated_name})
383 392
384 393 return appstruct
385 394
386 395
387 396 class RepoSettingsSchema(RepoSchema):
388 397 repo_group = colander.SchemaNode(
389 398 colander.Integer(),
390 399 validator=deferred_repo_group_validator,
391 400 widget=deferred_repo_group_widget,
392 401 missing='')
393 402
394 403 repo_clone_uri_change = colander.SchemaNode(
395 404 colander.String(),
396 405 missing='NEW')
397 406
398 407 repo_clone_uri = colander.SchemaNode(
399 408 colander.String(),
400 409 preparers=[preparers.strip_preparer],
401 410 validator=deferred_sync_uri_validator,
402 411 missing='')
403 412
404 413 repo_push_uri_change = colander.SchemaNode(
405 414 colander.String(),
406 415 missing='NEW')
407 416
408 417 repo_push_uri = colander.SchemaNode(
409 418 colander.String(),
410 419 preparers=[preparers.strip_preparer],
411 420 validator=deferred_sync_uri_validator,
412 421 missing='')
413 422
414 423 def deserialize(self, cstruct):
415 424 """
416 425 Custom deserialize that allows to chain validation, and verify
417 426 permissions, and as last step uniqueness
418 427 """
419 428
420 429 # first pass, to validate given data
421 430 appstruct = super(RepoSchema, self).deserialize(cstruct)
422 431 validated_name = appstruct['repo_name']
423 432 # because of repoSchema adds repo-group as an ID, we inject it as
424 433 # full name here because validators require it, it's unwrapped later
425 434 # so it's safe to use and final name is going to be without group anyway
426 435
427 436 group, separator = get_repo_group(appstruct['repo_group'])
428 437 if group:
429 438 validated_name = separator.join([group.group_name, validated_name])
430 439
431 440 # second pass to validate permissions to repo_group
441 if 'old_values' in self.bindings:
442 # save current repo name for name change checks
443 self.bindings['old_values']['submitted_repo_name'] = validated_name
432 444 second = RepoGroupAccessSchema().bind(**self.bindings)
433 445 appstruct_second = second.deserialize({'repo_group': validated_name})
434 446 # save result
435 447 appstruct['repo_group'] = appstruct_second['repo_group']
436 448
437 449 # thirds to validate uniqueness
438 450 third = RepoNameUniqueSchema().bind(**self.bindings)
439 451 third.deserialize({'unique_repo_name': validated_name})
440 452
441 453 return appstruct
General Comments 0
You need to be logged in to leave comments. Login now