##// END OF EJS Templates
app: use simpler way to extract default_user_id, this will be now registered at server...
marcink -
r4332:b6d13602 default
parent child Browse files
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -1,299 +1,299 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import mock
22 22 import pytest
23 23 from rhodecode.model.db import User, UserIpMap
24 24 from rhodecode.model.meta import Session
25 25 from rhodecode.model.permission import PermissionModel
26 26 from rhodecode.model.ssh_key import SshKeyModel
27 27 from rhodecode.tests import (
28 28 TestController, clear_cache_regions, assert_session_flash)
29 29
30 30
31 31 def route_path(name, params=None, **kwargs):
32 32 import urllib
33 33 from rhodecode.apps._base import ADMIN_PREFIX
34 34
35 35 base_url = {
36 36 'edit_user_ips':
37 37 ADMIN_PREFIX + '/users/{user_id}/edit/ips',
38 38 'edit_user_ips_add':
39 39 ADMIN_PREFIX + '/users/{user_id}/edit/ips/new',
40 40 'edit_user_ips_delete':
41 41 ADMIN_PREFIX + '/users/{user_id}/edit/ips/delete',
42 42
43 43 'admin_permissions_application':
44 44 ADMIN_PREFIX + '/permissions/application',
45 45 'admin_permissions_application_update':
46 46 ADMIN_PREFIX + '/permissions/application/update',
47 47
48 48 'admin_permissions_global':
49 49 ADMIN_PREFIX + '/permissions/global',
50 50 'admin_permissions_global_update':
51 51 ADMIN_PREFIX + '/permissions/global/update',
52 52
53 53 'admin_permissions_object':
54 54 ADMIN_PREFIX + '/permissions/object',
55 55 'admin_permissions_object_update':
56 56 ADMIN_PREFIX + '/permissions/object/update',
57 57
58 58 'admin_permissions_ips':
59 59 ADMIN_PREFIX + '/permissions/ips',
60 60 'admin_permissions_overview':
61 61 ADMIN_PREFIX + '/permissions/overview',
62 62
63 63 'admin_permissions_ssh_keys':
64 64 ADMIN_PREFIX + '/permissions/ssh_keys',
65 65 'admin_permissions_ssh_keys_data':
66 66 ADMIN_PREFIX + '/permissions/ssh_keys/data',
67 67 'admin_permissions_ssh_keys_update':
68 68 ADMIN_PREFIX + '/permissions/ssh_keys/update'
69 69
70 70 }[name].format(**kwargs)
71 71
72 72 if params:
73 73 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
74 74 return base_url
75 75
76 76
77 77 class TestAdminPermissionsController(TestController):
78 78
79 79 @pytest.fixture(scope='class', autouse=True)
80 80 def prepare(self, request):
81 81 # cleanup and reset to default permissions after
82 82 @request.addfinalizer
83 83 def cleanup():
84 84 PermissionModel().create_default_user_permissions(
85 85 User.get_default_user(), force=True)
86 86
87 87 def test_index_application(self):
88 88 self.log_user()
89 89 self.app.get(route_path('admin_permissions_application'))
90 90
91 91 @pytest.mark.parametrize(
92 92 'anonymous, default_register, default_register_message, default_password_reset,'
93 93 'default_extern_activate, expect_error, expect_form_error', [
94 94 (True, 'hg.register.none', '', 'hg.password_reset.enabled', 'hg.extern_activate.manual',
95 95 False, False),
96 96 (True, 'hg.register.manual_activate', '', 'hg.password_reset.enabled', 'hg.extern_activate.auto',
97 97 False, False),
98 98 (True, 'hg.register.auto_activate', '', 'hg.password_reset.enabled', 'hg.extern_activate.manual',
99 99 False, False),
100 100 (True, 'hg.register.auto_activate', '', 'hg.password_reset.enabled', 'hg.extern_activate.manual',
101 101 False, False),
102 102 (True, 'hg.register.XXX', '', 'hg.password_reset.enabled', 'hg.extern_activate.manual',
103 103 False, True),
104 104 (True, '', '', 'hg.password_reset.enabled', '', True, False),
105 105 ])
106 106 def test_update_application_permissions(
107 107 self, anonymous, default_register, default_register_message, default_password_reset,
108 108 default_extern_activate, expect_error, expect_form_error):
109 109
110 110 self.log_user()
111 111
112 112 # TODO: anonymous access set here to False, breaks some other tests
113 113 params = {
114 114 'csrf_token': self.csrf_token,
115 115 'anonymous': anonymous,
116 116 'default_register': default_register,
117 117 'default_register_message': default_register_message,
118 118 'default_password_reset': default_password_reset,
119 119 'default_extern_activate': default_extern_activate,
120 120 }
121 121 response = self.app.post(route_path('admin_permissions_application_update'),
122 122 params=params)
123 123 if expect_form_error:
124 124 assert response.status_int == 200
125 125 response.mustcontain('Value must be one of')
126 126 else:
127 127 if expect_error:
128 128 msg = 'Error occurred during update of permissions'
129 129 else:
130 130 msg = 'Application permissions updated successfully'
131 131 assert_session_flash(response, msg)
132 132
133 133 def test_index_object(self):
134 134 self.log_user()
135 135 self.app.get(route_path('admin_permissions_object'))
136 136
137 137 @pytest.mark.parametrize(
138 138 'repo, repo_group, user_group, expect_error, expect_form_error', [
139 139 ('repository.none', 'group.none', 'usergroup.none', False, False),
140 140 ('repository.read', 'group.read', 'usergroup.read', False, False),
141 141 ('repository.write', 'group.write', 'usergroup.write',
142 142 False, False),
143 143 ('repository.admin', 'group.admin', 'usergroup.admin',
144 144 False, False),
145 145 ('repository.XXX', 'group.admin', 'usergroup.admin', False, True),
146 146 ('', '', '', True, False),
147 147 ])
148 148 def test_update_object_permissions(self, repo, repo_group, user_group,
149 149 expect_error, expect_form_error):
150 150 self.log_user()
151 151
152 152 params = {
153 153 'csrf_token': self.csrf_token,
154 154 'default_repo_perm': repo,
155 155 'overwrite_default_repo': False,
156 156 'default_group_perm': repo_group,
157 157 'overwrite_default_group': False,
158 158 'default_user_group_perm': user_group,
159 159 'overwrite_default_user_group': False,
160 160 }
161 161 response = self.app.post(route_path('admin_permissions_object_update'),
162 162 params=params)
163 163 if expect_form_error:
164 164 assert response.status_int == 200
165 165 response.mustcontain('Value must be one of')
166 166 else:
167 167 if expect_error:
168 168 msg = 'Error occurred during update of permissions'
169 169 else:
170 170 msg = 'Object permissions updated successfully'
171 171 assert_session_flash(response, msg)
172 172
173 173 def test_index_global(self):
174 174 self.log_user()
175 175 self.app.get(route_path('admin_permissions_global'))
176 176
177 177 @pytest.mark.parametrize(
178 178 'repo_create, repo_create_write, user_group_create, repo_group_create,'
179 179 'fork_create, inherit_default_permissions, expect_error,'
180 180 'expect_form_error', [
181 181 ('hg.create.none', 'hg.create.write_on_repogroup.false',
182 182 'hg.usergroup.create.false', 'hg.repogroup.create.false',
183 183 'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
184 184 ('hg.create.repository', 'hg.create.write_on_repogroup.true',
185 185 'hg.usergroup.create.true', 'hg.repogroup.create.true',
186 186 'hg.fork.repository', 'hg.inherit_default_perms.false',
187 187 False, False),
188 188 ('hg.create.XXX', 'hg.create.write_on_repogroup.true',
189 189 'hg.usergroup.create.true', 'hg.repogroup.create.true',
190 190 'hg.fork.repository', 'hg.inherit_default_perms.false',
191 191 False, True),
192 192 ('', '', '', '', '', '', True, False),
193 193 ])
194 194 def test_update_global_permissions(
195 195 self, repo_create, repo_create_write, user_group_create,
196 196 repo_group_create, fork_create, inherit_default_permissions,
197 197 expect_error, expect_form_error):
198 198 self.log_user()
199 199
200 200 params = {
201 201 'csrf_token': self.csrf_token,
202 202 'default_repo_create': repo_create,
203 203 'default_repo_create_on_write': repo_create_write,
204 204 'default_user_group_create': user_group_create,
205 205 'default_repo_group_create': repo_group_create,
206 206 'default_fork_create': fork_create,
207 207 'default_inherit_default_permissions': inherit_default_permissions
208 208 }
209 209 response = self.app.post(route_path('admin_permissions_global_update'),
210 210 params=params)
211 211 if expect_form_error:
212 212 assert response.status_int == 200
213 213 response.mustcontain('Value must be one of')
214 214 else:
215 215 if expect_error:
216 216 msg = 'Error occurred during update of permissions'
217 217 else:
218 218 msg = 'Global permissions updated successfully'
219 219 assert_session_flash(response, msg)
220 220
221 221 def test_index_ips(self):
222 222 self.log_user()
223 223 response = self.app.get(route_path('admin_permissions_ips'))
224 224 response.mustcontain('All IP addresses are allowed')
225 225
226 226 def test_add_delete_ips(self):
227 227 clear_cache_regions(['sql_cache_short'])
228 228 self.log_user()
229 229
230 230 # ADD
231 default_user_id = User.get_default_user().user_id
231 default_user_id = User.get_default_user_id()
232 232 self.app.post(
233 233 route_path('edit_user_ips_add', user_id=default_user_id),
234 234 params={'new_ip': '0.0.0.0/24', 'csrf_token': self.csrf_token})
235 235
236 236 response = self.app.get(route_path('admin_permissions_ips'))
237 237 response.mustcontain('0.0.0.0/24')
238 238 response.mustcontain('0.0.0.0 - 0.0.0.255')
239 239
240 240 # DELETE
241 default_user_id = User.get_default_user().user_id
241 default_user_id = User.get_default_user_id()
242 242 del_ip_id = UserIpMap.query().filter(UserIpMap.user_id ==
243 243 default_user_id).first().ip_id
244 244
245 245 response = self.app.post(
246 246 route_path('edit_user_ips_delete', user_id=default_user_id),
247 247 params={'del_ip_id': del_ip_id, 'csrf_token': self.csrf_token})
248 248
249 249 assert_session_flash(response, 'Removed ip address from user whitelist')
250 250
251 251 clear_cache_regions(['sql_cache_short'])
252 252 response = self.app.get(route_path('admin_permissions_ips'))
253 253 response.mustcontain('All IP addresses are allowed')
254 254 response.mustcontain(no=['0.0.0.0/24'])
255 255 response.mustcontain(no=['0.0.0.0 - 0.0.0.255'])
256 256
257 257 def test_index_overview(self):
258 258 self.log_user()
259 259 self.app.get(route_path('admin_permissions_overview'))
260 260
261 261 def test_ssh_keys(self):
262 262 self.log_user()
263 263 self.app.get(route_path('admin_permissions_ssh_keys'), status=200)
264 264
265 265 def test_ssh_keys_data(self, user_util, xhr_header):
266 266 self.log_user()
267 267 response = self.app.get(route_path('admin_permissions_ssh_keys_data'),
268 268 extra_environ=xhr_header)
269 269 assert response.json == {u'data': [], u'draw': None,
270 270 u'recordsFiltered': 0, u'recordsTotal': 0}
271 271
272 272 dummy_user = user_util.create_user()
273 273 SshKeyModel().create(dummy_user, 'ab:cd:ef', 'KEYKEY', 'test_key')
274 274 Session().commit()
275 275 response = self.app.get(route_path('admin_permissions_ssh_keys_data'),
276 276 extra_environ=xhr_header)
277 277 assert response.json['data'][0]['fingerprint'] == 'ab:cd:ef'
278 278
279 279 def test_ssh_keys_update(self):
280 280 self.log_user()
281 281 response = self.app.post(
282 282 route_path('admin_permissions_ssh_keys_update'),
283 283 dict(csrf_token=self.csrf_token), status=302)
284 284
285 285 assert_session_flash(
286 286 response, 'Updated SSH keys file')
287 287
288 288 def test_ssh_keys_update_disabled(self):
289 289 self.log_user()
290 290
291 291 from rhodecode.apps.admin.views.permissions import AdminPermissionsView
292 292 with mock.patch.object(AdminPermissionsView, 'ssh_enabled',
293 293 return_value=False):
294 294 response = self.app.post(
295 295 route_path('admin_permissions_ssh_keys_update'),
296 296 dict(csrf_token=self.csrf_token), status=302)
297 297
298 298 assert_session_flash(
299 299 response, 'SSH key support is disabled in .ini file') No newline at end of file
@@ -1,519 +1,519 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import re
22 22 import logging
23 23 import formencode
24 24 import formencode.htmlfill
25 25 import datetime
26 26 from pyramid.interfaces import IRoutesMapper
27 27
28 28 from pyramid.view import view_config
29 29 from pyramid.httpexceptions import HTTPFound
30 30 from pyramid.renderers import render
31 31 from pyramid.response import Response
32 32
33 33 from rhodecode.apps._base import BaseAppView, DataGridAppView
34 34 from rhodecode.apps.ssh_support import SshKeyFileChangeEvent
35 35 from rhodecode import events
36 36
37 37 from rhodecode.lib import helpers as h
38 38 from rhodecode.lib.auth import (
39 39 LoginRequired, HasPermissionAllDecorator, CSRFRequired)
40 40 from rhodecode.lib.utils2 import aslist, safe_unicode
41 41 from rhodecode.model.db import (
42 42 or_, coalesce, User, UserIpMap, UserSshKeys)
43 43 from rhodecode.model.forms import (
44 44 ApplicationPermissionsForm, ObjectPermissionsForm, UserPermissionsForm)
45 45 from rhodecode.model.meta import Session
46 46 from rhodecode.model.permission import PermissionModel
47 47 from rhodecode.model.settings import SettingsModel
48 48
49 49
50 50 log = logging.getLogger(__name__)
51 51
52 52
53 53 class AdminPermissionsView(BaseAppView, DataGridAppView):
54 54 def load_default_context(self):
55 55 c = self._get_local_tmpl_context()
56 56 PermissionModel().set_global_permission_choices(
57 57 c, gettext_translator=self.request.translate)
58 58 return c
59 59
60 60 @LoginRequired()
61 61 @HasPermissionAllDecorator('hg.admin')
62 62 @view_config(
63 63 route_name='admin_permissions_application', request_method='GET',
64 64 renderer='rhodecode:templates/admin/permissions/permissions.mako')
65 65 def permissions_application(self):
66 66 c = self.load_default_context()
67 67 c.active = 'application'
68 68
69 69 c.user = User.get_default_user(refresh=True)
70 70
71 71 app_settings = c.rc_config
72 72
73 73 defaults = {
74 74 'anonymous': c.user.active,
75 75 'default_register_message': app_settings.get(
76 76 'rhodecode_register_message')
77 77 }
78 78 defaults.update(c.user.get_default_perms())
79 79
80 80 data = render('rhodecode:templates/admin/permissions/permissions.mako',
81 81 self._get_template_context(c), self.request)
82 82 html = formencode.htmlfill.render(
83 83 data,
84 84 defaults=defaults,
85 85 encoding="UTF-8",
86 86 force_defaults=False
87 87 )
88 88 return Response(html)
89 89
90 90 @LoginRequired()
91 91 @HasPermissionAllDecorator('hg.admin')
92 92 @CSRFRequired()
93 93 @view_config(
94 94 route_name='admin_permissions_application_update', request_method='POST',
95 95 renderer='rhodecode:templates/admin/permissions/permissions.mako')
96 96 def permissions_application_update(self):
97 97 _ = self.request.translate
98 98 c = self.load_default_context()
99 99 c.active = 'application'
100 100
101 101 _form = ApplicationPermissionsForm(
102 102 self.request.translate,
103 103 [x[0] for x in c.register_choices],
104 104 [x[0] for x in c.password_reset_choices],
105 105 [x[0] for x in c.extern_activate_choices])()
106 106
107 107 try:
108 108 form_result = _form.to_python(dict(self.request.POST))
109 109 form_result.update({'perm_user_name': User.DEFAULT_USER})
110 110 PermissionModel().update_application_permissions(form_result)
111 111
112 112 settings = [
113 113 ('register_message', 'default_register_message'),
114 114 ]
115 115 for setting, form_key in settings:
116 116 sett = SettingsModel().create_or_update_setting(
117 117 setting, form_result[form_key])
118 118 Session().add(sett)
119 119
120 120 Session().commit()
121 121 h.flash(_('Application permissions updated successfully'),
122 122 category='success')
123 123
124 124 except formencode.Invalid as errors:
125 125 defaults = errors.value
126 126
127 127 data = render(
128 128 'rhodecode:templates/admin/permissions/permissions.mako',
129 129 self._get_template_context(c), self.request)
130 130 html = formencode.htmlfill.render(
131 131 data,
132 132 defaults=defaults,
133 133 errors=errors.error_dict or {},
134 134 prefix_error=False,
135 135 encoding="UTF-8",
136 136 force_defaults=False
137 137 )
138 138 return Response(html)
139 139
140 140 except Exception:
141 141 log.exception("Exception during update of permissions")
142 142 h.flash(_('Error occurred during update of permissions'),
143 143 category='error')
144 144
145 affected_user_ids = [User.get_default_user().user_id]
145 affected_user_ids = [User.get_default_user_id()]
146 146 PermissionModel().trigger_permission_flush(affected_user_ids)
147 147
148 148 raise HTTPFound(h.route_path('admin_permissions_application'))
149 149
150 150 @LoginRequired()
151 151 @HasPermissionAllDecorator('hg.admin')
152 152 @view_config(
153 153 route_name='admin_permissions_object', request_method='GET',
154 154 renderer='rhodecode:templates/admin/permissions/permissions.mako')
155 155 def permissions_objects(self):
156 156 c = self.load_default_context()
157 157 c.active = 'objects'
158 158
159 159 c.user = User.get_default_user(refresh=True)
160 160 defaults = {}
161 161 defaults.update(c.user.get_default_perms())
162 162
163 163 data = render(
164 164 'rhodecode:templates/admin/permissions/permissions.mako',
165 165 self._get_template_context(c), self.request)
166 166 html = formencode.htmlfill.render(
167 167 data,
168 168 defaults=defaults,
169 169 encoding="UTF-8",
170 170 force_defaults=False
171 171 )
172 172 return Response(html)
173 173
174 174 @LoginRequired()
175 175 @HasPermissionAllDecorator('hg.admin')
176 176 @CSRFRequired()
177 177 @view_config(
178 178 route_name='admin_permissions_object_update', request_method='POST',
179 179 renderer='rhodecode:templates/admin/permissions/permissions.mako')
180 180 def permissions_objects_update(self):
181 181 _ = self.request.translate
182 182 c = self.load_default_context()
183 183 c.active = 'objects'
184 184
185 185 _form = ObjectPermissionsForm(
186 186 self.request.translate,
187 187 [x[0] for x in c.repo_perms_choices],
188 188 [x[0] for x in c.group_perms_choices],
189 189 [x[0] for x in c.user_group_perms_choices],
190 190 )()
191 191
192 192 try:
193 193 form_result = _form.to_python(dict(self.request.POST))
194 194 form_result.update({'perm_user_name': User.DEFAULT_USER})
195 195 PermissionModel().update_object_permissions(form_result)
196 196
197 197 Session().commit()
198 198 h.flash(_('Object permissions updated successfully'),
199 199 category='success')
200 200
201 201 except formencode.Invalid as errors:
202 202 defaults = errors.value
203 203
204 204 data = render(
205 205 'rhodecode:templates/admin/permissions/permissions.mako',
206 206 self._get_template_context(c), self.request)
207 207 html = formencode.htmlfill.render(
208 208 data,
209 209 defaults=defaults,
210 210 errors=errors.error_dict or {},
211 211 prefix_error=False,
212 212 encoding="UTF-8",
213 213 force_defaults=False
214 214 )
215 215 return Response(html)
216 216 except Exception:
217 217 log.exception("Exception during update of permissions")
218 218 h.flash(_('Error occurred during update of permissions'),
219 219 category='error')
220 220
221 affected_user_ids = [User.get_default_user().user_id]
221 affected_user_ids = [User.get_default_user_id()]
222 222 PermissionModel().trigger_permission_flush(affected_user_ids)
223 223
224 224 raise HTTPFound(h.route_path('admin_permissions_object'))
225 225
226 226 @LoginRequired()
227 227 @HasPermissionAllDecorator('hg.admin')
228 228 @view_config(
229 229 route_name='admin_permissions_branch', request_method='GET',
230 230 renderer='rhodecode:templates/admin/permissions/permissions.mako')
231 231 def permissions_branch(self):
232 232 c = self.load_default_context()
233 233 c.active = 'branch'
234 234
235 235 c.user = User.get_default_user(refresh=True)
236 236 defaults = {}
237 237 defaults.update(c.user.get_default_perms())
238 238
239 239 data = render(
240 240 'rhodecode:templates/admin/permissions/permissions.mako',
241 241 self._get_template_context(c), self.request)
242 242 html = formencode.htmlfill.render(
243 243 data,
244 244 defaults=defaults,
245 245 encoding="UTF-8",
246 246 force_defaults=False
247 247 )
248 248 return Response(html)
249 249
250 250 @LoginRequired()
251 251 @HasPermissionAllDecorator('hg.admin')
252 252 @view_config(
253 253 route_name='admin_permissions_global', request_method='GET',
254 254 renderer='rhodecode:templates/admin/permissions/permissions.mako')
255 255 def permissions_global(self):
256 256 c = self.load_default_context()
257 257 c.active = 'global'
258 258
259 259 c.user = User.get_default_user(refresh=True)
260 260 defaults = {}
261 261 defaults.update(c.user.get_default_perms())
262 262
263 263 data = render(
264 264 'rhodecode:templates/admin/permissions/permissions.mako',
265 265 self._get_template_context(c), self.request)
266 266 html = formencode.htmlfill.render(
267 267 data,
268 268 defaults=defaults,
269 269 encoding="UTF-8",
270 270 force_defaults=False
271 271 )
272 272 return Response(html)
273 273
274 274 @LoginRequired()
275 275 @HasPermissionAllDecorator('hg.admin')
276 276 @CSRFRequired()
277 277 @view_config(
278 278 route_name='admin_permissions_global_update', request_method='POST',
279 279 renderer='rhodecode:templates/admin/permissions/permissions.mako')
280 280 def permissions_global_update(self):
281 281 _ = self.request.translate
282 282 c = self.load_default_context()
283 283 c.active = 'global'
284 284
285 285 _form = UserPermissionsForm(
286 286 self.request.translate,
287 287 [x[0] for x in c.repo_create_choices],
288 288 [x[0] for x in c.repo_create_on_write_choices],
289 289 [x[0] for x in c.repo_group_create_choices],
290 290 [x[0] for x in c.user_group_create_choices],
291 291 [x[0] for x in c.fork_choices],
292 292 [x[0] for x in c.inherit_default_permission_choices])()
293 293
294 294 try:
295 295 form_result = _form.to_python(dict(self.request.POST))
296 296 form_result.update({'perm_user_name': User.DEFAULT_USER})
297 297 PermissionModel().update_user_permissions(form_result)
298 298
299 299 Session().commit()
300 300 h.flash(_('Global permissions updated successfully'),
301 301 category='success')
302 302
303 303 except formencode.Invalid as errors:
304 304 defaults = errors.value
305 305
306 306 data = render(
307 307 'rhodecode:templates/admin/permissions/permissions.mako',
308 308 self._get_template_context(c), self.request)
309 309 html = formencode.htmlfill.render(
310 310 data,
311 311 defaults=defaults,
312 312 errors=errors.error_dict or {},
313 313 prefix_error=False,
314 314 encoding="UTF-8",
315 315 force_defaults=False
316 316 )
317 317 return Response(html)
318 318 except Exception:
319 319 log.exception("Exception during update of permissions")
320 320 h.flash(_('Error occurred during update of permissions'),
321 321 category='error')
322 322
323 affected_user_ids = [User.get_default_user().user_id]
323 affected_user_ids = [User.get_default_user_id()]
324 324 PermissionModel().trigger_permission_flush(affected_user_ids)
325 325
326 326 raise HTTPFound(h.route_path('admin_permissions_global'))
327 327
328 328 @LoginRequired()
329 329 @HasPermissionAllDecorator('hg.admin')
330 330 @view_config(
331 331 route_name='admin_permissions_ips', request_method='GET',
332 332 renderer='rhodecode:templates/admin/permissions/permissions.mako')
333 333 def permissions_ips(self):
334 334 c = self.load_default_context()
335 335 c.active = 'ips'
336 336
337 337 c.user = User.get_default_user(refresh=True)
338 338 c.user_ip_map = (
339 339 UserIpMap.query().filter(UserIpMap.user == c.user).all())
340 340
341 341 return self._get_template_context(c)
342 342
343 343 @LoginRequired()
344 344 @HasPermissionAllDecorator('hg.admin')
345 345 @view_config(
346 346 route_name='admin_permissions_overview', request_method='GET',
347 347 renderer='rhodecode:templates/admin/permissions/permissions.mako')
348 348 def permissions_overview(self):
349 349 c = self.load_default_context()
350 350 c.active = 'perms'
351 351
352 352 c.user = User.get_default_user(refresh=True)
353 353 c.perm_user = c.user.AuthUser()
354 354 return self._get_template_context(c)
355 355
356 356 @LoginRequired()
357 357 @HasPermissionAllDecorator('hg.admin')
358 358 @view_config(
359 359 route_name='admin_permissions_auth_token_access', request_method='GET',
360 360 renderer='rhodecode:templates/admin/permissions/permissions.mako')
361 361 def auth_token_access(self):
362 362 from rhodecode import CONFIG
363 363
364 364 c = self.load_default_context()
365 365 c.active = 'auth_token_access'
366 366
367 367 c.user = User.get_default_user(refresh=True)
368 368 c.perm_user = c.user.AuthUser()
369 369
370 370 mapper = self.request.registry.queryUtility(IRoutesMapper)
371 371 c.view_data = []
372 372
373 373 _argument_prog = re.compile('\{(.*?)\}|:\((.*)\)')
374 374 introspector = self.request.registry.introspector
375 375
376 376 view_intr = {}
377 377 for view_data in introspector.get_category('views'):
378 378 intr = view_data['introspectable']
379 379
380 380 if 'route_name' in intr and intr['attr']:
381 381 view_intr[intr['route_name']] = '{}:{}'.format(
382 382 str(intr['derived_callable'].func_name), intr['attr']
383 383 )
384 384
385 385 c.whitelist_key = 'api_access_controllers_whitelist'
386 386 c.whitelist_file = CONFIG.get('__file__')
387 387 whitelist_views = aslist(
388 388 CONFIG.get(c.whitelist_key), sep=',')
389 389
390 390 for route_info in mapper.get_routes():
391 391 if not route_info.name.startswith('__'):
392 392 routepath = route_info.pattern
393 393
394 394 def replace(matchobj):
395 395 if matchobj.group(1):
396 396 return "{%s}" % matchobj.group(1).split(':')[0]
397 397 else:
398 398 return "{%s}" % matchobj.group(2)
399 399
400 400 routepath = _argument_prog.sub(replace, routepath)
401 401
402 402 if not routepath.startswith('/'):
403 403 routepath = '/' + routepath
404 404
405 405 view_fqn = view_intr.get(route_info.name, 'NOT AVAILABLE')
406 406 active = view_fqn in whitelist_views
407 407 c.view_data.append((route_info.name, view_fqn, routepath, active))
408 408
409 409 c.whitelist_views = whitelist_views
410 410 return self._get_template_context(c)
411 411
412 412 def ssh_enabled(self):
413 413 return self.request.registry.settings.get(
414 414 'ssh.generate_authorized_keyfile')
415 415
416 416 @LoginRequired()
417 417 @HasPermissionAllDecorator('hg.admin')
418 418 @view_config(
419 419 route_name='admin_permissions_ssh_keys', request_method='GET',
420 420 renderer='rhodecode:templates/admin/permissions/permissions.mako')
421 421 def ssh_keys(self):
422 422 c = self.load_default_context()
423 423 c.active = 'ssh_keys'
424 424 c.ssh_enabled = self.ssh_enabled()
425 425 return self._get_template_context(c)
426 426
427 427 @LoginRequired()
428 428 @HasPermissionAllDecorator('hg.admin')
429 429 @view_config(
430 430 route_name='admin_permissions_ssh_keys_data', request_method='GET',
431 431 renderer='json_ext', xhr=True)
432 432 def ssh_keys_data(self):
433 433 _ = self.request.translate
434 434 self.load_default_context()
435 435 column_map = {
436 436 'fingerprint': 'ssh_key_fingerprint',
437 437 'username': User.username
438 438 }
439 439 draw, start, limit = self._extract_chunk(self.request)
440 440 search_q, order_by, order_dir = self._extract_ordering(
441 441 self.request, column_map=column_map)
442 442
443 443 ssh_keys_data_total_count = UserSshKeys.query()\
444 444 .count()
445 445
446 446 # json generate
447 447 base_q = UserSshKeys.query().join(UserSshKeys.user)
448 448
449 449 if search_q:
450 450 like_expression = u'%{}%'.format(safe_unicode(search_q))
451 451 base_q = base_q.filter(or_(
452 452 User.username.ilike(like_expression),
453 453 UserSshKeys.ssh_key_fingerprint.ilike(like_expression),
454 454 ))
455 455
456 456 users_data_total_filtered_count = base_q.count()
457 457
458 458 sort_col = self._get_order_col(order_by, UserSshKeys)
459 459 if sort_col:
460 460 if order_dir == 'asc':
461 461 # handle null values properly to order by NULL last
462 462 if order_by in ['created_on']:
463 463 sort_col = coalesce(sort_col, datetime.date.max)
464 464 sort_col = sort_col.asc()
465 465 else:
466 466 # handle null values properly to order by NULL last
467 467 if order_by in ['created_on']:
468 468 sort_col = coalesce(sort_col, datetime.date.min)
469 469 sort_col = sort_col.desc()
470 470
471 471 base_q = base_q.order_by(sort_col)
472 472 base_q = base_q.offset(start).limit(limit)
473 473
474 474 ssh_keys = base_q.all()
475 475
476 476 ssh_keys_data = []
477 477 for ssh_key in ssh_keys:
478 478 ssh_keys_data.append({
479 479 "username": h.gravatar_with_user(self.request, ssh_key.user.username),
480 480 "fingerprint": ssh_key.ssh_key_fingerprint,
481 481 "description": ssh_key.description,
482 482 "created_on": h.format_date(ssh_key.created_on),
483 483 "accessed_on": h.format_date(ssh_key.accessed_on),
484 484 "action": h.link_to(
485 485 _('Edit'), h.route_path('edit_user_ssh_keys',
486 486 user_id=ssh_key.user.user_id))
487 487 })
488 488
489 489 data = ({
490 490 'draw': draw,
491 491 'data': ssh_keys_data,
492 492 'recordsTotal': ssh_keys_data_total_count,
493 493 'recordsFiltered': users_data_total_filtered_count,
494 494 })
495 495
496 496 return data
497 497
498 498 @LoginRequired()
499 499 @HasPermissionAllDecorator('hg.admin')
500 500 @CSRFRequired()
501 501 @view_config(
502 502 route_name='admin_permissions_ssh_keys_update', request_method='POST',
503 503 renderer='rhodecode:templates/admin/permissions/permissions.mako')
504 504 def ssh_keys_update(self):
505 505 _ = self.request.translate
506 506 self.load_default_context()
507 507
508 508 ssh_enabled = self.ssh_enabled()
509 509 key_file = self.request.registry.settings.get(
510 510 'ssh.authorized_keys_file_path')
511 511 if ssh_enabled:
512 512 events.trigger(SshKeyFileChangeEvent(), self.request.registry)
513 513 h.flash(_('Updated SSH keys file: {}').format(key_file),
514 514 category='success')
515 515 else:
516 516 h.flash(_('SSH key support is disabled in .ini file'),
517 517 category='warning')
518 518
519 519 raise HTTPFound(h.route_path('admin_permissions_ssh_keys'))
@@ -1,325 +1,325 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2011-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import logging
22 22
23 23 from pyramid.view import view_config
24 24 from pyramid.httpexceptions import HTTPFound
25 25 from packaging.version import Version
26 26
27 27 from rhodecode import events
28 28 from rhodecode.apps._base import RepoAppView
29 29 from rhodecode.lib import helpers as h
30 30 from rhodecode.lib import audit_logger
31 31 from rhodecode.lib.auth import (
32 32 LoginRequired, HasRepoPermissionAnyDecorator, CSRFRequired,
33 33 HasRepoPermissionAny)
34 34 from rhodecode.lib.exceptions import AttachedForksError, AttachedPullRequestsError
35 35 from rhodecode.lib.utils2 import safe_int
36 36 from rhodecode.lib.vcs import RepositoryError
37 37 from rhodecode.model.db import Session, UserFollowing, User, Repository
38 38 from rhodecode.model.permission import PermissionModel
39 39 from rhodecode.model.repo import RepoModel
40 40 from rhodecode.model.scm import ScmModel
41 41
42 42 log = logging.getLogger(__name__)
43 43
44 44
45 45 class RepoSettingsView(RepoAppView):
46 46
47 47 def load_default_context(self):
48 48 c = self._get_local_tmpl_context()
49 49 return c
50 50
51 51 def _get_users_with_permissions(self):
52 52 user_permissions = {}
53 53 for perm in self.db_repo.permissions():
54 54 user_permissions[perm.user_id] = perm
55 55
56 56 return user_permissions
57 57
58 58 @LoginRequired()
59 59 @HasRepoPermissionAnyDecorator('repository.admin')
60 60 @view_config(
61 61 route_name='edit_repo_advanced', request_method='GET',
62 62 renderer='rhodecode:templates/admin/repos/repo_edit.mako')
63 63 def edit_advanced(self):
64 64 _ = self.request.translate
65 65 c = self.load_default_context()
66 66 c.active = 'advanced'
67 67
68 c.default_user_id = User.get_default_user().user_id
68 c.default_user_id = User.get_default_user_id()
69 69 c.in_public_journal = UserFollowing.query() \
70 70 .filter(UserFollowing.user_id == c.default_user_id) \
71 71 .filter(UserFollowing.follows_repository == self.db_repo).scalar()
72 72
73 73 c.ver_info_dict = self.rhodecode_vcs_repo.get_hooks_info()
74 74 c.hooks_outdated = False
75 75
76 76 try:
77 77 if Version(c.ver_info_dict['pre_version']) < Version(c.rhodecode_version):
78 78 c.hooks_outdated = True
79 79 except Exception:
80 80 pass
81 81
82 82 # update commit cache if GET flag is present
83 83 if self.request.GET.get('update_commit_cache'):
84 84 self.db_repo.update_commit_cache()
85 85 h.flash(_('updated commit cache'), category='success')
86 86
87 87 return self._get_template_context(c)
88 88
89 89 @LoginRequired()
90 90 @HasRepoPermissionAnyDecorator('repository.admin')
91 91 @CSRFRequired()
92 92 @view_config(
93 93 route_name='edit_repo_advanced_archive', request_method='POST',
94 94 renderer='rhodecode:templates/admin/repos/repo_edit.mako')
95 95 def edit_advanced_archive(self):
96 96 """
97 97 Archives the repository. It will become read-only, and not visible in search
98 98 or other queries. But still visible for super-admins.
99 99 """
100 100
101 101 _ = self.request.translate
102 102
103 103 try:
104 104 old_data = self.db_repo.get_api_data()
105 105 RepoModel().archive(self.db_repo)
106 106
107 107 repo = audit_logger.RepoWrap(repo_id=None, repo_name=self.db_repo.repo_name)
108 108 audit_logger.store_web(
109 109 'repo.archive', action_data={'old_data': old_data},
110 110 user=self._rhodecode_user, repo=repo)
111 111
112 112 ScmModel().mark_for_invalidation(self.db_repo_name, delete=True)
113 113 h.flash(
114 114 _('Archived repository `%s`') % self.db_repo_name,
115 115 category='success')
116 116 Session().commit()
117 117 except Exception:
118 118 log.exception("Exception during archiving of repository")
119 119 h.flash(_('An error occurred during archiving of `%s`')
120 120 % self.db_repo_name, category='error')
121 121 # redirect to advanced for more deletion options
122 122 raise HTTPFound(
123 123 h.route_path('edit_repo_advanced', repo_name=self.db_repo_name,
124 124 _anchor='advanced-archive'))
125 125
126 126 # flush permissions for all users defined in permissions
127 127 affected_user_ids = self._get_users_with_permissions().keys()
128 128 PermissionModel().trigger_permission_flush(affected_user_ids)
129 129
130 130 raise HTTPFound(h.route_path('home'))
131 131
132 132 @LoginRequired()
133 133 @HasRepoPermissionAnyDecorator('repository.admin')
134 134 @CSRFRequired()
135 135 @view_config(
136 136 route_name='edit_repo_advanced_delete', request_method='POST',
137 137 renderer='rhodecode:templates/admin/repos/repo_edit.mako')
138 138 def edit_advanced_delete(self):
139 139 """
140 140 Deletes the repository, or shows warnings if deletion is not possible
141 141 because of attached forks or other errors.
142 142 """
143 143 _ = self.request.translate
144 144 handle_forks = self.request.POST.get('forks', None)
145 145 if handle_forks == 'detach_forks':
146 146 handle_forks = 'detach'
147 147 elif handle_forks == 'delete_forks':
148 148 handle_forks = 'delete'
149 149
150 150 try:
151 151 old_data = self.db_repo.get_api_data()
152 152 RepoModel().delete(self.db_repo, forks=handle_forks)
153 153
154 154 _forks = self.db_repo.forks.count()
155 155 if _forks and handle_forks:
156 156 if handle_forks == 'detach_forks':
157 157 h.flash(_('Detached %s forks') % _forks, category='success')
158 158 elif handle_forks == 'delete_forks':
159 159 h.flash(_('Deleted %s forks') % _forks, category='success')
160 160
161 161 repo = audit_logger.RepoWrap(repo_id=None, repo_name=self.db_repo.repo_name)
162 162 audit_logger.store_web(
163 163 'repo.delete', action_data={'old_data': old_data},
164 164 user=self._rhodecode_user, repo=repo)
165 165
166 166 ScmModel().mark_for_invalidation(self.db_repo_name, delete=True)
167 167 h.flash(
168 168 _('Deleted repository `%s`') % self.db_repo_name,
169 169 category='success')
170 170 Session().commit()
171 171 except AttachedForksError:
172 172 repo_advanced_url = h.route_path(
173 173 'edit_repo_advanced', repo_name=self.db_repo_name,
174 174 _anchor='advanced-delete')
175 175 delete_anchor = h.link_to(_('detach or delete'), repo_advanced_url)
176 176 h.flash(_('Cannot delete `{repo}` it still contains attached forks. '
177 177 'Try using {delete_or_detach} option.')
178 178 .format(repo=self.db_repo_name, delete_or_detach=delete_anchor),
179 179 category='warning')
180 180
181 181 # redirect to advanced for forks handle action ?
182 182 raise HTTPFound(repo_advanced_url)
183 183
184 184 except AttachedPullRequestsError:
185 185 repo_advanced_url = h.route_path(
186 186 'edit_repo_advanced', repo_name=self.db_repo_name,
187 187 _anchor='advanced-delete')
188 188 attached_prs = len(self.db_repo.pull_requests_source +
189 189 self.db_repo.pull_requests_target)
190 190 h.flash(
191 191 _('Cannot delete `{repo}` it still contains {num} attached pull requests. '
192 192 'Consider archiving the repository instead.').format(
193 193 repo=self.db_repo_name, num=attached_prs), category='warning')
194 194
195 195 # redirect to advanced for forks handle action ?
196 196 raise HTTPFound(repo_advanced_url)
197 197
198 198 except Exception:
199 199 log.exception("Exception during deletion of repository")
200 200 h.flash(_('An error occurred during deletion of `%s`')
201 201 % self.db_repo_name, category='error')
202 202 # redirect to advanced for more deletion options
203 203 raise HTTPFound(
204 204 h.route_path('edit_repo_advanced', repo_name=self.db_repo_name,
205 205 _anchor='advanced-delete'))
206 206
207 207 raise HTTPFound(h.route_path('home'))
208 208
209 209 @LoginRequired()
210 210 @HasRepoPermissionAnyDecorator('repository.admin')
211 211 @CSRFRequired()
212 212 @view_config(
213 213 route_name='edit_repo_advanced_journal', request_method='POST',
214 214 renderer='rhodecode:templates/admin/repos/repo_edit.mako')
215 215 def edit_advanced_journal(self):
216 216 """
217 217 Set's this repository to be visible in public journal,
218 218 in other words making default user to follow this repo
219 219 """
220 220 _ = self.request.translate
221 221
222 222 try:
223 user_id = User.get_default_user().user_id
223 user_id = User.get_default_user_id()
224 224 ScmModel().toggle_following_repo(self.db_repo.repo_id, user_id)
225 225 h.flash(_('Updated repository visibility in public journal'),
226 226 category='success')
227 227 Session().commit()
228 228 except Exception:
229 229 h.flash(_('An error occurred during setting this '
230 230 'repository in public journal'),
231 231 category='error')
232 232
233 233 raise HTTPFound(
234 234 h.route_path('edit_repo_advanced', repo_name=self.db_repo_name))
235 235
236 236 @LoginRequired()
237 237 @HasRepoPermissionAnyDecorator('repository.admin')
238 238 @CSRFRequired()
239 239 @view_config(
240 240 route_name='edit_repo_advanced_fork', request_method='POST',
241 241 renderer='rhodecode:templates/admin/repos/repo_edit.mako')
242 242 def edit_advanced_fork(self):
243 243 """
244 244 Mark given repository as a fork of another
245 245 """
246 246 _ = self.request.translate
247 247
248 248 new_fork_id = safe_int(self.request.POST.get('id_fork_of'))
249 249
250 250 # valid repo, re-check permissions
251 251 if new_fork_id:
252 252 repo = Repository.get(new_fork_id)
253 253 # ensure we have at least read access to the repo we mark
254 254 perm_check = HasRepoPermissionAny(
255 255 'repository.read', 'repository.write', 'repository.admin')
256 256
257 257 if repo and perm_check(repo_name=repo.repo_name):
258 258 new_fork_id = repo.repo_id
259 259 else:
260 260 new_fork_id = None
261 261
262 262 try:
263 263 repo = ScmModel().mark_as_fork(
264 264 self.db_repo_name, new_fork_id, self._rhodecode_user.user_id)
265 265 fork = repo.fork.repo_name if repo.fork else _('Nothing')
266 266 Session().commit()
267 267 h.flash(
268 268 _('Marked repo %s as fork of %s') % (self.db_repo_name, fork),
269 269 category='success')
270 270 except RepositoryError as e:
271 271 log.exception("Repository Error occurred")
272 272 h.flash(str(e), category='error')
273 273 except Exception:
274 274 log.exception("Exception while editing fork")
275 275 h.flash(_('An error occurred during this operation'),
276 276 category='error')
277 277
278 278 raise HTTPFound(
279 279 h.route_path('edit_repo_advanced', repo_name=self.db_repo_name))
280 280
281 281 @LoginRequired()
282 282 @HasRepoPermissionAnyDecorator('repository.admin')
283 283 @CSRFRequired()
284 284 @view_config(
285 285 route_name='edit_repo_advanced_locking', request_method='POST',
286 286 renderer='rhodecode:templates/admin/repos/repo_edit.mako')
287 287 def edit_advanced_locking(self):
288 288 """
289 289 Toggle locking of repository
290 290 """
291 291 _ = self.request.translate
292 292 set_lock = self.request.POST.get('set_lock')
293 293 set_unlock = self.request.POST.get('set_unlock')
294 294
295 295 try:
296 296 if set_lock:
297 297 Repository.lock(self.db_repo, self._rhodecode_user.user_id,
298 298 lock_reason=Repository.LOCK_WEB)
299 299 h.flash(_('Locked repository'), category='success')
300 300 elif set_unlock:
301 301 Repository.unlock(self.db_repo)
302 302 h.flash(_('Unlocked repository'), category='success')
303 303 except Exception as e:
304 304 log.exception("Exception during unlocking")
305 305 h.flash(_('An error occurred during unlocking'), category='error')
306 306
307 307 raise HTTPFound(
308 308 h.route_path('edit_repo_advanced', repo_name=self.db_repo_name))
309 309
310 310 @LoginRequired()
311 311 @HasRepoPermissionAnyDecorator('repository.admin')
312 312 @view_config(
313 313 route_name='edit_repo_advanced_hooks', request_method='GET',
314 314 renderer='rhodecode:templates/admin/repos/repo_edit.mako')
315 315 def edit_advanced_install_hooks(self):
316 316 """
317 317 Install Hooks for repository
318 318 """
319 319 _ = self.request.translate
320 320 self.load_default_context()
321 321 self.rhodecode_vcs_repo.install_hooks(force=True)
322 322 h.flash(_('installed updated hooks into this repository'),
323 323 category='success')
324 324 raise HTTPFound(
325 325 h.route_path('edit_repo_advanced', repo_name=self.db_repo_name))
@@ -1,86 +1,87 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import logging
23 23 import rhodecode
24 24
25 25 from rhodecode.config import utils
26 26
27 27 from rhodecode.lib.utils import load_rcextensions
28 28 from rhodecode.lib.utils2 import str2bool
29 29 from rhodecode.lib.vcs import connect_vcs
30 30
31 31 log = logging.getLogger(__name__)
32 32
33 33
34 34 def load_pyramid_environment(global_config, settings):
35 35 # Some parts of the code expect a merge of global and app settings.
36 36 settings_merged = global_config.copy()
37 37 settings_merged.update(settings)
38 38
39 39 # TODO(marcink): probably not required anymore
40 40 # configure channelstream,
41 41 settings_merged['channelstream_config'] = {
42 42 'enabled': str2bool(settings_merged.get('channelstream.enabled', False)),
43 43 'server': settings_merged.get('channelstream.server'),
44 44 'secret': settings_merged.get('channelstream.secret')
45 45 }
46 46
47 47 # If this is a test run we prepare the test environment like
48 48 # creating a test database, test search index and test repositories.
49 49 # This has to be done before the database connection is initialized.
50 50 if settings['is_test']:
51 51 rhodecode.is_test = True
52 52 rhodecode.disable_error_handler = True
53 53 from rhodecode import authentication
54 54 authentication.plugin_default_auth_ttl = 0
55 55
56 56 utils.initialize_test_environment(settings_merged)
57 57
58 58 # Initialize the database connection.
59 59 utils.initialize_database(settings_merged)
60 60
61 61 load_rcextensions(root_path=settings_merged['here'])
62 62
63 63 # Limit backends to `vcs.backends` from configuration, and preserve the order
64 64 for alias in rhodecode.BACKENDS.keys():
65 65 if alias not in settings['vcs.backends']:
66 66 del rhodecode.BACKENDS[alias]
67 67
68 def sorter(item):
69 return settings['vcs.backends'].index(item[0])
70 rhodecode.BACKENDS = rhodecode.OrderedDict(sorted(rhodecode.BACKENDS.items(), key=sorter))
68 _sorted_backend = sorted(rhodecode.BACKENDS.items(),
69 key=lambda item: settings['vcs.backends'].index(item[0]))
70 rhodecode.BACKENDS = rhodecode.OrderedDict(_sorted_backend)
71 71
72 72 log.info('Enabled VCS backends: %s', rhodecode.BACKENDS.keys())
73 73
74 74 # initialize vcs client and optionally run the server if enabled
75 75 vcs_server_uri = settings['vcs.server']
76 76 vcs_server_enabled = settings['vcs.server.enable']
77 77
78 78 utils.configure_vcs(settings)
79 79
80 80 # Store the settings to make them available to other modules.
81 81
82 82 rhodecode.PYRAMID_SETTINGS = settings_merged
83 83 rhodecode.CONFIG = settings_merged
84 rhodecode.CONFIG['default_user_id'] = utils.get_default_user_id()
84 85
85 86 if vcs_server_enabled:
86 87 connect_vcs(vcs_server_uri, utils.get_vcs_server_protocol(settings))
@@ -1,93 +1,101 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 import shlex
23 22 import platform
24 23
25 24 from rhodecode.model import init_model
26 25
27 26
28 27 def configure_vcs(config):
29 28 """
30 29 Patch VCS config with some RhodeCode specific stuff
31 30 """
32 31 from rhodecode.lib.vcs import conf
33 32 import rhodecode.lib.vcs.conf.settings
34 33
35 34 conf.settings.BACKENDS = {
36 35 'hg': 'rhodecode.lib.vcs.backends.hg.MercurialRepository',
37 36 'git': 'rhodecode.lib.vcs.backends.git.GitRepository',
38 37 'svn': 'rhodecode.lib.vcs.backends.svn.SubversionRepository',
39 38 }
40 39
41 40 conf.settings.HOOKS_PROTOCOL = config['vcs.hooks.protocol']
42 41 conf.settings.HOOKS_HOST = config['vcs.hooks.host']
43 42 conf.settings.HOOKS_DIRECT_CALLS = config['vcs.hooks.direct_calls']
44 43 conf.settings.DEFAULT_ENCODINGS = config['default_encoding']
45 44 conf.settings.ALIASES[:] = config['vcs.backends']
46 45 conf.settings.SVN_COMPATIBLE_VERSION = config['vcs.svn.compatible_version']
47 46
48 47
49 48 def initialize_database(config):
50 49 from rhodecode.lib.utils2 import engine_from_config, get_encryption_key
51 50 engine = engine_from_config(config, 'sqlalchemy.db1.')
52 51 init_model(engine, encryption_key=get_encryption_key(config))
53 52
54 53
55 54 def initialize_test_environment(settings, test_env=None):
56 55 if test_env is None:
57 56 test_env = not int(os.environ.get('RC_NO_TMP_PATH', 0))
58 57
59 58 from rhodecode.lib.utils import (
60 59 create_test_directory, create_test_database, create_test_repositories,
61 60 create_test_index)
62 61 from rhodecode.tests import TESTS_TMP_PATH
63 62 from rhodecode.lib.vcs.backends.hg import largefiles_store
64 63 from rhodecode.lib.vcs.backends.git import lfs_store
65 64
66 65 # test repos
67 66 if test_env:
68 67 create_test_directory(TESTS_TMP_PATH)
69 68 # large object stores
70 69 create_test_directory(largefiles_store(TESTS_TMP_PATH))
71 70 create_test_directory(lfs_store(TESTS_TMP_PATH))
72 71
73 72 create_test_database(TESTS_TMP_PATH, settings)
74 73 create_test_repositories(TESTS_TMP_PATH, settings)
75 74 create_test_index(TESTS_TMP_PATH, settings)
76 75
77 76
78 77 def get_vcs_server_protocol(config):
79 78 return config['vcs.server.protocol']
80 79
81 80
82 81 def set_instance_id(config):
83 82 """
84 83 Sets a dynamic generated config['instance_id'] if missing or '*'
85 84 E.g instance_id = *cluster-1 or instance_id = *
86 85 """
87 86
88 87 config['instance_id'] = config.get('instance_id') or ''
89 88 instance_id = config['instance_id']
90 89 if instance_id.startswith('*') or not instance_id:
91 90 prefix = instance_id.lstrip('*')
92 91 _platform_id = platform.uname()[1] or 'instance'
93 92 config['instance_id'] = '%s%s-%s' % (prefix, _platform_id, os.getpid())
93
94
95 def get_default_user_id():
96 from rhodecode.model.db import User, Session
97 user_id = Session()\
98 .query(User.user_id)\
99 .filter(User.username == User.DEFAULT_USER)\
100 .scalar()
101 return user_id
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
@@ -1,597 +1,597 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 permissions model for RhodeCode
23 23 """
24 24 import collections
25 25 import logging
26 26 import traceback
27 27
28 28 from sqlalchemy.exc import DatabaseError
29 29
30 30 from rhodecode import events
31 31 from rhodecode.model import BaseModel
32 32 from rhodecode.model.db import (
33 33 User, Permission, UserToPerm, UserRepoToPerm, UserRepoGroupToPerm,
34 34 UserUserGroupToPerm, UserGroup, UserGroupToPerm, UserToRepoBranchPermission)
35 35 from rhodecode.lib.utils2 import str2bool, safe_int
36 36
37 37 log = logging.getLogger(__name__)
38 38
39 39
40 40 class PermissionModel(BaseModel):
41 41 """
42 42 Permissions model for RhodeCode
43 43 """
44 44
45 45 cls = Permission
46 46 global_perms = {
47 47 'default_repo_create': None,
48 48 # special case for create repos on write access to group
49 49 'default_repo_create_on_write': None,
50 50 'default_repo_group_create': None,
51 51 'default_user_group_create': None,
52 52 'default_fork_create': None,
53 53 'default_inherit_default_permissions': None,
54 54 'default_register': None,
55 55 'default_password_reset': None,
56 56 'default_extern_activate': None,
57 57
58 58 # object permissions below
59 59 'default_repo_perm': None,
60 60 'default_group_perm': None,
61 61 'default_user_group_perm': None,
62 62
63 63 # branch
64 64 'default_branch_perm': None,
65 65 }
66 66
67 67 def set_global_permission_choices(self, c_obj, gettext_translator):
68 68 _ = gettext_translator
69 69
70 70 c_obj.repo_perms_choices = [
71 71 ('repository.none', _('None'),),
72 72 ('repository.read', _('Read'),),
73 73 ('repository.write', _('Write'),),
74 74 ('repository.admin', _('Admin'),)]
75 75
76 76 c_obj.group_perms_choices = [
77 77 ('group.none', _('None'),),
78 78 ('group.read', _('Read'),),
79 79 ('group.write', _('Write'),),
80 80 ('group.admin', _('Admin'),)]
81 81
82 82 c_obj.user_group_perms_choices = [
83 83 ('usergroup.none', _('None'),),
84 84 ('usergroup.read', _('Read'),),
85 85 ('usergroup.write', _('Write'),),
86 86 ('usergroup.admin', _('Admin'),)]
87 87
88 88 c_obj.branch_perms_choices = [
89 89 ('branch.none', _('Protected/No Access'),),
90 90 ('branch.merge', _('Web merge'),),
91 91 ('branch.push', _('Push'),),
92 92 ('branch.push_force', _('Force Push'),)]
93 93
94 94 c_obj.register_choices = [
95 95 ('hg.register.none', _('Disabled')),
96 96 ('hg.register.manual_activate', _('Allowed with manual account activation')),
97 97 ('hg.register.auto_activate', _('Allowed with automatic account activation')),]
98 98
99 99 c_obj.password_reset_choices = [
100 100 ('hg.password_reset.enabled', _('Allow password recovery')),
101 101 ('hg.password_reset.hidden', _('Hide password recovery link')),
102 102 ('hg.password_reset.disabled', _('Disable password recovery')),]
103 103
104 104 c_obj.extern_activate_choices = [
105 105 ('hg.extern_activate.manual', _('Manual activation of external account')),
106 106 ('hg.extern_activate.auto', _('Automatic activation of external account')),]
107 107
108 108 c_obj.repo_create_choices = [
109 109 ('hg.create.none', _('Disabled')),
110 110 ('hg.create.repository', _('Enabled'))]
111 111
112 112 c_obj.repo_create_on_write_choices = [
113 113 ('hg.create.write_on_repogroup.false', _('Disabled')),
114 114 ('hg.create.write_on_repogroup.true', _('Enabled'))]
115 115
116 116 c_obj.user_group_create_choices = [
117 117 ('hg.usergroup.create.false', _('Disabled')),
118 118 ('hg.usergroup.create.true', _('Enabled'))]
119 119
120 120 c_obj.repo_group_create_choices = [
121 121 ('hg.repogroup.create.false', _('Disabled')),
122 122 ('hg.repogroup.create.true', _('Enabled'))]
123 123
124 124 c_obj.fork_choices = [
125 125 ('hg.fork.none', _('Disabled')),
126 126 ('hg.fork.repository', _('Enabled'))]
127 127
128 128 c_obj.inherit_default_permission_choices = [
129 129 ('hg.inherit_default_perms.false', _('Disabled')),
130 130 ('hg.inherit_default_perms.true', _('Enabled'))]
131 131
132 132 def get_default_perms(self, object_perms, suffix):
133 133 defaults = {}
134 134 for perm in object_perms:
135 135 # perms
136 136 if perm.permission.permission_name.startswith('repository.'):
137 137 defaults['default_repo_perm' + suffix] = perm.permission.permission_name
138 138
139 139 if perm.permission.permission_name.startswith('group.'):
140 140 defaults['default_group_perm' + suffix] = perm.permission.permission_name
141 141
142 142 if perm.permission.permission_name.startswith('usergroup.'):
143 143 defaults['default_user_group_perm' + suffix] = perm.permission.permission_name
144 144
145 145 # branch
146 146 if perm.permission.permission_name.startswith('branch.'):
147 147 defaults['default_branch_perm' + suffix] = perm.permission.permission_name
148 148
149 149 # creation of objects
150 150 if perm.permission.permission_name.startswith('hg.create.write_on_repogroup'):
151 151 defaults['default_repo_create_on_write' + suffix] = perm.permission.permission_name
152 152
153 153 elif perm.permission.permission_name.startswith('hg.create.'):
154 154 defaults['default_repo_create' + suffix] = perm.permission.permission_name
155 155
156 156 if perm.permission.permission_name.startswith('hg.fork.'):
157 157 defaults['default_fork_create' + suffix] = perm.permission.permission_name
158 158
159 159 if perm.permission.permission_name.startswith('hg.inherit_default_perms.'):
160 160 defaults['default_inherit_default_permissions' + suffix] = perm.permission.permission_name
161 161
162 162 if perm.permission.permission_name.startswith('hg.repogroup.'):
163 163 defaults['default_repo_group_create' + suffix] = perm.permission.permission_name
164 164
165 165 if perm.permission.permission_name.startswith('hg.usergroup.'):
166 166 defaults['default_user_group_create' + suffix] = perm.permission.permission_name
167 167
168 168 # registration and external account activation
169 169 if perm.permission.permission_name.startswith('hg.register.'):
170 170 defaults['default_register' + suffix] = perm.permission.permission_name
171 171
172 172 if perm.permission.permission_name.startswith('hg.password_reset.'):
173 173 defaults['default_password_reset' + suffix] = perm.permission.permission_name
174 174
175 175 if perm.permission.permission_name.startswith('hg.extern_activate.'):
176 176 defaults['default_extern_activate' + suffix] = perm.permission.permission_name
177 177
178 178 return defaults
179 179
180 180 def _make_new_user_perm(self, user, perm_name):
181 181 log.debug('Creating new user permission:%s', perm_name)
182 182 new = UserToPerm()
183 183 new.user = user
184 184 new.permission = Permission.get_by_key(perm_name)
185 185 return new
186 186
187 187 def _make_new_user_group_perm(self, user_group, perm_name):
188 188 log.debug('Creating new user group permission:%s', perm_name)
189 189 new = UserGroupToPerm()
190 190 new.users_group = user_group
191 191 new.permission = Permission.get_by_key(perm_name)
192 192 return new
193 193
194 194 def _keep_perm(self, perm_name, keep_fields):
195 195 def get_pat(field_name):
196 196 return {
197 197 # global perms
198 198 'default_repo_create': 'hg.create.',
199 199 # special case for create repos on write access to group
200 200 'default_repo_create_on_write': 'hg.create.write_on_repogroup.',
201 201 'default_repo_group_create': 'hg.repogroup.create.',
202 202 'default_user_group_create': 'hg.usergroup.create.',
203 203 'default_fork_create': 'hg.fork.',
204 204 'default_inherit_default_permissions': 'hg.inherit_default_perms.',
205 205
206 206 # application perms
207 207 'default_register': 'hg.register.',
208 208 'default_password_reset': 'hg.password_reset.',
209 209 'default_extern_activate': 'hg.extern_activate.',
210 210
211 211 # object permissions below
212 212 'default_repo_perm': 'repository.',
213 213 'default_group_perm': 'group.',
214 214 'default_user_group_perm': 'usergroup.',
215 215 # branch
216 216 'default_branch_perm': 'branch.',
217 217
218 218 }[field_name]
219 219 for field in keep_fields:
220 220 pat = get_pat(field)
221 221 if perm_name.startswith(pat):
222 222 return True
223 223 return False
224 224
225 225 def _clear_object_perm(self, object_perms, preserve=None):
226 226 preserve = preserve or []
227 227 _deleted = []
228 228 for perm in object_perms:
229 229 perm_name = perm.permission.permission_name
230 230 if not self._keep_perm(perm_name, keep_fields=preserve):
231 231 _deleted.append(perm_name)
232 232 self.sa.delete(perm)
233 233 return _deleted
234 234
235 235 def _clear_user_perms(self, user_id, preserve=None):
236 236 perms = self.sa.query(UserToPerm)\
237 237 .filter(UserToPerm.user_id == user_id)\
238 238 .all()
239 239 return self._clear_object_perm(perms, preserve=preserve)
240 240
241 241 def _clear_user_group_perms(self, user_group_id, preserve=None):
242 242 perms = self.sa.query(UserGroupToPerm)\
243 243 .filter(UserGroupToPerm.users_group_id == user_group_id)\
244 244 .all()
245 245 return self._clear_object_perm(perms, preserve=preserve)
246 246
247 247 def _set_new_object_perms(self, obj_type, object, form_result, preserve=None):
248 248 # clear current entries, to make this function idempotent
249 249 # it will fix even if we define more permissions or permissions
250 250 # are somehow missing
251 251 preserve = preserve or []
252 252 _global_perms = self.global_perms.copy()
253 253 if obj_type not in ['user', 'user_group']:
254 254 raise ValueError("obj_type must be on of 'user' or 'user_group'")
255 255 global_perms = len(_global_perms)
256 256 default_user_perms = len(Permission.DEFAULT_USER_PERMISSIONS)
257 257 if global_perms != default_user_perms:
258 258 raise Exception(
259 259 'Inconsistent permissions definition. Got {} vs {}'.format(
260 260 global_perms, default_user_perms))
261 261
262 262 if obj_type == 'user':
263 263 self._clear_user_perms(object.user_id, preserve)
264 264 if obj_type == 'user_group':
265 265 self._clear_user_group_perms(object.users_group_id, preserve)
266 266
267 267 # now kill the keys that we want to preserve from the form.
268 268 for key in preserve:
269 269 del _global_perms[key]
270 270
271 271 for k in _global_perms.copy():
272 272 _global_perms[k] = form_result[k]
273 273
274 274 # at that stage we validate all are passed inside form_result
275 275 for _perm_key, perm_value in _global_perms.items():
276 276 if perm_value is None:
277 277 raise ValueError('Missing permission for %s' % (_perm_key,))
278 278
279 279 if obj_type == 'user':
280 280 p = self._make_new_user_perm(object, perm_value)
281 281 self.sa.add(p)
282 282 if obj_type == 'user_group':
283 283 p = self._make_new_user_group_perm(object, perm_value)
284 284 self.sa.add(p)
285 285
286 286 def _set_new_user_perms(self, user, form_result, preserve=None):
287 287 return self._set_new_object_perms(
288 288 'user', user, form_result, preserve)
289 289
290 290 def _set_new_user_group_perms(self, user_group, form_result, preserve=None):
291 291 return self._set_new_object_perms(
292 292 'user_group', user_group, form_result, preserve)
293 293
294 294 def set_new_user_perms(self, user, form_result):
295 295 # calculate what to preserve from what is given in form_result
296 296 preserve = set(self.global_perms.keys()).difference(set(form_result.keys()))
297 297 return self._set_new_user_perms(user, form_result, preserve)
298 298
299 299 def set_new_user_group_perms(self, user_group, form_result):
300 300 # calculate what to preserve from what is given in form_result
301 301 preserve = set(self.global_perms.keys()).difference(set(form_result.keys()))
302 302 return self._set_new_user_group_perms(user_group, form_result, preserve)
303 303
304 304 def create_permissions(self):
305 305 """
306 306 Create permissions for whole system
307 307 """
308 308 for p in Permission.PERMS:
309 309 if not Permission.get_by_key(p[0]):
310 310 new_perm = Permission()
311 311 new_perm.permission_name = p[0]
312 312 new_perm.permission_longname = p[0] # translation err with p[1]
313 313 self.sa.add(new_perm)
314 314
315 315 def _create_default_object_permission(self, obj_type, obj, obj_perms,
316 316 force=False):
317 317 if obj_type not in ['user', 'user_group']:
318 318 raise ValueError("obj_type must be on of 'user' or 'user_group'")
319 319
320 320 def _get_group(perm_name):
321 321 return '.'.join(perm_name.split('.')[:1])
322 322
323 323 defined_perms_groups = map(
324 324 _get_group, (x.permission.permission_name for x in obj_perms))
325 325 log.debug('GOT ALREADY DEFINED:%s', obj_perms)
326 326
327 327 if force:
328 328 self._clear_object_perm(obj_perms)
329 329 self.sa.commit()
330 330 defined_perms_groups = []
331 331 # for every default permission that needs to be created, we check if
332 332 # it's group is already defined, if it's not we create default perm
333 333 for perm_name in Permission.DEFAULT_USER_PERMISSIONS:
334 334 gr = _get_group(perm_name)
335 335 if gr not in defined_perms_groups:
336 336 log.debug('GR:%s not found, creating permission %s',
337 337 gr, perm_name)
338 338 if obj_type == 'user':
339 339 new_perm = self._make_new_user_perm(obj, perm_name)
340 340 self.sa.add(new_perm)
341 341 if obj_type == 'user_group':
342 342 new_perm = self._make_new_user_group_perm(obj, perm_name)
343 343 self.sa.add(new_perm)
344 344
345 345 def create_default_user_permissions(self, user, force=False):
346 346 """
347 347 Creates only missing default permissions for user, if force is set it
348 348 resets the default permissions for that user
349 349
350 350 :param user:
351 351 :param force:
352 352 """
353 353 user = self._get_user(user)
354 354 obj_perms = UserToPerm.query().filter(UserToPerm.user == user).all()
355 355 return self._create_default_object_permission(
356 356 'user', user, obj_perms, force)
357 357
358 358 def create_default_user_group_permissions(self, user_group, force=False):
359 359 """
360 360 Creates only missing default permissions for user group, if force is
361 361 set it resets the default permissions for that user group
362 362
363 363 :param user_group:
364 364 :param force:
365 365 """
366 366 user_group = self._get_user_group(user_group)
367 367 obj_perms = UserToPerm.query().filter(UserGroupToPerm.users_group == user_group).all()
368 368 return self._create_default_object_permission(
369 369 'user_group', user_group, obj_perms, force)
370 370
371 371 def update_application_permissions(self, form_result):
372 372 if 'perm_user_id' in form_result:
373 373 perm_user = User.get(safe_int(form_result['perm_user_id']))
374 374 else:
375 375 # used mostly to do lookup for default user
376 376 perm_user = User.get_by_username(form_result['perm_user_name'])
377 377
378 378 try:
379 379 # stage 1 set anonymous access
380 380 if perm_user.username == User.DEFAULT_USER:
381 381 perm_user.active = str2bool(form_result['anonymous'])
382 382 self.sa.add(perm_user)
383 383
384 384 # stage 2 reset defaults and set them from form data
385 385 self._set_new_user_perms(perm_user, form_result, preserve=[
386 386 'default_repo_perm',
387 387 'default_group_perm',
388 388 'default_user_group_perm',
389 389 'default_branch_perm',
390 390
391 391 'default_repo_group_create',
392 392 'default_user_group_create',
393 393 'default_repo_create_on_write',
394 394 'default_repo_create',
395 395 'default_fork_create',
396 396 'default_inherit_default_permissions',])
397 397
398 398 self.sa.commit()
399 399 except (DatabaseError,):
400 400 log.error(traceback.format_exc())
401 401 self.sa.rollback()
402 402 raise
403 403
404 404 def update_user_permissions(self, form_result):
405 405 if 'perm_user_id' in form_result:
406 406 perm_user = User.get(safe_int(form_result['perm_user_id']))
407 407 else:
408 408 # used mostly to do lookup for default user
409 409 perm_user = User.get_by_username(form_result['perm_user_name'])
410 410 try:
411 411 # stage 2 reset defaults and set them from form data
412 412 self._set_new_user_perms(perm_user, form_result, preserve=[
413 413 'default_repo_perm',
414 414 'default_group_perm',
415 415 'default_user_group_perm',
416 416 'default_branch_perm',
417 417
418 418 'default_register',
419 419 'default_password_reset',
420 420 'default_extern_activate'])
421 421 self.sa.commit()
422 422 except (DatabaseError,):
423 423 log.error(traceback.format_exc())
424 424 self.sa.rollback()
425 425 raise
426 426
427 427 def update_user_group_permissions(self, form_result):
428 428 if 'perm_user_group_id' in form_result:
429 429 perm_user_group = UserGroup.get(safe_int(form_result['perm_user_group_id']))
430 430 else:
431 431 # used mostly to do lookup for default user
432 432 perm_user_group = UserGroup.get_by_group_name(form_result['perm_user_group_name'])
433 433 try:
434 434 # stage 2 reset defaults and set them from form data
435 435 self._set_new_user_group_perms(perm_user_group, form_result, preserve=[
436 436 'default_repo_perm',
437 437 'default_group_perm',
438 438 'default_user_group_perm',
439 439 'default_branch_perm',
440 440
441 441 'default_register',
442 442 'default_password_reset',
443 443 'default_extern_activate'])
444 444 self.sa.commit()
445 445 except (DatabaseError,):
446 446 log.error(traceback.format_exc())
447 447 self.sa.rollback()
448 448 raise
449 449
450 450 def update_object_permissions(self, form_result):
451 451 if 'perm_user_id' in form_result:
452 452 perm_user = User.get(safe_int(form_result['perm_user_id']))
453 453 else:
454 454 # used mostly to do lookup for default user
455 455 perm_user = User.get_by_username(form_result['perm_user_name'])
456 456 try:
457 457
458 458 # stage 2 reset defaults and set them from form data
459 459 self._set_new_user_perms(perm_user, form_result, preserve=[
460 460 'default_repo_group_create',
461 461 'default_user_group_create',
462 462 'default_repo_create_on_write',
463 463 'default_repo_create',
464 464 'default_fork_create',
465 465 'default_inherit_default_permissions',
466 466 'default_branch_perm',
467 467
468 468 'default_register',
469 469 'default_password_reset',
470 470 'default_extern_activate'])
471 471
472 472 # overwrite default repo permissions
473 473 if form_result['overwrite_default_repo']:
474 474 _def_name = form_result['default_repo_perm'].split('repository.')[-1]
475 475 _def = Permission.get_by_key('repository.' + _def_name)
476 476 for r2p in self.sa.query(UserRepoToPerm)\
477 477 .filter(UserRepoToPerm.user == perm_user)\
478 478 .all():
479 479 # don't reset PRIVATE repositories
480 480 if not r2p.repository.private:
481 481 r2p.permission = _def
482 482 self.sa.add(r2p)
483 483
484 484 # overwrite default repo group permissions
485 485 if form_result['overwrite_default_group']:
486 486 _def_name = form_result['default_group_perm'].split('group.')[-1]
487 487 _def = Permission.get_by_key('group.' + _def_name)
488 488 for g2p in self.sa.query(UserRepoGroupToPerm)\
489 489 .filter(UserRepoGroupToPerm.user == perm_user)\
490 490 .all():
491 491 g2p.permission = _def
492 492 self.sa.add(g2p)
493 493
494 494 # overwrite default user group permissions
495 495 if form_result['overwrite_default_user_group']:
496 496 _def_name = form_result['default_user_group_perm'].split('usergroup.')[-1]
497 497 # user groups
498 498 _def = Permission.get_by_key('usergroup.' + _def_name)
499 499 for g2p in self.sa.query(UserUserGroupToPerm)\
500 500 .filter(UserUserGroupToPerm.user == perm_user)\
501 501 .all():
502 502 g2p.permission = _def
503 503 self.sa.add(g2p)
504 504
505 505 # COMMIT
506 506 self.sa.commit()
507 507 except (DatabaseError,):
508 508 log.exception('Failed to set default object permissions')
509 509 self.sa.rollback()
510 510 raise
511 511
512 512 def update_branch_permissions(self, form_result):
513 513 if 'perm_user_id' in form_result:
514 514 perm_user = User.get(safe_int(form_result['perm_user_id']))
515 515 else:
516 516 # used mostly to do lookup for default user
517 517 perm_user = User.get_by_username(form_result['perm_user_name'])
518 518 try:
519 519
520 520 # stage 2 reset defaults and set them from form data
521 521 self._set_new_user_perms(perm_user, form_result, preserve=[
522 522 'default_repo_perm',
523 523 'default_group_perm',
524 524 'default_user_group_perm',
525 525
526 526 'default_repo_group_create',
527 527 'default_user_group_create',
528 528 'default_repo_create_on_write',
529 529 'default_repo_create',
530 530 'default_fork_create',
531 531 'default_inherit_default_permissions',
532 532
533 533 'default_register',
534 534 'default_password_reset',
535 535 'default_extern_activate'])
536 536
537 537 # overwrite default branch permissions
538 538 if form_result['overwrite_default_branch']:
539 539 _def_name = \
540 540 form_result['default_branch_perm'].split('branch.')[-1]
541 541
542 542 _def = Permission.get_by_key('branch.' + _def_name)
543 543
544 544 user_perms = UserToRepoBranchPermission.query()\
545 545 .join(UserToRepoBranchPermission.user_repo_to_perm)\
546 546 .filter(UserRepoToPerm.user == perm_user).all()
547 547
548 548 for g2p in user_perms:
549 549 g2p.permission = _def
550 550 self.sa.add(g2p)
551 551
552 552 # COMMIT
553 553 self.sa.commit()
554 554 except (DatabaseError,):
555 555 log.exception('Failed to set default branch permissions')
556 556 self.sa.rollback()
557 557 raise
558 558
559 559 def get_users_with_repo_write(self, db_repo):
560 560 write_plus = ['repository.write', 'repository.admin']
561 default_user_id = User.get_default_user().user_id
561 default_user_id = User.get_default_user_id()
562 562 user_write_permissions = collections.OrderedDict()
563 563
564 564 # write+ and DEFAULT user for inheritance
565 565 for perm in db_repo.permissions():
566 566 if perm.permission in write_plus or perm.user_id == default_user_id:
567 567 user_write_permissions[perm.user_id] = perm
568 568 return user_write_permissions
569 569
570 570 def get_user_groups_with_repo_write(self, db_repo):
571 571 write_plus = ['repository.write', 'repository.admin']
572 572 user_group_write_permissions = collections.OrderedDict()
573 573
574 574 # write+ and DEFAULT user for inheritance
575 575 for p in db_repo.permission_user_groups():
576 576 if p.permission in write_plus:
577 577 user_group_write_permissions[p.users_group_id] = p
578 578 return user_group_write_permissions
579 579
580 580 def trigger_permission_flush(self, affected_user_ids):
581 581 events.trigger(events.UserPermissionsChange(affected_user_ids))
582 582
583 583 def flush_user_permission_caches(self, changes, affected_user_ids=None):
584 584 affected_user_ids = affected_user_ids or []
585 585
586 586 for change in changes['added'] + changes['updated'] + changes['deleted']:
587 587 if change['type'] == 'user':
588 588 affected_user_ids.append(change['id'])
589 589 if change['type'] == 'user_group':
590 590 user_group = UserGroup.get(safe_int(change['id']))
591 591 if user_group:
592 592 group_members_ids = [x.user_id for x in user_group.members]
593 593 affected_user_ids.extend(group_members_ids)
594 594
595 595 self.trigger_permission_flush(affected_user_ids)
596 596
597 597 return affected_user_ids
@@ -1,1116 +1,1116 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Set of generic validators
23 23 """
24 24
25 25
26 26 import os
27 27 import re
28 28 import logging
29 29 import collections
30 30
31 31 import formencode
32 32 import ipaddress
33 33 from formencode.validators import (
34 34 UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set,
35 35 NotEmpty, IPAddress, CIDR, String, FancyValidator
36 36 )
37 37
38 38 from sqlalchemy.sql.expression import true
39 39 from sqlalchemy.util import OrderedSet
40 40 from pyramid import compat
41 41
42 42 from rhodecode.authentication import (
43 43 legacy_plugin_prefix, _import_legacy_plugin)
44 44 from rhodecode.authentication.base import loadplugin
45 45 from rhodecode.apps._base import ADMIN_PREFIX
46 46 from rhodecode.lib.auth import HasRepoGroupPermissionAny, HasPermissionAny
47 47 from rhodecode.lib.utils import repo_name_slug, make_db_config
48 48 from rhodecode.lib.utils2 import safe_int, str2bool, aslist, md5, safe_unicode
49 49 from rhodecode.lib.vcs.backends.git.repository import GitRepository
50 50 from rhodecode.lib.vcs.backends.hg.repository import MercurialRepository
51 51 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
52 52 from rhodecode.model.db import (
53 53 RepoGroup, Repository, UserGroup, User, ChangesetStatus, Gist)
54 54 from rhodecode.model.settings import VcsSettingsModel
55 55
56 56 # silence warnings and pylint
57 57 UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, \
58 58 NotEmpty, IPAddress, CIDR, String, FancyValidator
59 59
60 60 log = logging.getLogger(__name__)
61 61
62 62
63 63 class _Missing(object):
64 64 pass
65 65
66 66
67 67 Missing = _Missing()
68 68
69 69
70 70 def M(self, key, state, **kwargs):
71 71 """
72 72 returns string from self.message based on given key,
73 73 passed kw params are used to substitute %(named)s params inside
74 74 translated strings
75 75
76 76 :param msg:
77 77 :param state:
78 78 """
79 79
80 80 #state._ = staticmethod(_)
81 81 # inject validator into state object
82 82 return self.message(key, state, **kwargs)
83 83
84 84
85 85 def UniqueList(localizer, convert=None):
86 86 _ = localizer
87 87
88 88 class _validator(formencode.FancyValidator):
89 89 """
90 90 Unique List !
91 91 """
92 92 messages = {
93 93 'empty': _(u'Value cannot be an empty list'),
94 94 'missing_value': _(u'Value cannot be an empty list'),
95 95 }
96 96
97 97 def _to_python(self, value, state):
98 98 ret_val = []
99 99
100 100 def make_unique(value):
101 101 seen = []
102 102 return [c for c in value if not (c in seen or seen.append(c))]
103 103
104 104 if isinstance(value, list):
105 105 ret_val = make_unique(value)
106 106 elif isinstance(value, set):
107 107 ret_val = make_unique(list(value))
108 108 elif isinstance(value, tuple):
109 109 ret_val = make_unique(list(value))
110 110 elif value is None:
111 111 ret_val = []
112 112 else:
113 113 ret_val = [value]
114 114
115 115 if convert:
116 116 ret_val = map(convert, ret_val)
117 117 return ret_val
118 118
119 119 def empty_value(self, value):
120 120 return []
121 121 return _validator
122 122
123 123
124 124 def UniqueListFromString(localizer):
125 125 _ = localizer
126 126
127 127 class _validator(UniqueList(localizer)):
128 128 def _to_python(self, value, state):
129 129 if isinstance(value, compat.string_types):
130 130 value = aslist(value, ',')
131 131 return super(_validator, self)._to_python(value, state)
132 132 return _validator
133 133
134 134
135 135 def ValidSvnPattern(localizer, section, repo_name=None):
136 136 _ = localizer
137 137
138 138 class _validator(formencode.validators.FancyValidator):
139 139 messages = {
140 140 'pattern_exists': _(u'Pattern already exists'),
141 141 }
142 142
143 143 def validate_python(self, value, state):
144 144 if not value:
145 145 return
146 146 model = VcsSettingsModel(repo=repo_name)
147 147 ui_settings = model.get_svn_patterns(section=section)
148 148 for entry in ui_settings:
149 149 if value == entry.value:
150 150 msg = M(self, 'pattern_exists', state)
151 151 raise formencode.Invalid(msg, value, state)
152 152 return _validator
153 153
154 154
155 155 def ValidUsername(localizer, edit=False, old_data=None):
156 156 _ = localizer
157 157 old_data = old_data or {}
158 158
159 159 class _validator(formencode.validators.FancyValidator):
160 160 messages = {
161 161 'username_exists': _(u'Username "%(username)s" already exists'),
162 162 'system_invalid_username':
163 163 _(u'Username "%(username)s" is forbidden'),
164 164 'invalid_username':
165 165 _(u'Username may only contain alphanumeric characters '
166 166 u'underscores, periods or dashes and must begin with '
167 167 u'alphanumeric character or underscore')
168 168 }
169 169
170 170 def validate_python(self, value, state):
171 171 if value in ['default', 'new_user']:
172 172 msg = M(self, 'system_invalid_username', state, username=value)
173 173 raise formencode.Invalid(msg, value, state)
174 174 # check if user is unique
175 175 old_un = None
176 176 if edit:
177 177 old_un = User.get(old_data.get('user_id')).username
178 178
179 179 if old_un != value or not edit:
180 180 if User.get_by_username(value, case_insensitive=True):
181 181 msg = M(self, 'username_exists', state, username=value)
182 182 raise formencode.Invalid(msg, value, state)
183 183
184 184 if (re.match(r'^[\w]{1}[\w\-\.]{0,254}$', value)
185 185 is None):
186 186 msg = M(self, 'invalid_username', state)
187 187 raise formencode.Invalid(msg, value, state)
188 188 return _validator
189 189
190 190
191 191 def ValidRepoUser(localizer, allow_disabled=False):
192 192 _ = localizer
193 193
194 194 class _validator(formencode.validators.FancyValidator):
195 195 messages = {
196 196 'invalid_username': _(u'Username %(username)s is not valid'),
197 197 'disabled_username': _(u'Username %(username)s is disabled')
198 198 }
199 199
200 200 def validate_python(self, value, state):
201 201 try:
202 202 user = User.query().filter(User.username == value).one()
203 203 except Exception:
204 204 msg = M(self, 'invalid_username', state, username=value)
205 205 raise formencode.Invalid(
206 206 msg, value, state, error_dict={'username': msg}
207 207 )
208 208 if user and (not allow_disabled and not user.active):
209 209 msg = M(self, 'disabled_username', state, username=value)
210 210 raise formencode.Invalid(
211 211 msg, value, state, error_dict={'username': msg}
212 212 )
213 213 return _validator
214 214
215 215
216 216 def ValidUserGroup(localizer, edit=False, old_data=None):
217 217 _ = localizer
218 218 old_data = old_data or {}
219 219
220 220 class _validator(formencode.validators.FancyValidator):
221 221 messages = {
222 222 'invalid_group': _(u'Invalid user group name'),
223 223 'group_exist': _(u'User group `%(usergroup)s` already exists'),
224 224 'invalid_usergroup_name':
225 225 _(u'user group name may only contain alphanumeric '
226 226 u'characters underscores, periods or dashes and must begin '
227 227 u'with alphanumeric character')
228 228 }
229 229
230 230 def validate_python(self, value, state):
231 231 if value in ['default']:
232 232 msg = M(self, 'invalid_group', state)
233 233 raise formencode.Invalid(
234 234 msg, value, state, error_dict={'users_group_name': msg}
235 235 )
236 236 # check if group is unique
237 237 old_ugname = None
238 238 if edit:
239 239 old_id = old_data.get('users_group_id')
240 240 old_ugname = UserGroup.get(old_id).users_group_name
241 241
242 242 if old_ugname != value or not edit:
243 243 is_existing_group = UserGroup.get_by_group_name(
244 244 value, case_insensitive=True)
245 245 if is_existing_group:
246 246 msg = M(self, 'group_exist', state, usergroup=value)
247 247 raise formencode.Invalid(
248 248 msg, value, state, error_dict={'users_group_name': msg}
249 249 )
250 250
251 251 if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None:
252 252 msg = M(self, 'invalid_usergroup_name', state)
253 253 raise formencode.Invalid(
254 254 msg, value, state, error_dict={'users_group_name': msg}
255 255 )
256 256 return _validator
257 257
258 258
259 259 def ValidRepoGroup(localizer, edit=False, old_data=None, can_create_in_root=False):
260 260 _ = localizer
261 261 old_data = old_data or {}
262 262
263 263 class _validator(formencode.validators.FancyValidator):
264 264 messages = {
265 265 'group_parent_id': _(u'Cannot assign this group as parent'),
266 266 'group_exists': _(u'Group "%(group_name)s" already exists'),
267 267 'repo_exists': _(u'Repository with name "%(group_name)s" '
268 268 u'already exists'),
269 269 'permission_denied': _(u"no permission to store repository group"
270 270 u"in this location"),
271 271 'permission_denied_root': _(
272 272 u"no permission to store repository group "
273 273 u"in root location")
274 274 }
275 275
276 276 def _to_python(self, value, state):
277 277 group_name = repo_name_slug(value.get('group_name', ''))
278 278 group_parent_id = safe_int(value.get('group_parent_id'))
279 279 gr = RepoGroup.get(group_parent_id)
280 280 if gr:
281 281 parent_group_path = gr.full_path
282 282 # value needs to be aware of group name in order to check
283 283 # db key This is an actual just the name to store in the
284 284 # database
285 285 group_name_full = (
286 286 parent_group_path + RepoGroup.url_sep() + group_name)
287 287 else:
288 288 group_name_full = group_name
289 289
290 290 value['group_name'] = group_name
291 291 value['group_name_full'] = group_name_full
292 292 value['group_parent_id'] = group_parent_id
293 293 return value
294 294
295 295 def validate_python(self, value, state):
296 296
297 297 old_group_name = None
298 298 group_name = value.get('group_name')
299 299 group_name_full = value.get('group_name_full')
300 300 group_parent_id = safe_int(value.get('group_parent_id'))
301 301 if group_parent_id == -1:
302 302 group_parent_id = None
303 303
304 304 group_obj = RepoGroup.get(old_data.get('group_id'))
305 305 parent_group_changed = False
306 306 if edit:
307 307 old_group_name = group_obj.group_name
308 308 old_group_parent_id = group_obj.group_parent_id
309 309
310 310 if group_parent_id != old_group_parent_id:
311 311 parent_group_changed = True
312 312
313 313 # TODO: mikhail: the following if statement is not reached
314 314 # since group_parent_id's OneOf validation fails before.
315 315 # Can be removed.
316 316
317 317 # check against setting a parent of self
318 318 parent_of_self = (
319 319 old_data['group_id'] == group_parent_id
320 320 if group_parent_id else False
321 321 )
322 322 if parent_of_self:
323 323 msg = M(self, 'group_parent_id', state)
324 324 raise formencode.Invalid(
325 325 msg, value, state, error_dict={'group_parent_id': msg}
326 326 )
327 327
328 328 # group we're moving current group inside
329 329 child_group = None
330 330 if group_parent_id:
331 331 child_group = RepoGroup.query().filter(
332 332 RepoGroup.group_id == group_parent_id).scalar()
333 333
334 334 # do a special check that we cannot move a group to one of
335 335 # it's children
336 336 if edit and child_group:
337 337 parents = [x.group_id for x in child_group.parents]
338 338 move_to_children = old_data['group_id'] in parents
339 339 if move_to_children:
340 340 msg = M(self, 'group_parent_id', state)
341 341 raise formencode.Invalid(
342 342 msg, value, state, error_dict={'group_parent_id': msg})
343 343
344 344 # Check if we have permission to store in the parent.
345 345 # Only check if the parent group changed.
346 346 if parent_group_changed:
347 347 if child_group is None:
348 348 if not can_create_in_root:
349 349 msg = M(self, 'permission_denied_root', state)
350 350 raise formencode.Invalid(
351 351 msg, value, state,
352 352 error_dict={'group_parent_id': msg})
353 353 else:
354 354 valid = HasRepoGroupPermissionAny('group.admin')
355 355 forbidden = not valid(
356 356 child_group.group_name, 'can create group validator')
357 357 if forbidden:
358 358 msg = M(self, 'permission_denied', state)
359 359 raise formencode.Invalid(
360 360 msg, value, state,
361 361 error_dict={'group_parent_id': msg})
362 362
363 363 # if we change the name or it's new group, check for existing names
364 364 # or repositories with the same name
365 365 if old_group_name != group_name_full or not edit:
366 366 # check group
367 367 gr = RepoGroup.get_by_group_name(group_name_full)
368 368 if gr:
369 369 msg = M(self, 'group_exists', state, group_name=group_name)
370 370 raise formencode.Invalid(
371 371 msg, value, state, error_dict={'group_name': msg})
372 372
373 373 # check for same repo
374 374 repo = Repository.get_by_repo_name(group_name_full)
375 375 if repo:
376 376 msg = M(self, 'repo_exists', state, group_name=group_name)
377 377 raise formencode.Invalid(
378 378 msg, value, state, error_dict={'group_name': msg})
379 379 return _validator
380 380
381 381
382 382 def ValidPassword(localizer):
383 383 _ = localizer
384 384
385 385 class _validator(formencode.validators.FancyValidator):
386 386 messages = {
387 387 'invalid_password':
388 388 _(u'Invalid characters (non-ascii) in password')
389 389 }
390 390
391 391 def validate_python(self, value, state):
392 392 try:
393 393 (value or '').decode('ascii')
394 394 except UnicodeError:
395 395 msg = M(self, 'invalid_password', state)
396 396 raise formencode.Invalid(msg, value, state,)
397 397 return _validator
398 398
399 399
400 400 def ValidPasswordsMatch(
401 401 localizer, passwd='new_password',
402 402 passwd_confirmation='password_confirmation'):
403 403 _ = localizer
404 404
405 405 class _validator(formencode.validators.FancyValidator):
406 406 messages = {
407 407 'password_mismatch': _(u'Passwords do not match'),
408 408 }
409 409
410 410 def validate_python(self, value, state):
411 411
412 412 pass_val = value.get('password') or value.get(passwd)
413 413 if pass_val != value[passwd_confirmation]:
414 414 msg = M(self, 'password_mismatch', state)
415 415 raise formencode.Invalid(
416 416 msg, value, state,
417 417 error_dict={passwd: msg, passwd_confirmation: msg}
418 418 )
419 419 return _validator
420 420
421 421
422 422 def ValidAuth(localizer):
423 423 _ = localizer
424 424
425 425 class _validator(formencode.validators.FancyValidator):
426 426 messages = {
427 427 'invalid_password': _(u'invalid password'),
428 428 'invalid_username': _(u'invalid user name'),
429 429 'disabled_account': _(u'Your account is disabled')
430 430 }
431 431
432 432 def validate_python(self, value, state):
433 433 from rhodecode.authentication.base import authenticate, HTTP_TYPE
434 434
435 435 password = value['password']
436 436 username = value['username']
437 437
438 438 if not authenticate(username, password, '', HTTP_TYPE,
439 439 skip_missing=True):
440 440 user = User.get_by_username(username)
441 441 if user and not user.active:
442 442 log.warning('user %s is disabled', username)
443 443 msg = M(self, 'disabled_account', state)
444 444 raise formencode.Invalid(
445 445 msg, value, state, error_dict={'username': msg}
446 446 )
447 447 else:
448 448 log.warning('user `%s` failed to authenticate', username)
449 449 msg = M(self, 'invalid_username', state)
450 450 msg2 = M(self, 'invalid_password', state)
451 451 raise formencode.Invalid(
452 452 msg, value, state,
453 453 error_dict={'username': msg, 'password': msg2}
454 454 )
455 455 return _validator
456 456
457 457
458 458 def ValidRepoName(localizer, edit=False, old_data=None):
459 459 old_data = old_data or {}
460 460 _ = localizer
461 461
462 462 class _validator(formencode.validators.FancyValidator):
463 463 messages = {
464 464 'invalid_repo_name':
465 465 _(u'Repository name %(repo)s is disallowed'),
466 466 # top level
467 467 'repository_exists': _(u'Repository with name %(repo)s '
468 468 u'already exists'),
469 469 'group_exists': _(u'Repository group with name "%(repo)s" '
470 470 u'already exists'),
471 471 # inside a group
472 472 'repository_in_group_exists': _(u'Repository with name %(repo)s '
473 473 u'exists in group "%(group)s"'),
474 474 'group_in_group_exists': _(
475 475 u'Repository group with name "%(repo)s" '
476 476 u'exists in group "%(group)s"'),
477 477 }
478 478
479 479 def _to_python(self, value, state):
480 480 repo_name = repo_name_slug(value.get('repo_name', ''))
481 481 repo_group = value.get('repo_group')
482 482 if repo_group:
483 483 gr = RepoGroup.get(repo_group)
484 484 group_path = gr.full_path
485 485 group_name = gr.group_name
486 486 # value needs to be aware of group name in order to check
487 487 # db key This is an actual just the name to store in the
488 488 # database
489 489 repo_name_full = group_path + RepoGroup.url_sep() + repo_name
490 490 else:
491 491 group_name = group_path = ''
492 492 repo_name_full = repo_name
493 493
494 494 value['repo_name'] = repo_name
495 495 value['repo_name_full'] = repo_name_full
496 496 value['group_path'] = group_path
497 497 value['group_name'] = group_name
498 498 return value
499 499
500 500 def validate_python(self, value, state):
501 501
502 502 repo_name = value.get('repo_name')
503 503 repo_name_full = value.get('repo_name_full')
504 504 group_path = value.get('group_path')
505 505 group_name = value.get('group_name')
506 506
507 507 if repo_name in [ADMIN_PREFIX, '']:
508 508 msg = M(self, 'invalid_repo_name', state, repo=repo_name)
509 509 raise formencode.Invalid(
510 510 msg, value, state, error_dict={'repo_name': msg})
511 511
512 512 rename = old_data.get('repo_name') != repo_name_full
513 513 create = not edit
514 514 if rename or create:
515 515
516 516 if group_path:
517 517 if Repository.get_by_repo_name(repo_name_full):
518 518 msg = M(self, 'repository_in_group_exists', state,
519 519 repo=repo_name, group=group_name)
520 520 raise formencode.Invalid(
521 521 msg, value, state, error_dict={'repo_name': msg})
522 522 if RepoGroup.get_by_group_name(repo_name_full):
523 523 msg = M(self, 'group_in_group_exists', state,
524 524 repo=repo_name, group=group_name)
525 525 raise formencode.Invalid(
526 526 msg, value, state, error_dict={'repo_name': msg})
527 527 else:
528 528 if RepoGroup.get_by_group_name(repo_name_full):
529 529 msg = M(self, 'group_exists', state, repo=repo_name)
530 530 raise formencode.Invalid(
531 531 msg, value, state, error_dict={'repo_name': msg})
532 532
533 533 if Repository.get_by_repo_name(repo_name_full):
534 534 msg = M(
535 535 self, 'repository_exists', state, repo=repo_name)
536 536 raise formencode.Invalid(
537 537 msg, value, state, error_dict={'repo_name': msg})
538 538 return value
539 539 return _validator
540 540
541 541
542 542 def ValidForkName(localizer, *args, **kwargs):
543 543 _ = localizer
544 544
545 545 return ValidRepoName(localizer, *args, **kwargs)
546 546
547 547
548 548 def SlugifyName(localizer):
549 549 _ = localizer
550 550
551 551 class _validator(formencode.validators.FancyValidator):
552 552
553 553 def _to_python(self, value, state):
554 554 return repo_name_slug(value)
555 555
556 556 def validate_python(self, value, state):
557 557 pass
558 558 return _validator
559 559
560 560
561 561 def CannotHaveGitSuffix(localizer):
562 562 _ = localizer
563 563
564 564 class _validator(formencode.validators.FancyValidator):
565 565 messages = {
566 566 'has_git_suffix':
567 567 _(u'Repository name cannot end with .git'),
568 568 }
569 569
570 570 def _to_python(self, value, state):
571 571 return value
572 572
573 573 def validate_python(self, value, state):
574 574 if value and value.endswith('.git'):
575 575 msg = M(
576 576 self, 'has_git_suffix', state)
577 577 raise formencode.Invalid(
578 578 msg, value, state, error_dict={'repo_name': msg})
579 579 return _validator
580 580
581 581
582 582 def ValidCloneUri(localizer):
583 583 _ = localizer
584 584
585 585 class InvalidCloneUrl(Exception):
586 586 allowed_prefixes = ()
587 587
588 588 def url_handler(repo_type, url):
589 589 config = make_db_config(clear_session=False)
590 590 if repo_type == 'hg':
591 591 allowed_prefixes = ('http', 'svn+http', 'git+http')
592 592
593 593 if 'http' in url[:4]:
594 594 # initially check if it's at least the proper URL
595 595 # or does it pass basic auth
596 596 MercurialRepository.check_url(url, config)
597 597 elif 'svn+http' in url[:8]: # svn->hg import
598 598 SubversionRepository.check_url(url, config)
599 599 elif 'git+http' in url[:8]: # git->hg import
600 600 raise NotImplementedError()
601 601 else:
602 602 exc = InvalidCloneUrl('Clone from URI %s not allowed. '
603 603 'Allowed url must start with one of %s'
604 604 % (url, ','.join(allowed_prefixes)))
605 605 exc.allowed_prefixes = allowed_prefixes
606 606 raise exc
607 607
608 608 elif repo_type == 'git':
609 609 allowed_prefixes = ('http', 'svn+http', 'hg+http')
610 610 if 'http' in url[:4]:
611 611 # initially check if it's at least the proper URL
612 612 # or does it pass basic auth
613 613 GitRepository.check_url(url, config)
614 614 elif 'svn+http' in url[:8]: # svn->git import
615 615 raise NotImplementedError()
616 616 elif 'hg+http' in url[:8]: # hg->git import
617 617 raise NotImplementedError()
618 618 else:
619 619 exc = InvalidCloneUrl('Clone from URI %s not allowed. '
620 620 'Allowed url must start with one of %s'
621 621 % (url, ','.join(allowed_prefixes)))
622 622 exc.allowed_prefixes = allowed_prefixes
623 623 raise exc
624 624
625 625 class _validator(formencode.validators.FancyValidator):
626 626 messages = {
627 627 'clone_uri': _(u'invalid clone url for %(rtype)s repository'),
628 628 'invalid_clone_uri': _(
629 629 u'Invalid clone url, provide a valid clone '
630 630 u'url starting with one of %(allowed_prefixes)s')
631 631 }
632 632
633 633 def validate_python(self, value, state):
634 634 repo_type = value.get('repo_type')
635 635 url = value.get('clone_uri')
636 636
637 637 if url:
638 638 try:
639 639 url_handler(repo_type, url)
640 640 except InvalidCloneUrl as e:
641 641 log.warning(e)
642 642 msg = M(self, 'invalid_clone_uri', state, rtype=repo_type,
643 643 allowed_prefixes=','.join(e.allowed_prefixes))
644 644 raise formencode.Invalid(msg, value, state,
645 645 error_dict={'clone_uri': msg})
646 646 except Exception:
647 647 log.exception('Url validation failed')
648 648 msg = M(self, 'clone_uri', state, rtype=repo_type)
649 649 raise formencode.Invalid(msg, value, state,
650 650 error_dict={'clone_uri': msg})
651 651 return _validator
652 652
653 653
654 654 def ValidForkType(localizer, old_data=None):
655 655 _ = localizer
656 656 old_data = old_data or {}
657 657
658 658 class _validator(formencode.validators.FancyValidator):
659 659 messages = {
660 660 'invalid_fork_type': _(u'Fork have to be the same type as parent')
661 661 }
662 662
663 663 def validate_python(self, value, state):
664 664 if old_data['repo_type'] != value:
665 665 msg = M(self, 'invalid_fork_type', state)
666 666 raise formencode.Invalid(
667 667 msg, value, state, error_dict={'repo_type': msg}
668 668 )
669 669 return _validator
670 670
671 671
672 672 def CanWriteGroup(localizer, old_data=None):
673 673 _ = localizer
674 674
675 675 class _validator(formencode.validators.FancyValidator):
676 676 messages = {
677 677 'permission_denied': _(
678 678 u"You do not have the permission "
679 679 u"to create repositories in this group."),
680 680 'permission_denied_root': _(
681 681 u"You do not have the permission to store repositories in "
682 682 u"the root location.")
683 683 }
684 684
685 685 def _to_python(self, value, state):
686 686 # root location
687 687 if value in [-1, "-1"]:
688 688 return None
689 689 return value
690 690
691 691 def validate_python(self, value, state):
692 692 gr = RepoGroup.get(value)
693 693 gr_name = gr.group_name if gr else None # None means ROOT location
694 694 # create repositories with write permission on group is set to true
695 695 create_on_write = HasPermissionAny(
696 696 'hg.create.write_on_repogroup.true')()
697 697 group_admin = HasRepoGroupPermissionAny('group.admin')(
698 698 gr_name, 'can write into group validator')
699 699 group_write = HasRepoGroupPermissionAny('group.write')(
700 700 gr_name, 'can write into group validator')
701 701 forbidden = not (group_admin or (group_write and create_on_write))
702 702 can_create_repos = HasPermissionAny(
703 703 'hg.admin', 'hg.create.repository')
704 704 gid = (old_data['repo_group'].get('group_id')
705 705 if (old_data and 'repo_group' in old_data) else None)
706 706 value_changed = gid != safe_int(value)
707 707 new = not old_data
708 708 # do check if we changed the value, there's a case that someone got
709 709 # revoked write permissions to a repository, he still created, we
710 710 # don't need to check permission if he didn't change the value of
711 711 # groups in form box
712 712 if value_changed or new:
713 713 # parent group need to be existing
714 714 if gr and forbidden:
715 715 msg = M(self, 'permission_denied', state)
716 716 raise formencode.Invalid(
717 717 msg, value, state, error_dict={'repo_type': msg}
718 718 )
719 719 # check if we can write to root location !
720 720 elif gr is None and not can_create_repos():
721 721 msg = M(self, 'permission_denied_root', state)
722 722 raise formencode.Invalid(
723 723 msg, value, state, error_dict={'repo_type': msg}
724 724 )
725 725 return _validator
726 726
727 727
728 728 def ValidPerms(localizer, type_='repo'):
729 729 _ = localizer
730 730 if type_ == 'repo_group':
731 731 EMPTY_PERM = 'group.none'
732 732 elif type_ == 'repo':
733 733 EMPTY_PERM = 'repository.none'
734 734 elif type_ == 'user_group':
735 735 EMPTY_PERM = 'usergroup.none'
736 736
737 737 class _validator(formencode.validators.FancyValidator):
738 738 messages = {
739 739 'perm_new_member_name':
740 740 _(u'This username or user group name is not valid')
741 741 }
742 742
743 743 def _to_python(self, value, state):
744 744 perm_updates = OrderedSet()
745 745 perm_additions = OrderedSet()
746 746 perm_deletions = OrderedSet()
747 747 # build a list of permission to update/delete and new permission
748 748
749 749 # Read the perm_new_member/perm_del_member attributes and group
750 750 # them by they IDs
751 751 new_perms_group = collections.defaultdict(dict)
752 752 del_perms_group = collections.defaultdict(dict)
753 753 for k, v in value.copy().iteritems():
754 754 if k.startswith('perm_del_member'):
755 755 # delete from org storage so we don't process that later
756 756 del value[k]
757 757 # part is `id`, `type`
758 758 _type, part = k.split('perm_del_member_')
759 759 args = part.split('_')
760 760 if len(args) == 2:
761 761 _key, pos = args
762 762 del_perms_group[pos][_key] = v
763 763 if k.startswith('perm_new_member'):
764 764 # delete from org storage so we don't process that later
765 765 del value[k]
766 766 # part is `id`, `type`, `perm`
767 767 _type, part = k.split('perm_new_member_')
768 768 args = part.split('_')
769 769 if len(args) == 2:
770 770 _key, pos = args
771 771 new_perms_group[pos][_key] = v
772 772
773 773 # store the deletes
774 774 for k in sorted(del_perms_group.keys()):
775 775 perm_dict = del_perms_group[k]
776 776 del_member = perm_dict.get('id')
777 777 del_type = perm_dict.get('type')
778 778 if del_member and del_type:
779 779 perm_deletions.add(
780 780 (del_member, None, del_type))
781 781
782 782 # store additions in order of how they were added in web form
783 783 for k in sorted(new_perms_group.keys()):
784 784 perm_dict = new_perms_group[k]
785 785 new_member = perm_dict.get('id')
786 786 new_type = perm_dict.get('type')
787 787 new_perm = perm_dict.get('perm')
788 788 if new_member and new_perm and new_type:
789 789 perm_additions.add(
790 790 (new_member, new_perm, new_type))
791 791
792 792 # get updates of permissions
793 793 # (read the existing radio button states)
794 default_user_id = User.get_default_user().user_id
794 default_user_id = User.get_default_user_id()
795 795
796 796 for k, update_value in value.iteritems():
797 797 if k.startswith('u_perm_') or k.startswith('g_perm_'):
798 798 obj_type = k[0]
799 799 obj_id = k[7:]
800 800 update_type = {'u': 'user',
801 801 'g': 'user_group'}[obj_type]
802 802
803 803 if obj_type == 'u' and safe_int(obj_id) == default_user_id:
804 804 if str2bool(value.get('repo_private')):
805 805 # prevent from updating default user permissions
806 806 # when this repository is marked as private
807 807 update_value = EMPTY_PERM
808 808
809 809 perm_updates.add(
810 810 (obj_id, update_value, update_type))
811 811
812 812 value['perm_additions'] = [] # propagated later
813 813 value['perm_updates'] = list(perm_updates)
814 814 value['perm_deletions'] = list(perm_deletions)
815 815
816 816 updates_map = dict(
817 817 (x[0], (x[1], x[2])) for x in value['perm_updates'])
818 818 # make sure Additions don't override updates.
819 819 for member_id, perm, member_type in list(perm_additions):
820 820 if member_id in updates_map:
821 821 perm = updates_map[member_id][0]
822 822 value['perm_additions'].append((member_id, perm, member_type))
823 823
824 824 # on new entries validate users they exist and they are active !
825 825 # this leaves feedback to the form
826 826 try:
827 827 if member_type == 'user':
828 828 User.query()\
829 829 .filter(User.active == true())\
830 830 .filter(User.user_id == member_id).one()
831 831 if member_type == 'user_group':
832 832 UserGroup.query()\
833 833 .filter(UserGroup.users_group_active == true())\
834 834 .filter(UserGroup.users_group_id == member_id)\
835 835 .one()
836 836
837 837 except Exception:
838 838 log.exception('Updated permission failed: org_exc:')
839 839 msg = M(self, 'perm_new_member_type', state)
840 840 raise formencode.Invalid(
841 841 msg, value, state, error_dict={
842 842 'perm_new_member_name': msg}
843 843 )
844 844 return value
845 845 return _validator
846 846
847 847
848 848 def ValidPath(localizer):
849 849 _ = localizer
850 850
851 851 class _validator(formencode.validators.FancyValidator):
852 852 messages = {
853 853 'invalid_path': _(u'This is not a valid path')
854 854 }
855 855
856 856 def validate_python(self, value, state):
857 857 if not os.path.isdir(value):
858 858 msg = M(self, 'invalid_path', state)
859 859 raise formencode.Invalid(
860 860 msg, value, state, error_dict={'paths_root_path': msg}
861 861 )
862 862 return _validator
863 863
864 864
865 865 def UniqSystemEmail(localizer, old_data=None):
866 866 _ = localizer
867 867 old_data = old_data or {}
868 868
869 869 class _validator(formencode.validators.FancyValidator):
870 870 messages = {
871 871 'email_taken': _(u'This e-mail address is already taken')
872 872 }
873 873
874 874 def _to_python(self, value, state):
875 875 return value.lower()
876 876
877 877 def validate_python(self, value, state):
878 878 if (old_data.get('email') or '').lower() != value:
879 879 user = User.get_by_email(value, case_insensitive=True)
880 880 if user:
881 881 msg = M(self, 'email_taken', state)
882 882 raise formencode.Invalid(
883 883 msg, value, state, error_dict={'email': msg}
884 884 )
885 885 return _validator
886 886
887 887
888 888 def ValidSystemEmail(localizer):
889 889 _ = localizer
890 890
891 891 class _validator(formencode.validators.FancyValidator):
892 892 messages = {
893 893 'non_existing_email': _(u'e-mail "%(email)s" does not exist.')
894 894 }
895 895
896 896 def _to_python(self, value, state):
897 897 return value.lower()
898 898
899 899 def validate_python(self, value, state):
900 900 user = User.get_by_email(value, case_insensitive=True)
901 901 if user is None:
902 902 msg = M(self, 'non_existing_email', state, email=value)
903 903 raise formencode.Invalid(
904 904 msg, value, state, error_dict={'email': msg}
905 905 )
906 906 return _validator
907 907
908 908
909 909 def NotReviewedRevisions(localizer, repo_id):
910 910 _ = localizer
911 911 class _validator(formencode.validators.FancyValidator):
912 912 messages = {
913 913 'rev_already_reviewed':
914 914 _(u'Revisions %(revs)s are already part of pull request '
915 915 u'or have set status'),
916 916 }
917 917
918 918 def validate_python(self, value, state):
919 919 # check revisions if they are not reviewed, or a part of another
920 920 # pull request
921 921 statuses = ChangesetStatus.query()\
922 922 .filter(ChangesetStatus.revision.in_(value))\
923 923 .filter(ChangesetStatus.repo_id == repo_id)\
924 924 .all()
925 925
926 926 errors = []
927 927 for status in statuses:
928 928 if status.pull_request_id:
929 929 errors.append(['pull_req', status.revision[:12]])
930 930 elif status.status:
931 931 errors.append(['status', status.revision[:12]])
932 932
933 933 if errors:
934 934 revs = ','.join([x[1] for x in errors])
935 935 msg = M(self, 'rev_already_reviewed', state, revs=revs)
936 936 raise formencode.Invalid(
937 937 msg, value, state, error_dict={'revisions': revs})
938 938
939 939 return _validator
940 940
941 941
942 942 def ValidIp(localizer):
943 943 _ = localizer
944 944
945 945 class _validator(CIDR):
946 946 messages = {
947 947 'badFormat': _(u'Please enter a valid IPv4 or IpV6 address'),
948 948 'illegalBits': _(
949 949 u'The network size (bits) must be within the range '
950 950 u'of 0-32 (not %(bits)r)'),
951 951 }
952 952
953 953 # we ovveride the default to_python() call
954 954 def to_python(self, value, state):
955 955 v = super(_validator, self).to_python(value, state)
956 956 v = safe_unicode(v.strip())
957 957 net = ipaddress.ip_network(address=v, strict=False)
958 958 return str(net)
959 959
960 960 def validate_python(self, value, state):
961 961 try:
962 962 addr = safe_unicode(value.strip())
963 963 # this raises an ValueError if address is not IpV4 or IpV6
964 964 ipaddress.ip_network(addr, strict=False)
965 965 except ValueError:
966 966 raise formencode.Invalid(self.message('badFormat', state),
967 967 value, state)
968 968 return _validator
969 969
970 970
971 971 def FieldKey(localizer):
972 972 _ = localizer
973 973
974 974 class _validator(formencode.validators.FancyValidator):
975 975 messages = {
976 976 'badFormat': _(
977 977 u'Key name can only consist of letters, '
978 978 u'underscore, dash or numbers'),
979 979 }
980 980
981 981 def validate_python(self, value, state):
982 982 if not re.match('[a-zA-Z0-9_-]+$', value):
983 983 raise formencode.Invalid(self.message('badFormat', state),
984 984 value, state)
985 985 return _validator
986 986
987 987
988 988 def ValidAuthPlugins(localizer):
989 989 _ = localizer
990 990
991 991 class _validator(formencode.validators.FancyValidator):
992 992 messages = {
993 993 'import_duplicate': _(
994 994 u'Plugins %(loaded)s and %(next_to_load)s '
995 995 u'both export the same name'),
996 996 'missing_includeme': _(
997 997 u'The plugin "%(plugin_id)s" is missing an includeme '
998 998 u'function.'),
999 999 'import_error': _(
1000 1000 u'Can not load plugin "%(plugin_id)s"'),
1001 1001 'no_plugin': _(
1002 1002 u'No plugin available with ID "%(plugin_id)s"'),
1003 1003 }
1004 1004
1005 1005 def _to_python(self, value, state):
1006 1006 # filter empty values
1007 1007 return filter(lambda s: s not in [None, ''], value)
1008 1008
1009 1009 def _validate_legacy_plugin_id(self, plugin_id, value, state):
1010 1010 """
1011 1011 Validates that the plugin import works. It also checks that the
1012 1012 plugin has an includeme attribute.
1013 1013 """
1014 1014 try:
1015 1015 plugin = _import_legacy_plugin(plugin_id)
1016 1016 except Exception as e:
1017 1017 log.exception(
1018 1018 'Exception during import of auth legacy plugin "{}"'
1019 1019 .format(plugin_id))
1020 1020 msg = M(self, 'import_error', state, plugin_id=plugin_id)
1021 1021 raise formencode.Invalid(msg, value, state)
1022 1022
1023 1023 if not hasattr(plugin, 'includeme'):
1024 1024 msg = M(self, 'missing_includeme', state, plugin_id=plugin_id)
1025 1025 raise formencode.Invalid(msg, value, state)
1026 1026
1027 1027 return plugin
1028 1028
1029 1029 def _validate_plugin_id(self, plugin_id, value, state):
1030 1030 """
1031 1031 Plugins are already imported during app start up. Therefore this
1032 1032 validation only retrieves the plugin from the plugin registry and
1033 1033 if it returns something not None everything is OK.
1034 1034 """
1035 1035 plugin = loadplugin(plugin_id)
1036 1036
1037 1037 if plugin is None:
1038 1038 msg = M(self, 'no_plugin', state, plugin_id=plugin_id)
1039 1039 raise formencode.Invalid(msg, value, state)
1040 1040
1041 1041 return plugin
1042 1042
1043 1043 def validate_python(self, value, state):
1044 1044 unique_names = {}
1045 1045 for plugin_id in value:
1046 1046
1047 1047 # Validate legacy or normal plugin.
1048 1048 if plugin_id.startswith(legacy_plugin_prefix):
1049 1049 plugin = self._validate_legacy_plugin_id(
1050 1050 plugin_id, value, state)
1051 1051 else:
1052 1052 plugin = self._validate_plugin_id(plugin_id, value, state)
1053 1053
1054 1054 # Only allow unique plugin names.
1055 1055 if plugin.name in unique_names:
1056 1056 msg = M(self, 'import_duplicate', state,
1057 1057 loaded=unique_names[plugin.name],
1058 1058 next_to_load=plugin)
1059 1059 raise formencode.Invalid(msg, value, state)
1060 1060 unique_names[plugin.name] = plugin
1061 1061 return _validator
1062 1062
1063 1063
1064 1064 def ValidPattern(localizer):
1065 1065 _ = localizer
1066 1066
1067 1067 class _validator(formencode.validators.FancyValidator):
1068 1068 messages = {
1069 1069 'bad_format': _(u'Url must start with http or /'),
1070 1070 }
1071 1071
1072 1072 def _to_python(self, value, state):
1073 1073 patterns = []
1074 1074
1075 1075 prefix = 'new_pattern'
1076 1076 for name, v in value.iteritems():
1077 1077 pattern_name = '_'.join((prefix, 'pattern'))
1078 1078 if name.startswith(pattern_name):
1079 1079 new_item_id = name[len(pattern_name)+1:]
1080 1080
1081 1081 def _field(name):
1082 1082 return '%s_%s_%s' % (prefix, name, new_item_id)
1083 1083
1084 1084 values = {
1085 1085 'issuetracker_pat': value.get(_field('pattern')),
1086 1086 'issuetracker_url': value.get(_field('url')),
1087 1087 'issuetracker_pref': value.get(_field('prefix')),
1088 1088 'issuetracker_desc': value.get(_field('description'))
1089 1089 }
1090 1090 new_uid = md5(values['issuetracker_pat'])
1091 1091
1092 1092 has_required_fields = (
1093 1093 values['issuetracker_pat']
1094 1094 and values['issuetracker_url'])
1095 1095
1096 1096 if has_required_fields:
1097 1097 # validate url that it starts with http or /
1098 1098 # otherwise it can lead to JS injections
1099 1099 # e.g specifig javascript:<malicios code>
1100 1100 if not values['issuetracker_url'].startswith(('http', '/')):
1101 1101 raise formencode.Invalid(
1102 1102 self.message('bad_format', state),
1103 1103 value, state)
1104 1104
1105 1105 settings = [
1106 1106 ('_'.join((key, new_uid)), values[key], 'unicode')
1107 1107 for key in values]
1108 1108 patterns.append(settings)
1109 1109
1110 1110 value['patterns'] = patterns
1111 1111 delete_patterns = value.get('uid') or []
1112 1112 if not isinstance(delete_patterns, (list, tuple)):
1113 1113 delete_patterns = [delete_patterns]
1114 1114 value['delete_patterns'] = delete_patterns
1115 1115 return value
1116 1116 return _validator
@@ -1,632 +1,632 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 from hashlib import sha1
23 23
24 24 import pytest
25 25 from mock import patch
26 26
27 27 from rhodecode.lib import auth
28 28 from rhodecode.lib.utils2 import md5
29 29 from rhodecode.model.auth_token import AuthTokenModel
30 30 from rhodecode.model.db import Session, User
31 31 from rhodecode.model.repo import RepoModel
32 32 from rhodecode.model.user import UserModel
33 33 from rhodecode.model.user_group import UserGroupModel
34 34
35 35
36 36 def test_perm_origin_dict():
37 37 pod = auth.PermOriginDict()
38 38 pod['thing'] = 'read', 'default', 1
39 39 assert pod['thing'] == 'read'
40 40
41 41 assert pod.perm_origin_stack == {
42 42 'thing': [('read', 'default', 1)]}
43 43
44 44 pod['thing'] = 'write', 'admin', 1
45 45 assert pod['thing'] == 'write'
46 46
47 47 assert pod.perm_origin_stack == {
48 48 'thing': [('read', 'default', 1), ('write', 'admin', 1)]}
49 49
50 50 pod['other'] = 'write', 'default', 8
51 51
52 52 assert pod.perm_origin_stack == {
53 53 'other': [('write', 'default', 8)],
54 54 'thing': [('read', 'default', 1), ('write', 'admin', 1)]}
55 55
56 56 pod['other'] = 'none', 'override', 8
57 57
58 58 assert pod.perm_origin_stack == {
59 59 'other': [('write', 'default', 8), ('none', 'override', 8)],
60 60 'thing': [('read', 'default', 1), ('write', 'admin', 1)]}
61 61
62 62 with pytest.raises(ValueError):
63 63 pod['thing'] = 'read'
64 64
65 65
66 66 def test_cached_perms_data(user_regular, backend_random):
67 67 permissions = get_permissions(user_regular)
68 68 repo_name = backend_random.repo.repo_name
69 69 expected_global_permissions = {
70 70 'repository.read', 'group.read', 'usergroup.read'}
71 71 assert expected_global_permissions.issubset(permissions['global'])
72 72 assert permissions['repositories'][repo_name] == 'repository.read'
73 73
74 74
75 75 def test_cached_perms_data_with_admin_user(user_regular, backend_random):
76 76 permissions = get_permissions(user_regular, user_is_admin=True)
77 77 repo_name = backend_random.repo.repo_name
78 78 assert 'hg.admin' in permissions['global']
79 79 assert permissions['repositories'][repo_name] == 'repository.admin'
80 80
81 81
82 82 def test_cached_perms_data_with_admin_user_extended_calculation(user_regular, backend_random):
83 83 permissions = get_permissions(user_regular, user_is_admin=True,
84 84 calculate_super_admin=True)
85 85 repo_name = backend_random.repo.repo_name
86 86 assert 'hg.admin' in permissions['global']
87 87 assert permissions['repositories'][repo_name] == 'repository.admin'
88 88
89 89
90 90 def test_cached_perms_data_user_group_global_permissions(user_util):
91 91 user, user_group = user_util.create_user_with_group()
92 92 user_group.inherit_default_permissions = False
93 93
94 94 granted_permission = 'repository.write'
95 95 UserGroupModel().grant_perm(user_group, granted_permission)
96 96 Session().commit()
97 97
98 98 permissions = get_permissions(user)
99 99 assert granted_permission in permissions['global']
100 100
101 101
102 102 @pytest.mark.xfail(reason="Not implemented, see TODO note")
103 103 def test_cached_perms_data_user_group_global_permissions_(user_util):
104 104 user, user_group = user_util.create_user_with_group()
105 105
106 106 granted_permission = 'repository.write'
107 107 UserGroupModel().grant_perm(user_group, granted_permission)
108 108 Session().commit()
109 109
110 110 permissions = get_permissions(user)
111 111 assert granted_permission in permissions['global']
112 112
113 113
114 114 def test_cached_perms_data_user_global_permissions(user_util):
115 115 user = user_util.create_user()
116 116 UserModel().grant_perm(user, 'repository.none')
117 117 Session().commit()
118 118
119 119 permissions = get_permissions(user, user_inherit_default_permissions=True)
120 120 assert 'repository.read' in permissions['global']
121 121
122 122
123 123 def test_cached_perms_data_repository_permissions_on_private_repository(
124 124 backend_random, user_util):
125 125 user, user_group = user_util.create_user_with_group()
126 126
127 127 repo = backend_random.create_repo()
128 128 repo.private = True
129 129
130 130 granted_permission = 'repository.write'
131 131 RepoModel().grant_user_group_permission(
132 132 repo, user_group.users_group_name, granted_permission)
133 133 Session().commit()
134 134
135 135 permissions = get_permissions(user)
136 136 assert permissions['repositories'][repo.repo_name] == granted_permission
137 137
138 138
139 139 def test_cached_perms_data_repository_permissions_for_owner(
140 140 backend_random, user_util):
141 141 user = user_util.create_user()
142 142
143 143 repo = backend_random.create_repo()
144 144 repo.user_id = user.user_id
145 145
146 146 permissions = get_permissions(user)
147 147 assert permissions['repositories'][repo.repo_name] == 'repository.admin'
148 148
149 149 # TODO: johbo: Make cleanup in UserUtility smarter, then remove this hack
150 repo.user_id = User.get_default_user().user_id
150 repo.user_id = User.get_default_user_id()
151 151
152 152
153 153 def test_cached_perms_data_repository_permissions_not_inheriting_defaults(
154 154 backend_random, user_util):
155 155 user = user_util.create_user()
156 156 repo = backend_random.create_repo()
157 157
158 158 # Don't inherit default object permissions
159 159 UserModel().grant_perm(user, 'hg.inherit_default_perms.false')
160 160 Session().commit()
161 161
162 162 permissions = get_permissions(user)
163 163 assert permissions['repositories'][repo.repo_name] == 'repository.none'
164 164
165 165
166 166 def test_cached_perms_data_default_permissions_on_repository_group(user_util):
167 167 # Have a repository group with default permissions set
168 168 repo_group = user_util.create_repo_group()
169 169 default_user = User.get_default_user()
170 170 user_util.grant_user_permission_to_repo_group(
171 171 repo_group, default_user, 'repository.write')
172 172 user = user_util.create_user()
173 173
174 174 permissions = get_permissions(user)
175 175 assert permissions['repositories_groups'][repo_group.group_name] == \
176 176 'repository.write'
177 177
178 178
179 179 def test_cached_perms_data_default_permissions_on_repository_group_owner(
180 180 user_util):
181 181 # Have a repository group
182 182 repo_group = user_util.create_repo_group()
183 183 default_user = User.get_default_user()
184 184
185 185 # Add a permission for the default user to hit the code path
186 186 user_util.grant_user_permission_to_repo_group(
187 187 repo_group, default_user, 'repository.write')
188 188
189 189 # Have an owner of the group
190 190 user = user_util.create_user()
191 191 repo_group.user_id = user.user_id
192 192
193 193 permissions = get_permissions(user)
194 194 assert permissions['repositories_groups'][repo_group.group_name] == \
195 195 'group.admin'
196 196
197 197
198 198 def test_cached_perms_data_default_permissions_on_repository_group_no_inherit(
199 199 user_util):
200 200 # Have a repository group
201 201 repo_group = user_util.create_repo_group()
202 202 default_user = User.get_default_user()
203 203
204 204 # Add a permission for the default user to hit the code path
205 205 user_util.grant_user_permission_to_repo_group(
206 206 repo_group, default_user, 'repository.write')
207 207
208 208 # Don't inherit default object permissions
209 209 user = user_util.create_user()
210 210 UserModel().grant_perm(user, 'hg.inherit_default_perms.false')
211 211 Session().commit()
212 212
213 213 permissions = get_permissions(user)
214 214 assert permissions['repositories_groups'][repo_group.group_name] == \
215 215 'group.none'
216 216
217 217
218 218 def test_cached_perms_data_repository_permissions_from_user_group(
219 219 user_util, backend_random):
220 220 user, user_group = user_util.create_user_with_group()
221 221
222 222 # Needs a second user group to make sure that we select the right
223 223 # permissions.
224 224 user_group2 = user_util.create_user_group()
225 225 UserGroupModel().add_user_to_group(user_group2, user)
226 226
227 227 repo = backend_random.create_repo()
228 228
229 229 RepoModel().grant_user_group_permission(
230 230 repo, user_group.users_group_name, 'repository.read')
231 231 RepoModel().grant_user_group_permission(
232 232 repo, user_group2.users_group_name, 'repository.write')
233 233 Session().commit()
234 234
235 235 permissions = get_permissions(user)
236 236 assert permissions['repositories'][repo.repo_name] == 'repository.write'
237 237
238 238
239 239 def test_cached_perms_data_repository_permissions_from_user_group_owner(
240 240 user_util, backend_random):
241 241 user, user_group = user_util.create_user_with_group()
242 242
243 243 repo = backend_random.create_repo()
244 244 repo.user_id = user.user_id
245 245
246 246 RepoModel().grant_user_group_permission(
247 247 repo, user_group.users_group_name, 'repository.write')
248 248 Session().commit()
249 249
250 250 permissions = get_permissions(user)
251 251 assert permissions['repositories'][repo.repo_name] == 'repository.admin'
252 252
253 253
254 254 def test_cached_perms_data_user_repository_permissions(
255 255 user_util, backend_random):
256 256 user = user_util.create_user()
257 257 repo = backend_random.create_repo()
258 258 granted_permission = 'repository.write'
259 259 RepoModel().grant_user_permission(repo, user, granted_permission)
260 260 Session().commit()
261 261
262 262 permissions = get_permissions(user)
263 263 assert permissions['repositories'][repo.repo_name] == granted_permission
264 264
265 265
266 266 def test_cached_perms_data_user_repository_permissions_explicit(
267 267 user_util, backend_random):
268 268 user = user_util.create_user()
269 269 repo = backend_random.create_repo()
270 270 granted_permission = 'repository.none'
271 271 RepoModel().grant_user_permission(repo, user, granted_permission)
272 272 Session().commit()
273 273
274 274 permissions = get_permissions(user, explicit=True)
275 275 assert permissions['repositories'][repo.repo_name] == granted_permission
276 276
277 277
278 278 def test_cached_perms_data_user_repository_permissions_owner(
279 279 user_util, backend_random):
280 280 user = user_util.create_user()
281 281 repo = backend_random.create_repo()
282 282 repo.user_id = user.user_id
283 283 RepoModel().grant_user_permission(repo, user, 'repository.write')
284 284 Session().commit()
285 285
286 286 permissions = get_permissions(user)
287 287 assert permissions['repositories'][repo.repo_name] == 'repository.admin'
288 288
289 289
290 290 def test_cached_perms_data_repository_groups_permissions_inherited(
291 291 user_util, backend_random):
292 292 user, user_group = user_util.create_user_with_group()
293 293
294 294 # Needs a second group to hit the last condition
295 295 user_group2 = user_util.create_user_group()
296 296 UserGroupModel().add_user_to_group(user_group2, user)
297 297
298 298 repo_group = user_util.create_repo_group()
299 299
300 300 user_util.grant_user_group_permission_to_repo_group(
301 301 repo_group, user_group, 'group.read')
302 302 user_util.grant_user_group_permission_to_repo_group(
303 303 repo_group, user_group2, 'group.write')
304 304
305 305 permissions = get_permissions(user)
306 306 assert permissions['repositories_groups'][repo_group.group_name] == \
307 307 'group.write'
308 308
309 309
310 310 def test_cached_perms_data_repository_groups_permissions_inherited_owner(
311 311 user_util, backend_random):
312 312 user, user_group = user_util.create_user_with_group()
313 313 repo_group = user_util.create_repo_group()
314 314 repo_group.user_id = user.user_id
315 315
316 316 granted_permission = 'group.write'
317 317 user_util.grant_user_group_permission_to_repo_group(
318 318 repo_group, user_group, granted_permission)
319 319
320 320 permissions = get_permissions(user)
321 321 assert permissions['repositories_groups'][repo_group.group_name] == \
322 322 'group.admin'
323 323
324 324
325 325 def test_cached_perms_data_repository_groups_permissions(
326 326 user_util, backend_random):
327 327 user = user_util.create_user()
328 328
329 329 repo_group = user_util.create_repo_group()
330 330
331 331 granted_permission = 'group.write'
332 332 user_util.grant_user_permission_to_repo_group(
333 333 repo_group, user, granted_permission)
334 334
335 335 permissions = get_permissions(user)
336 336 assert permissions['repositories_groups'][repo_group.group_name] == \
337 337 'group.write'
338 338
339 339
340 340 def test_cached_perms_data_repository_groups_permissions_explicit(
341 341 user_util, backend_random):
342 342 user = user_util.create_user()
343 343
344 344 repo_group = user_util.create_repo_group()
345 345
346 346 granted_permission = 'group.none'
347 347 user_util.grant_user_permission_to_repo_group(
348 348 repo_group, user, granted_permission)
349 349
350 350 permissions = get_permissions(user, explicit=True)
351 351 assert permissions['repositories_groups'][repo_group.group_name] == \
352 352 'group.none'
353 353
354 354
355 355 def test_cached_perms_data_repository_groups_permissions_owner(
356 356 user_util, backend_random):
357 357 user = user_util.create_user()
358 358
359 359 repo_group = user_util.create_repo_group()
360 360 repo_group.user_id = user.user_id
361 361
362 362 granted_permission = 'group.write'
363 363 user_util.grant_user_permission_to_repo_group(
364 364 repo_group, user, granted_permission)
365 365
366 366 permissions = get_permissions(user)
367 367 assert permissions['repositories_groups'][repo_group.group_name] == \
368 368 'group.admin'
369 369
370 370
371 371 def test_cached_perms_data_user_group_permissions_inherited(
372 372 user_util, backend_random):
373 373 user, user_group = user_util.create_user_with_group()
374 374 user_group2 = user_util.create_user_group()
375 375 UserGroupModel().add_user_to_group(user_group2, user)
376 376
377 377 target_user_group = user_util.create_user_group()
378 378
379 379 user_util.grant_user_group_permission_to_user_group(
380 380 target_user_group, user_group, 'usergroup.read')
381 381 user_util.grant_user_group_permission_to_user_group(
382 382 target_user_group, user_group2, 'usergroup.write')
383 383
384 384 permissions = get_permissions(user)
385 385 assert permissions['user_groups'][target_user_group.users_group_name] == \
386 386 'usergroup.write'
387 387
388 388
389 389 def test_cached_perms_data_user_group_permissions(
390 390 user_util, backend_random):
391 391 user = user_util.create_user()
392 392 user_group = user_util.create_user_group()
393 393 UserGroupModel().grant_user_permission(user_group, user, 'usergroup.write')
394 394 Session().commit()
395 395
396 396 permissions = get_permissions(user)
397 397 assert permissions['user_groups'][user_group.users_group_name] == \
398 398 'usergroup.write'
399 399
400 400
401 401 def test_cached_perms_data_user_group_permissions_explicit(
402 402 user_util, backend_random):
403 403 user = user_util.create_user()
404 404 user_group = user_util.create_user_group()
405 405 UserGroupModel().grant_user_permission(user_group, user, 'usergroup.none')
406 406 Session().commit()
407 407
408 408 permissions = get_permissions(user, explicit=True)
409 409 assert permissions['user_groups'][user_group.users_group_name] == \
410 410 'usergroup.none'
411 411
412 412
413 413 def test_cached_perms_data_user_group_permissions_not_inheriting_defaults(
414 414 user_util, backend_random):
415 415 user = user_util.create_user()
416 416 user_group = user_util.create_user_group()
417 417
418 418 # Don't inherit default object permissions
419 419 UserModel().grant_perm(user, 'hg.inherit_default_perms.false')
420 420 Session().commit()
421 421
422 422 permissions = get_permissions(user)
423 423 assert permissions['user_groups'][user_group.users_group_name] == \
424 424 'usergroup.none'
425 425
426 426
427 427 def test_permission_calculator_admin_permissions(
428 428 user_util, backend_random):
429 429 user = user_util.create_user()
430 430 user_group = user_util.create_user_group()
431 431 repo = backend_random.repo
432 432 repo_group = user_util.create_repo_group()
433 433
434 434 calculator = auth.PermissionCalculator(
435 435 user.user_id, {}, False, False, True, 'higherwin')
436 436 permissions = calculator._calculate_admin_permissions()
437 437
438 438 assert permissions['repositories_groups'][repo_group.group_name] == \
439 439 'group.admin'
440 440 assert permissions['user_groups'][user_group.users_group_name] == \
441 441 'usergroup.admin'
442 442 assert permissions['repositories'][repo.repo_name] == 'repository.admin'
443 443 assert 'hg.admin' in permissions['global']
444 444
445 445
446 446 def test_permission_calculator_repository_permissions_robustness_from_group(
447 447 user_util, backend_random):
448 448 user, user_group = user_util.create_user_with_group()
449 449
450 450 RepoModel().grant_user_group_permission(
451 451 backend_random.repo, user_group.users_group_name, 'repository.write')
452 452
453 453 calculator = auth.PermissionCalculator(
454 454 user.user_id, {}, False, False, False, 'higherwin')
455 455 calculator._calculate_repository_permissions()
456 456
457 457
458 458 def test_permission_calculator_repository_permissions_robustness_from_user(
459 459 user_util, backend_random):
460 460 user = user_util.create_user()
461 461
462 462 RepoModel().grant_user_permission(
463 463 backend_random.repo, user, 'repository.write')
464 464 Session().commit()
465 465
466 466 calculator = auth.PermissionCalculator(
467 467 user.user_id, {}, False, False, False, 'higherwin')
468 468 calculator._calculate_repository_permissions()
469 469
470 470
471 471 def test_permission_calculator_repo_group_permissions_robustness_from_group(
472 472 user_util, backend_random):
473 473 user, user_group = user_util.create_user_with_group()
474 474 repo_group = user_util.create_repo_group()
475 475
476 476 user_util.grant_user_group_permission_to_repo_group(
477 477 repo_group, user_group, 'group.write')
478 478
479 479 calculator = auth.PermissionCalculator(
480 480 user.user_id, {}, False, False, False, 'higherwin')
481 481 calculator._calculate_repository_group_permissions()
482 482
483 483
484 484 def test_permission_calculator_repo_group_permissions_robustness_from_user(
485 485 user_util, backend_random):
486 486 user = user_util.create_user()
487 487 repo_group = user_util.create_repo_group()
488 488
489 489 user_util.grant_user_permission_to_repo_group(
490 490 repo_group, user, 'group.write')
491 491
492 492 calculator = auth.PermissionCalculator(
493 493 user.user_id, {}, False, False, False, 'higherwin')
494 494 calculator._calculate_repository_group_permissions()
495 495
496 496
497 497 def test_permission_calculator_user_group_permissions_robustness_from_group(
498 498 user_util, backend_random):
499 499 user, user_group = user_util.create_user_with_group()
500 500 target_user_group = user_util.create_user_group()
501 501
502 502 user_util.grant_user_group_permission_to_user_group(
503 503 target_user_group, user_group, 'usergroup.write')
504 504
505 505 calculator = auth.PermissionCalculator(
506 506 user.user_id, {}, False, False, False, 'higherwin')
507 507 calculator._calculate_user_group_permissions()
508 508
509 509
510 510 def test_permission_calculator_user_group_permissions_robustness_from_user(
511 511 user_util, backend_random):
512 512 user = user_util.create_user()
513 513 target_user_group = user_util.create_user_group()
514 514
515 515 user_util.grant_user_permission_to_user_group(
516 516 target_user_group, user, 'usergroup.write')
517 517
518 518 calculator = auth.PermissionCalculator(
519 519 user.user_id, {}, False, False, False, 'higherwin')
520 520 calculator._calculate_user_group_permissions()
521 521
522 522
523 523 @pytest.mark.parametrize("algo, new_permission, old_permission, expected", [
524 524 ('higherwin', 'repository.none', 'repository.none', 'repository.none'),
525 525 ('higherwin', 'repository.read', 'repository.none', 'repository.read'),
526 526 ('lowerwin', 'repository.write', 'repository.write', 'repository.write'),
527 527 ('lowerwin', 'repository.read', 'repository.write', 'repository.read'),
528 528 ])
529 529 def test_permission_calculator_choose_permission(
530 530 user_regular, algo, new_permission, old_permission, expected):
531 531 calculator = auth.PermissionCalculator(
532 532 user_regular.user_id, {}, False, False, False, algo)
533 533 result = calculator._choose_permission(new_permission, old_permission)
534 534 assert result == expected
535 535
536 536
537 537 def test_permission_calculator_choose_permission_raises_on_wrong_algo(
538 538 user_regular):
539 539 calculator = auth.PermissionCalculator(
540 540 user_regular.user_id, {}, False, False, False, 'invalid')
541 541 result = calculator._choose_permission(
542 542 'repository.read', 'repository.read')
543 543 # TODO: johbo: This documents the existing behavior. Think of an
544 544 # improvement.
545 545 assert result is None
546 546
547 547
548 548 def test_auth_user_get_cookie_store_for_normal_user(user_util):
549 549 user = user_util.create_user()
550 550 auth_user = auth.AuthUser(user_id=user.user_id)
551 551 expected_data = {
552 552 'username': user.username,
553 553 'user_id': user.user_id,
554 554 'password': md5(user.password),
555 555 'is_authenticated': False
556 556 }
557 557 assert auth_user.get_cookie_store() == expected_data
558 558
559 559
560 560 def test_auth_user_get_cookie_store_for_default_user():
561 561 default_user = User.get_default_user()
562 562 auth_user = auth.AuthUser()
563 563 expected_data = {
564 564 'username': User.DEFAULT_USER,
565 565 'user_id': default_user.user_id,
566 566 'password': md5(default_user.password),
567 567 'is_authenticated': True
568 568 }
569 569 assert auth_user.get_cookie_store() == expected_data
570 570
571 571
572 572 def get_permissions(user, **kwargs):
573 573 """
574 574 Utility filling in useful defaults into the call to `_cached_perms_data`.
575 575
576 576 Fill in `**kwargs` if specific values are needed for a test.
577 577 """
578 578 call_args = {
579 579 'user_id': user.user_id,
580 580 'scope': {},
581 581 'user_is_admin': False,
582 582 'user_inherit_default_permissions': False,
583 583 'explicit': False,
584 584 'algo': 'higherwin',
585 585 'calculate_super_admin': False,
586 586 }
587 587 call_args.update(kwargs)
588 588 permissions = auth._cached_perms_data(**call_args)
589 589 return permissions
590 590
591 591
592 592 class TestGenerateAuthToken(object):
593 593 def test_salt_is_used_when_specified(self):
594 594 salt = 'abcde'
595 595 user_name = 'test_user'
596 596 result = auth.generate_auth_token(user_name, salt)
597 597 expected_result = sha1(user_name + salt).hexdigest()
598 598 assert result == expected_result
599 599
600 600 def test_salt_is_geneated_when_not_specified(self):
601 601 user_name = 'test_user'
602 602 random_salt = os.urandom(16)
603 603 with patch.object(auth, 'os') as os_mock:
604 604 os_mock.urandom.return_value = random_salt
605 605 result = auth.generate_auth_token(user_name)
606 606 expected_result = sha1(user_name + random_salt).hexdigest()
607 607 assert result == expected_result
608 608
609 609
610 610 @pytest.mark.parametrize("test_token, test_roles, auth_result, expected_tokens", [
611 611 ('', None, False,
612 612 []),
613 613 ('wrongtoken', None, False,
614 614 []),
615 615 ('abracadabra_vcs', [AuthTokenModel.cls.ROLE_API], False,
616 616 [('abracadabra_api', AuthTokenModel.cls.ROLE_API, -1)]),
617 617 ('abracadabra_api', [AuthTokenModel.cls.ROLE_API], True,
618 618 [('abracadabra_api', AuthTokenModel.cls.ROLE_API, -1)]),
619 619 ('abracadabra_api', [AuthTokenModel.cls.ROLE_API], True,
620 620 [('abracadabra_api', AuthTokenModel.cls.ROLE_API, -1),
621 621 ('abracadabra_http', AuthTokenModel.cls.ROLE_HTTP, -1)]),
622 622 ])
623 623 def test_auth_by_token(test_token, test_roles, auth_result, expected_tokens,
624 624 user_util):
625 625 user = user_util.create_user()
626 626 user_id = user.user_id
627 627 for token, role, expires in expected_tokens:
628 628 new_token = AuthTokenModel().create(user_id, u'test-token', expires, role)
629 629 new_token.api_key = token # inject known name for testing...
630 630
631 631 assert auth_result == user.authenticate_by_token(
632 632 test_token, roles=test_roles)
@@ -1,325 +1,325 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import functools
22 22
23 23 import pytest
24 24
25 25 from rhodecode.model.db import RepoGroup, User
26 26 from rhodecode.model.meta import Session
27 27 from rhodecode.model.repo_group import RepoGroupModel
28 28 from rhodecode.tests.models.common import (
29 29 _create_project_tree, check_tree_perms, _get_perms, _check_expected_count,
30 30 expected_count, _destroy_project_tree)
31 31
32 32
33 33 test_u1_id = None
34 34 _get_repo_perms = None
35 35 _get_group_perms = None
36 36
37 37
38 38 @pytest.fixture(autouse=True)
39 39 def setup_read_permission():
40 40 permissions_setup_func()
41 41
42 42
43 43 def permissions_setup_func(group_name='g0', perm='group.read', recursive='all',
44 44 user_id=None):
45 45 """
46 46 Resets all permissions to perm attribute
47 47 """
48 48 if not user_id:
49 49 user_id = test_u1_id
50 50 # called by the @with_setup decorator also reset the default user stuff
51 51 permissions_setup_func(group_name, perm, recursive,
52 user_id=User.get_default_user().user_id)
52 user_id=User.get_default_user_id())
53 53
54 54 # TODO: DRY, compare test_user_group:permissions_setup_func
55 55 repo_group = RepoGroup.get_by_group_name(group_name=group_name)
56 56 if not repo_group:
57 57 raise Exception('Cannot get group %s' % group_name)
58 58
59 59 perm_updates = [[user_id, perm, 'user']]
60 60 RepoGroupModel().update_permissions(repo_group,
61 61 perm_updates=perm_updates,
62 62 recursive=recursive, check_perms=False)
63 63 Session().commit()
64 64
65 65
66 66 @pytest.fixture(scope='module', autouse=True)
67 67 def prepare(request, baseapp):
68 68 global test_u1_id, _get_repo_perms, _get_group_perms
69 69 test_u1 = _create_project_tree()
70 70 Session().commit()
71 71 test_u1_id = test_u1.user_id
72 72 _get_repo_perms = functools.partial(_get_perms, key='repositories',
73 73 test_u1_id=test_u1_id)
74 74 _get_group_perms = functools.partial(_get_perms, key='repositories_groups',
75 75 test_u1_id=test_u1_id)
76 76
77 77 @request.addfinalizer
78 78 def cleanup():
79 79 _destroy_project_tree(test_u1_id)
80 80
81 81
82 82 def test_user_permissions_on_group_without_recursive_mode():
83 83 # set permission to g0 non-recursive mode
84 84 recursive = 'none'
85 85 group = 'g0'
86 86 permissions_setup_func(group, 'group.write', recursive=recursive)
87 87
88 88 items = [x for x in _get_repo_perms(group, recursive)]
89 89 expected = 0
90 90 assert len(items) == expected, ' %s != %s' % (len(items), expected)
91 91 for name, perm in items:
92 92 check_tree_perms(name, perm, group, 'repository.read')
93 93
94 94 items = [x for x in _get_group_perms(group, recursive)]
95 95 expected = 1
96 96 assert len(items) == expected, ' %s != %s' % (len(items), expected)
97 97 for name, perm in items:
98 98 check_tree_perms(name, perm, group, 'group.write')
99 99
100 100
101 101 def test_user_permissions_on_group_without_recursive_mode_subgroup():
102 102 # set permission to g0 non-recursive mode
103 103 recursive = 'none'
104 104 group = 'g0/g0_1'
105 105 permissions_setup_func(group, 'group.write', recursive=recursive)
106 106
107 107 items = [x for x in _get_repo_perms(group, recursive)]
108 108 expected = 0
109 109 assert len(items) == expected, ' %s != %s' % (len(items), expected)
110 110 for name, perm in items:
111 111 check_tree_perms(name, perm, group, 'repository.read')
112 112
113 113 items = [x for x in _get_group_perms(group, recursive)]
114 114 expected = 1
115 115 assert len(items) == expected, ' %s != %s' % (len(items), expected)
116 116 for name, perm in items:
117 117 check_tree_perms(name, perm, group, 'group.write')
118 118
119 119
120 120 def test_user_permissions_on_group_with_recursive_mode():
121 121
122 122 # set permission to g0 recursive mode, all children including
123 123 # other repos and groups should have this permission now set !
124 124 recursive = 'all'
125 125 group = 'g0'
126 126 permissions_setup_func(group, 'group.write', recursive=recursive)
127 127
128 128 repo_items = [x for x in _get_repo_perms(group, recursive)]
129 129 items = [x for x in _get_group_perms(group, recursive)]
130 130 _check_expected_count(items, repo_items, expected_count(group, True))
131 131
132 132 for name, perm in repo_items:
133 133 check_tree_perms(name, perm, group, 'repository.write')
134 134
135 135 for name, perm in items:
136 136 check_tree_perms(name, perm, group, 'group.write')
137 137
138 138
139 139 def test_user_permissions_on_group_with_recursive_mode_for_default_user():
140 140
141 141 # set permission to g0 recursive mode, all children including
142 142 # other repos and groups should have this permission now set !
143 143 recursive = 'all'
144 144 group = 'g0'
145 default_user_id = User.get_default_user().user_id
145 default_user_id = User.get_default_user_id()
146 146 permissions_setup_func(group, 'group.write', recursive=recursive,
147 147 user_id=default_user_id)
148 148
149 149 # change default to get perms for default user
150 150 _get_repo_perms = functools.partial(_get_perms, key='repositories',
151 151 test_u1_id=default_user_id)
152 152 _get_group_perms = functools.partial(_get_perms, key='repositories_groups',
153 153 test_u1_id=default_user_id)
154 154
155 155 repo_items = [x for x in _get_repo_perms(group, recursive)]
156 156 items = [x for x in _get_group_perms(group, recursive)]
157 157 _check_expected_count(items, repo_items, expected_count(group, True))
158 158
159 159 for name, perm in repo_items:
160 160 check_tree_perms(name, perm, group, 'repository.write')
161 161
162 162 for name, perm in items:
163 163 check_tree_perms(name, perm, group, 'group.write')
164 164
165 165
166 166 def test_user_permissions_on_group_with_recursive_mode_inner_group():
167 167 # set permission to g0_3 group to none
168 168 recursive = 'all'
169 169 group = 'g0/g0_3'
170 170 permissions_setup_func(group, 'group.none', recursive=recursive)
171 171
172 172 repo_items = [x for x in _get_repo_perms(group, recursive)]
173 173 items = [x for x in _get_group_perms(group, recursive)]
174 174 _check_expected_count(items, repo_items, expected_count(group, True))
175 175
176 176 for name, perm in repo_items:
177 177 check_tree_perms(name, perm, group, 'repository.none')
178 178
179 179 for name, perm in items:
180 180 check_tree_perms(name, perm, group, 'group.none')
181 181
182 182
183 183 def test_user_permissions_on_group_with_recursive_mode_deepest():
184 184 # set permission to g0_3 group to none
185 185 recursive = 'all'
186 186 group = 'g0/g0_1/g0_1_1'
187 187 permissions_setup_func(group, 'group.write', recursive=recursive)
188 188
189 189 repo_items = [x for x in _get_repo_perms(group, recursive)]
190 190 items = [x for x in _get_group_perms(group, recursive)]
191 191 _check_expected_count(items, repo_items, expected_count(group, True))
192 192
193 193 for name, perm in repo_items:
194 194 check_tree_perms(name, perm, group, 'repository.write')
195 195
196 196 for name, perm in items:
197 197 check_tree_perms(name, perm, group, 'group.write')
198 198
199 199
200 200 def test_user_permissions_on_group_with_recursive_mode_only_with_repos():
201 201 # set permission to g0_3 group to none
202 202 recursive = 'all'
203 203 group = 'g0/g0_2'
204 204 permissions_setup_func(group, 'group.admin', recursive=recursive)
205 205
206 206 repo_items = [x for x in _get_repo_perms(group, recursive)]
207 207 items = [x for x in _get_group_perms(group, recursive)]
208 208 _check_expected_count(items, repo_items, expected_count(group, True))
209 209
210 210 for name, perm in repo_items:
211 211 check_tree_perms(name, perm, group, 'repository.admin')
212 212
213 213 for name, perm in items:
214 214 check_tree_perms(name, perm, group, 'group.admin')
215 215
216 216
217 217 def test_user_permissions_on_group_with_recursive_repo_mode_for_default_user():
218 218 # set permission to g0/g0_1 recursive repos only mode, all children
219 219 # including other repos should have this permission now set, inner groups
220 220 # are excluded!
221 221 recursive = 'repos'
222 222 group = 'g0/g0_1'
223 223 perm = 'group.none'
224 default_user_id = User.get_default_user().user_id
224 default_user_id = User.get_default_user_id()
225 225
226 226 # TODO: workaround due to different setup calls, adept to py.test style
227 227 permissions_setup_func()
228 228 permissions_setup_func(group, perm, recursive=recursive,
229 229 user_id=default_user_id)
230 230
231 231 # change default to get perms for default user
232 232 _get_repo_perms = functools.partial(_get_perms, key='repositories',
233 233 test_u1_id=default_user_id)
234 234 _get_group_perms = functools.partial(_get_perms, key='repositories_groups',
235 235 test_u1_id=default_user_id)
236 236
237 237 repo_items = [x for x in _get_repo_perms(group, recursive)]
238 238 items = [x for x in _get_group_perms(group, recursive)]
239 239 _check_expected_count(items, repo_items, expected_count(group, True))
240 240
241 241 for name, perm in repo_items:
242 242 check_tree_perms(name, perm, group, 'repository.none')
243 243
244 244 for name, perm in items:
245 245 # permission is set with repos only mode, but we also change the
246 246 # permission on the group we trigger the apply to children from, thus
247 247 # we need to change its permission check
248 248 old_perm = 'group.read'
249 249 if name == group:
250 250 old_perm = perm
251 251 check_tree_perms(name, perm, group, old_perm)
252 252
253 253
254 254 def test_user_permissions_on_group_with_recursive_repo_mode_inner_group():
255 255 # set permission to g0_3 group to none, with recursive repos only
256 256 recursive = 'repos'
257 257 group = 'g0/g0_3'
258 258 perm = 'group.none'
259 259 permissions_setup_func(group, perm, recursive=recursive)
260 260
261 261 repo_items = [x for x in _get_repo_perms(group, recursive)]
262 262 items = [x for x in _get_group_perms(group, recursive)]
263 263 _check_expected_count(items, repo_items, expected_count(group, True))
264 264
265 265 for name, perm in repo_items:
266 266 check_tree_perms(name, perm, group, 'repository.none')
267 267
268 268 for name, perm in items:
269 269 # permission is set with repos only mode, but we also change the
270 270 # permission on the group we trigger the apply to children from, thus
271 271 # we need to change its permission check
272 272 old_perm = 'group.read'
273 273 if name == group:
274 274 old_perm = perm
275 275 check_tree_perms(name, perm, group, old_perm)
276 276
277 277
278 278 def test_user_permissions_on_group_with_rec_group_mode_for_default_user():
279 279 # set permission to g0/g0_1 with recursive groups only mode, all children
280 280 # sincluding other groups should have this permission now set. repositories
281 281 # should remain intact as we use groups only mode !
282 282 recursive = 'groups'
283 283 group = 'g0/g0_1'
284 default_user_id = User.get_default_user().user_id
284 default_user_id = User.get_default_user_id()
285 285
286 286 # TODO: workaround due to different setup calls, adept to py.test style
287 287 permissions_setup_func()
288 288 permissions_setup_func(group, 'group.write', recursive=recursive,
289 289 user_id=default_user_id)
290 290
291 291 # change default to get perms for default user
292 292 _get_repo_perms = functools.partial(_get_perms, key='repositories',
293 293 test_u1_id=default_user_id)
294 294 _get_group_perms = functools.partial(_get_perms, key='repositories_groups',
295 295 test_u1_id=default_user_id)
296 296
297 297 repo_items = [x for x in _get_repo_perms(group, recursive)]
298 298 items = [x for x in _get_group_perms(group, recursive)]
299 299 _check_expected_count(items, repo_items, expected_count(group, True))
300 300
301 301 for name, perm in repo_items:
302 302 check_tree_perms(name, perm, group, 'repository.read')
303 303
304 304 for name, perm in items:
305 305 check_tree_perms(name, perm, group, 'group.write')
306 306
307 307
308 308 def test_user_permissions_on_group_with_recursive_group_mode_inner_group():
309 309 # set permission to g0_3 group to none, with recursive mode for groups only
310 310 recursive = 'groups'
311 311 group = 'g0/g0_3'
312 312
313 313 # TODO: workaround due to different setup calls, adept to py.test style
314 314 permissions_setup_func()
315 315 permissions_setup_func(group, 'group.none', recursive=recursive)
316 316
317 317 repo_items = [x for x in _get_repo_perms(group, recursive)]
318 318 items = [x for x in _get_group_perms(group, recursive)]
319 319 _check_expected_count(items, repo_items, expected_count(group, True))
320 320
321 321 for name, perm in repo_items:
322 322 check_tree_perms(name, perm, group, 'repository.read')
323 323
324 324 for name, perm in items:
325 325 check_tree_perms(name, perm, group, 'group.none')
General Comments 0
You need to be logged in to leave comments. Login now