##// END OF EJS Templates
integrations: fix re-submit of form on repo integrations.
marcink -
r1384:13f7d9a4 default
parent child Browse files
Show More
@@ -1,393 +1,391 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2012-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import pylons
22 22 import deform
23 23 import logging
24 24 import colander
25 25 import peppercorn
26 26 import webhelpers.paginate
27 27
28 28 from pyramid.httpexceptions import HTTPFound, HTTPForbidden, HTTPBadRequest
29 29 from pyramid.renderers import render
30 30 from pyramid.response import Response
31 31
32 32 from rhodecode.lib import auth
33 33 from rhodecode.lib.auth import LoginRequired, HasPermissionAllDecorator
34 34 from rhodecode.lib.utils2 import safe_int
35 35 from rhodecode.lib.helpers import Page
36 36 from rhodecode.model.db import Repository, RepoGroup, Session, Integration
37 37 from rhodecode.model.scm import ScmModel
38 38 from rhodecode.model.integration import IntegrationModel
39 39 from rhodecode.admin.navigation import navigation_list
40 40 from rhodecode.translation import _
41 41 from rhodecode.integrations import integration_type_registry
42 42 from rhodecode.model.validation_schema.schemas.integration_schema import (
43 43 make_integration_schema, IntegrationScopeType)
44 44
45 45 log = logging.getLogger(__name__)
46 46
47 47
48 48 class IntegrationSettingsViewBase(object):
49 49 """ Base Integration settings view used by both repo / global settings """
50 50
51 51 def __init__(self, context, request):
52 52 self.context = context
53 53 self.request = request
54 54 self._load_general_context()
55 55
56 56 if not self.perm_check(request.user):
57 57 raise HTTPForbidden()
58 58
59 59 def _load_general_context(self):
60 60 """
61 61 This avoids boilerplate for repo/global+list/edit+views/templates
62 62 by doing all possible contexts at the same time however it should
63 63 be split up into separate functions once more "contexts" exist
64 64 """
65 65
66 66 self.IntegrationType = None
67 67 self.repo = None
68 68 self.repo_group = None
69 69 self.integration = None
70 70 self.integrations = {}
71 71
72 72 request = self.request
73 73
74 74 if 'repo_name' in request.matchdict: # in repo settings context
75 75 repo_name = request.matchdict['repo_name']
76 76 self.repo = Repository.get_by_repo_name(repo_name)
77 77
78 78 if 'repo_group_name' in request.matchdict: # in group settings context
79 79 repo_group_name = request.matchdict['repo_group_name']
80 80 self.repo_group = RepoGroup.get_by_group_name(repo_group_name)
81 81
82 82
83 83 if 'integration' in request.matchdict: # integration type context
84 84 integration_type = request.matchdict['integration']
85 85 self.IntegrationType = integration_type_registry[integration_type]
86 86
87 87 if 'integration_id' in request.matchdict: # single integration context
88 88 integration_id = request.matchdict['integration_id']
89 89 self.integration = Integration.get(integration_id)
90 90
91 91 # extra perms check just in case
92 92 if not self._has_perms_for_integration(self.integration):
93 93 raise HTTPForbidden()
94 94
95 95 self.settings = self.integration and self.integration.settings or {}
96 96 self.admin_view = not (self.repo or self.repo_group)
97 97
98 98 def _has_perms_for_integration(self, integration):
99 99 perms = self.request.user.permissions
100 100
101 101 if 'hg.admin' in perms['global']:
102 102 return True
103 103
104 104 if integration.repo:
105 105 return perms['repositories'].get(
106 106 integration.repo.repo_name) == 'repository.admin'
107 107
108 108 if integration.repo_group:
109 109 return perms['repositories_groups'].get(
110 110 integration.repo_group.group_name) == 'group.admin'
111 111
112 112 return False
113 113
114 114 def _template_c_context(self):
115 115 # TODO: dan: this is a stopgap in order to inherit from current pylons
116 116 # based admin/repo settings templates - this should be removed entirely
117 117 # after port to pyramid
118 118
119 119 c = pylons.tmpl_context
120 120 c.active = 'integrations'
121 121 c.rhodecode_user = self.request.user
122 122 c.repo = self.repo
123 123 c.repo_group = self.repo_group
124 124 c.repo_name = self.repo and self.repo.repo_name or None
125 125 c.repo_group_name = self.repo_group and self.repo_group.group_name or None
126 126
127 127 if self.repo:
128 128 c.repo_info = self.repo
129 129 c.rhodecode_db_repo = self.repo
130 130 c.repository_pull_requests = ScmModel().get_pull_requests(self.repo)
131 131 else:
132 132 c.navlist = navigation_list(self.request)
133 133
134 134 return c
135 135
136 136 def _form_schema(self):
137 137 schema = make_integration_schema(IntegrationType=self.IntegrationType,
138 138 settings=self.settings)
139 139
140 140 # returns a clone, important if mutating the schema later
141 141 return schema.bind(
142 142 permissions=self.request.user.permissions,
143 143 no_scope=not self.admin_view)
144 144
145
146 145 def _form_defaults(self):
147 146 defaults = {}
148 147
149 148 if self.integration:
150 149 defaults['settings'] = self.integration.settings or {}
151 150 defaults['options'] = {
152 151 'name': self.integration.name,
153 152 'enabled': self.integration.enabled,
154 153 'scope': {
155 154 'repo': self.integration.repo,
156 155 'repo_group': self.integration.repo_group,
157 156 'child_repos_only': self.integration.child_repos_only,
158 157 },
159 158 }
160 159 else:
161 160 if self.repo:
162 161 scope = _('{repo_name} repository').format(
163 162 repo_name=self.repo.repo_name)
164 163 elif self.repo_group:
165 164 scope = _('{repo_group_name} repo group').format(
166 165 repo_group_name=self.repo_group.group_name)
167 166 else:
168 167 scope = _('Global')
169 168
170 169 defaults['options'] = {
171 170 'enabled': True,
172 171 'name': _('{name} integration').format(
173 172 name=self.IntegrationType.display_name),
174 173 }
175 174 defaults['options']['scope'] = {
176 175 'repo': self.repo,
177 176 'repo_group': self.repo_group,
178 177 }
179 178
180 179 return defaults
181 180
182 181 def _delete_integration(self, integration):
183 182 Session().delete(self.integration)
184 183 Session().commit()
185 184 self.request.session.flash(
186 185 _('Integration {integration_name} deleted successfully.').format(
187 186 integration_name=self.integration.name),
188 187 queue='success')
189 188
190 189 if self.repo:
191 190 redirect_to = self.request.route_url(
192 191 'repo_integrations_home', repo_name=self.repo.repo_name)
193 192 elif self.repo_group:
194 193 redirect_to = self.request.route_url(
195 194 'repo_group_integrations_home',
196 195 repo_group_name=self.repo_group.group_name)
197 196 else:
198 197 redirect_to = self.request.route_url('global_integrations_home')
199 198 raise HTTPFound(redirect_to)
200 199
201 200 def settings_get(self, defaults=None, form=None):
202 201 """
203 202 View that displays the integration settings as a form.
204 203 """
205 204
206 205 defaults = defaults or self._form_defaults()
207 206 schema = self._form_schema()
208 207
209 208 if self.integration:
210 209 buttons = ('submit', 'delete')
211 210 else:
212 211 buttons = ('submit',)
213 212
214 213 form = form or deform.Form(schema, appstruct=defaults, buttons=buttons)
215 214
216 215 template_context = {
217 216 'form': form,
218 217 'current_IntegrationType': self.IntegrationType,
219 218 'integration': self.integration,
220 219 'c': self._template_c_context(),
221 220 }
222 221
223 222 return template_context
224 223
225 224 @auth.CSRFRequired()
226 225 def settings_post(self):
227 226 """
228 227 View that validates and stores the integration settings.
229 228 """
230 229 controls = self.request.POST.items()
231 230 pstruct = peppercorn.parse(controls)
232 231
233 232 if self.integration and pstruct.get('delete'):
234 233 return self._delete_integration(self.integration)
235 234
236 235 schema = self._form_schema()
237 236
238 237 skip_settings_validation = False
239 238 if self.integration and 'enabled' not in pstruct.get('options', {}):
240 239 skip_settings_validation = True
241 240 schema['settings'].validator = None
242 241 for field in schema['settings'].children:
243 242 field.validator = None
244 243 field.missing = ''
245 244
246 245 if self.integration:
247 246 buttons = ('submit', 'delete')
248 247 else:
249 248 buttons = ('submit',)
250 249
251 250 form = deform.Form(schema, buttons=buttons)
252 251
253 252 if not self.admin_view:
254 253 # scope is read only field in these cases, and has to be added
255 254 options = pstruct.setdefault('options', {})
256 255 if 'scope' not in options:
257 256 options['scope'] = IntegrationScopeType().serialize(None, {
258 257 'repo': self.repo,
259 258 'repo_group': self.repo_group,
260 259 })
261 260
262 261 try:
263 262 valid_data = form.validate_pstruct(pstruct)
264 263 except deform.ValidationFailure as e:
265 264 self.request.session.flash(
266 265 _('Errors exist when saving integration settings. '
267 266 'Please check the form inputs.'),
268 267 queue='error')
269 268 return self.settings_get(form=e)
270 269
271 270 if not self.integration:
272 271 self.integration = Integration()
273 272 self.integration.integration_type = self.IntegrationType.key
274 273 Session().add(self.integration)
275 274
276 275 scope = valid_data['options']['scope']
277 276
278 277 IntegrationModel().update_integration(self.integration,
279 278 name=valid_data['options']['name'],
280 279 enabled=valid_data['options']['enabled'],
281 280 settings=valid_data['settings'],
282 281 repo=scope['repo'],
283 282 repo_group=scope['repo_group'],
284 283 child_repos_only=scope['child_repos_only'],
285 284 )
286 285
287
288 286 self.integration.settings = valid_data['settings']
289 287 Session().commit()
290 288 # Display success message and redirect.
291 289 self.request.session.flash(
292 290 _('Integration {integration_name} updated successfully.').format(
293 291 integration_name=self.IntegrationType.display_name),
294 292 queue='success')
295 293
296
297 294 # if integration scope changes, we must redirect to the right place
298 295 # keeping in mind if the original view was for /repo/ or /_admin/
299 296 admin_view = not (self.repo or self.repo_group)
300 297
301 298 if self.integration.repo and not admin_view:
302 299 redirect_to = self.request.route_path(
303 300 'repo_integrations_edit',
304 301 repo_name=self.integration.repo.repo_name,
305 302 integration=self.integration.integration_type,
306 303 integration_id=self.integration.integration_id)
307 304 elif self.integration.repo_group and not admin_view:
308 305 redirect_to = self.request.route_path(
309 306 'repo_group_integrations_edit',
310 307 repo_group_name=self.integration.repo_group.group_name,
311 308 integration=self.integration.integration_type,
312 309 integration_id=self.integration.integration_id)
313 310 else:
314 311 redirect_to = self.request.route_path(
315 312 'global_integrations_edit',
316 313 integration=self.integration.integration_type,
317 314 integration_id=self.integration.integration_id)
318 315
319 316 return HTTPFound(redirect_to)
320 317
321 318 def index(self):
322 319 """ List integrations """
323 320 if self.repo:
324 321 scope = self.repo
325 322 elif self.repo_group:
326 323 scope = self.repo_group
327 324 else:
328 325 scope = 'all'
329 326
330 327 integrations = []
331 328
332 329 for IntType, integration in IntegrationModel().get_integrations(
333 330 scope=scope, IntegrationType=self.IntegrationType):
334 331
335 332 # extra permissions check *just in case*
336 333 if not self._has_perms_for_integration(integration):
337 334 continue
338 335
339 336 integrations.append((IntType, integration))
340 337
341 338 sort_arg = self.request.GET.get('sort', 'name:asc')
342 339 if ':' in sort_arg:
343 340 sort_field, sort_dir = sort_arg.split(':')
344 341 else:
345 342 sort_field = sort_arg, 'asc'
346 343
347 344 assert sort_field in ('name', 'integration_type', 'enabled', 'scope')
348 345
349 346 integrations.sort(
350 key=lambda x: getattr(x[1], sort_field), reverse=(sort_dir=='desc'))
351
347 key=lambda x: getattr(x[1], sort_field),
348 reverse=(sort_dir == 'desc'))
352 349
353 350 page_url = webhelpers.paginate.PageURL(
354 351 self.request.path, self.request.GET)
355 352 page = safe_int(self.request.GET.get('page', 1), 1)
356 353
357 354 integrations = Page(integrations, page=page, items_per_page=10,
358 355 url=page_url)
359 356
360 357 template_context = {
361 358 'sort_field': sort_field,
362 359 'rev_sort_dir': sort_dir != 'desc' and 'desc' or 'asc',
363 360 'current_IntegrationType': self.IntegrationType,
364 361 'integrations_list': integrations,
365 362 'available_integrations': integration_type_registry,
366 363 'c': self._template_c_context(),
367 364 'request': self.request,
368 365 }
369 366 return template_context
370 367
371 368 def new_integration(self):
372 369 template_context = {
373 370 'available_integrations': integration_type_registry,
374 371 'c': self._template_c_context(),
375 372 }
376 373 return template_context
377 374
375
378 376 class GlobalIntegrationsView(IntegrationSettingsViewBase):
379 377 def perm_check(self, user):
380 378 return auth.HasPermissionAll('hg.admin').check_permissions(user=user)
381 379
382 380
383 381 class RepoIntegrationsView(IntegrationSettingsViewBase):
384 382 def perm_check(self, user):
385 383 return auth.HasRepoPermissionAll('repository.admin'
386 384 )(repo_name=self.repo.repo_name, user=user)
387 385
388 386
389 387 class RepoGroupIntegrationsView(IntegrationSettingsViewBase):
390 388 def perm_check(self, user):
391 389 return auth.HasRepoGroupPermissionAll('group.admin'
392 390 )(group_name=self.repo_group.group_name, user=user)
393 391
@@ -1,226 +1,226 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22
23 23 import deform
24 24 import colander
25 25
26 26 from rhodecode.translation import _
27 27 from rhodecode.model.db import Repository, RepoGroup
28 28 from rhodecode.model.validation_schema import validators, preparers
29 29
30 30
31 31 def integration_scope_choices(permissions):
32 32 """
33 33 Return list of (value, label) choices for integration scopes depending on
34 34 the permissions
35 35 """
36 36 result = [('', _('Pick a scope:'))]
37 37 if 'hg.admin' in permissions['global']:
38 38 result.extend([
39 39 ('global', _('Global (all repositories)')),
40 40 ('root-repos', _('Top level repositories only')),
41 41 ])
42 42
43 43 repo_choices = [
44 44 ('repo:%s' % repo_name, '/' + repo_name)
45 45 for repo_name, repo_perm
46 46 in permissions['repositories'].items()
47 47 if repo_perm == 'repository.admin'
48 48 ]
49 49 repogroup_choices = [
50 50 ('repogroup:%s' % repo_group_name, '/' + repo_group_name + '/ (child repos only)')
51 51 for repo_group_name, repo_group_perm
52 52 in permissions['repositories_groups'].items()
53 53 if repo_group_perm == 'group.admin'
54 54 ]
55 55 repogroup_recursive_choices = [
56 56 ('repogroup-recursive:%s' % repo_group_name, '/' + repo_group_name + '/ (recursive)')
57 57 for repo_group_name, repo_group_perm
58 58 in permissions['repositories_groups'].items()
59 59 if repo_group_perm == 'group.admin'
60 60 ]
61 61 result.extend(
62 62 sorted(repogroup_recursive_choices + repogroup_choices + repo_choices,
63 63 key=lambda (choice, label): choice.split(':', 1)[1]
64 64 )
65 65 )
66 66 return result
67 67
68 68
69 69 @colander.deferred
70 70 def deferred_integration_scopes_validator(node, kw):
71 71 perms = kw.get('permissions')
72 72 def _scope_validator(_node, scope):
73 73 is_super_admin = 'hg.admin' in perms['global']
74 74
75 75 if scope.get('repo'):
76 76 if (is_super_admin or perms['repositories'].get(
77 77 scope['repo'].repo_name) == 'repository.admin'):
78 78 return True
79 79 msg = _('Only repo admins can create integrations')
80 80 raise colander.Invalid(_node, msg)
81 81 elif scope.get('repo_group'):
82 82 if (is_super_admin or perms['repositories_groups'].get(
83 83 scope['repo_group'].group_name) == 'group.admin'):
84 84 return True
85 85
86 86 msg = _('Only repogroup admins can create integrations')
87 87 raise colander.Invalid(_node, msg)
88 88 else:
89 89 if is_super_admin:
90 90 return True
91 91 msg = _('Only superadmins can create global integrations')
92 92 raise colander.Invalid(_node, msg)
93 93
94 94 return _scope_validator
95 95
96 96
97 97 @colander.deferred
98 98 def deferred_integration_scopes_widget(node, kw):
99 99 if kw.get('no_scope'):
100 100 return deform.widget.TextInputWidget(readonly=True)
101 101
102 102 choices = integration_scope_choices(kw.get('permissions'))
103 103 widget = deform.widget.Select2Widget(values=choices)
104 104 return widget
105 105
106 106
107 107 class IntegrationScopeType(colander.SchemaType):
108 108 def serialize(self, node, appstruct):
109 109 if appstruct is colander.null:
110 110 return colander.null
111 111
112 112 if appstruct.get('repo'):
113 113 return 'repo:%s' % appstruct['repo'].repo_name
114 114 elif appstruct.get('repo_group'):
115 115 if appstruct.get('child_repos_only'):
116 116 return 'repogroup:%s' % appstruct['repo_group'].group_name
117 117 else:
118 118 return 'repogroup-recursive:%s' % (
119 119 appstruct['repo_group'].group_name)
120 120 else:
121 121 if appstruct.get('child_repos_only'):
122 122 return 'root-repos'
123 123 else:
124 124 return 'global'
125 125
126 126 raise colander.Invalid(node, '%r is not a valid scope' % appstruct)
127 127
128 128 def deserialize(self, node, cstruct):
129 129 if cstruct is colander.null:
130 130 return colander.null
131 131
132 132 if cstruct.startswith('repo:'):
133 133 repo = Repository.get_by_repo_name(cstruct.split(':')[1])
134 134 if repo:
135 135 return {
136 136 'repo': repo,
137 137 'repo_group': None,
138 'child_repos_only': None,
138 'child_repos_only': False,
139 139 }
140 140 elif cstruct.startswith('repogroup-recursive:'):
141 141 repo_group = RepoGroup.get_by_group_name(cstruct.split(':')[1])
142 142 if repo_group:
143 143 return {
144 144 'repo': None,
145 145 'repo_group': repo_group,
146 146 'child_repos_only': False
147 147 }
148 148 elif cstruct.startswith('repogroup:'):
149 149 repo_group = RepoGroup.get_by_group_name(cstruct.split(':')[1])
150 150 if repo_group:
151 151 return {
152 152 'repo': None,
153 153 'repo_group': repo_group,
154 154 'child_repos_only': True
155 155 }
156 156 elif cstruct == 'global':
157 157 return {
158 158 'repo': None,
159 159 'repo_group': None,
160 160 'child_repos_only': False
161 161 }
162 162 elif cstruct == 'root-repos':
163 163 return {
164 164 'repo': None,
165 165 'repo_group': None,
166 166 'child_repos_only': True
167 167 }
168 168
169 169 raise colander.Invalid(node, '%r is not a valid scope' % cstruct)
170 170
171 171
172 172 class IntegrationOptionsSchemaBase(colander.MappingSchema):
173 173
174 174 name = colander.SchemaNode(
175 175 colander.String(),
176 176 description=_('Short name for this integration.'),
177 177 missing=colander.required,
178 178 title=_('Integration name'),
179 179 )
180 180
181 181 scope = colander.SchemaNode(
182 182 IntegrationScopeType(),
183 183 description=_(
184 184 'Scope of the integration. Recursive means the integration '
185 185 ' runs on all repos of that group and children recursively.'),
186 186 title=_('Integration scope'),
187 187 validator=deferred_integration_scopes_validator,
188 188 widget=deferred_integration_scopes_widget,
189 189 missing=colander.required,
190 190 )
191 191
192 192 enabled = colander.SchemaNode(
193 193 colander.Bool(),
194 194 default=True,
195 195 description=_('Enable or disable this integration.'),
196 196 missing=False,
197 197 title=_('Enabled'),
198 198 )
199 199
200 200
201 201
202 202 def make_integration_schema(IntegrationType, settings=None):
203 203 """
204 204 Return a colander schema for an integration type
205 205
206 206 :param IntegrationType: the integration type class
207 207 :param settings: existing integration settings dict (optional)
208 208 """
209 209
210 210 settings = settings or {}
211 211 settings_schema = IntegrationType(settings=settings).settings_schema()
212 212
213 213 class IntegrationSchema(colander.Schema):
214 214 options = IntegrationOptionsSchemaBase()
215 215
216 216 schema = IntegrationSchema()
217 217 schema['options'].title = _('General integration options')
218 218
219 219 settings_schema.name = 'settings'
220 220 settings_schema.title = _('{integration_type} settings').format(
221 221 integration_type=IntegrationType.display_name)
222 222 schema.add(settings_schema)
223 223
224 224 return schema
225 225
226 226
@@ -1,167 +1,167 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import colander
22 22 import pytest
23 23
24 24 from rhodecode.integrations import integration_type_registry
25 25 from rhodecode.integrations.types.base import IntegrationTypeBase
26 26 from rhodecode.model.validation_schema.schemas.integration_schema import (
27 27 make_integration_schema
28 28 )
29 29
30 30
31 31 @pytest.mark.usefixtures('app', 'autologin_user')
32 32 class TestIntegrationSchema(object):
33 33
34 34 def test_deserialize_integration_schema_perms(
35 35 self, backend_random, test_repo_group, StubIntegrationType):
36 36
37 37 repo = backend_random.repo
38 38 repo_group = test_repo_group
39 39
40 40 empty_perms_dict = {
41 41 'global': [],
42 42 'repositories': {},
43 43 'repositories_groups': {},
44 44 }
45 45
46 46 perms_tests = [
47 47 (
48 48 'repo:%s' % repo.repo_name,
49 49 {
50 'child_repos_only': None,
50 'child_repos_only': False,
51 51 'repo_group': None,
52 52 'repo': repo,
53 53 },
54 54 [
55 55 ({}, False),
56 56 ({'global': ['hg.admin']}, True),
57 57 ({'global': []}, False),
58 58 ({'repositories': {repo.repo_name: 'repository.admin'}}, True),
59 59 ({'repositories': {repo.repo_name: 'repository.read'}}, False),
60 60 ({'repositories': {repo.repo_name: 'repository.write'}}, False),
61 61 ({'repositories': {repo.repo_name: 'repository.none'}}, False),
62 62 ]
63 63 ),
64 64 (
65 65 'repogroup:%s' % repo_group.group_name,
66 66 {
67 67 'repo': None,
68 68 'repo_group': repo_group,
69 69 'child_repos_only': True,
70 70 },
71 71 [
72 72 ({}, False),
73 73 ({'global': ['hg.admin']}, True),
74 74 ({'global': []}, False),
75 75 ({'repositories_groups':
76 76 {repo_group.group_name: 'group.admin'}}, True),
77 77 ({'repositories_groups':
78 78 {repo_group.group_name: 'group.read'}}, False),
79 79 ({'repositories_groups':
80 80 {repo_group.group_name: 'group.write'}}, False),
81 81 ({'repositories_groups':
82 82 {repo_group.group_name: 'group.none'}}, False),
83 83 ]
84 84 ),
85 85 (
86 86 'repogroup-recursive:%s' % repo_group.group_name,
87 87 {
88 88 'repo': None,
89 89 'repo_group': repo_group,
90 90 'child_repos_only': False,
91 91 },
92 92 [
93 93 ({}, False),
94 94 ({'global': ['hg.admin']}, True),
95 95 ({'global': []}, False),
96 96 ({'repositories_groups':
97 97 {repo_group.group_name: 'group.admin'}}, True),
98 98 ({'repositories_groups':
99 99 {repo_group.group_name: 'group.read'}}, False),
100 100 ({'repositories_groups':
101 101 {repo_group.group_name: 'group.write'}}, False),
102 102 ({'repositories_groups':
103 103 {repo_group.group_name: 'group.none'}}, False),
104 104 ]
105 105 ),
106 106 (
107 107 'global',
108 108 {
109 109 'repo': None,
110 110 'repo_group': None,
111 111 'child_repos_only': False,
112 112 }, [
113 113 ({}, False),
114 114 ({'global': ['hg.admin']}, True),
115 115 ({'global': []}, False),
116 116 ]
117 117 ),
118 118 (
119 119 'root-repos',
120 120 {
121 121 'repo': None,
122 122 'repo_group': None,
123 123 'child_repos_only': True,
124 124 }, [
125 125 ({}, False),
126 126 ({'global': ['hg.admin']}, True),
127 127 ({'global': []}, False),
128 128 ]
129 129 ),
130 130 ]
131 131
132 132 for scope_input, scope_output, perms_allowed in perms_tests:
133 133 for perms_update, allowed in perms_allowed:
134 134 perms = dict(empty_perms_dict, **perms_update)
135 135
136 136 schema = make_integration_schema(
137 137 IntegrationType=StubIntegrationType
138 138 ).bind(permissions=perms)
139 139
140 140 input_data = {
141 141 'options': {
142 142 'enabled': 'true',
143 143 'scope': scope_input,
144 144 'name': 'test integration',
145 145 },
146 146 'settings': {
147 147 'test_string_field': 'stringy',
148 148 'test_int_field': '100',
149 149 }
150 150 }
151 151
152 152 if not allowed:
153 153 with pytest.raises(colander.Invalid):
154 154 schema.deserialize(input_data)
155 155 else:
156 156 assert schema.deserialize(input_data) == {
157 157 'options': {
158 158 'enabled': True,
159 159 'scope': scope_output,
160 160 'name': 'test integration',
161 161 },
162 162 'settings': {
163 163 'test_string_field': 'stringy',
164 164 'test_int_field': 100,
165 165 }
166 166 }
167 167
General Comments 0
You need to be logged in to leave comments. Login now