##// END OF EJS Templates
lint: ruff run
super-admin -
r5089:fed0c169 default
parent child Browse files
Show More
@@ -1,206 +1,206 b''
1 1
2 2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software: you can redistribute it and/or modify
5 5 # it under the terms of the GNU Affero General Public License, version 3
6 6 # (only), as published by the Free Software Foundation.
7 7 #
8 8 # This program is distributed in the hope that it will be useful,
9 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 11 # GNU General Public License for more details.
12 12 #
13 13 # You should have received a copy of the GNU Affero General Public License
14 14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 15 #
16 16 # This program is dual-licensed. If you wish to learn more about the
17 17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 19
20 20 import mock
21 21 import pytest
22 22
23 23 from rhodecode.lib.auth import check_password
24 24 from rhodecode.model.user import UserModel
25 25 from rhodecode.tests import (
26 26 TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_EMAIL)
27 27 from rhodecode.api.tests.utils import (
28 28 build_data, api_call, assert_ok, assert_error, jsonify, crash)
29 29 from rhodecode.tests.fixture import Fixture
30 30 from rhodecode.model.db import RepoGroup
31 31
32 32
33 33 # TODO: mikhail: remove fixture from here
34 34 fixture = Fixture()
35 35
36 36
37 37 @pytest.mark.usefixtures("testuser_api", "app")
38 38 class TestCreateUser(object):
39 39 def test_api_create_existing_user(self):
40 40 id_, params = build_data(
41 41 self.apikey, 'create_user',
42 42 username=TEST_USER_ADMIN_LOGIN,
43 43 email='test@foo.com',
44 44 password='trololo')
45 45 response = api_call(self.app, params)
46 46
47 47 expected = "user `%s` already exist" % (TEST_USER_ADMIN_LOGIN,)
48 48 assert_error(id_, expected, given=response.body)
49 49
50 50 def test_api_create_user_with_existing_email(self):
51 51 id_, params = build_data(
52 52 self.apikey, 'create_user',
53 53 username=TEST_USER_ADMIN_LOGIN + 'new',
54 54 email=TEST_USER_REGULAR_EMAIL,
55 55 password='trololo')
56 56 response = api_call(self.app, params)
57 57
58 58 expected = "email `%s` already exist" % (TEST_USER_REGULAR_EMAIL,)
59 59 assert_error(id_, expected, given=response.body)
60 60
61 61 def test_api_create_user_with_wrong_username(self):
62 62 bad_username = '<> HELLO WORLD <>'
63 63 id_, params = build_data(
64 64 self.apikey, 'create_user',
65 65 username=bad_username,
66 66 email='new@email.com',
67 67 password='trololo')
68 68 response = api_call(self.app, params)
69 69
70 70 expected = {'username':
71 71 "Username may only contain alphanumeric characters "
72 72 "underscores, periods or dashes and must begin with "
73 73 "alphanumeric character or underscore"}
74 74 assert_error(id_, expected, given=response.body)
75 75
76 76 def test_api_create_user(self):
77 77 username = 'test_new_api_user'
78 78 email = username + "@foo.com"
79 79
80 80 id_, params = build_data(
81 81 self.apikey, 'create_user',
82 82 username=username,
83 83 email=email,
84 84 description='CTO of Things',
85 85 password='example')
86 86 response = api_call(self.app, params)
87 87
88 88 usr = UserModel().get_by_username(username)
89 89 ret = {
90 90 'msg': 'created new user `%s`' % (username,),
91 91 'user': jsonify(usr.get_api_data(include_secrets=True)),
92 92 }
93 93 try:
94 94 expected = ret
95 95 assert check_password('example', usr.password)
96 96 assert_ok(id_, expected, given=response.body)
97 97 finally:
98 98 fixture.destroy_user(usr.user_id)
99 99
100 100 def test_api_create_user_without_password(self):
101 101 username = 'test_new_api_user_passwordless'
102 102 email = username + "@foo.com"
103 103
104 104 id_, params = build_data(
105 105 self.apikey, 'create_user',
106 106 username=username,
107 107 email=email)
108 108 response = api_call(self.app, params)
109 109
110 110 usr = UserModel().get_by_username(username)
111 111 ret = {
112 112 'msg': 'created new user `%s`' % (username,),
113 113 'user': jsonify(usr.get_api_data(include_secrets=True)),
114 114 }
115 115 try:
116 116 expected = ret
117 117 assert_ok(id_, expected, given=response.body)
118 118 finally:
119 119 fixture.destroy_user(usr.user_id)
120 120
121 121 def test_api_create_user_with_extern_name(self):
122 122 username = 'test_new_api_user_passwordless'
123 123 email = username + "@foo.com"
124 124
125 125 id_, params = build_data(
126 126 self.apikey, 'create_user',
127 127 username=username,
128 128 email=email, extern_name='rhodecode')
129 129 response = api_call(self.app, params)
130 130
131 131 usr = UserModel().get_by_username(username)
132 132 ret = {
133 133 'msg': 'created new user `%s`' % (username,),
134 134 'user': jsonify(usr.get_api_data(include_secrets=True)),
135 135 }
136 136 try:
137 137 expected = ret
138 138 assert_ok(id_, expected, given=response.body)
139 139 finally:
140 140 fixture.destroy_user(usr.user_id)
141 141
142 142 def test_api_create_user_with_password_change(self):
143 143 username = 'test_new_api_user_password_change'
144 144 email = username + "@foo.com"
145 145
146 146 id_, params = build_data(
147 147 self.apikey, 'create_user',
148 148 username=username,
149 149 email=email, extern_name='rhodecode',
150 150 force_password_change=True)
151 151 response = api_call(self.app, params)
152 152
153 153 usr = UserModel().get_by_username(username)
154 154 ret = {
155 155 'msg': 'created new user `%s`' % (username,),
156 156 'user': jsonify(usr.get_api_data(include_secrets=True)),
157 157 }
158 158 try:
159 159 expected = ret
160 160 assert_ok(id_, expected, given=response.body)
161 161 finally:
162 162 fixture.destroy_user(usr.user_id)
163 163
164 164 def test_api_create_user_with_personal_repo_group(self):
165 165 username = 'test_new_api_user_personal_group'
166 166 email = username + "@foo.com"
167 167
168 168 id_, params = build_data(
169 169 self.apikey, 'create_user',
170 170 username=username,
171 171 email=email, extern_name='rhodecode',
172 172 create_personal_repo_group=True)
173 173 response = api_call(self.app, params)
174 174
175 175 usr = UserModel().get_by_username(username)
176 176 ret = {
177 177 'msg': 'created new user `%s`' % (username,),
178 178 'user': jsonify(usr.get_api_data(include_secrets=True)),
179 179 }
180 180
181 181 personal_group = RepoGroup.get_by_group_name(username)
182 182 assert personal_group
183 assert personal_group.personal == True
183 assert personal_group.personal is True
184 184 assert personal_group.user.username == username
185 185
186 186 try:
187 187 expected = ret
188 188 assert_ok(id_, expected, given=response.body)
189 189 finally:
190 190 fixture.destroy_repo_group(username)
191 191 fixture.destroy_user(usr.user_id)
192 192
193 193 @mock.patch.object(UserModel, 'create_or_update', crash)
194 194 def test_api_create_user_when_exception_happened(self):
195 195
196 196 username = 'test_new_api_user'
197 197 email = username + "@foo.com"
198 198
199 199 id_, params = build_data(
200 200 self.apikey, 'create_user',
201 201 username=username,
202 202 email=email,
203 203 password='trololo')
204 204 response = api_call(self.app, params)
205 205 expected = 'failed to create user `%s`' % (username,)
206 206 assert_error(id_, expected, given=response.body)
@@ -1,255 +1,255 b''
1 1
2 2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software: you can redistribute it and/or modify
5 5 # it under the terms of the GNU Affero General Public License, version 3
6 6 # (only), as published by the Free Software Foundation.
7 7 #
8 8 # This program is distributed in the hope that it will be useful,
9 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 11 # GNU General Public License for more details.
12 12 #
13 13 # You should have received a copy of the GNU Affero General Public License
14 14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 15 #
16 16 # This program is dual-licensed. If you wish to learn more about the
17 17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 19
20 20 import pytest
21 21
22 22 from rhodecode.tests import (
23 23 TestController, assert_session_flash, TEST_USER_ADMIN_LOGIN)
24 24 from rhodecode.model.db import UserGroup
25 25 from rhodecode.model.meta import Session
26 26 from rhodecode.tests.fixture import Fixture
27 27
28 28 fixture = Fixture()
29 29
30 30
31 31 def route_path(name, params=None, **kwargs):
32 32 import urllib.request
33 33 import urllib.parse
34 34 import urllib.error
35 35 from rhodecode.apps._base import ADMIN_PREFIX
36 36
37 37 base_url = {
38 38 'user_groups': ADMIN_PREFIX + '/user_groups',
39 39 'user_groups_data': ADMIN_PREFIX + '/user_groups_data',
40 40 'user_group_members_data': ADMIN_PREFIX + '/user_groups/{user_group_id}/members',
41 41 'user_groups_new': ADMIN_PREFIX + '/user_groups/new',
42 42 'user_groups_create': ADMIN_PREFIX + '/user_groups/create',
43 43 'edit_user_group': ADMIN_PREFIX + '/user_groups/{user_group_id}/edit',
44 44 'edit_user_group_advanced_sync': ADMIN_PREFIX + '/user_groups/{user_group_id}/edit/advanced/sync',
45 45 'edit_user_group_global_perms_update': ADMIN_PREFIX + '/user_groups/{user_group_id}/edit/global_permissions/update',
46 46 'user_groups_update': ADMIN_PREFIX + '/user_groups/{user_group_id}/update',
47 47 'user_groups_delete': ADMIN_PREFIX + '/user_groups/{user_group_id}/delete',
48 48
49 49 }[name].format(**kwargs)
50 50
51 51 if params:
52 52 base_url = '{}?{}'.format(base_url, urllib.parse.urlencode(params))
53 53 return base_url
54 54
55 55
56 56 class TestUserGroupsView(TestController):
57 57
58 58 def test_set_synchronization(self, user_util):
59 59 self.log_user()
60 60 user_group_name = user_util.create_user_group().users_group_name
61 61
62 62 group = Session().query(UserGroup).filter(
63 63 UserGroup.users_group_name == user_group_name).one()
64 64
65 65 assert group.group_data.get('extern_type') is None
66 66
67 67 # enable
68 68 self.app.post(
69 69 route_path('edit_user_group_advanced_sync',
70 70 user_group_id=group.users_group_id),
71 71 params={'csrf_token': self.csrf_token}, status=302)
72 72
73 73 group = Session().query(UserGroup).filter(
74 74 UserGroup.users_group_name == user_group_name).one()
75 75 assert group.group_data.get('extern_type') == 'manual'
76 76 assert group.group_data.get('extern_type_set_by') == TEST_USER_ADMIN_LOGIN
77 77
78 78 # disable
79 79 self.app.post(
80 80 route_path('edit_user_group_advanced_sync',
81 81 user_group_id=group.users_group_id),
82 82 params={'csrf_token': self.csrf_token}, status=302)
83 83
84 84 group = Session().query(UserGroup).filter(
85 85 UserGroup.users_group_name == user_group_name).one()
86 86 assert group.group_data.get('extern_type') is None
87 87 assert group.group_data.get('extern_type_set_by') == TEST_USER_ADMIN_LOGIN
88 88
89 89 def test_delete_user_group(self, user_util):
90 90 self.log_user()
91 91 user_group_id = user_util.create_user_group().users_group_id
92 92
93 93 group = Session().query(UserGroup).filter(
94 94 UserGroup.users_group_id == user_group_id).one()
95 95
96 96 self.app.post(
97 97 route_path('user_groups_delete', user_group_id=group.users_group_id),
98 98 params={'csrf_token': self.csrf_token})
99 99
100 100 group = Session().query(UserGroup).filter(
101 101 UserGroup.users_group_id == user_group_id).scalar()
102 102
103 103 assert group is None
104 104
105 105 @pytest.mark.parametrize('repo_create, repo_create_write, user_group_create, repo_group_create, fork_create, inherit_default_permissions, expect_error, expect_form_error', [
106 106 ('hg.create.none', 'hg.create.write_on_repogroup.false', 'hg.usergroup.create.false', 'hg.repogroup.create.false', 'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
107 107 ('hg.create.repository', 'hg.create.write_on_repogroup.true', 'hg.usergroup.create.true', 'hg.repogroup.create.true', 'hg.fork.repository', 'hg.inherit_default_perms.false', False, False),
108 108 ('hg.create.XXX', 'hg.create.write_on_repogroup.true', 'hg.usergroup.create.true', 'hg.repogroup.create.true', 'hg.fork.repository', 'hg.inherit_default_perms.false', False, True),
109 109 ('', '', '', '', '', '', True, False),
110 110 ])
111 111 def test_global_permissions_on_user_group(
112 112 self, repo_create, repo_create_write, user_group_create,
113 113 repo_group_create, fork_create, expect_error, expect_form_error,
114 114 inherit_default_permissions, user_util):
115 115
116 116 self.log_user()
117 117 user_group = user_util.create_user_group()
118 118
119 119 user_group_name = user_group.users_group_name
120 120 user_group_id = user_group.users_group_id
121 121
122 122 # ENABLE REPO CREATE ON A GROUP
123 123 perm_params = {
124 124 'inherit_default_permissions': False,
125 125 'default_repo_create': repo_create,
126 126 'default_repo_create_on_write': repo_create_write,
127 127 'default_user_group_create': user_group_create,
128 128 'default_repo_group_create': repo_group_create,
129 129 'default_fork_create': fork_create,
130 130 'default_inherit_default_permissions': inherit_default_permissions,
131 131
132 132 'csrf_token': self.csrf_token,
133 133 }
134 134 response = self.app.post(
135 135 route_path('edit_user_group_global_perms_update',
136 136 user_group_id=user_group_id),
137 137 params=perm_params)
138 138
139 139 if expect_form_error:
140 140 assert response.status_int == 200
141 141 response.mustcontain('Value must be one of')
142 142 else:
143 143 if expect_error:
144 144 msg = 'An error occurred during permissions saving'
145 145 else:
146 146 msg = 'User Group global permissions updated successfully'
147 147 ug = UserGroup.get_by_group_name(user_group_name)
148 148 del perm_params['csrf_token']
149 149 del perm_params['inherit_default_permissions']
150 150 assert perm_params == ug.get_default_perms()
151 151 assert_session_flash(response, msg)
152 152
153 153 def test_edit_view(self, user_util):
154 154 self.log_user()
155 155
156 156 user_group = user_util.create_user_group()
157 157 self.app.get(
158 158 route_path('edit_user_group',
159 159 user_group_id=user_group.users_group_id),
160 160 status=200)
161 161
162 162 def test_update_user_group(self, user_util):
163 163 user = self.log_user()
164 164
165 165 user_group = user_util.create_user_group()
166 166 users_group_id = user_group.users_group_id
167 167 new_name = user_group.users_group_name + '_CHANGE'
168 168
169 169 params = [
170 170 ('users_group_active', False),
171 171 ('user_group_description', 'DESC'),
172 172 ('users_group_name', new_name),
173 173 ('user', user['username']),
174 174 ('csrf_token', self.csrf_token),
175 175 ('__start__', 'user_group_members:sequence'),
176 176 ('__start__', 'member:mapping'),
177 177 ('member_user_id', user['user_id']),
178 178 ('type', 'existing'),
179 179 ('__end__', 'member:mapping'),
180 180 ('__end__', 'user_group_members:sequence'),
181 181 ]
182 182
183 183 self.app.post(
184 184 route_path('user_groups_update',
185 185 user_group_id=users_group_id),
186 186 params=params,
187 187 status=302)
188 188
189 189 user_group = UserGroup.get(users_group_id)
190 190 assert user_group
191 191
192 192 assert user_group.users_group_name == new_name
193 193 assert user_group.user_group_description == 'DESC'
194 assert user_group.users_group_active == False
194 assert user_group.users_group_active is False
195 195
196 196 def test_update_user_group_name_conflicts(self, user_util):
197 197 self.log_user()
198 198 user_group_old = user_util.create_user_group()
199 199 new_name = user_group_old.users_group_name
200 200
201 201 user_group = user_util.create_user_group()
202 202
203 203 params = dict(
204 204 users_group_active=False,
205 205 user_group_description='DESC',
206 206 users_group_name=new_name,
207 207 csrf_token=self.csrf_token)
208 208
209 209 response = self.app.post(
210 210 route_path('user_groups_update',
211 211 user_group_id=user_group.users_group_id),
212 212 params=params,
213 213 status=200)
214 214
215 215 response.mustcontain('User group `{}` already exists'.format(
216 216 new_name))
217 217
218 218 def test_update_members_from_user_ids(self, user_regular):
219 219 uid = user_regular.user_id
220 220 username = user_regular.username
221 221 self.log_user()
222 222
223 223 user_group = fixture.create_user_group('test_gr_ids')
224 224 assert user_group.members == []
225 225 assert user_group.user != user_regular
226 226 expected_active_state = not user_group.users_group_active
227 227
228 228 form_data = [
229 229 ('csrf_token', self.csrf_token),
230 230 ('user', username),
231 231 ('users_group_name', 'changed_name'),
232 232 ('users_group_active', expected_active_state),
233 233 ('user_group_description', 'changed_description'),
234 234
235 235 ('__start__', 'user_group_members:sequence'),
236 236 ('__start__', 'member:mapping'),
237 237 ('member_user_id', uid),
238 238 ('type', 'existing'),
239 239 ('__end__', 'member:mapping'),
240 240 ('__end__', 'user_group_members:sequence'),
241 241 ]
242 242 ugid = user_group.users_group_id
243 243 self.app.post(
244 244 route_path('user_groups_update', user_group_id=ugid), form_data)
245 245
246 246 user_group = UserGroup.get(ugid)
247 247 assert user_group
248 248
249 249 assert user_group.members[0].user_id == uid
250 250 assert user_group.user_id == uid
251 251 assert 'changed_name' in user_group.users_group_name
252 252 assert 'changed_description' in user_group.user_group_description
253 253 assert user_group.users_group_active == expected_active_state
254 254
255 255 fixture.destroy_user_group(user_group)
@@ -1,297 +1,297 b''
1 1
2 2
3 3 # Copyright (C) 2012-2023 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 RhodeCode authentication plugin for Atlassian CROWD
23 23 """
24 24
25 25
26 26 import colander
27 27 import base64
28 28 import logging
29 29 import urllib.request
30 30 import urllib.error
31 31 import urllib.parse
32 32
33 33 from rhodecode.translation import _
34 34 from rhodecode.authentication.base import (
35 35 RhodeCodeExternalAuthPlugin, hybrid_property)
36 36 from rhodecode.authentication.schema import AuthnPluginSettingsSchemaBase
37 37 from rhodecode.authentication.routes import AuthnPluginResourceBase
38 38 from rhodecode.lib.colander_utils import strip_whitespace
39 39 from rhodecode.lib.ext_json import json, formatted_json
40 40 from rhodecode.model.db import User
41 41
42 42 log = logging.getLogger(__name__)
43 43
44 44
45 45 def plugin_factory(plugin_id, *args, **kwargs):
46 46 """
47 47 Factory function that is called during plugin discovery.
48 48 It returns the plugin instance.
49 49 """
50 50 plugin = RhodeCodeAuthPlugin(plugin_id)
51 51 return plugin
52 52
53 53
54 54 class CrowdAuthnResource(AuthnPluginResourceBase):
55 55 pass
56 56
57 57
58 58 class CrowdSettingsSchema(AuthnPluginSettingsSchemaBase):
59 59 host = colander.SchemaNode(
60 60 colander.String(),
61 61 default='127.0.0.1',
62 62 description=_('The FQDN or IP of the Atlassian CROWD Server'),
63 63 preparer=strip_whitespace,
64 64 title=_('Host'),
65 65 widget='string')
66 66 port = colander.SchemaNode(
67 67 colander.Int(),
68 68 default=8095,
69 69 description=_('The Port in use by the Atlassian CROWD Server'),
70 70 preparer=strip_whitespace,
71 71 title=_('Port'),
72 72 validator=colander.Range(min=0, max=65536),
73 73 widget='int')
74 74 app_name = colander.SchemaNode(
75 75 colander.String(),
76 76 default='',
77 77 description=_('The Application Name to authenticate to CROWD'),
78 78 preparer=strip_whitespace,
79 79 title=_('Application Name'),
80 80 widget='string')
81 81 app_password = colander.SchemaNode(
82 82 colander.String(),
83 83 default='',
84 84 description=_('The password to authenticate to CROWD'),
85 85 preparer=strip_whitespace,
86 86 title=_('Application Password'),
87 87 widget='password')
88 88 admin_groups = colander.SchemaNode(
89 89 colander.String(),
90 90 default='',
91 91 description=_('A comma separated list of group names that identify '
92 92 'users as RhodeCode Administrators'),
93 93 missing='',
94 94 preparer=strip_whitespace,
95 95 title=_('Admin Groups'),
96 96 widget='string')
97 97
98 98
99 99 class CrowdServer(object):
100 100 def __init__(self, *args, **kwargs):
101 101 """
102 102 Create a new CrowdServer object that points to IP/Address 'host',
103 103 on the given port, and using the given method (https/http). user and
104 104 passwd can be set here or with set_credentials. If unspecified,
105 105 "version" defaults to "latest".
106 106
107 107 example::
108 108
109 109 cserver = CrowdServer(host="127.0.0.1",
110 110 port="8095",
111 111 user="some_app",
112 112 passwd="some_passwd",
113 113 version="1")
114 114 """
115 if not "port" in kwargs:
115 if 'port' not in kwargs:
116 116 kwargs["port"] = "8095"
117 117 self._logger = kwargs.get("logger", logging.getLogger(__name__))
118 118 self._uri = "%s://%s:%s/crowd" % (kwargs.get("method", "http"),
119 119 kwargs.get("host", "127.0.0.1"),
120 120 kwargs.get("port", "8095"))
121 121 self.set_credentials(kwargs.get("user", ""),
122 122 kwargs.get("passwd", ""))
123 123 self._version = kwargs.get("version", "latest")
124 124 self._url_list = None
125 125 self._appname = "crowd"
126 126
127 127 def set_credentials(self, user, passwd):
128 128 self.user = user
129 129 self.passwd = passwd
130 130 self._make_opener()
131 131
132 132 def _make_opener(self):
133 133 mgr = urllib.request.HTTPPasswordMgrWithDefaultRealm()
134 134 mgr.add_password(None, self._uri, self.user, self.passwd)
135 135 handler = urllib.request.HTTPBasicAuthHandler(mgr)
136 136 self.opener = urllib.request.build_opener(handler)
137 137
138 138 def _request(self, url, body=None, headers=None,
139 139 method=None, noformat=False,
140 140 empty_response_ok=False):
141 141 _headers = {"Content-type": "application/json",
142 142 "Accept": "application/json"}
143 143 if self.user and self.passwd:
144 144 authstring = base64.b64encode("%s:%s" % (self.user, self.passwd))
145 145 _headers["Authorization"] = "Basic %s" % authstring
146 146 if headers:
147 147 _headers.update(headers)
148 148 log.debug("Sent crowd: \n%s"
149 149 % (formatted_json({"url": url, "body": body,
150 150 "headers": _headers})))
151 151 request = urllib.request.Request(url, body, _headers)
152 152 if method:
153 153 request.get_method = lambda: method
154 154
155 155 global msg
156 156 msg = ""
157 157 try:
158 158 ret_doc = self.opener.open(request)
159 159 msg = ret_doc.read()
160 160 if not msg and empty_response_ok:
161 161 ret_val = {}
162 162 ret_val["status"] = True
163 163 ret_val["error"] = "Response body was empty"
164 164 elif not noformat:
165 165 ret_val = json.loads(msg)
166 166 ret_val["status"] = True
167 167 else:
168 168 ret_val = msg
169 169 except Exception as e:
170 170 if not noformat:
171 171 ret_val = {"status": False,
172 172 "body": body,
173 173 "error": "{}\n{}".format(e, msg)}
174 174 else:
175 175 ret_val = None
176 176 return ret_val
177 177
178 178 def user_auth(self, username, password):
179 179 """Authenticate a user against crowd. Returns brief information about
180 180 the user."""
181 181 url = ("%s/rest/usermanagement/%s/authentication?username=%s"
182 182 % (self._uri, self._version, username))
183 183 body = json.dumps({"value": password})
184 184 return self._request(url, body)
185 185
186 186 def user_groups(self, username):
187 187 """Retrieve a list of groups to which this user belongs."""
188 188 url = ("%s/rest/usermanagement/%s/user/group/nested?username=%s"
189 189 % (self._uri, self._version, username))
190 190 return self._request(url)
191 191
192 192
193 193 class RhodeCodeAuthPlugin(RhodeCodeExternalAuthPlugin):
194 194 uid = 'crowd'
195 195 _settings_unsafe_keys = ['app_password']
196 196
197 197 def includeme(self, config):
198 198 config.add_authn_plugin(self)
199 199 config.add_authn_resource(self.get_id(), CrowdAuthnResource(self))
200 200 config.add_view(
201 201 'rhodecode.authentication.views.AuthnPluginViewBase',
202 202 attr='settings_get',
203 203 renderer='rhodecode:templates/admin/auth/plugin_settings.mako',
204 204 request_method='GET',
205 205 route_name='auth_home',
206 206 context=CrowdAuthnResource)
207 207 config.add_view(
208 208 'rhodecode.authentication.views.AuthnPluginViewBase',
209 209 attr='settings_post',
210 210 renderer='rhodecode:templates/admin/auth/plugin_settings.mako',
211 211 request_method='POST',
212 212 route_name='auth_home',
213 213 context=CrowdAuthnResource)
214 214
215 215 def get_settings_schema(self):
216 216 return CrowdSettingsSchema()
217 217
218 218 def get_display_name(self, load_from_settings=False):
219 219 return _('CROWD')
220 220
221 221 @classmethod
222 222 def docs(cls):
223 223 return "https://docs.rhodecode.com/RhodeCode-Enterprise/auth/auth-crowd.html"
224 224
225 225 @hybrid_property
226 226 def name(self):
227 227 return u"crowd"
228 228
229 229 def use_fake_password(self):
230 230 return True
231 231
232 232 def user_activation_state(self):
233 233 def_user_perms = User.get_default_user().AuthUser().permissions['global']
234 234 return 'hg.extern_activate.auto' in def_user_perms
235 235
236 236 def auth(self, userobj, username, password, settings, **kwargs):
237 237 """
238 238 Given a user object (which may be null), username, a plaintext password,
239 239 and a settings object (containing all the keys needed as listed in settings()),
240 240 authenticate this user's login attempt.
241 241
242 242 Return None on failure. On success, return a dictionary of the form:
243 243
244 244 see: RhodeCodeAuthPluginBase.auth_func_attrs
245 245 This is later validated for correctness
246 246 """
247 247 if not username or not password:
248 248 log.debug('Empty username or password skipping...')
249 249 return None
250 250
251 251 log.debug("Crowd settings: \n%s", formatted_json(settings))
252 252 server = CrowdServer(**settings)
253 253 server.set_credentials(settings["app_name"], settings["app_password"])
254 254 crowd_user = server.user_auth(username, password)
255 255 log.debug("Crowd returned: \n%s", formatted_json(crowd_user))
256 256 if not crowd_user["status"]:
257 257 return None
258 258
259 259 res = server.user_groups(crowd_user["name"])
260 260 log.debug("Crowd groups: \n%s", formatted_json(res))
261 261 crowd_user["groups"] = [x["name"] for x in res["groups"]]
262 262
263 263 # old attrs fetched from RhodeCode database
264 264 admin = getattr(userobj, 'admin', False)
265 265 active = getattr(userobj, 'active', True)
266 266 email = getattr(userobj, 'email', '')
267 267 username = getattr(userobj, 'username', username)
268 268 firstname = getattr(userobj, 'firstname', '')
269 269 lastname = getattr(userobj, 'lastname', '')
270 270 extern_type = getattr(userobj, 'extern_type', '')
271 271
272 272 user_attrs = {
273 273 'username': username,
274 274 'firstname': crowd_user["first-name"] or firstname,
275 275 'lastname': crowd_user["last-name"] or lastname,
276 276 'groups': crowd_user["groups"],
277 277 'user_group_sync': True,
278 278 'email': crowd_user["email"] or email,
279 279 'admin': admin,
280 280 'active': active,
281 281 'active_from_extern': crowd_user.get('active'),
282 282 'extern_name': crowd_user["name"],
283 283 'extern_type': extern_type,
284 284 }
285 285
286 286 # set an admin if we're in admin_groups of crowd
287 287 for group in settings["admin_groups"]:
288 288 if group in user_attrs["groups"]:
289 289 user_attrs["admin"] = True
290 290 log.debug("Final crowd user object: \n%s", formatted_json(user_attrs))
291 291 log.info('user `%s` authenticated correctly', user_attrs['username'])
292 292 return user_attrs
293 293
294 294
295 295 def includeme(config):
296 296 plugin_id = 'egg:rhodecode-enterprise-ce#{}'.format(RhodeCodeAuthPlugin.uid)
297 297 plugin_factory(plugin_id).includeme(config)
@@ -1,263 +1,263 b''
1 1 #!/usr/bin/env python
2 2
3 3
4 4 import os
5 5 import re
6 6 import shutil
7 7 import logging
8 8
9 9 from rhodecode.lib.dbmigrate.migrate import exceptions
10 10 from rhodecode.lib.dbmigrate.migrate.versioning import pathed, script
11 11 from datetime import datetime
12 12
13 13
14 14 log = logging.getLogger(__name__)
15 15
16 16 class VerNum(object):
17 17 """A version number that behaves like a string and int at the same time"""
18 18
19 19 _instances = {}
20 20
21 21 def __new__(cls, value):
22 22 val = str(value)
23 23 if val not in cls._instances:
24 24 cls._instances[val] = super(VerNum, cls).__new__(cls)
25 25 ret = cls._instances[val]
26 26 return ret
27 27
28 28 def __init__(self,value):
29 29 self.value = str(int(value))
30 30 if self < 0:
31 31 raise ValueError("Version number cannot be negative")
32 32
33 33 def __add__(self, value):
34 34 ret = int(self) + int(value)
35 35 return VerNum(ret)
36 36
37 37 def __sub__(self, value):
38 38 return self + (int(value) * -1)
39 39
40 40 def __eq__(self, value):
41 41 return int(self) == int(value)
42 42
43 43 def __ne__(self, value):
44 44 return int(self) != int(value)
45 45
46 46 def __lt__(self, value):
47 47 return int(self) < int(value)
48 48
49 49 def __gt__(self, value):
50 50 return int(self) > int(value)
51 51
52 52 def __ge__(self, value):
53 53 return int(self) >= int(value)
54 54
55 55 def __le__(self, value):
56 56 return int(self) <= int(value)
57 57
58 58 def __repr__(self):
59 59 return "<VerNum(%s)>" % self.value
60 60
61 61 def __str__(self):
62 62 return str(self.value)
63 63
64 64 def __int__(self):
65 65 return int(self.value)
66 66
67 67
68 68 class Collection(pathed.Pathed):
69 69 """A collection of versioning scripts in a repository"""
70 70
71 71 FILENAME_WITH_VERSION = re.compile(r'^(\d{3,}).*')
72 72
73 73 def __init__(self, path):
74 74 """Collect current version scripts in repository
75 75 and store them in self.versions
76 76 """
77 77 super(Collection, self).__init__(path)
78 78
79 79 # Create temporary list of files, allowing skipped version numbers.
80 80 files = os.listdir(path)
81 81 if '1' in files:
82 82 # deprecation
83 83 raise Exception('It looks like you have a repository in the old '
84 84 'format (with directories for each version). '
85 85 'Please convert repository before proceeding.')
86 86
87 87 tempVersions = {}
88 88 for filename in files:
89 89 match = self.FILENAME_WITH_VERSION.match(filename)
90 90 if match:
91 91 num = int(match.group(1))
92 92 tempVersions.setdefault(num, []).append(filename)
93 93 else:
94 94 pass # Must be a helper file or something, let's ignore it.
95 95
96 96 # Create the versions member where the keys
97 97 # are VerNum's and the values are Version's.
98 98 self.versions = {}
99 99 for num, files in list(tempVersions.items()):
100 100 self.versions[VerNum(num)] = Version(num, path, files)
101 101
102 102 @property
103 103 def latest(self):
104 104 """:returns: Latest version in Collection"""
105 105 return max([VerNum(0)] + list(self.versions.keys()))
106 106
107 107 def _next_ver_num(self, use_timestamp_numbering):
108 if use_timestamp_numbering == True:
108 if use_timestamp_numbering is True:
109 109 return VerNum(int(datetime.utcnow().strftime('%Y%m%d%H%M%S')))
110 110 else:
111 111 return self.latest + 1
112 112
113 113 def create_new_python_version(self, description, **k):
114 114 """Create Python files for new version"""
115 115 ver = self._next_ver_num(k.pop('use_timestamp_numbering', False))
116 116 extra = str_to_filename(description)
117 117
118 118 if extra:
119 119 if extra == '_':
120 120 extra = ''
121 121 elif not extra.startswith('_'):
122 122 extra = '_%s' % extra
123 123
124 124 filename = '%03d%s.py' % (ver, extra)
125 125 filepath = self._version_path(filename)
126 126
127 127 script.PythonScript.create(filepath, **k)
128 128 self.versions[ver] = Version(ver, self.path, [filename])
129 129
130 130 def create_new_sql_version(self, database, description, **k):
131 131 """Create SQL files for new version"""
132 132 ver = self._next_ver_num(k.pop('use_timestamp_numbering', False))
133 133 self.versions[ver] = Version(ver, self.path, [])
134 134
135 135 extra = str_to_filename(description)
136 136
137 137 if extra:
138 138 if extra == '_':
139 139 extra = ''
140 140 elif not extra.startswith('_'):
141 141 extra = '_%s' % extra
142 142
143 143 # Create new files.
144 144 for op in ('upgrade', 'downgrade'):
145 145 filename = '%03d%s_%s_%s.sql' % (ver, extra, database, op)
146 146 filepath = self._version_path(filename)
147 147 script.SqlScript.create(filepath, **k)
148 148 self.versions[ver].add_script(filepath)
149 149
150 150 def version(self, vernum=None):
151 151 """Returns latest Version if vernum is not given.
152 152 Otherwise, returns wanted version"""
153 153 if vernum is None:
154 154 vernum = self.latest
155 155 return self.versions[VerNum(vernum)]
156 156
157 157 @classmethod
158 158 def clear(cls):
159 159 super(Collection, cls).clear()
160 160
161 161 def _version_path(self, ver):
162 162 """Returns path of file in versions repository"""
163 163 return os.path.join(self.path, str(ver))
164 164
165 165
166 166 class Version(object):
167 167 """A single version in a collection
168 168 :param vernum: Version Number
169 169 :param path: Path to script files
170 170 :param filelist: List of scripts
171 171 :type vernum: int, VerNum
172 172 :type path: string
173 173 :type filelist: list
174 174 """
175 175
176 176 def __init__(self, vernum, path, filelist):
177 177 self.version = VerNum(vernum)
178 178
179 179 # Collect scripts in this folder
180 180 self.sql = {}
181 181 self.python = None
182 182
183 183 for script in filelist:
184 184 self.add_script(os.path.join(path, script))
185 185
186 186 def script(self, database=None, operation=None):
187 187 """Returns SQL or Python Script"""
188 188 for db in (database, 'default'):
189 189 # Try to return a .sql script first
190 190 try:
191 191 return self.sql[db][operation]
192 192 except KeyError:
193 193 continue # No .sql script exists
194 194
195 195 # TODO: maybe add force Python parameter?
196 196 ret = self.python
197 197
198 198 assert ret is not None, \
199 199 "There is no script for %d version" % self.version
200 200 return ret
201 201
202 202 def add_script(self, path):
203 203 """Add script to Collection/Version"""
204 204 if path.endswith(Extensions.py):
205 205 self._add_script_py(path)
206 206 elif path.endswith(Extensions.sql):
207 207 self._add_script_sql(path)
208 208
209 209 SQL_FILENAME = re.compile(r'^.*\.sql')
210 210
211 211 def _add_script_sql(self, path):
212 212 basename = os.path.basename(path)
213 213 match = self.SQL_FILENAME.match(basename)
214 214
215 215 if match:
216 216 basename = basename.replace('.sql', '')
217 217 parts = basename.split('_')
218 218 if len(parts) < 3:
219 219 raise exceptions.ScriptError(
220 220 "Invalid SQL script name %s " % basename + \
221 221 "(needs to be ###_description_database_operation.sql)")
222 222 version = parts[0]
223 223 op = parts[-1]
224 224 # NOTE(mriedem): check for ibm_db_sa as the database in the name
225 225 if 'ibm_db_sa' in basename:
226 226 if len(parts) == 6:
227 227 dbms = '_'.join(parts[-4: -1])
228 228 else:
229 229 raise exceptions.ScriptError(
230 230 "Invalid ibm_db_sa SQL script name '%s'; "
231 231 "(needs to be "
232 232 "###_description_ibm_db_sa_operation.sql)" % basename)
233 233 else:
234 234 dbms = parts[-2]
235 235 else:
236 236 raise exceptions.ScriptError(
237 237 "Invalid SQL script name %s " % basename + \
238 238 "(needs to be ###_description_database_operation.sql)")
239 239
240 240 # File the script into a dictionary
241 241 self.sql.setdefault(dbms, {})[op] = script.SqlScript(path)
242 242
243 243 def _add_script_py(self, path):
244 244 if self.python is not None:
245 245 raise exceptions.ScriptError('You can only have one Python script '
246 246 'per version, but you have: %s and %s' % (self.python, path))
247 247 self.python = script.PythonScript(path)
248 248
249 249
250 250 class Extensions:
251 251 """A namespace for file extensions"""
252 252 py = 'py'
253 253 sql = 'sql'
254 254
255 255 def str_to_filename(s):
256 256 """Replaces spaces, (double and single) quotes
257 257 and double underscores to underscores
258 258 """
259 259
260 260 s = s.replace(' ', '_').replace('"', '_').replace("'", '_').replace(".", "_")
261 261 while '__' in s:
262 262 s = s.replace('__', '_')
263 263 return s
@@ -1,186 +1,185 b''
1
2 1 # Copyright (C) 2010-2023 RhodeCode GmbH
3 2 #
4 3 # This program is free software: you can redistribute it and/or modify
5 4 # it under the terms of the GNU Affero General Public License, version 3
6 5 # (only), as published by the Free Software Foundation.
7 6 #
8 7 # This program is distributed in the hope that it will be useful,
9 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 10 # GNU General Public License for more details.
12 11 #
13 12 # You should have received a copy of the GNU Affero General Public License
14 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 14 #
16 15 # This program is dual-licensed. If you wish to learn more about the
17 16 # RhodeCode Enterprise Edition, including its added features, Support services,
18 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 18
20 19 import sys
21 20 import logging
22 21
23 22
24 23 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = list(range(30, 38))
25 24
26 25 # Sequences
27 26 RESET_SEQ = "\033[0m"
28 27 COLOR_SEQ = "\033[0;%dm"
29 28 BOLD_SEQ = "\033[1m"
30 29
31 30 COLORS = {
32 31 'CRITICAL': MAGENTA,
33 32 'ERROR': RED,
34 33 'WARNING': CYAN,
35 34 'INFO': GREEN,
36 35 'DEBUG': BLUE,
37 36 'SQL': YELLOW
38 37 }
39 38
40 39
41 40 def _inject_req_id(record, with_prefix=True):
42 41 from pyramid.threadlocal import get_current_request
43 42 dummy = '00000000-0000-0000-0000-000000000000'
44 43 req_id = None
45 44
46 45 req = get_current_request()
47 46 if req:
48 47 req_id = getattr(req, 'req_id', None)
49 48 if with_prefix:
50 49 req_id = 'req_id:%-36s' % (req_id or dummy)
51 50 else:
52 51 req_id = (req_id or dummy)
53 52 record.req_id = req_id
54 53
55 54
56 55 def _add_log_to_debug_bucket(formatted_record):
57 56 from pyramid.threadlocal import get_current_request
58 57 req = get_current_request()
59 58 if req:
60 59 req.req_id_bucket.append(formatted_record)
61 60
62 61
63 62 def one_space_trim(s):
64 63 if s.find(" ") == -1:
65 64 return s
66 65 else:
67 66 s = s.replace(' ', ' ')
68 67 return one_space_trim(s)
69 68
70 69
71 70 def format_sql(sql):
72 71 sql = sql.replace('\n', '')
73 72 sql = one_space_trim(sql)
74 73 sql = sql\
75 74 .replace(',', ',\n\t')\
76 75 .replace('SELECT', '\n\tSELECT \n\t')\
77 76 .replace('UPDATE', '\n\tUPDATE \n\t')\
78 77 .replace('DELETE', '\n\tDELETE \n\t')\
79 78 .replace('FROM', '\n\tFROM')\
80 79 .replace('ORDER BY', '\n\tORDER BY')\
81 80 .replace('LIMIT', '\n\tLIMIT')\
82 81 .replace('WHERE', '\n\tWHERE')\
83 82 .replace('AND', '\n\tAND')\
84 83 .replace('LEFT', '\n\tLEFT')\
85 84 .replace('INNER', '\n\tINNER')\
86 85 .replace('INSERT', '\n\tINSERT')\
87 86 .replace('DELETE', '\n\tDELETE')
88 87 return sql
89 88
90 89
91 90 class ExceptionAwareFormatter(logging.Formatter):
92 91 """
93 92 Extended logging formatter which prints out remote tracebacks.
94 93 """
95 94
96 95 def formatException(self, ei):
97 96 ex_type, ex_value, ex_tb = ei
98 97
99 98 local_tb = logging.Formatter.formatException(self, ei)
100 99 if hasattr(ex_value, '_vcs_server_traceback'):
101 100
102 101 def formatRemoteTraceback(remote_tb_lines):
103 102 result = ["\n +--- This exception occured remotely on VCSServer - Remote traceback:\n\n"]
104 103 result.append(remote_tb_lines)
105 104 result.append("\n +--- End of remote traceback\n")
106 105 return result
107 106
108 107 try:
109 108 if ex_type is not None and ex_value is None and ex_tb is None:
110 109 # possible old (3.x) call syntax where caller is only
111 110 # providing exception object
112 111 if type(ex_type) is not type:
113 112 raise TypeError(
114 113 "invalid argument: ex_type should be an exception "
115 114 "type, or just supply no arguments at all")
116 115 if ex_type is None and ex_tb is None:
117 116 ex_type, ex_value, ex_tb = sys.exc_info()
118 117
119 118 remote_tb = getattr(ex_value, "_vcs_server_traceback", None)
120 119
121 120 if remote_tb:
122 121 remote_tb = formatRemoteTraceback(remote_tb)
123 122 return local_tb + ''.join(remote_tb)
124 123 finally:
125 124 # clean up cycle to traceback, to allow proper GC
126 125 del ex_type, ex_value, ex_tb
127 126
128 127 return local_tb
129 128
130 129
131 130 class RequestTrackingFormatter(ExceptionAwareFormatter):
132 131 def format(self, record):
133 132 _inject_req_id(record)
134 133 def_record = logging.Formatter.format(self, record)
135 134 _add_log_to_debug_bucket(def_record)
136 135 return def_record
137 136
138 137
139 138 class ColorFormatter(ExceptionAwareFormatter):
140 139
141 140 def format(self, record):
142 141 """
143 142 Changes record's levelname to use with COLORS enum
144 143 """
145 144 def_record = super(ColorFormatter, self).format(record)
146 145
147 146 levelname = record.levelname
148 147 start = COLOR_SEQ % (COLORS[levelname])
149 148 end = RESET_SEQ
150 149
151 150 colored_record = ''.join([start, def_record, end])
152 151 return colored_record
153 152
154 153
155 154 class ColorRequestTrackingFormatter(RequestTrackingFormatter):
156 155
157 156 def format(self, record):
158 157 """
159 158 Changes record's levelname to use with COLORS enum
160 159 """
161 160 def_record = super(ColorRequestTrackingFormatter, self).format(record)
162 161
163 162 levelname = record.levelname
164 163 start = COLOR_SEQ % (COLORS[levelname])
165 164 end = RESET_SEQ
166 165
167 166 colored_record = ''.join([start, def_record, end])
168 167 return colored_record
169 168
170 169
171 170 class ColorFormatterSql(logging.Formatter):
172 171
173 172 def format(self, record):
174 173 """
175 174 Changes record's levelname to use with COLORS enum
176 175 """
177 176
178 177 start = COLOR_SEQ % (COLORS['SQL'])
179 178 def_record = format_sql(logging.Formatter.format(self, record))
180 179 end = RESET_SEQ
181 180
182 181 colored_record = ''.join([start, def_record, end])
183 182 return colored_record
184 183
185 # marcink: needs to stay with this name for backward .ini compatability
186 Pyro4AwareFormatter = ExceptionAwareFormatter
184 # NOTE (marcink): needs to stay with this name for backward .ini compatability
185 Pyro4AwareFormatter = ExceptionAwareFormatter # noqa
@@ -1,132 +1,132 b''
1 1
2 2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software: you can redistribute it and/or modify
5 5 # it under the terms of the GNU Affero General Public License, version 3
6 6 # (only), as published by the Free Software Foundation.
7 7 #
8 8 # This program is distributed in the hope that it will be useful,
9 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 11 # GNU General Public License for more details.
12 12 #
13 13 # You should have received a copy of the GNU Affero General Public License
14 14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 15 #
16 16 # This program is dual-licensed. If you wish to learn more about the
17 17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 19
20 20 import re
21 21 import markdown
22 22 import xml.etree.ElementTree as etree
23 23
24 24 from markdown.extensions import Extension
25 25 from markdown.extensions.fenced_code import FencedCodeExtension
26 26 from markdown.extensions.tables import TableExtension
27 27 from markdown.extensions.nl2br import Nl2BrExtension as _Nl2BrExtension
28 28 from markdown.extensions.wikilinks import WikiLinkExtension
29 29 from markdown.inlinepatterns import Pattern
30 30
31 31 import gfm
32 32
33 33
34 34 class InlineProcessor(Pattern):
35 35 """
36 36 Base class that inline patterns subclass.
37 37 This is the newer style inline processor that uses a more
38 38 efficient and flexible search approach.
39 39 """
40 40
41 41 def __init__(self, pattern, md=None):
42 42 """
43 43 Create an instant of an inline pattern.
44 44 Keyword arguments:
45 45 * pattern: A regular expression that matches a pattern
46 46 """
47 47 self.pattern = pattern
48 48 self.compiled_re = re.compile(pattern, re.DOTALL | re.UNICODE)
49 49
50 50 # Api for Markdown to pass safe_mode into instance
51 51 self.safe_mode = False
52 52 self.md = md
53 53
54 54 def handleMatch(self, m, data):
55 55 """Return a ElementTree element from the given match and the
56 56 start and end index of the matched text.
57 57 If `start` and/or `end` are returned as `None`, it will be
58 58 assumed that the processor did not find a valid region of text.
59 59 Subclasses should override this method.
60 60 Keyword arguments:
61 61 * m: A re match object containing a match of the pattern.
62 62 * data: The buffer current under analysis
63 63 Returns:
64 64 * el: The ElementTree element, text or None.
65 65 * start: The start of the region that has been matched or None.
66 66 * end: The end of the region that has been matched or None.
67 67 """
68 68 pass # pragma: no cover
69 69
70 70
71 71 class SimpleTagInlineProcessor(InlineProcessor):
72 72 """
73 73 Return element of type `tag` with a text attribute of group(2)
74 74 of a Pattern.
75 75 """
76 76 def __init__(self, pattern, tag):
77 77 InlineProcessor.__init__(self, pattern)
78 78 self.tag = tag
79 79
80 80 def handleMatch(self, m, data): # pragma: no cover
81 81 el = etree.Element(self.tag)
82 82 el.text = m.group(2)
83 83 return el, m.start(0), m.end(0)
84 84
85 85
86 86 class SubstituteTagInlineProcessor(SimpleTagInlineProcessor):
87 87 """ Return an element of type `tag` with no children. """
88 88 def handleMatch(self, m, data):
89 89 return etree.Element(self.tag), m.start(0), m.end(0)
90 90
91 91
92 92 class Nl2BrExtension(_Nl2BrExtension):
93 93 pass
94 94
95 95
96 96 # Global Vars
97 97 URLIZE_RE = '(%s)' % '|'.join([
98 98 r'<(?:f|ht)tps?://[^>]*>',
99 99 r'\b(?:f|ht)tps?://[^)<>\s]+[^.,)<>\s]',
100 100 r'\bwww\.[^)<>\s]+[^.,)<>\s]',
101 101 r'[^(<\s]+\.(?:com|net|org)\b',
102 102 ])
103 103
104 104
105 105 class UrlizePattern(markdown.inlinepatterns.Pattern):
106 106 """ Return a link Element given an autolink (`http://example/com`). """
107 107 def handleMatch(self, m):
108 108 url = m.group(2)
109 109
110 110 if url.startswith('<'):
111 111 url = url[1:-1]
112 112
113 113 text = url
114 114
115 if not url.split('://')[0] in ('http','https','ftp'):
116 if '@' in url and not '/' in url:
115 if url.split('://')[0] not in ('http', 'https', 'ftp'):
116 if '@' in url and '/' not in url:
117 117 url = 'mailto:' + url
118 118 else:
119 119 url = 'http://' + url
120 120
121 121 el = markdown.util.etree.Element("a")
122 122 el.set('href', url)
123 123 el.text = markdown.util.AtomicString(text)
124 124 return el
125 125
126 126
127 127 class UrlizeExtension(Extension):
128 128 """ Urlize Extension for Python-Markdown. """
129 129
130 130 def extendMarkdown(self, md):
131 131 """ Replace autolink with UrlizePattern """
132 132 md.inlinePatterns['autolink'] = UrlizePattern(URLIZE_RE, md)
@@ -1,218 +1,218 b''
1 1
2 2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software: you can redistribute it and/or modify
5 5 # it under the terms of the GNU Affero General Public License, version 3
6 6 # (only), as published by the Free Software Foundation.
7 7 #
8 8 # This program is distributed in the hope that it will be useful,
9 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 11 # GNU General Public License for more details.
12 12 #
13 13 # You should have received a copy of the GNU Affero General Public License
14 14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 15 #
16 16 # This program is dual-licensed. If you wish to learn more about the
17 17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 19
20 20 import mock
21 21 import pytest
22 22
23 23 from rhodecode.model.db import User
24 24 from rhodecode.tests import TEST_USER_REGULAR_LOGIN
25 25 from rhodecode.tests.fixture import Fixture
26 26 from rhodecode.model.user_group import UserGroupModel
27 27 from rhodecode.model.meta import Session
28 28
29 29
30 30 fixture = Fixture()
31 31
32 32
33 33 def teardown_module(self):
34 34 _delete_all_user_groups()
35 35
36 36
37 37 class TestGetUserGroups(object):
38 38 def test_returns_filtered_list(self, backend, user_util):
39 39 created_groups = []
40 40 for i in range(4):
41 41 created_groups.append(
42 42 user_util.create_user_group(users_group_active=True))
43 43
44 44 group_filter = created_groups[-1].users_group_name[-2:]
45 45 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
46 46 with self._patch_user_group_list():
47 47 groups = UserGroupModel().get_user_groups(group_filter)
48 48
49 49 fake_groups = [
50 50 u for u in groups if u['value'].startswith('test_returns')]
51 51 assert len(fake_groups) == 1
52 52 assert fake_groups[0]['value'] == created_groups[-1].users_group_name
53 53 assert fake_groups[0]['value_display'].startswith(
54 54 'Group: test_returns')
55 55
56 56 def test_returns_limited_list(self, backend, user_util):
57 57 created_groups = []
58 58 for i in range(3):
59 59 created_groups.append(
60 60 user_util.create_user_group(users_group_active=True))
61 61 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
62 62 with self._patch_user_group_list():
63 63 groups = UserGroupModel().get_user_groups('test_returns')
64 64
65 65 fake_groups = [
66 66 u for u in groups if u['value'].startswith('test_returns')]
67 67 assert len(fake_groups) == 3
68 68
69 69 def test_returns_active_user_groups(self, backend, user_util):
70 70 for i in range(4):
71 71 is_active = i % 2 == 0
72 72 user_util.create_user_group(users_group_active=is_active)
73 73 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
74 74 with self._patch_user_group_list():
75 75 groups = UserGroupModel().get_user_groups()
76 76 expected = ('id', 'icon_link', 'value_display', 'value', 'value_type')
77 77 for group in groups:
78 assert group['value_type'] is 'user_group'
78 assert group['value_type'] == 'user_group'
79 79 for key in expected:
80 80 assert key in group
81 81
82 82 fake_groups = [
83 83 u for u in groups if u['value'].startswith('test_returns')]
84 84 assert len(fake_groups) == 2
85 85 for user in fake_groups:
86 86 assert user['value_display'].startswith('Group: test_returns')
87 87
88 88 def _patch_user_group_list(self):
89 89 def side_effect(group_list, perm_set):
90 90 return group_list
91 91 return mock.patch(
92 92 'rhodecode.model.user_group.UserGroupList', side_effect=side_effect)
93 93
94 94
95 95 @pytest.mark.parametrize(
96 96 "pre_existing, regular_should_be, external_should_be, groups, "
97 97 "expected", [
98 98 ([], [], [], [], []),
99 99 # no changes of regular
100 100 ([], ['regular'], [], [], ['regular']),
101 101 # not added to regular group
102 102 (['some_other'], [], [], ['some_other'], []),
103 103 (
104 104 [], ['regular'], ['container'], ['container'],
105 105 ['regular', 'container']
106 106 ),
107 107 (
108 108 [], ['regular'], [], ['container', 'container2'],
109 109 ['regular', 'container', 'container2']
110 110 ),
111 111 # remove not used
112 112 ([], ['regular'], ['other'], [], ['regular']),
113 113 (
114 114 ['some_other'], ['regular'], ['other', 'container'],
115 115 ['container', 'container2'],
116 116 ['regular', 'container', 'container2']
117 117 ),
118 118 ])
119 119 def test_enforce_groups(pre_existing, regular_should_be,
120 120 external_should_be, groups, expected, backend_hg):
121 121 # TODO: anderson: adding backend_hg fixture so it sets up the database
122 122 # for when running this file alone
123 123 _delete_all_user_groups()
124 124
125 125 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
126 126 for gr in pre_existing:
127 127 gr = fixture.create_user_group(gr)
128 128 Session().commit()
129 129
130 130 # make sure use is just in those groups
131 131 for gr in regular_should_be:
132 132 gr = fixture.create_user_group(gr)
133 133 Session().commit()
134 134 UserGroupModel().add_user_to_group(gr, user)
135 135 Session().commit()
136 136
137 137 # now special external groups created by auth plugins
138 138 for gr in external_should_be:
139 139 gr = fixture.create_user_group(
140 140 gr, user_group_data={'extern_type': 'container'})
141 141 Session().commit()
142 142 UserGroupModel().add_user_to_group(gr, user)
143 143 Session().commit()
144 144
145 145 UserGroupModel().enforce_groups(user, groups, 'container')
146 146 Session().commit()
147 147
148 148 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
149 149 in_groups = user.group_member
150 150
151 151 expected.sort()
152 152 assert (
153 153 expected == sorted(x.users_group.users_group_name for x in in_groups))
154 154
155 155
156 156 def _delete_all_user_groups():
157 157 for gr in UserGroupModel.get_all():
158 158 fixture.destroy_user_group(gr)
159 159 Session().commit()
160 160
161 161
162 162 def test_add_and_remove_user_from_group(user_regular, user_util):
163 163 user_group = user_util.create_user_group()
164 164 assert user_group.members == []
165 165 UserGroupModel().add_user_to_group(user_group, user_regular)
166 166 Session().commit()
167 167 assert user_group.members[0].user == user_regular
168 168 UserGroupModel().remove_user_from_group(user_group, user_regular)
169 169 Session().commit()
170 170 assert user_group.members == []
171 171
172 172
173 173 @pytest.mark.parametrize('data, expected', [
174 174 ([], []),
175 175 ([{"member_user_id": 1, "type": "new"}], [1]),
176 176 ([{"member_user_id": 1, "type": "new"},
177 177 {"member_user_id": 1, "type": "existing"}], [1]),
178 178 ([{"member_user_id": 1, "type": "new"},
179 179 {"member_user_id": 2, "type": "new"},
180 180 {"member_user_id": 3, "type": "remove"}], [1, 2])
181 181 ])
182 182 def test_clean_members_data(data, expected):
183 183 cleaned = UserGroupModel()._clean_members_data(data)
184 184 assert cleaned == expected
185 185
186 186
187 187 def _create_test_members():
188 188 members = []
189 189 for member_number in range(3):
190 190 member = mock.Mock()
191 191 member.user_id = member_number + 1
192 192 member.user.user_id = member_number + 1
193 193 members.append(member)
194 194 return members
195 195
196 196
197 197 def test_get_added_and_removed_users():
198 198 members = _create_test_members()
199 199 mock_user_group = mock.Mock()
200 200 mock_user_group.members = [members[0], members[1]]
201 201 new_users_list = [members[1].user.user_id, members[2].user.user_id]
202 202 model = UserGroupModel()
203 203
204 204 added, removed = model._get_added_and_removed_user_ids(
205 205 mock_user_group, new_users_list)
206 206
207 207 assert added == [members[2].user.user_id]
208 208 assert removed == [members[0].user.user_id]
209 209
210 210
211 211 def test_set_users_as_members_and_find_user_in_group(
212 212 user_util, user_regular, user_admin):
213 213 user_group = user_util.create_user_group()
214 214 assert len(user_group.members) == 0
215 215 user_list = [user_regular.user_id, user_admin.user_id]
216 216 UserGroupModel()._set_users_as_members(user_group, user_list)
217 217 assert len(user_group.members) == 2
218 218 assert UserGroupModel()._find_user_in_group(user_regular, user_group)
@@ -1,328 +1,328 b''
1 1
2 2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software: you can redistribute it and/or modify
5 5 # it under the terms of the GNU Affero General Public License, version 3
6 6 # (only), as published by the Free Software Foundation.
7 7 #
8 8 # This program is distributed in the hope that it will be useful,
9 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 11 # GNU General Public License for more details.
12 12 #
13 13 # You should have received a copy of the GNU Affero General Public License
14 14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 15 #
16 16 # This program is dual-licensed. If you wish to learn more about the
17 17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 19
20 20 import pytest
21 21 import mock
22 22
23 23 from rhodecode.model.db import (
24 24 true, User, UserGroup, UserGroupMember, UserEmailMap, Permission, UserIpMap)
25 25 from rhodecode.model.meta import Session
26 26 from rhodecode.model.user import UserModel
27 27 from rhodecode.model.user_group import UserGroupModel
28 28 from rhodecode.model.repo import RepoModel
29 29 from rhodecode.model.repo_group import RepoGroupModel
30 30 from rhodecode.tests.fixture import Fixture
31 31 from rhodecode.lib.str_utils import safe_str
32 32
33 33
34 34 fixture = Fixture()
35 35
36 36
37 37 class TestGetUsers(object):
38 38 def test_returns_active_users(self, backend, user_util):
39 39 for i in range(4):
40 40 is_active = i % 2 == 0
41 41 user_util.create_user(active=is_active, lastname='Fake user')
42 42
43 43 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
44 44 with mock.patch('rhodecode.lib.helpers.link_to_user'):
45 45 users = UserModel().get_users()
46 46 fake_users = [u for u in users if u['last_name'] == 'Fake user']
47 47 assert len(fake_users) == 2
48 48
49 49 expected_keys = (
50 50 'id', 'first_name', 'last_name', 'username', 'icon_link',
51 51 'value_display', 'value', 'value_type')
52 52 for user in users:
53 assert user['value_type'] is 'user'
53 assert user['value_type'] == 'user'
54 54 for key in expected_keys:
55 55 assert key in user
56 56
57 57 def test_returns_user_filtered_by_last_name(self, backend, user_util):
58 58 keywords = ('aBc', u'ΓΌnicode')
59 59 for keyword in keywords:
60 60 for i in range(2):
61 61 user_util.create_user(
62 62 active=True, lastname=u'Fake {} user'.format(keyword))
63 63
64 64 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
65 65 with mock.patch('rhodecode.lib.helpers.link_to_user'):
66 66 keyword = keywords[1].lower()
67 67 users = UserModel().get_users(name_contains=keyword)
68 68
69 69 fake_users = [u for u in users if u['last_name'].startswith('Fake')]
70 70 assert len(fake_users) == 2
71 71 for user in fake_users:
72 72 assert user['last_name'] == safe_str('Fake ΓΌnicode user')
73 73
74 74 def test_returns_user_filtered_by_first_name(self, backend, user_util):
75 75 created_users = []
76 76 keywords = ('aBc', u'ΓΌnicode')
77 77 for keyword in keywords:
78 78 for i in range(2):
79 79 created_users.append(user_util.create_user(
80 80 active=True, lastname='Fake user',
81 81 firstname=u'Fake {} user'.format(keyword)))
82 82
83 83 keyword = keywords[1].lower()
84 84 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
85 85 with mock.patch('rhodecode.lib.helpers.link_to_user'):
86 86 users = UserModel().get_users(name_contains=keyword)
87 87
88 88 fake_users = [u for u in users if u['last_name'].startswith('Fake')]
89 89 assert len(fake_users) == 2
90 90 for user in fake_users:
91 91 assert user['first_name'] == safe_str('Fake ΓΌnicode user')
92 92
93 93 def test_returns_user_filtered_by_username(self, backend, user_util):
94 94 created_users = []
95 95 for i in range(5):
96 96 created_users.append(user_util.create_user(
97 97 active=True, lastname='Fake user'))
98 98
99 99 user_filter = created_users[-1].username[-2:]
100 100 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
101 101 with mock.patch('rhodecode.lib.helpers.link_to_user'):
102 102 users = UserModel().get_users(name_contains=user_filter)
103 103
104 104 fake_users = [u for u in users if u['last_name'].startswith('Fake')]
105 105 assert len(fake_users) == 1
106 106 assert fake_users[0]['username'] == created_users[-1].username
107 107
108 108 def test_returns_limited_user_list(self, backend, user_util):
109 109 created_users = []
110 110 for i in range(5):
111 111 created_users.append(user_util.create_user(
112 112 active=True, lastname='Fake user'))
113 113
114 114 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
115 115 with mock.patch('rhodecode.lib.helpers.link_to_user'):
116 116 users = UserModel().get_users(name_contains='Fake', limit=3)
117 117
118 118 fake_users = [u for u in users if u['last_name'].startswith('Fake')]
119 119 assert len(fake_users) == 3
120 120
121 121
122 122 @pytest.fixture()
123 123 def test_user(request, baseapp):
124 124 usr = UserModel().create_or_update(
125 125 username='test_user',
126 126 password='qweqwe',
127 127 email='main_email@rhodecode.org',
128 128 firstname='u1', lastname=u'u1')
129 129 Session().commit()
130 130 assert User.get_by_username(u'test_user') == usr
131 131
132 132 @request.addfinalizer
133 133 def cleanup():
134 134 if UserModel().get_user(usr.user_id) is None:
135 135 return
136 136
137 137 perm = Permission.query().all()
138 138 for p in perm:
139 139 UserModel().revoke_perm(usr, p)
140 140
141 141 UserModel().delete(usr.user_id)
142 142 Session().commit()
143 143
144 144 return usr
145 145
146 146
147 147 def test_create_and_remove(test_user):
148 148 usr = test_user
149 149
150 150 # make user group
151 151 user_group = fixture.create_user_group('some_example_group')
152 152 Session().commit()
153 153
154 154 UserGroupModel().add_user_to_group(user_group, usr)
155 155 Session().commit()
156 156
157 157 assert UserGroup.get(user_group.users_group_id) == user_group
158 158 assert UserGroupMember.query().count() == 1
159 159 UserModel().delete(usr.user_id)
160 160 Session().commit()
161 161
162 162 assert UserGroupMember.query().all() == []
163 163
164 164
165 165 def test_additonal_email_as_main(test_user):
166 166 with pytest.raises(AttributeError):
167 167 m = UserEmailMap()
168 168 m.email = test_user.email
169 169 m.user = test_user
170 170 Session().add(m)
171 171 Session().commit()
172 172
173 173
174 174 def test_extra_email_map(test_user):
175 175
176 176 m = UserEmailMap()
177 177 m.email = u'main_email2@rhodecode.org'
178 178 m.user = test_user
179 179 Session().add(m)
180 180 Session().commit()
181 181
182 182 u = User.get_by_email(email='main_email@rhodecode.org')
183 183 assert test_user.user_id == u.user_id
184 184 assert test_user.username == u.username
185 185
186 186 u = User.get_by_email(email='main_email2@rhodecode.org')
187 187 assert test_user.user_id == u.user_id
188 188 assert test_user.username == u.username
189 189 u = User.get_by_email(email='main_email3@rhodecode.org')
190 190 assert u is None
191 191
192 192
193 193 def test_get_api_data_replaces_secret_data_by_default(test_user):
194 194 api_data = test_user.get_api_data()
195 195 api_key_length = 40
196 196 expected_replacement = '*' * api_key_length
197 197
198 198 for key in api_data['auth_tokens']:
199 199 assert key == expected_replacement
200 200
201 201
202 202 def test_get_api_data_includes_secret_data_if_activated(test_user):
203 203 api_data = test_user.get_api_data(include_secrets=True)
204 204 assert api_data['auth_tokens'] == test_user.auth_tokens
205 205
206 206
207 207 def test_add_perm(test_user):
208 208 perm = Permission.query().all()[0]
209 209 UserModel().grant_perm(test_user, perm)
210 210 Session().commit()
211 211 assert UserModel().has_perm(test_user, perm)
212 212
213 213
214 214 def test_has_perm(test_user):
215 215 perm = Permission.query().all()
216 216 for p in perm:
217 217 assert not UserModel().has_perm(test_user, p)
218 218
219 219
220 220 def test_revoke_perm(test_user):
221 221 perm = Permission.query().all()[0]
222 222 UserModel().grant_perm(test_user, perm)
223 223 Session().commit()
224 224 assert UserModel().has_perm(test_user, perm)
225 225
226 226 # revoke
227 227 UserModel().revoke_perm(test_user, perm)
228 228 Session().commit()
229 229 assert not UserModel().has_perm(test_user, perm)
230 230
231 231
232 232 @pytest.mark.parametrize("ip_range, expected, expect_errors", [
233 233 ('', [], False),
234 234 ('127.0.0.1', ['127.0.0.1'], False),
235 235 ('127.0.0.1,127.0.0.2', ['127.0.0.1', '127.0.0.2'], False),
236 236 ('127.0.0.1 , 127.0.0.2', ['127.0.0.1', '127.0.0.2'], False),
237 237 (
238 238 '127.0.0.1,172.172.172.0,127.0.0.2',
239 239 ['127.0.0.1', '172.172.172.0', '127.0.0.2'], False),
240 240 (
241 241 '127.0.0.1-127.0.0.5',
242 242 ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4', '127.0.0.5'],
243 243 False),
244 244 (
245 245 '127.0.0.1 - 127.0.0.5',
246 246 ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4', '127.0.0.5'],
247 247 False
248 248 ),
249 249 ('-', [], True),
250 250 ('127.0.0.1-32', [], True),
251 251 (
252 252 '127.0.0.1,127.0.0.1,127.0.0.1,127.0.0.1-127.0.0.2,127.0.0.2',
253 253 ['127.0.0.1', '127.0.0.2'], False),
254 254 (
255 255 '127.0.0.1-127.0.0.2,127.0.0.4-127.0.0.6,',
256 256 ['127.0.0.1', '127.0.0.2', '127.0.0.4', '127.0.0.5', '127.0.0.6'],
257 257 False
258 258 ),
259 259 (
260 260 '127.0.0.1-127.0.0.2,127.0.0.1-127.0.0.6,',
261 261 ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4', '127.0.0.5',
262 262 '127.0.0.6'],
263 263 False
264 264 ),
265 265 ])
266 266 def test_ip_range_generator(ip_range, expected, expect_errors):
267 267 func = UserModel().parse_ip_range
268 268 if expect_errors:
269 269 pytest.raises(ValueError, func, ip_range)
270 270 else:
271 271 parsed_list = func(ip_range)
272 272 assert parsed_list == expected
273 273
274 274
275 275 def test_user_delete_cascades_ip_whitelist(test_user):
276 276 sample_ip = '1.1.1.1'
277 277 uid_map = UserIpMap(user_id=test_user.user_id, ip_addr=sample_ip)
278 278 Session().add(uid_map)
279 279 Session().delete(test_user)
280 280 try:
281 281 Session().flush()
282 282 finally:
283 283 Session().rollback()
284 284
285 285
286 286 def test_account_for_deactivation_generation(test_user):
287 287 accounts = UserModel().get_accounts_in_creation_order(
288 288 current_user=test_user)
289 289 # current user should be #1 in the list
290 290 assert accounts[0] == test_user.user_id
291 291 active_users = User.query().filter(User.active == true()).count()
292 292 assert active_users == len(accounts)
293 293
294 294
295 295 def test_user_delete_cascades_permissions_on_repo(backend, test_user):
296 296 test_repo = backend.create_repo()
297 297 RepoModel().grant_user_permission(
298 298 test_repo, test_user, 'repository.write')
299 299 Session().commit()
300 300
301 301 assert test_user.repo_to_perm
302 302
303 303 UserModel().delete(test_user)
304 304 Session().commit()
305 305
306 306
307 307 def test_user_delete_cascades_permissions_on_repo_group(
308 308 test_repo_group, test_user):
309 309 RepoGroupModel().grant_user_permission(
310 310 test_repo_group, test_user, 'group.write')
311 311 Session().commit()
312 312
313 313 assert test_user.repo_group_to_perm
314 314
315 315 Session().delete(test_user)
316 316 Session().commit()
317 317
318 318
319 319 def test_user_delete_cascades_permissions_on_user_group(
320 320 test_user_group, test_user):
321 321 UserGroupModel().grant_user_permission(
322 322 test_user_group, test_user, 'usergroup.write')
323 323 Session().commit()
324 324
325 325 assert test_user.user_group_to_perm
326 326
327 327 Session().delete(test_user)
328 328 Session().commit()
@@ -1,295 +1,295 b''
1 1
2 2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software: you can redistribute it and/or modify
5 5 # it under the terms of the GNU Affero General Public License, version 3
6 6 # (only), as published by the Free Software Foundation.
7 7 #
8 8 # This program is distributed in the hope that it will be useful,
9 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 11 # GNU General Public License for more details.
12 12 #
13 13 # You should have received a copy of the GNU Affero General Public License
14 14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 15 #
16 16 # This program is dual-licensed. If you wish to learn more about the
17 17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 19
20 20
21 21 import formencode
22 22 import pytest
23 23
24 24 from rhodecode.tests import (
25 25 HG_REPO, TEST_USER_REGULAR2_EMAIL, TEST_USER_REGULAR2_LOGIN,
26 26 TEST_USER_REGULAR2_PASS, TEST_USER_ADMIN_LOGIN, TESTS_TMP_PATH)
27 27
28 28 from rhodecode.model import validators as v
29 29 from rhodecode.model.user_group import UserGroupModel
30 30
31 31 from rhodecode.model.meta import Session
32 32 from rhodecode.model.repo_group import RepoGroupModel
33 33 from rhodecode.model.db import ChangesetStatus, Repository
34 34 from rhodecode.model.changeset_status import ChangesetStatusModel
35 35 from rhodecode.tests.fixture import Fixture
36 36
37 37 fixture = Fixture()
38 38
39 39 pytestmark = pytest.mark.usefixtures('baseapp')
40 40
41 41
42 42 @pytest.fixture()
43 43 def localizer():
44 44 def func(msg):
45 45 return msg
46 46 return func
47 47
48 48
49 49 def test_Message_extractor(localizer):
50 50 validator = v.ValidUsername(localizer)
51 51 pytest.raises(formencode.Invalid, validator.to_python, 'default')
52 52
53 53 class StateObj(object):
54 54 pass
55 55
56 56 pytest.raises(
57 57 formencode.Invalid, validator.to_python, 'default', StateObj)
58 58
59 59
60 60 def test_ValidUsername(localizer):
61 61 validator = v.ValidUsername(localizer)
62 62
63 63 pytest.raises(formencode.Invalid, validator.to_python, 'default')
64 64 pytest.raises(formencode.Invalid, validator.to_python, 'new_user')
65 65 pytest.raises(formencode.Invalid, validator.to_python, '.,')
66 66 pytest.raises(
67 67 formencode.Invalid, validator.to_python, TEST_USER_ADMIN_LOGIN)
68 68 assert 'test' == validator.to_python('test')
69 69
70 70 validator = v.ValidUsername(localizer, edit=True, old_data={'user_id': 1})
71 71
72 72
73 73 def test_ValidRepoUser(localizer):
74 74 validator = v.ValidRepoUser(localizer)
75 75 pytest.raises(formencode.Invalid, validator.to_python, 'nouser')
76 76 assert TEST_USER_ADMIN_LOGIN == \
77 77 validator.to_python(TEST_USER_ADMIN_LOGIN)
78 78
79 79
80 80 def test_ValidUserGroup(localizer):
81 81 validator = v.ValidUserGroup(localizer)
82 82 pytest.raises(formencode.Invalid, validator.to_python, 'default')
83 83 pytest.raises(formencode.Invalid, validator.to_python, '.,')
84 84
85 85 gr = fixture.create_user_group('test')
86 86 gr2 = fixture.create_user_group('tes2')
87 87 Session().commit()
88 88 pytest.raises(formencode.Invalid, validator.to_python, 'test')
89 89 assert gr.users_group_id is not None
90 90 validator = v.ValidUserGroup(localizer,
91 91 edit=True,
92 92 old_data={'users_group_id': gr2.users_group_id})
93 93
94 94 pytest.raises(formencode.Invalid, validator.to_python, 'test')
95 95 pytest.raises(formencode.Invalid, validator.to_python, 'TesT')
96 96 pytest.raises(formencode.Invalid, validator.to_python, 'TEST')
97 97 UserGroupModel().delete(gr)
98 98 UserGroupModel().delete(gr2)
99 99 Session().commit()
100 100
101 101
102 102 @pytest.fixture(scope='function')
103 103 def repo_group(request):
104 104 model = RepoGroupModel()
105 105 gr = model.create(
106 106 group_name='test_gr', group_description='desc', just_db=True,
107 107 owner=TEST_USER_ADMIN_LOGIN)
108 108
109 109 def cleanup():
110 110 model.delete(gr)
111 111
112 112 request.addfinalizer(cleanup)
113 113
114 114 return gr
115 115
116 116
117 117 def test_ValidRepoGroup_same_name_as_repo(localizer):
118 118 validator = v.ValidRepoGroup(localizer)
119 119 with pytest.raises(formencode.Invalid) as excinfo:
120 120 validator.to_python({'group_name': HG_REPO})
121 121 expected_msg = 'Repository with name "vcs_test_hg" already exists'
122 122 assert expected_msg in str(excinfo.value)
123 123
124 124
125 125 def test_ValidRepoGroup_group_exists(localizer, repo_group):
126 126 validator = v.ValidRepoGroup(localizer)
127 127 with pytest.raises(formencode.Invalid) as excinfo:
128 128 validator.to_python({'group_name': repo_group.group_name})
129 129 expected_msg = 'Group "test_gr" already exists'
130 130 assert expected_msg in str(excinfo.value)
131 131
132 132
133 133 def test_ValidRepoGroup_invalid_parent(localizer, repo_group):
134 134 validator = v.ValidRepoGroup(localizer, edit=True,
135 135 old_data={'group_id': repo_group.group_id})
136 136 with pytest.raises(formencode.Invalid) as excinfo:
137 137 validator.to_python({
138 138 'group_name': repo_group.group_name + 'n',
139 139 'group_parent_id': repo_group.group_id,
140 140 })
141 141 expected_msg = 'Cannot assign this group as parent'
142 142 assert expected_msg in str(excinfo.value)
143 143
144 144
145 145 def test_ValidRepoGroup_edit_group_no_root_permission(localizer, repo_group):
146 146 validator = v.ValidRepoGroup(localizer,
147 147 edit=True, old_data={'group_id': repo_group.group_id},
148 148 can_create_in_root=False)
149 149
150 150 # Cannot change parent
151 151 with pytest.raises(formencode.Invalid) as excinfo:
152 152 validator.to_python({'group_parent_id': '25'})
153 153 expected_msg = 'no permission to store repository group in root location'
154 154 assert expected_msg in str(excinfo.value)
155 155
156 156 # Chaning all the other fields is allowed
157 157 validator.to_python({'group_name': 'foo', 'group_parent_id': '-1'})
158 158 validator.to_python(
159 159 {'user': TEST_USER_REGULAR2_LOGIN, 'group_parent_id': '-1'})
160 160 validator.to_python({'group_description': 'bar', 'group_parent_id': '-1'})
161 161 validator.to_python({'enable_locking': 'true', 'group_parent_id': '-1'})
162 162
163 163
164 164 def test_ValidPassword(localizer):
165 165 validator = v.ValidPassword(localizer)
166 166 assert 'lol' == validator.to_python('lol')
167 assert None == validator.to_python(None)
167 assert None is validator.to_python(None)
168 168 pytest.raises(formencode.Invalid, validator.to_python, 'Δ…Δ‡ΕΌΕΊ')
169 169
170 170
171 171 def test_ValidPasswordsMatch(localizer):
172 172 validator = v.ValidPasswordsMatch(localizer)
173 173 pytest.raises(
174 174 formencode.Invalid,
175 175 validator.to_python, {'password': 'pass',
176 176 'password_confirmation': 'pass2'})
177 177
178 178 pytest.raises(
179 179 formencode.Invalid,
180 180 validator.to_python, {'new_password': 'pass',
181 181 'password_confirmation': 'pass2'})
182 182
183 183 assert {'new_password': 'pass', 'password_confirmation': 'pass'} == \
184 184 validator.to_python({'new_password': 'pass',
185 185 'password_confirmation': 'pass'})
186 186
187 187 assert {'password': 'pass', 'password_confirmation': 'pass'} == \
188 188 validator.to_python({'password': 'pass',
189 189 'password_confirmation': 'pass'})
190 190
191 191
192 192 def test_ValidAuth(localizer, config_stub):
193 193 config_stub.testing_securitypolicy()
194 194 config_stub.include('rhodecode.authentication')
195 195 config_stub.include('rhodecode.authentication.plugins.auth_rhodecode')
196 196 config_stub.include('rhodecode.authentication.plugins.auth_token')
197 197
198 198 validator = v.ValidAuth(localizer)
199 199 valid_creds = {
200 200 'username': TEST_USER_REGULAR2_LOGIN,
201 201 'password': TEST_USER_REGULAR2_PASS,
202 202 }
203 203 invalid_creds = {
204 204 'username': 'err',
205 205 'password': 'err',
206 206 }
207 207 assert valid_creds == validator.to_python(valid_creds)
208 208 pytest.raises(
209 209 formencode.Invalid, validator.to_python, invalid_creds)
210 210
211 211
212 212 def test_ValidRepoName(localizer):
213 213 validator = v.ValidRepoName(localizer)
214 214
215 215 pytest.raises(
216 216 formencode.Invalid, validator.to_python, {'repo_name': ''})
217 217
218 218 pytest.raises(
219 219 formencode.Invalid, validator.to_python, {'repo_name': HG_REPO})
220 220
221 221 gr = RepoGroupModel().create(group_name='group_test',
222 222 group_description='desc',
223 223 owner=TEST_USER_ADMIN_LOGIN)
224 224 pytest.raises(
225 225 formencode.Invalid, validator.to_python, {'repo_name': gr.group_name})
226 226
227 227 #TODO: write an error case for that ie. create a repo withinh a group
228 228 # pytest.raises(formencode.Invalid,
229 229 # validator.to_python, {'repo_name': 'some',
230 230 # 'repo_group': gr.group_id})
231 231
232 232
233 233 def test_ValidForkName(localizer):
234 234 # this uses ValidRepoName validator
235 235 assert True
236 236
237 237 @pytest.mark.parametrize("name, expected", [
238 238 ('test', 'test'), ('lolz!', 'lolz'), (' aavv', 'aavv'),
239 239 ('ala ma kota', 'ala-ma-kota'), ('@nooo', 'nooo'),
240 240 ('$!haha lolz !', 'haha-lolz'), ('$$$$$', ''), ('{}OK!', 'OK'),
241 241 ('/]re po', 're-po')])
242 242 def test_SlugifyName(name, expected, localizer):
243 243 validator = v.SlugifyName(localizer)
244 244 assert expected == validator.to_python(name)
245 245
246 246
247 247 def test_ValidForkType(localizer):
248 248 validator = v.ValidForkType(localizer, old_data={'repo_type': 'hg'})
249 249 assert 'hg' == validator.to_python('hg')
250 250 pytest.raises(formencode.Invalid, validator.to_python, 'git')
251 251
252 252
253 253 def test_ValidPath(localizer):
254 254 validator = v.ValidPath(localizer)
255 255 assert TESTS_TMP_PATH == validator.to_python(TESTS_TMP_PATH)
256 256 pytest.raises(
257 257 formencode.Invalid, validator.to_python, '/no_such_dir')
258 258
259 259
260 260 def test_UniqSystemEmail(localizer):
261 261 validator = v.UniqSystemEmail(localizer, old_data={})
262 262
263 263 assert 'mail@python.org' == validator.to_python('MaiL@Python.org')
264 264
265 265 email = TEST_USER_REGULAR2_EMAIL
266 266 pytest.raises(formencode.Invalid, validator.to_python, email)
267 267
268 268
269 269 def test_ValidSystemEmail(localizer):
270 270 validator = v.ValidSystemEmail(localizer)
271 271 email = TEST_USER_REGULAR2_EMAIL
272 272
273 273 assert email == validator.to_python(email)
274 274 pytest.raises(formencode.Invalid, validator.to_python, 'err')
275 275
276 276
277 277 def test_NotReviewedRevisions(localizer):
278 278 repo_id = Repository.get_by_repo_name(HG_REPO).repo_id
279 279 validator = v.NotReviewedRevisions(localizer, repo_id)
280 280 rev = '0' * 40
281 281 # add status for a rev, that should throw an error because it is already
282 282 # reviewed
283 283 new_status = ChangesetStatus()
284 284 new_status.author = ChangesetStatusModel()._get_user(TEST_USER_ADMIN_LOGIN)
285 285 new_status.repo = ChangesetStatusModel()._get_repo(HG_REPO)
286 286 new_status.status = ChangesetStatus.STATUS_APPROVED
287 287 new_status.comment = None
288 288 new_status.revision = rev
289 289 Session().add(new_status)
290 290 Session().commit()
291 291 try:
292 292 pytest.raises(formencode.Invalid, validator.to_python, [rev])
293 293 finally:
294 294 Session().delete(new_status)
295 295 Session().commit()
General Comments 0
You need to be logged in to leave comments. Login now