##// END OF EJS Templates
validators/schemas: python3 fixes str vs unicode and few test breaking fixes
super-admin -
r5066:ccd88b7c default
parent child Browse files
Show More
@@ -1,88 +1,99 b''
1 1
2 2
3 3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 import unicodedata
21 from rhodecode.lib.str_utils import convert_special_chars, safe_int, safe_bytes
22
23
24 def to_bytes_preparer(value):
25
26 if value:
27 value = safe_bytes(value)
28 return value
22 29
23 30
24 31 def strip_preparer(value):
25 32 """
26 33 strips given values using .strip() function
27 34 """
28 35
29 36 if value:
30 37 value = value.strip()
31 38 return value
32 39
33 40
34 41 def slugify_preparer(value, keep_case=True):
35 42 """
36 43 Slugify given value to a safe representation for url/id
37 44 """
38 45 from rhodecode.lib.utils import repo_name_slug
39 46 if value:
40 47 value = repo_name_slug(value if keep_case else value.lower())
41 48 return value
42 49
43 50
44 51 def non_ascii_strip_preparer(value):
45 52 """
46 53 trie to replace non-ascii letters to their ascii representation
47 54 eg::
48 55
49 56 `ΕΌoΕ‚w` converts into `zolw`
50 57 """
51 58 if value:
52 value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore')
59 value = convert_special_chars(value)
53 60 return value
54 61
55 62
56 63 def unique_list_preparer(value):
57 64 """
58 65 Converts an list to a list with only unique values
59 66 """
60 67
61 68 def make_unique(value):
62 69 seen = []
63 70 return [c for c in value if
64 71 not (c in seen or seen.append(c))]
65 72
66 73 if isinstance(value, list):
67 74 ret_val = make_unique(value)
68 75 elif isinstance(value, set):
69 76 ret_val = list(value)
70 77 elif isinstance(value, tuple):
71 78 ret_val = make_unique(value)
72 79 elif value is None:
73 80 ret_val = []
74 81 else:
75 82 ret_val = [value]
76 83
77 84 return ret_val
78 85
79 86
80 87 def unique_list_from_str_preparer(value):
81 88 """
82 89 Converts an list to a list with only unique values
83 90 """
84 91 from rhodecode.lib.utils2 import aslist
85 92
86 93 if isinstance(value, str):
87 94 value = aslist(value, ',')
88 return unique_list_preparer(value) No newline at end of file
95 return unique_list_preparer(value)
96
97
98 def ensure_value_is_int(value):
99 return safe_int(value)
@@ -1,74 +1,74 b''
1 1
2 2
3 3 # Copyright (C) 2017-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22
23 23 import colander
24 24
25 25 from rhodecode.translation import _
26 26 from rhodecode.model.validation_schema import preparers
27 27 from rhodecode.model.validation_schema import types
28 28
29 29
30 30 @colander.deferred
31 31 def deferred_lifetime_validator(node, kw):
32 32 options = kw.get('lifetime_options', [])
33 33 return colander.All(
34 34 colander.Range(min=-1, max=60 * 24 * 30 * 12),
35 35 colander.OneOf([x for x in options]))
36 36
37 37
38 38 def unique_gist_validator(node, value):
39 39 from rhodecode.model.db import Gist
40 40 existing = Gist.get_by_access_id(value)
41 41 if existing:
42 42 msg = _(u'Gist with name {} already exists').format(value)
43 43 raise colander.Invalid(node, msg)
44 44
45 45
46 46 def filename_validator(node, value):
47 47 if value != os.path.basename(value):
48 48 msg = _(u'Filename {} cannot be inside a directory').format(value)
49 49 raise colander.Invalid(node, msg)
50 50
51 51
52 52 comment_types = ['note', 'todo']
53 53
54 54
55 55 class CommentSchema(colander.MappingSchema):
56 56 from rhodecode.model.db import ChangesetComment, ChangesetStatus
57 57
58 58 comment_body = colander.SchemaNode(colander.String())
59 59 comment_type = colander.SchemaNode(
60 60 colander.String(),
61 61 validator=colander.OneOf(ChangesetComment.COMMENT_TYPES),
62 62 missing=ChangesetComment.COMMENT_TYPE_NOTE)
63 is_draft = colander.SchemaNode(colander.Boolean(),missing=False)
63 is_draft = colander.SchemaNode(colander.Boolean(), missing=False)
64 64 comment_file = colander.SchemaNode(colander.String(), missing=None)
65 65 comment_line = colander.SchemaNode(colander.String(), missing=None)
66 66 status_change = colander.SchemaNode(
67 67 colander.String(), missing=None,
68 68 validator=colander.OneOf([x[0] for x in ChangesetStatus.STATUSES]))
69 69 renderer_type = colander.SchemaNode(colander.String())
70 70
71 71 resolves_comment_id = colander.SchemaNode(colander.Integer(), missing=None)
72 72
73 73 user = colander.SchemaNode(types.StrOrIntType())
74 74 repo = colander.SchemaNode(types.StrOrIntType())
@@ -1,183 +1,194 b''
1 1
2 2
3 3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22
23 23 import colander
24 24
25 from rhodecode.lib.str_utils import safe_str
25 26 from rhodecode.translation import _
26 27 from rhodecode.model.validation_schema import preparers
27 28
28 29
29 30 def nodes_to_sequence(nodes, colander_node=None):
30 31 """
31 32 Converts old style dict nodes to new list of dicts
32 33
33 34 :param nodes: dict with key beeing name of the file
34 35
35 36 """
36 37 if not isinstance(nodes, dict):
37 msg = 'Nodes needs to be a dict, got {}'.format(type(nodes))
38 msg = f'Nodes needs to be a dict, got {type(nodes)}'
38 39 raise colander.Invalid(colander_node, msg)
39 40 out = []
40 41
41 42 for key, val in nodes.items():
42 43 val = (isinstance(val, dict) and val) or {}
43 44 out.append(dict(
44 45 filename=key,
45 46 content=val.get('content'),
46 47 mimetype=val.get('mimetype')
47 48 ))
48 49
49 50 out = Nodes().deserialize(out)
50 51 return out
51 52
52 53
53 54 def sequence_to_nodes(nodes, colander_node=None):
55
54 56 if not isinstance(nodes, list):
55 57 msg = 'Nodes needs to be a list, got {}'.format(type(nodes))
56 58 raise colander.Invalid(colander_node, msg)
57 59 nodes = Nodes().deserialize(nodes)
58 60
59 61 out = {}
60 62 try:
61 63 for file_data in nodes:
62 64 file_data_skip = file_data.copy()
63 65 # if we got filename_org we use it as a key so we keep old
64 66 # name as input and rename is-reflected inside the values as
65 67 # filename and filename_org differences.
66 68 filename_org = file_data.get('filename_org')
67 69 filename = filename_org or file_data['filename']
68 70 out[filename] = {}
69 71 out[filename].update(file_data_skip)
70 72
71 73 except Exception as e:
72 74 msg = 'Invalid data format org_exc:`{}`'.format(repr(e))
73 75 raise colander.Invalid(colander_node, msg)
74 76 return out
75 77
76 78
77 79 @colander.deferred
78 80 def deferred_lifetime_validator(node, kw):
79 81 options = kw.get('lifetime_options', [])
80 82 return colander.All(
81 83 colander.Range(min=-1, max=60 * 24 * 30 * 12),
82 84 colander.OneOf([x for x in options]))
83 85
84 86
85 87 def unique_gist_validator(node, value):
86 88 from rhodecode.model.db import Gist
87 89 existing = Gist.get_by_access_id(value)
88 90 if existing:
89 msg = _(u'Gist with name {} already exists').format(value)
91 msg = _('Gist with name {} already exists').format(value)
90 92 raise colander.Invalid(node, msg)
91 93
92 94
93 95 def filename_validator(node, value):
94 96 if value != os.path.basename(value):
95 msg = _(u'Filename {} cannot be inside a directory').format(value)
97 msg = _('Filename {} cannot be inside a directory').format(safe_str(value))
96 98 raise colander.Invalid(node, msg)
97 99
98 100
99 101 class NodeSchema(colander.MappingSchema):
100 102 # if we perform rename this will be org filename
101 103 filename_org = colander.SchemaNode(
102 colander.String(),
103 preparer=[preparers.strip_preparer,
104 preparers.non_ascii_strip_preparer],
104 colander.String(encoding='utf-8'),
105 preparer=[
106 preparers.strip_preparer,
107 preparers.non_ascii_strip_preparer,
108 preparers.to_bytes_preparer,
109 ],
105 110 validator=filename_validator,
106 111 missing=None)
107 112
108 113 filename = colander.SchemaNode(
109 colander.String(),
110 preparer=[preparers.strip_preparer,
111 preparers.non_ascii_strip_preparer],
114 colander.String(encoding='utf-8'),
115 preparer=[
116 preparers.strip_preparer,
117 preparers.non_ascii_strip_preparer,
118 preparers.to_bytes_preparer,
119 ],
112 120 validator=filename_validator)
113 121
114 122 content = colander.SchemaNode(
115 colander.String())
123 colander.String(encoding='utf-8'),
124 preparer=[preparers.to_bytes_preparer])
125
116 126 mimetype = colander.SchemaNode(
117 127 colander.String(),
118 128 missing=None)
119 129
120 130
121 131 class Nodes(colander.SequenceSchema):
122 132 filenames = NodeSchema()
123 133
124 134 def validator(self, node, cstruct):
125 135 if not isinstance(cstruct, list):
126 136 return
127 137
128 138 found_filenames = []
129 139 for data in cstruct:
130 140 filename = data['filename']
131 141 if filename in found_filenames:
132 142 msg = _('Duplicated value for filename found: `{}`').format(
133 143 filename)
134 144 raise colander.Invalid(node, msg)
135 145 found_filenames.append(filename)
136 146
137 147
138 148 class GistSchema(colander.MappingSchema):
139 149 """
140 150 schema = GistSchema()
141 151 schema.bind(
142 152 lifetime_options = [1,2,3]
143 153 )
144 154 out = schema.deserialize(dict(
145 155 nodes=[
146 156 {'filename': 'x', 'content': 'xxx', },
147 157 {'filename': 'docs/Z', 'content': 'xxx', 'mimetype': 'x'},
148 158 ]
149 159 ))
150 160 """
151 161
152 162 from rhodecode.model.db import Gist
153 163
154 164 gistid = colander.SchemaNode(
155 165 colander.String(),
156 166 missing=None,
157 167 preparer=[preparers.strip_preparer,
158 168 preparers.non_ascii_strip_preparer,
159 169 preparers.slugify_preparer],
160 170 validator=colander.All(
161 171 colander.Length(min=3),
162 172 unique_gist_validator
163 173 ))
164 174
165 175 description = colander.SchemaNode(
166 176 colander.String(),
167 missing=u'')
177 missing='')
168 178
169 179 lifetime = colander.SchemaNode(
170 180 colander.Integer(),
171 181 validator=deferred_lifetime_validator)
172 182
173 183 gist_acl_level = colander.SchemaNode(
174 184 colander.String(),
185 missing=Gist.ACL_LEVEL_PRIVATE,
175 186 validator=colander.OneOf([Gist.ACL_LEVEL_PUBLIC,
176 187 Gist.ACL_LEVEL_PRIVATE]))
177 188
178 189 gist_type = colander.SchemaNode(
179 190 colander.String(),
180 missing=Gist.GIST_PUBLIC,
191 missing=Gist.GIST_PRIVATE,
181 192 validator=colander.OneOf([Gist.GIST_PRIVATE, Gist.GIST_PUBLIC]))
182 193
183 194 nodes = Nodes()
@@ -1,450 +1,460 b''
1 1
2 2
3 3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import colander
22 22 import deform.widget
23 23
24 24 from rhodecode.translation import _
25 25 from rhodecode.model.validation_schema.utils import convert_to_optgroup, username_converter
26 26 from rhodecode.model.validation_schema import validators, preparers, types
27 27
28 28 DEFAULT_LANDING_REF = 'rev:tip'
29 29
30 30
31 31 def get_group_and_repo(repo_name):
32 32 from rhodecode.model.repo_group import RepoGroupModel
33 33 return RepoGroupModel()._get_group_name_and_parent(
34 34 repo_name, get_object=True)
35 35
36 36
37 37 def get_repo_group(repo_group_id):
38 38 from rhodecode.model.repo_group import RepoGroup
39 39 return RepoGroup.get(repo_group_id), RepoGroup.CHOICES_SEPARATOR
40 40
41 41
42 42 @colander.deferred
43 43 def deferred_repo_type_validator(node, kw):
44 44 options = kw.get('repo_type_options', [])
45 45 return colander.OneOf([x for x in options])
46 46
47 47
48 48 @colander.deferred
49 49 def deferred_repo_owner_validator(node, kw):
50 50
51 51 def repo_owner_validator(node, value):
52 52 from rhodecode.model.db import User
53 53 value = username_converter(value)
54 54 existing = User.get_by_username(value)
55 55 if not existing:
56 56 msg = _(u'Repo owner with id `{}` does not exists').format(value)
57 57 raise colander.Invalid(node, msg)
58 58
59 59 return repo_owner_validator
60 60
61 61
62 62 @colander.deferred
63 63 def deferred_landing_ref_validator(node, kw):
64 64 options = kw.get(
65 65 'repo_ref_options', [DEFAULT_LANDING_REF])
66 66 return colander.OneOf([x for x in options])
67 67
68 68
69 69 @colander.deferred
70 70 def deferred_sync_uri_validator(node, kw):
71 71 repo_type = kw.get('repo_type')
72 72 validator = validators.CloneUriValidator(repo_type)
73 73 return validator
74 74
75 75
76 76 @colander.deferred
77 77 def deferred_landing_ref_widget(node, kw):
78 78 from rhodecode.model.scm import ScmModel
79 79
80 80 repo_type = kw.get('repo_type')
81 81 default_opts = []
82 82 if repo_type:
83 83 default_landing_ref, _lbl = ScmModel.backend_landing_ref(repo_type)
84 84 default_opts.append((default_landing_ref, default_landing_ref))
85 85
86 86 items = kw.get('repo_ref_items', default_opts)
87 87 items = convert_to_optgroup(items)
88 88 return deform.widget.Select2Widget(values=items)
89 89
90 90
91 91 @colander.deferred
92 92 def deferred_fork_of_validator(node, kw):
93 93 old_values = kw.get('old_values') or {}
94 94
95 95 def fork_of_validator(node, value):
96 96 from rhodecode.model.db import Repository, RepoGroup
97 97 existing = Repository.get_by_repo_name(value)
98 98 if not existing:
99 99 msg = _(u'Fork with id `{}` does not exists').format(value)
100 100 raise colander.Invalid(node, msg)
101 101 elif old_values['repo_name'] == existing.repo_name:
102 102 msg = _(u'Cannot set fork of '
103 103 u'parameter of this repository to itself').format(value)
104 104 raise colander.Invalid(node, msg)
105 105
106 106 return fork_of_validator
107 107
108 108
109 109 @colander.deferred
110 110 def deferred_can_write_to_group_validator(node, kw):
111 111 request_user = kw.get('user')
112 112 old_values = kw.get('old_values') or {}
113 113
114 114 def can_write_to_group_validator(node, value):
115 115 """
116 116 Checks if given repo path is writable by user. This includes checks if
117 117 user is allowed to create repositories under root path or under
118 118 repo group paths
119 119 """
120 120
121 121 from rhodecode.lib.auth import (
122 122 HasPermissionAny, HasRepoGroupPermissionAny)
123 123 from rhodecode.model.repo_group import RepoGroupModel
124 124
125 125 messages = {
126 126 'invalid_repo_group':
127 127 _(u"Repository group `{}` does not exist"),
128 128 # permissions denied we expose as not existing, to prevent
129 129 # resource discovery
130 130 'permission_denied':
131 131 _(u"Repository group `{}` does not exist"),
132 132 'permission_denied_root':
133 133 _(u"You do not have the permission to store "
134 134 u"repositories in the root location.")
135 135 }
136 136
137 137 value = value['repo_group_name']
138 138
139 139 is_root_location = value is types.RootLocation
140 140 # NOT initialized validators, we must call them
141 141 can_create_repos_at_root = HasPermissionAny('hg.admin', 'hg.create.repository')
142 142
143 143 # if values is root location, we simply need to check if we can write
144 144 # to root location !
145 145 if is_root_location:
146 146
147 147 if can_create_repos_at_root(user=request_user):
148 148 # we can create repo group inside tool-level. No more checks
149 149 # are required
150 150 return
151 151 else:
152 152 old_name = old_values.get('repo_name')
153 153 if old_name and old_name == old_values.get('submitted_repo_name'):
154 154 # since we didn't change the name, we can skip validation and
155 155 # allow current users without store-in-root permissions to update
156 156 return
157 157
158 158 # "fake" node name as repo_name, otherwise we oddly report
159 159 # the error as if it was coming form repo_group
160 160 # however repo_group is empty when using root location.
161 161 node.name = 'repo_name'
162 162 raise colander.Invalid(node, messages['permission_denied_root'])
163 163
164 164 # parent group not exists ? throw an error
165 165 repo_group = RepoGroupModel().get_by_group_name(value)
166 166 if value and not repo_group:
167 167 raise colander.Invalid(
168 168 node, messages['invalid_repo_group'].format(value))
169 169
170 170 gr_name = repo_group.group_name
171 171
172 172 # create repositories with write permission on group is set to true
173 173 create_on_write = HasPermissionAny(
174 174 'hg.create.write_on_repogroup.true')(user=request_user)
175 175
176 176 group_admin = HasRepoGroupPermissionAny('group.admin')(
177 177 gr_name, 'can write into group validator', user=request_user)
178 178 group_write = HasRepoGroupPermissionAny('group.write')(
179 179 gr_name, 'can write into group validator', user=request_user)
180 180
181 181 forbidden = not (group_admin or (group_write and create_on_write))
182 182
183 183 # TODO: handling of old values, and detecting no-change in path
184 184 # to skip permission checks in such cases. This only needs to be
185 185 # implemented if we use this schema in forms as well
186 186
187 187 # gid = (old_data['repo_group'].get('group_id')
188 188 # if (old_data and 'repo_group' in old_data) else None)
189 189 # value_changed = gid != safe_int(value)
190 190 # new = not old_data
191 191
192 192 # do check if we changed the value, there's a case that someone got
193 193 # revoked write permissions to a repository, he still created, we
194 194 # don't need to check permission if he didn't change the value of
195 195 # groups in form box
196 196 # if value_changed or new:
197 197 # # parent group need to be existing
198 198 # TODO: ENDS HERE
199 199
200 200 if repo_group and forbidden:
201 201 msg = messages['permission_denied'].format(value)
202 202 raise colander.Invalid(node, msg)
203 203
204 204 return can_write_to_group_validator
205 205
206 206
207 207 @colander.deferred
208 208 def deferred_unique_name_validator(node, kw):
209 209 request_user = kw.get('user')
210 210 old_values = kw.get('old_values') or {}
211 211
212 212 def unique_name_validator(node, value):
213 213 from rhodecode.model.db import Repository, RepoGroup
214 214 name_changed = value != old_values.get('repo_name')
215 215
216 216 existing = Repository.get_by_repo_name(value)
217 217 if name_changed and existing:
218 218 msg = _(u'Repository with name `{}` already exists').format(value)
219 219 raise colander.Invalid(node, msg)
220 220
221 221 existing_group = RepoGroup.get_by_group_name(value)
222 222 if name_changed and existing_group:
223 223 msg = _(u'Repository group with name `{}` already exists').format(
224 224 value)
225 225 raise colander.Invalid(node, msg)
226 226 return unique_name_validator
227 227
228 228
229 229 @colander.deferred
230 230 def deferred_repo_name_validator(node, kw):
231 231 def no_git_suffix_validator(node, value):
232 232 if value.endswith('.git'):
233 233 msg = _('Repository name cannot end with .git')
234 234 raise colander.Invalid(node, msg)
235 235 return colander.All(
236 236 no_git_suffix_validator, validators.valid_name_validator)
237 237
238 238
239 239 @colander.deferred
240 240 def deferred_repo_group_validator(node, kw):
241 options = kw.get(
242 'repo_repo_group_options')
243 return colander.OneOf([x for x in options])
241 options = kw.get('repo_repo_group_options')
242
243 def repo_group_validator(node, value):
244 choices = [x for x in options]
245 err = _('Group ID: `${val}` is not one of allowed ${choices}')
246
247 if preparers.ensure_value_is_int(value) not in choices:
248 choices = ', '.join(['%s' % x for x in sorted(choices)])
249 err = _(err, mapping={'val': value, 'choices': choices})
250 raise colander.Invalid(node, err)
251
252 return repo_group_validator
244 253
245 254
246 255 @colander.deferred
247 256 def deferred_repo_group_widget(node, kw):
248 257 items = kw.get('repo_repo_group_items')
249 258 return deform.widget.Select2Widget(values=items)
250 259
251 260
252 261 class GroupType(colander.Mapping):
253 262 def _validate(self, node, value):
254 263 try:
255 264 return dict(repo_group_name=value)
256 265 except Exception as e:
257 266 raise colander.Invalid(
258 267 node, '"${val}" is not a mapping type: ${err}'.format(
259 268 val=value, err=e))
260 269
261 270 def deserialize(self, node, cstruct):
262 271 if cstruct is colander.null:
263 272 return cstruct
264 273
265 274 appstruct = super(GroupType, self).deserialize(node, cstruct)
266 275 validated_name = appstruct['repo_group_name']
267 276
268 277 # inject group based on once deserialized data
269 278 (repo_name_without_group,
270 279 parent_group_name,
271 280 parent_group) = get_group_and_repo(validated_name)
272 281
273 282 appstruct['repo_name_with_group'] = validated_name
274 283 appstruct['repo_name_without_group'] = repo_name_without_group
275 284 appstruct['repo_group_name'] = parent_group_name or types.RootLocation
276 285
277 286 if parent_group:
278 287 appstruct['repo_group_id'] = parent_group.group_id
279 288
280 289 return appstruct
281 290
282 291
283 292 class GroupSchema(colander.SchemaNode):
284 293 schema_type = GroupType
285 294 validator = deferred_can_write_to_group_validator
286 295 missing = colander.null
287 296
288 297
289 298 class RepoGroup(GroupSchema):
290 299 repo_group_name = colander.SchemaNode(
291 300 types.GroupNameType())
292 301 repo_group_id = colander.SchemaNode(
293 302 colander.String(), missing=None)
294 303 repo_name_without_group = colander.SchemaNode(
295 304 colander.String(), missing=None)
296 305
297 306
298 307 class RepoGroupAccessSchema(colander.MappingSchema):
299 308 repo_group = RepoGroup()
300 309
301 310
302 311 class RepoNameUniqueSchema(colander.MappingSchema):
303 312 unique_repo_name = colander.SchemaNode(
304 313 colander.String(),
305 314 validator=deferred_unique_name_validator)
306 315
307 316
308 317 class RepoSchema(colander.MappingSchema):
309 318
310 319 repo_name = colander.SchemaNode(
311 320 types.RepoNameType(),
312 321 validator=deferred_repo_name_validator)
313 322
314 323 repo_type = colander.SchemaNode(
315 324 colander.String(),
316 325 validator=deferred_repo_type_validator)
317 326
318 327 repo_owner = colander.SchemaNode(
319 328 colander.String(),
320 329 validator=deferred_repo_owner_validator,
321 330 widget=deform.widget.TextInputWidget())
322 331
323 332 repo_description = colander.SchemaNode(
324 333 colander.String(), missing='',
325 334 widget=deform.widget.TextAreaWidget())
326 335
327 336 repo_landing_commit_ref = colander.SchemaNode(
328 337 colander.String(),
329 338 validator=deferred_landing_ref_validator,
330 339 preparers=[preparers.strip_preparer],
331 340 missing=DEFAULT_LANDING_REF,
332 341 widget=deferred_landing_ref_widget)
333 342
334 343 repo_clone_uri = colander.SchemaNode(
335 344 colander.String(),
336 345 validator=deferred_sync_uri_validator,
337 346 preparers=[preparers.strip_preparer],
338 347 missing='')
339 348
340 349 repo_push_uri = colander.SchemaNode(
341 350 colander.String(),
342 351 validator=deferred_sync_uri_validator,
343 352 preparers=[preparers.strip_preparer],
344 353 missing='')
345 354
346 355 repo_fork_of = colander.SchemaNode(
347 356 colander.String(),
348 357 validator=deferred_fork_of_validator,
349 358 missing=None)
350 359
351 360 repo_private = colander.SchemaNode(
352 361 types.StringBooleanType(),
353 362 missing=False, widget=deform.widget.CheckboxWidget())
354 363 repo_copy_permissions = colander.SchemaNode(
355 364 types.StringBooleanType(),
356 365 missing=False, widget=deform.widget.CheckboxWidget())
357 366 repo_enable_statistics = colander.SchemaNode(
358 367 types.StringBooleanType(),
359 368 missing=False, widget=deform.widget.CheckboxWidget())
360 369 repo_enable_downloads = colander.SchemaNode(
361 370 types.StringBooleanType(),
362 371 missing=False, widget=deform.widget.CheckboxWidget())
363 372 repo_enable_locking = colander.SchemaNode(
364 373 types.StringBooleanType(),
365 374 missing=False, widget=deform.widget.CheckboxWidget())
366 375
367 376 def deserialize(self, cstruct):
368 377 """
369 378 Custom deserialize that allows to chain validation, and verify
370 379 permissions, and as last step uniqueness
371 380 """
372 381
373 382 # first pass, to validate given data
374 383 appstruct = super(RepoSchema, self).deserialize(cstruct)
375 384 validated_name = appstruct['repo_name']
376 385
377 386 # second pass to validate permissions to repo_group
378 387 if 'old_values' in self.bindings:
379 388 # save current repo name for name change checks
380 389 self.bindings['old_values']['submitted_repo_name'] = validated_name
381 390 second = RepoGroupAccessSchema().bind(**self.bindings)
382 391 appstruct_second = second.deserialize({'repo_group': validated_name})
383 392 # save result
384 393 appstruct['repo_group'] = appstruct_second['repo_group']
385 394
386 395 # thirds to validate uniqueness
387 396 third = RepoNameUniqueSchema().bind(**self.bindings)
388 397 third.deserialize({'unique_repo_name': validated_name})
389 398
390 399 return appstruct
391 400
392 401
393 402 class RepoSettingsSchema(RepoSchema):
394 403 repo_group = colander.SchemaNode(
395 404 colander.Integer(),
396 405 validator=deferred_repo_group_validator,
397 406 widget=deferred_repo_group_widget,
407 preparers=[preparers.ensure_value_is_int],
398 408 missing='')
399 409
400 410 repo_clone_uri_change = colander.SchemaNode(
401 411 colander.String(),
402 412 missing='NEW')
403 413
404 414 repo_clone_uri = colander.SchemaNode(
405 415 colander.String(),
406 416 preparers=[preparers.strip_preparer],
407 417 validator=deferred_sync_uri_validator,
408 418 missing='')
409 419
410 420 repo_push_uri_change = colander.SchemaNode(
411 421 colander.String(),
412 422 missing='NEW')
413 423
414 424 repo_push_uri = colander.SchemaNode(
415 425 colander.String(),
416 426 preparers=[preparers.strip_preparer],
417 427 validator=deferred_sync_uri_validator,
418 428 missing='')
419 429
420 430 def deserialize(self, cstruct):
421 431 """
422 432 Custom deserialize that allows to chain validation, and verify
423 433 permissions, and as last step uniqueness
424 434 """
425 435
426 436 # first pass, to validate given data
427 437 appstruct = super(RepoSchema, self).deserialize(cstruct)
428 438 validated_name = appstruct['repo_name']
429 439 # because of repoSchema adds repo-group as an ID, we inject it as
430 440 # full name here because validators require it, it's unwrapped later
431 441 # so it's safe to use and final name is going to be without group anyway
432 442
433 443 group, separator = get_repo_group(appstruct['repo_group'])
434 444 if group:
435 445 validated_name = separator.join([group.group_name, validated_name])
436 446
437 447 # second pass to validate permissions to repo_group
438 448 if 'old_values' in self.bindings:
439 449 # save current repo name for name change checks
440 450 self.bindings['old_values']['submitted_repo_name'] = validated_name
441 451 second = RepoGroupAccessSchema().bind(**self.bindings)
442 452 appstruct_second = second.deserialize({'repo_group': validated_name})
443 453 # save result
444 454 appstruct['repo_group'] = appstruct_second['repo_group']
445 455
446 456 # thirds to validate uniqueness
447 457 third = RepoNameUniqueSchema().bind(**self.bindings)
448 458 third.deserialize({'unique_repo_name': validated_name})
449 459
450 460 return appstruct
@@ -1,160 +1,161 b''
1 1
2 2
3 3 # Copyright (C) 2011-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 import os
22 21 import re
23 22 import logging
24 23
25
26 24 import ipaddress
27 25 import colander
28 26
29 27 from rhodecode.translation import _
30 from rhodecode.lib.utils2 import glob2re, safe_unicode
28 from rhodecode.lib.utils2 import glob2re
29 from rhodecode.lib.str_utils import safe_str
31 30 from rhodecode.lib.ext_json import json
32 31
33 32 log = logging.getLogger(__name__)
34 33
35 34
36 35 def ip_addr_validator(node, value):
37 36 try:
38 37 # this raises an ValueError if address is not IpV4 or IpV6
39 ipaddress.ip_network(safe_unicode(value), strict=False)
38 ipaddress.ip_network(safe_str(value), strict=False)
40 39 except ValueError:
41 msg = _(u'Please enter a valid IPv4 or IpV6 address')
40 msg = _('Please enter a valid IPv4 or IpV6 address')
42 41 raise colander.Invalid(node, msg)
43 42
44 43
45 44 class IpAddrValidator(object):
46 45 def __init__(self, strict=True):
47 46 self.strict = strict
48 47
49 48 def __call__(self, node, value):
50 49 try:
51 50 # this raises an ValueError if address is not IpV4 or IpV6
52 ipaddress.ip_network(safe_unicode(value), strict=self.strict)
51 ipaddress.ip_network(safe_str(value), strict=self.strict)
53 52 except ValueError:
54 msg = _(u'Please enter a valid IPv4 or IpV6 address')
53 msg = _('Please enter a valid IPv4 or IpV6 address')
55 54 raise colander.Invalid(node, msg)
56 55
57 56
58 57 def glob_validator(node, value):
59 58 try:
60 59 re.compile('^' + glob2re(value) + '$')
61 60 except Exception:
62 msg = _(u'Invalid glob pattern')
61 msg = _('Invalid glob pattern')
63 62 raise colander.Invalid(node, msg)
64 63
65 64
66 65 def valid_name_validator(node, value):
67 66 from rhodecode.model.validation_schema import types
68 67 if value is types.RootLocation:
69 68 return
70 69
71 70 msg = _('Name must start with a letter or number. Got `{}`').format(value)
72 71 if not re.match(r'^[a-zA-z0-9]{1,}', value):
73 72 raise colander.Invalid(node, msg)
74 73
75 74
76 75 class InvalidCloneUrl(Exception):
77 76 allowed_prefixes = ()
78 77
79 78
80 79 def url_validator(url, repo_type, config):
81 80 from rhodecode.lib.vcs.backends.hg import MercurialRepository
82 81 from rhodecode.lib.vcs.backends.git import GitRepository
83 82 from rhodecode.lib.vcs.backends.svn import SubversionRepository
84 83
85 84 if repo_type == 'hg':
86 85 allowed_prefixes = ('http', 'svn+http', 'git+http')
87 86
88 87 if 'http' in url[:4]:
89 88 # initially check if it's at least the proper URL
90 89 # or does it pass basic auth
91 90
92 91 return MercurialRepository.check_url(url, config)
93 92 elif 'svn+http' in url[:8]: # svn->hg import
94 93 SubversionRepository.check_url(url, config)
95 94 elif 'git+http' in url[:8]: # git->hg import
96 95 raise NotImplementedError()
97 96 else:
98 97 exc = InvalidCloneUrl('Clone from URI %s not allowed. '
99 98 'Allowed url must start with one of %s'
100 99 % (url, ','.join(allowed_prefixes)))
101 100 exc.allowed_prefixes = allowed_prefixes
102 101 raise exc
103 102
104 103 elif repo_type == 'git':
105 104 allowed_prefixes = ('http', 'svn+http', 'hg+http')
106 105 if 'http' in url[:4]:
107 106 # initially check if it's at least the proper URL
108 107 # or does it pass basic auth
109 108 return GitRepository.check_url(url, config)
110 109 elif 'svn+http' in url[:8]: # svn->git import
111 110 raise NotImplementedError()
112 111 elif 'hg+http' in url[:8]: # hg->git import
113 112 raise NotImplementedError()
114 113 else:
115 114 exc = InvalidCloneUrl('Clone from URI %s not allowed. '
116 115 'Allowed url must start with one of %s'
117 116 % (url, ','.join(allowed_prefixes)))
118 117 exc.allowed_prefixes = allowed_prefixes
119 118 raise exc
120 119 elif repo_type == 'svn':
121 120 # no validation for SVN yet
122 121 return
123 122
124 123 raise InvalidCloneUrl('Invalid repo type specified: `{}`'.format(repo_type))
125 124
126 125
127 126 class CloneUriValidator(object):
128 127 def __init__(self, repo_type):
129 128 self.repo_type = repo_type
130 129
131 130 def __call__(self, node, value):
132 131
133 132 from rhodecode.lib.utils import make_db_config
134 133 try:
135 134 config = make_db_config(clear_session=False)
136 135 url_validator(value, self.repo_type, config)
137 136 except InvalidCloneUrl as e:
138 137 log.warning(e)
139 raise colander.Invalid(node, e.message)
140 except Exception:
138 raise colander.Invalid(node, str(e))
139 except Exception as e:
141 140 log.exception('Url validation failed')
142 msg = _(u'invalid clone url or credentials for {repo_type} repository').format(
143 repo_type=self.repo_type)
141 reason = repr(e)
142 reason = reason.replace('<', '&lt;').replace('>', '&gt;')
143 msg = _('invalid clone url or credentials for {repo_type} repository. Reason: {reason}')\
144 .format(reason=reason, repo_type=self.repo_type)
144 145 raise colander.Invalid(node, msg)
145 146
146 147
147 148 def json_validator(node, value):
148 149 try:
149 150 json.loads(value)
150 except (Exception,) as e:
151 msg = _(u'Please enter a valid json object')
151 except (Exception,):
152 msg = _('Please enter a valid json object')
152 153 raise colander.Invalid(node, msg)
153 154
154 155
155 156 def json_validator_with_exc(node, value):
156 157 try:
157 158 json.loads(value)
158 159 except (Exception,) as e:
159 msg = _(u'Please enter a valid json object: `{}`'.format(e))
160 msg = _('Please enter a valid json object: `{}`'.format(e))
160 161 raise colander.Invalid(node, msg)
General Comments 0
You need to be logged in to leave comments. Login now