##// END OF EJS Templates
lint: ruff run
super-admin -
r5089:fed0c169 default
parent child Browse files
Show More
@@ -1,206 +1,206 b''
1
1
2 # Copyright (C) 2010-2023 RhodeCode GmbH
2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software: you can redistribute it and/or modify
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License, version 3
5 # it under the terms of the GNU Affero General Public License, version 3
6 # (only), as published by the Free Software Foundation.
6 # (only), as published by the Free Software Foundation.
7 #
7 #
8 # This program is distributed in the hope that it will be useful,
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
11 # GNU General Public License for more details.
12 #
12 #
13 # You should have received a copy of the GNU Affero General Public License
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 #
15 #
16 # This program is dual-licensed. If you wish to learn more about the
16 # This program is dual-licensed. If you wish to learn more about the
17 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19
19
20 import mock
20 import mock
21 import pytest
21 import pytest
22
22
23 from rhodecode.lib.auth import check_password
23 from rhodecode.lib.auth import check_password
24 from rhodecode.model.user import UserModel
24 from rhodecode.model.user import UserModel
25 from rhodecode.tests import (
25 from rhodecode.tests import (
26 TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_EMAIL)
26 TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_EMAIL)
27 from rhodecode.api.tests.utils import (
27 from rhodecode.api.tests.utils import (
28 build_data, api_call, assert_ok, assert_error, jsonify, crash)
28 build_data, api_call, assert_ok, assert_error, jsonify, crash)
29 from rhodecode.tests.fixture import Fixture
29 from rhodecode.tests.fixture import Fixture
30 from rhodecode.model.db import RepoGroup
30 from rhodecode.model.db import RepoGroup
31
31
32
32
33 # TODO: mikhail: remove fixture from here
33 # TODO: mikhail: remove fixture from here
34 fixture = Fixture()
34 fixture = Fixture()
35
35
36
36
37 @pytest.mark.usefixtures("testuser_api", "app")
37 @pytest.mark.usefixtures("testuser_api", "app")
38 class TestCreateUser(object):
38 class TestCreateUser(object):
39 def test_api_create_existing_user(self):
39 def test_api_create_existing_user(self):
40 id_, params = build_data(
40 id_, params = build_data(
41 self.apikey, 'create_user',
41 self.apikey, 'create_user',
42 username=TEST_USER_ADMIN_LOGIN,
42 username=TEST_USER_ADMIN_LOGIN,
43 email='test@foo.com',
43 email='test@foo.com',
44 password='trololo')
44 password='trololo')
45 response = api_call(self.app, params)
45 response = api_call(self.app, params)
46
46
47 expected = "user `%s` already exist" % (TEST_USER_ADMIN_LOGIN,)
47 expected = "user `%s` already exist" % (TEST_USER_ADMIN_LOGIN,)
48 assert_error(id_, expected, given=response.body)
48 assert_error(id_, expected, given=response.body)
49
49
50 def test_api_create_user_with_existing_email(self):
50 def test_api_create_user_with_existing_email(self):
51 id_, params = build_data(
51 id_, params = build_data(
52 self.apikey, 'create_user',
52 self.apikey, 'create_user',
53 username=TEST_USER_ADMIN_LOGIN + 'new',
53 username=TEST_USER_ADMIN_LOGIN + 'new',
54 email=TEST_USER_REGULAR_EMAIL,
54 email=TEST_USER_REGULAR_EMAIL,
55 password='trololo')
55 password='trololo')
56 response = api_call(self.app, params)
56 response = api_call(self.app, params)
57
57
58 expected = "email `%s` already exist" % (TEST_USER_REGULAR_EMAIL,)
58 expected = "email `%s` already exist" % (TEST_USER_REGULAR_EMAIL,)
59 assert_error(id_, expected, given=response.body)
59 assert_error(id_, expected, given=response.body)
60
60
61 def test_api_create_user_with_wrong_username(self):
61 def test_api_create_user_with_wrong_username(self):
62 bad_username = '<> HELLO WORLD <>'
62 bad_username = '<> HELLO WORLD <>'
63 id_, params = build_data(
63 id_, params = build_data(
64 self.apikey, 'create_user',
64 self.apikey, 'create_user',
65 username=bad_username,
65 username=bad_username,
66 email='new@email.com',
66 email='new@email.com',
67 password='trololo')
67 password='trololo')
68 response = api_call(self.app, params)
68 response = api_call(self.app, params)
69
69
70 expected = {'username':
70 expected = {'username':
71 "Username may only contain alphanumeric characters "
71 "Username may only contain alphanumeric characters "
72 "underscores, periods or dashes and must begin with "
72 "underscores, periods or dashes and must begin with "
73 "alphanumeric character or underscore"}
73 "alphanumeric character or underscore"}
74 assert_error(id_, expected, given=response.body)
74 assert_error(id_, expected, given=response.body)
75
75
76 def test_api_create_user(self):
76 def test_api_create_user(self):
77 username = 'test_new_api_user'
77 username = 'test_new_api_user'
78 email = username + "@foo.com"
78 email = username + "@foo.com"
79
79
80 id_, params = build_data(
80 id_, params = build_data(
81 self.apikey, 'create_user',
81 self.apikey, 'create_user',
82 username=username,
82 username=username,
83 email=email,
83 email=email,
84 description='CTO of Things',
84 description='CTO of Things',
85 password='example')
85 password='example')
86 response = api_call(self.app, params)
86 response = api_call(self.app, params)
87
87
88 usr = UserModel().get_by_username(username)
88 usr = UserModel().get_by_username(username)
89 ret = {
89 ret = {
90 'msg': 'created new user `%s`' % (username,),
90 'msg': 'created new user `%s`' % (username,),
91 'user': jsonify(usr.get_api_data(include_secrets=True)),
91 'user': jsonify(usr.get_api_data(include_secrets=True)),
92 }
92 }
93 try:
93 try:
94 expected = ret
94 expected = ret
95 assert check_password('example', usr.password)
95 assert check_password('example', usr.password)
96 assert_ok(id_, expected, given=response.body)
96 assert_ok(id_, expected, given=response.body)
97 finally:
97 finally:
98 fixture.destroy_user(usr.user_id)
98 fixture.destroy_user(usr.user_id)
99
99
100 def test_api_create_user_without_password(self):
100 def test_api_create_user_without_password(self):
101 username = 'test_new_api_user_passwordless'
101 username = 'test_new_api_user_passwordless'
102 email = username + "@foo.com"
102 email = username + "@foo.com"
103
103
104 id_, params = build_data(
104 id_, params = build_data(
105 self.apikey, 'create_user',
105 self.apikey, 'create_user',
106 username=username,
106 username=username,
107 email=email)
107 email=email)
108 response = api_call(self.app, params)
108 response = api_call(self.app, params)
109
109
110 usr = UserModel().get_by_username(username)
110 usr = UserModel().get_by_username(username)
111 ret = {
111 ret = {
112 'msg': 'created new user `%s`' % (username,),
112 'msg': 'created new user `%s`' % (username,),
113 'user': jsonify(usr.get_api_data(include_secrets=True)),
113 'user': jsonify(usr.get_api_data(include_secrets=True)),
114 }
114 }
115 try:
115 try:
116 expected = ret
116 expected = ret
117 assert_ok(id_, expected, given=response.body)
117 assert_ok(id_, expected, given=response.body)
118 finally:
118 finally:
119 fixture.destroy_user(usr.user_id)
119 fixture.destroy_user(usr.user_id)
120
120
121 def test_api_create_user_with_extern_name(self):
121 def test_api_create_user_with_extern_name(self):
122 username = 'test_new_api_user_passwordless'
122 username = 'test_new_api_user_passwordless'
123 email = username + "@foo.com"
123 email = username + "@foo.com"
124
124
125 id_, params = build_data(
125 id_, params = build_data(
126 self.apikey, 'create_user',
126 self.apikey, 'create_user',
127 username=username,
127 username=username,
128 email=email, extern_name='rhodecode')
128 email=email, extern_name='rhodecode')
129 response = api_call(self.app, params)
129 response = api_call(self.app, params)
130
130
131 usr = UserModel().get_by_username(username)
131 usr = UserModel().get_by_username(username)
132 ret = {
132 ret = {
133 'msg': 'created new user `%s`' % (username,),
133 'msg': 'created new user `%s`' % (username,),
134 'user': jsonify(usr.get_api_data(include_secrets=True)),
134 'user': jsonify(usr.get_api_data(include_secrets=True)),
135 }
135 }
136 try:
136 try:
137 expected = ret
137 expected = ret
138 assert_ok(id_, expected, given=response.body)
138 assert_ok(id_, expected, given=response.body)
139 finally:
139 finally:
140 fixture.destroy_user(usr.user_id)
140 fixture.destroy_user(usr.user_id)
141
141
142 def test_api_create_user_with_password_change(self):
142 def test_api_create_user_with_password_change(self):
143 username = 'test_new_api_user_password_change'
143 username = 'test_new_api_user_password_change'
144 email = username + "@foo.com"
144 email = username + "@foo.com"
145
145
146 id_, params = build_data(
146 id_, params = build_data(
147 self.apikey, 'create_user',
147 self.apikey, 'create_user',
148 username=username,
148 username=username,
149 email=email, extern_name='rhodecode',
149 email=email, extern_name='rhodecode',
150 force_password_change=True)
150 force_password_change=True)
151 response = api_call(self.app, params)
151 response = api_call(self.app, params)
152
152
153 usr = UserModel().get_by_username(username)
153 usr = UserModel().get_by_username(username)
154 ret = {
154 ret = {
155 'msg': 'created new user `%s`' % (username,),
155 'msg': 'created new user `%s`' % (username,),
156 'user': jsonify(usr.get_api_data(include_secrets=True)),
156 'user': jsonify(usr.get_api_data(include_secrets=True)),
157 }
157 }
158 try:
158 try:
159 expected = ret
159 expected = ret
160 assert_ok(id_, expected, given=response.body)
160 assert_ok(id_, expected, given=response.body)
161 finally:
161 finally:
162 fixture.destroy_user(usr.user_id)
162 fixture.destroy_user(usr.user_id)
163
163
164 def test_api_create_user_with_personal_repo_group(self):
164 def test_api_create_user_with_personal_repo_group(self):
165 username = 'test_new_api_user_personal_group'
165 username = 'test_new_api_user_personal_group'
166 email = username + "@foo.com"
166 email = username + "@foo.com"
167
167
168 id_, params = build_data(
168 id_, params = build_data(
169 self.apikey, 'create_user',
169 self.apikey, 'create_user',
170 username=username,
170 username=username,
171 email=email, extern_name='rhodecode',
171 email=email, extern_name='rhodecode',
172 create_personal_repo_group=True)
172 create_personal_repo_group=True)
173 response = api_call(self.app, params)
173 response = api_call(self.app, params)
174
174
175 usr = UserModel().get_by_username(username)
175 usr = UserModel().get_by_username(username)
176 ret = {
176 ret = {
177 'msg': 'created new user `%s`' % (username,),
177 'msg': 'created new user `%s`' % (username,),
178 'user': jsonify(usr.get_api_data(include_secrets=True)),
178 'user': jsonify(usr.get_api_data(include_secrets=True)),
179 }
179 }
180
180
181 personal_group = RepoGroup.get_by_group_name(username)
181 personal_group = RepoGroup.get_by_group_name(username)
182 assert personal_group
182 assert personal_group
183 assert personal_group.personal == True
183 assert personal_group.personal is True
184 assert personal_group.user.username == username
184 assert personal_group.user.username == username
185
185
186 try:
186 try:
187 expected = ret
187 expected = ret
188 assert_ok(id_, expected, given=response.body)
188 assert_ok(id_, expected, given=response.body)
189 finally:
189 finally:
190 fixture.destroy_repo_group(username)
190 fixture.destroy_repo_group(username)
191 fixture.destroy_user(usr.user_id)
191 fixture.destroy_user(usr.user_id)
192
192
193 @mock.patch.object(UserModel, 'create_or_update', crash)
193 @mock.patch.object(UserModel, 'create_or_update', crash)
194 def test_api_create_user_when_exception_happened(self):
194 def test_api_create_user_when_exception_happened(self):
195
195
196 username = 'test_new_api_user'
196 username = 'test_new_api_user'
197 email = username + "@foo.com"
197 email = username + "@foo.com"
198
198
199 id_, params = build_data(
199 id_, params = build_data(
200 self.apikey, 'create_user',
200 self.apikey, 'create_user',
201 username=username,
201 username=username,
202 email=email,
202 email=email,
203 password='trololo')
203 password='trololo')
204 response = api_call(self.app, params)
204 response = api_call(self.app, params)
205 expected = 'failed to create user `%s`' % (username,)
205 expected = 'failed to create user `%s`' % (username,)
206 assert_error(id_, expected, given=response.body)
206 assert_error(id_, expected, given=response.body)
@@ -1,255 +1,255 b''
1
1
2 # Copyright (C) 2010-2023 RhodeCode GmbH
2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software: you can redistribute it and/or modify
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License, version 3
5 # it under the terms of the GNU Affero General Public License, version 3
6 # (only), as published by the Free Software Foundation.
6 # (only), as published by the Free Software Foundation.
7 #
7 #
8 # This program is distributed in the hope that it will be useful,
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
11 # GNU General Public License for more details.
12 #
12 #
13 # You should have received a copy of the GNU Affero General Public License
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 #
15 #
16 # This program is dual-licensed. If you wish to learn more about the
16 # This program is dual-licensed. If you wish to learn more about the
17 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19
19
20 import pytest
20 import pytest
21
21
22 from rhodecode.tests import (
22 from rhodecode.tests import (
23 TestController, assert_session_flash, TEST_USER_ADMIN_LOGIN)
23 TestController, assert_session_flash, TEST_USER_ADMIN_LOGIN)
24 from rhodecode.model.db import UserGroup
24 from rhodecode.model.db import UserGroup
25 from rhodecode.model.meta import Session
25 from rhodecode.model.meta import Session
26 from rhodecode.tests.fixture import Fixture
26 from rhodecode.tests.fixture import Fixture
27
27
28 fixture = Fixture()
28 fixture = Fixture()
29
29
30
30
31 def route_path(name, params=None, **kwargs):
31 def route_path(name, params=None, **kwargs):
32 import urllib.request
32 import urllib.request
33 import urllib.parse
33 import urllib.parse
34 import urllib.error
34 import urllib.error
35 from rhodecode.apps._base import ADMIN_PREFIX
35 from rhodecode.apps._base import ADMIN_PREFIX
36
36
37 base_url = {
37 base_url = {
38 'user_groups': ADMIN_PREFIX + '/user_groups',
38 'user_groups': ADMIN_PREFIX + '/user_groups',
39 'user_groups_data': ADMIN_PREFIX + '/user_groups_data',
39 'user_groups_data': ADMIN_PREFIX + '/user_groups_data',
40 'user_group_members_data': ADMIN_PREFIX + '/user_groups/{user_group_id}/members',
40 'user_group_members_data': ADMIN_PREFIX + '/user_groups/{user_group_id}/members',
41 'user_groups_new': ADMIN_PREFIX + '/user_groups/new',
41 'user_groups_new': ADMIN_PREFIX + '/user_groups/new',
42 'user_groups_create': ADMIN_PREFIX + '/user_groups/create',
42 'user_groups_create': ADMIN_PREFIX + '/user_groups/create',
43 'edit_user_group': ADMIN_PREFIX + '/user_groups/{user_group_id}/edit',
43 'edit_user_group': ADMIN_PREFIX + '/user_groups/{user_group_id}/edit',
44 'edit_user_group_advanced_sync': ADMIN_PREFIX + '/user_groups/{user_group_id}/edit/advanced/sync',
44 'edit_user_group_advanced_sync': ADMIN_PREFIX + '/user_groups/{user_group_id}/edit/advanced/sync',
45 'edit_user_group_global_perms_update': ADMIN_PREFIX + '/user_groups/{user_group_id}/edit/global_permissions/update',
45 'edit_user_group_global_perms_update': ADMIN_PREFIX + '/user_groups/{user_group_id}/edit/global_permissions/update',
46 'user_groups_update': ADMIN_PREFIX + '/user_groups/{user_group_id}/update',
46 'user_groups_update': ADMIN_PREFIX + '/user_groups/{user_group_id}/update',
47 'user_groups_delete': ADMIN_PREFIX + '/user_groups/{user_group_id}/delete',
47 'user_groups_delete': ADMIN_PREFIX + '/user_groups/{user_group_id}/delete',
48
48
49 }[name].format(**kwargs)
49 }[name].format(**kwargs)
50
50
51 if params:
51 if params:
52 base_url = '{}?{}'.format(base_url, urllib.parse.urlencode(params))
52 base_url = '{}?{}'.format(base_url, urllib.parse.urlencode(params))
53 return base_url
53 return base_url
54
54
55
55
56 class TestUserGroupsView(TestController):
56 class TestUserGroupsView(TestController):
57
57
58 def test_set_synchronization(self, user_util):
58 def test_set_synchronization(self, user_util):
59 self.log_user()
59 self.log_user()
60 user_group_name = user_util.create_user_group().users_group_name
60 user_group_name = user_util.create_user_group().users_group_name
61
61
62 group = Session().query(UserGroup).filter(
62 group = Session().query(UserGroup).filter(
63 UserGroup.users_group_name == user_group_name).one()
63 UserGroup.users_group_name == user_group_name).one()
64
64
65 assert group.group_data.get('extern_type') is None
65 assert group.group_data.get('extern_type') is None
66
66
67 # enable
67 # enable
68 self.app.post(
68 self.app.post(
69 route_path('edit_user_group_advanced_sync',
69 route_path('edit_user_group_advanced_sync',
70 user_group_id=group.users_group_id),
70 user_group_id=group.users_group_id),
71 params={'csrf_token': self.csrf_token}, status=302)
71 params={'csrf_token': self.csrf_token}, status=302)
72
72
73 group = Session().query(UserGroup).filter(
73 group = Session().query(UserGroup).filter(
74 UserGroup.users_group_name == user_group_name).one()
74 UserGroup.users_group_name == user_group_name).one()
75 assert group.group_data.get('extern_type') == 'manual'
75 assert group.group_data.get('extern_type') == 'manual'
76 assert group.group_data.get('extern_type_set_by') == TEST_USER_ADMIN_LOGIN
76 assert group.group_data.get('extern_type_set_by') == TEST_USER_ADMIN_LOGIN
77
77
78 # disable
78 # disable
79 self.app.post(
79 self.app.post(
80 route_path('edit_user_group_advanced_sync',
80 route_path('edit_user_group_advanced_sync',
81 user_group_id=group.users_group_id),
81 user_group_id=group.users_group_id),
82 params={'csrf_token': self.csrf_token}, status=302)
82 params={'csrf_token': self.csrf_token}, status=302)
83
83
84 group = Session().query(UserGroup).filter(
84 group = Session().query(UserGroup).filter(
85 UserGroup.users_group_name == user_group_name).one()
85 UserGroup.users_group_name == user_group_name).one()
86 assert group.group_data.get('extern_type') is None
86 assert group.group_data.get('extern_type') is None
87 assert group.group_data.get('extern_type_set_by') == TEST_USER_ADMIN_LOGIN
87 assert group.group_data.get('extern_type_set_by') == TEST_USER_ADMIN_LOGIN
88
88
89 def test_delete_user_group(self, user_util):
89 def test_delete_user_group(self, user_util):
90 self.log_user()
90 self.log_user()
91 user_group_id = user_util.create_user_group().users_group_id
91 user_group_id = user_util.create_user_group().users_group_id
92
92
93 group = Session().query(UserGroup).filter(
93 group = Session().query(UserGroup).filter(
94 UserGroup.users_group_id == user_group_id).one()
94 UserGroup.users_group_id == user_group_id).one()
95
95
96 self.app.post(
96 self.app.post(
97 route_path('user_groups_delete', user_group_id=group.users_group_id),
97 route_path('user_groups_delete', user_group_id=group.users_group_id),
98 params={'csrf_token': self.csrf_token})
98 params={'csrf_token': self.csrf_token})
99
99
100 group = Session().query(UserGroup).filter(
100 group = Session().query(UserGroup).filter(
101 UserGroup.users_group_id == user_group_id).scalar()
101 UserGroup.users_group_id == user_group_id).scalar()
102
102
103 assert group is None
103 assert group is None
104
104
105 @pytest.mark.parametrize('repo_create, repo_create_write, user_group_create, repo_group_create, fork_create, inherit_default_permissions, expect_error, expect_form_error', [
105 @pytest.mark.parametrize('repo_create, repo_create_write, user_group_create, repo_group_create, fork_create, inherit_default_permissions, expect_error, expect_form_error', [
106 ('hg.create.none', 'hg.create.write_on_repogroup.false', 'hg.usergroup.create.false', 'hg.repogroup.create.false', 'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
106 ('hg.create.none', 'hg.create.write_on_repogroup.false', 'hg.usergroup.create.false', 'hg.repogroup.create.false', 'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
107 ('hg.create.repository', 'hg.create.write_on_repogroup.true', 'hg.usergroup.create.true', 'hg.repogroup.create.true', 'hg.fork.repository', 'hg.inherit_default_perms.false', False, False),
107 ('hg.create.repository', 'hg.create.write_on_repogroup.true', 'hg.usergroup.create.true', 'hg.repogroup.create.true', 'hg.fork.repository', 'hg.inherit_default_perms.false', False, False),
108 ('hg.create.XXX', 'hg.create.write_on_repogroup.true', 'hg.usergroup.create.true', 'hg.repogroup.create.true', 'hg.fork.repository', 'hg.inherit_default_perms.false', False, True),
108 ('hg.create.XXX', 'hg.create.write_on_repogroup.true', 'hg.usergroup.create.true', 'hg.repogroup.create.true', 'hg.fork.repository', 'hg.inherit_default_perms.false', False, True),
109 ('', '', '', '', '', '', True, False),
109 ('', '', '', '', '', '', True, False),
110 ])
110 ])
111 def test_global_permissions_on_user_group(
111 def test_global_permissions_on_user_group(
112 self, repo_create, repo_create_write, user_group_create,
112 self, repo_create, repo_create_write, user_group_create,
113 repo_group_create, fork_create, expect_error, expect_form_error,
113 repo_group_create, fork_create, expect_error, expect_form_error,
114 inherit_default_permissions, user_util):
114 inherit_default_permissions, user_util):
115
115
116 self.log_user()
116 self.log_user()
117 user_group = user_util.create_user_group()
117 user_group = user_util.create_user_group()
118
118
119 user_group_name = user_group.users_group_name
119 user_group_name = user_group.users_group_name
120 user_group_id = user_group.users_group_id
120 user_group_id = user_group.users_group_id
121
121
122 # ENABLE REPO CREATE ON A GROUP
122 # ENABLE REPO CREATE ON A GROUP
123 perm_params = {
123 perm_params = {
124 'inherit_default_permissions': False,
124 'inherit_default_permissions': False,
125 'default_repo_create': repo_create,
125 'default_repo_create': repo_create,
126 'default_repo_create_on_write': repo_create_write,
126 'default_repo_create_on_write': repo_create_write,
127 'default_user_group_create': user_group_create,
127 'default_user_group_create': user_group_create,
128 'default_repo_group_create': repo_group_create,
128 'default_repo_group_create': repo_group_create,
129 'default_fork_create': fork_create,
129 'default_fork_create': fork_create,
130 'default_inherit_default_permissions': inherit_default_permissions,
130 'default_inherit_default_permissions': inherit_default_permissions,
131
131
132 'csrf_token': self.csrf_token,
132 'csrf_token': self.csrf_token,
133 }
133 }
134 response = self.app.post(
134 response = self.app.post(
135 route_path('edit_user_group_global_perms_update',
135 route_path('edit_user_group_global_perms_update',
136 user_group_id=user_group_id),
136 user_group_id=user_group_id),
137 params=perm_params)
137 params=perm_params)
138
138
139 if expect_form_error:
139 if expect_form_error:
140 assert response.status_int == 200
140 assert response.status_int == 200
141 response.mustcontain('Value must be one of')
141 response.mustcontain('Value must be one of')
142 else:
142 else:
143 if expect_error:
143 if expect_error:
144 msg = 'An error occurred during permissions saving'
144 msg = 'An error occurred during permissions saving'
145 else:
145 else:
146 msg = 'User Group global permissions updated successfully'
146 msg = 'User Group global permissions updated successfully'
147 ug = UserGroup.get_by_group_name(user_group_name)
147 ug = UserGroup.get_by_group_name(user_group_name)
148 del perm_params['csrf_token']
148 del perm_params['csrf_token']
149 del perm_params['inherit_default_permissions']
149 del perm_params['inherit_default_permissions']
150 assert perm_params == ug.get_default_perms()
150 assert perm_params == ug.get_default_perms()
151 assert_session_flash(response, msg)
151 assert_session_flash(response, msg)
152
152
153 def test_edit_view(self, user_util):
153 def test_edit_view(self, user_util):
154 self.log_user()
154 self.log_user()
155
155
156 user_group = user_util.create_user_group()
156 user_group = user_util.create_user_group()
157 self.app.get(
157 self.app.get(
158 route_path('edit_user_group',
158 route_path('edit_user_group',
159 user_group_id=user_group.users_group_id),
159 user_group_id=user_group.users_group_id),
160 status=200)
160 status=200)
161
161
162 def test_update_user_group(self, user_util):
162 def test_update_user_group(self, user_util):
163 user = self.log_user()
163 user = self.log_user()
164
164
165 user_group = user_util.create_user_group()
165 user_group = user_util.create_user_group()
166 users_group_id = user_group.users_group_id
166 users_group_id = user_group.users_group_id
167 new_name = user_group.users_group_name + '_CHANGE'
167 new_name = user_group.users_group_name + '_CHANGE'
168
168
169 params = [
169 params = [
170 ('users_group_active', False),
170 ('users_group_active', False),
171 ('user_group_description', 'DESC'),
171 ('user_group_description', 'DESC'),
172 ('users_group_name', new_name),
172 ('users_group_name', new_name),
173 ('user', user['username']),
173 ('user', user['username']),
174 ('csrf_token', self.csrf_token),
174 ('csrf_token', self.csrf_token),
175 ('__start__', 'user_group_members:sequence'),
175 ('__start__', 'user_group_members:sequence'),
176 ('__start__', 'member:mapping'),
176 ('__start__', 'member:mapping'),
177 ('member_user_id', user['user_id']),
177 ('member_user_id', user['user_id']),
178 ('type', 'existing'),
178 ('type', 'existing'),
179 ('__end__', 'member:mapping'),
179 ('__end__', 'member:mapping'),
180 ('__end__', 'user_group_members:sequence'),
180 ('__end__', 'user_group_members:sequence'),
181 ]
181 ]
182
182
183 self.app.post(
183 self.app.post(
184 route_path('user_groups_update',
184 route_path('user_groups_update',
185 user_group_id=users_group_id),
185 user_group_id=users_group_id),
186 params=params,
186 params=params,
187 status=302)
187 status=302)
188
188
189 user_group = UserGroup.get(users_group_id)
189 user_group = UserGroup.get(users_group_id)
190 assert user_group
190 assert user_group
191
191
192 assert user_group.users_group_name == new_name
192 assert user_group.users_group_name == new_name
193 assert user_group.user_group_description == 'DESC'
193 assert user_group.user_group_description == 'DESC'
194 assert user_group.users_group_active == False
194 assert user_group.users_group_active is False
195
195
196 def test_update_user_group_name_conflicts(self, user_util):
196 def test_update_user_group_name_conflicts(self, user_util):
197 self.log_user()
197 self.log_user()
198 user_group_old = user_util.create_user_group()
198 user_group_old = user_util.create_user_group()
199 new_name = user_group_old.users_group_name
199 new_name = user_group_old.users_group_name
200
200
201 user_group = user_util.create_user_group()
201 user_group = user_util.create_user_group()
202
202
203 params = dict(
203 params = dict(
204 users_group_active=False,
204 users_group_active=False,
205 user_group_description='DESC',
205 user_group_description='DESC',
206 users_group_name=new_name,
206 users_group_name=new_name,
207 csrf_token=self.csrf_token)
207 csrf_token=self.csrf_token)
208
208
209 response = self.app.post(
209 response = self.app.post(
210 route_path('user_groups_update',
210 route_path('user_groups_update',
211 user_group_id=user_group.users_group_id),
211 user_group_id=user_group.users_group_id),
212 params=params,
212 params=params,
213 status=200)
213 status=200)
214
214
215 response.mustcontain('User group `{}` already exists'.format(
215 response.mustcontain('User group `{}` already exists'.format(
216 new_name))
216 new_name))
217
217
218 def test_update_members_from_user_ids(self, user_regular):
218 def test_update_members_from_user_ids(self, user_regular):
219 uid = user_regular.user_id
219 uid = user_regular.user_id
220 username = user_regular.username
220 username = user_regular.username
221 self.log_user()
221 self.log_user()
222
222
223 user_group = fixture.create_user_group('test_gr_ids')
223 user_group = fixture.create_user_group('test_gr_ids')
224 assert user_group.members == []
224 assert user_group.members == []
225 assert user_group.user != user_regular
225 assert user_group.user != user_regular
226 expected_active_state = not user_group.users_group_active
226 expected_active_state = not user_group.users_group_active
227
227
228 form_data = [
228 form_data = [
229 ('csrf_token', self.csrf_token),
229 ('csrf_token', self.csrf_token),
230 ('user', username),
230 ('user', username),
231 ('users_group_name', 'changed_name'),
231 ('users_group_name', 'changed_name'),
232 ('users_group_active', expected_active_state),
232 ('users_group_active', expected_active_state),
233 ('user_group_description', 'changed_description'),
233 ('user_group_description', 'changed_description'),
234
234
235 ('__start__', 'user_group_members:sequence'),
235 ('__start__', 'user_group_members:sequence'),
236 ('__start__', 'member:mapping'),
236 ('__start__', 'member:mapping'),
237 ('member_user_id', uid),
237 ('member_user_id', uid),
238 ('type', 'existing'),
238 ('type', 'existing'),
239 ('__end__', 'member:mapping'),
239 ('__end__', 'member:mapping'),
240 ('__end__', 'user_group_members:sequence'),
240 ('__end__', 'user_group_members:sequence'),
241 ]
241 ]
242 ugid = user_group.users_group_id
242 ugid = user_group.users_group_id
243 self.app.post(
243 self.app.post(
244 route_path('user_groups_update', user_group_id=ugid), form_data)
244 route_path('user_groups_update', user_group_id=ugid), form_data)
245
245
246 user_group = UserGroup.get(ugid)
246 user_group = UserGroup.get(ugid)
247 assert user_group
247 assert user_group
248
248
249 assert user_group.members[0].user_id == uid
249 assert user_group.members[0].user_id == uid
250 assert user_group.user_id == uid
250 assert user_group.user_id == uid
251 assert 'changed_name' in user_group.users_group_name
251 assert 'changed_name' in user_group.users_group_name
252 assert 'changed_description' in user_group.user_group_description
252 assert 'changed_description' in user_group.user_group_description
253 assert user_group.users_group_active == expected_active_state
253 assert user_group.users_group_active == expected_active_state
254
254
255 fixture.destroy_user_group(user_group)
255 fixture.destroy_user_group(user_group)
@@ -1,297 +1,297 b''
1
1
2
2
3 # Copyright (C) 2012-2023 RhodeCode GmbH
3 # Copyright (C) 2012-2023 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 RhodeCode authentication plugin for Atlassian CROWD
22 RhodeCode authentication plugin for Atlassian CROWD
23 """
23 """
24
24
25
25
26 import colander
26 import colander
27 import base64
27 import base64
28 import logging
28 import logging
29 import urllib.request
29 import urllib.request
30 import urllib.error
30 import urllib.error
31 import urllib.parse
31 import urllib.parse
32
32
33 from rhodecode.translation import _
33 from rhodecode.translation import _
34 from rhodecode.authentication.base import (
34 from rhodecode.authentication.base import (
35 RhodeCodeExternalAuthPlugin, hybrid_property)
35 RhodeCodeExternalAuthPlugin, hybrid_property)
36 from rhodecode.authentication.schema import AuthnPluginSettingsSchemaBase
36 from rhodecode.authentication.schema import AuthnPluginSettingsSchemaBase
37 from rhodecode.authentication.routes import AuthnPluginResourceBase
37 from rhodecode.authentication.routes import AuthnPluginResourceBase
38 from rhodecode.lib.colander_utils import strip_whitespace
38 from rhodecode.lib.colander_utils import strip_whitespace
39 from rhodecode.lib.ext_json import json, formatted_json
39 from rhodecode.lib.ext_json import json, formatted_json
40 from rhodecode.model.db import User
40 from rhodecode.model.db import User
41
41
42 log = logging.getLogger(__name__)
42 log = logging.getLogger(__name__)
43
43
44
44
45 def plugin_factory(plugin_id, *args, **kwargs):
45 def plugin_factory(plugin_id, *args, **kwargs):
46 """
46 """
47 Factory function that is called during plugin discovery.
47 Factory function that is called during plugin discovery.
48 It returns the plugin instance.
48 It returns the plugin instance.
49 """
49 """
50 plugin = RhodeCodeAuthPlugin(plugin_id)
50 plugin = RhodeCodeAuthPlugin(plugin_id)
51 return plugin
51 return plugin
52
52
53
53
54 class CrowdAuthnResource(AuthnPluginResourceBase):
54 class CrowdAuthnResource(AuthnPluginResourceBase):
55 pass
55 pass
56
56
57
57
58 class CrowdSettingsSchema(AuthnPluginSettingsSchemaBase):
58 class CrowdSettingsSchema(AuthnPluginSettingsSchemaBase):
59 host = colander.SchemaNode(
59 host = colander.SchemaNode(
60 colander.String(),
60 colander.String(),
61 default='127.0.0.1',
61 default='127.0.0.1',
62 description=_('The FQDN or IP of the Atlassian CROWD Server'),
62 description=_('The FQDN or IP of the Atlassian CROWD Server'),
63 preparer=strip_whitespace,
63 preparer=strip_whitespace,
64 title=_('Host'),
64 title=_('Host'),
65 widget='string')
65 widget='string')
66 port = colander.SchemaNode(
66 port = colander.SchemaNode(
67 colander.Int(),
67 colander.Int(),
68 default=8095,
68 default=8095,
69 description=_('The Port in use by the Atlassian CROWD Server'),
69 description=_('The Port in use by the Atlassian CROWD Server'),
70 preparer=strip_whitespace,
70 preparer=strip_whitespace,
71 title=_('Port'),
71 title=_('Port'),
72 validator=colander.Range(min=0, max=65536),
72 validator=colander.Range(min=0, max=65536),
73 widget='int')
73 widget='int')
74 app_name = colander.SchemaNode(
74 app_name = colander.SchemaNode(
75 colander.String(),
75 colander.String(),
76 default='',
76 default='',
77 description=_('The Application Name to authenticate to CROWD'),
77 description=_('The Application Name to authenticate to CROWD'),
78 preparer=strip_whitespace,
78 preparer=strip_whitespace,
79 title=_('Application Name'),
79 title=_('Application Name'),
80 widget='string')
80 widget='string')
81 app_password = colander.SchemaNode(
81 app_password = colander.SchemaNode(
82 colander.String(),
82 colander.String(),
83 default='',
83 default='',
84 description=_('The password to authenticate to CROWD'),
84 description=_('The password to authenticate to CROWD'),
85 preparer=strip_whitespace,
85 preparer=strip_whitespace,
86 title=_('Application Password'),
86 title=_('Application Password'),
87 widget='password')
87 widget='password')
88 admin_groups = colander.SchemaNode(
88 admin_groups = colander.SchemaNode(
89 colander.String(),
89 colander.String(),
90 default='',
90 default='',
91 description=_('A comma separated list of group names that identify '
91 description=_('A comma separated list of group names that identify '
92 'users as RhodeCode Administrators'),
92 'users as RhodeCode Administrators'),
93 missing='',
93 missing='',
94 preparer=strip_whitespace,
94 preparer=strip_whitespace,
95 title=_('Admin Groups'),
95 title=_('Admin Groups'),
96 widget='string')
96 widget='string')
97
97
98
98
99 class CrowdServer(object):
99 class CrowdServer(object):
100 def __init__(self, *args, **kwargs):
100 def __init__(self, *args, **kwargs):
101 """
101 """
102 Create a new CrowdServer object that points to IP/Address 'host',
102 Create a new CrowdServer object that points to IP/Address 'host',
103 on the given port, and using the given method (https/http). user and
103 on the given port, and using the given method (https/http). user and
104 passwd can be set here or with set_credentials. If unspecified,
104 passwd can be set here or with set_credentials. If unspecified,
105 "version" defaults to "latest".
105 "version" defaults to "latest".
106
106
107 example::
107 example::
108
108
109 cserver = CrowdServer(host="127.0.0.1",
109 cserver = CrowdServer(host="127.0.0.1",
110 port="8095",
110 port="8095",
111 user="some_app",
111 user="some_app",
112 passwd="some_passwd",
112 passwd="some_passwd",
113 version="1")
113 version="1")
114 """
114 """
115 if not "port" in kwargs:
115 if 'port' not in kwargs:
116 kwargs["port"] = "8095"
116 kwargs["port"] = "8095"
117 self._logger = kwargs.get("logger", logging.getLogger(__name__))
117 self._logger = kwargs.get("logger", logging.getLogger(__name__))
118 self._uri = "%s://%s:%s/crowd" % (kwargs.get("method", "http"),
118 self._uri = "%s://%s:%s/crowd" % (kwargs.get("method", "http"),
119 kwargs.get("host", "127.0.0.1"),
119 kwargs.get("host", "127.0.0.1"),
120 kwargs.get("port", "8095"))
120 kwargs.get("port", "8095"))
121 self.set_credentials(kwargs.get("user", ""),
121 self.set_credentials(kwargs.get("user", ""),
122 kwargs.get("passwd", ""))
122 kwargs.get("passwd", ""))
123 self._version = kwargs.get("version", "latest")
123 self._version = kwargs.get("version", "latest")
124 self._url_list = None
124 self._url_list = None
125 self._appname = "crowd"
125 self._appname = "crowd"
126
126
127 def set_credentials(self, user, passwd):
127 def set_credentials(self, user, passwd):
128 self.user = user
128 self.user = user
129 self.passwd = passwd
129 self.passwd = passwd
130 self._make_opener()
130 self._make_opener()
131
131
132 def _make_opener(self):
132 def _make_opener(self):
133 mgr = urllib.request.HTTPPasswordMgrWithDefaultRealm()
133 mgr = urllib.request.HTTPPasswordMgrWithDefaultRealm()
134 mgr.add_password(None, self._uri, self.user, self.passwd)
134 mgr.add_password(None, self._uri, self.user, self.passwd)
135 handler = urllib.request.HTTPBasicAuthHandler(mgr)
135 handler = urllib.request.HTTPBasicAuthHandler(mgr)
136 self.opener = urllib.request.build_opener(handler)
136 self.opener = urllib.request.build_opener(handler)
137
137
138 def _request(self, url, body=None, headers=None,
138 def _request(self, url, body=None, headers=None,
139 method=None, noformat=False,
139 method=None, noformat=False,
140 empty_response_ok=False):
140 empty_response_ok=False):
141 _headers = {"Content-type": "application/json",
141 _headers = {"Content-type": "application/json",
142 "Accept": "application/json"}
142 "Accept": "application/json"}
143 if self.user and self.passwd:
143 if self.user and self.passwd:
144 authstring = base64.b64encode("%s:%s" % (self.user, self.passwd))
144 authstring = base64.b64encode("%s:%s" % (self.user, self.passwd))
145 _headers["Authorization"] = "Basic %s" % authstring
145 _headers["Authorization"] = "Basic %s" % authstring
146 if headers:
146 if headers:
147 _headers.update(headers)
147 _headers.update(headers)
148 log.debug("Sent crowd: \n%s"
148 log.debug("Sent crowd: \n%s"
149 % (formatted_json({"url": url, "body": body,
149 % (formatted_json({"url": url, "body": body,
150 "headers": _headers})))
150 "headers": _headers})))
151 request = urllib.request.Request(url, body, _headers)
151 request = urllib.request.Request(url, body, _headers)
152 if method:
152 if method:
153 request.get_method = lambda: method
153 request.get_method = lambda: method
154
154
155 global msg
155 global msg
156 msg = ""
156 msg = ""
157 try:
157 try:
158 ret_doc = self.opener.open(request)
158 ret_doc = self.opener.open(request)
159 msg = ret_doc.read()
159 msg = ret_doc.read()
160 if not msg and empty_response_ok:
160 if not msg and empty_response_ok:
161 ret_val = {}
161 ret_val = {}
162 ret_val["status"] = True
162 ret_val["status"] = True
163 ret_val["error"] = "Response body was empty"
163 ret_val["error"] = "Response body was empty"
164 elif not noformat:
164 elif not noformat:
165 ret_val = json.loads(msg)
165 ret_val = json.loads(msg)
166 ret_val["status"] = True
166 ret_val["status"] = True
167 else:
167 else:
168 ret_val = msg
168 ret_val = msg
169 except Exception as e:
169 except Exception as e:
170 if not noformat:
170 if not noformat:
171 ret_val = {"status": False,
171 ret_val = {"status": False,
172 "body": body,
172 "body": body,
173 "error": "{}\n{}".format(e, msg)}
173 "error": "{}\n{}".format(e, msg)}
174 else:
174 else:
175 ret_val = None
175 ret_val = None
176 return ret_val
176 return ret_val
177
177
178 def user_auth(self, username, password):
178 def user_auth(self, username, password):
179 """Authenticate a user against crowd. Returns brief information about
179 """Authenticate a user against crowd. Returns brief information about
180 the user."""
180 the user."""
181 url = ("%s/rest/usermanagement/%s/authentication?username=%s"
181 url = ("%s/rest/usermanagement/%s/authentication?username=%s"
182 % (self._uri, self._version, username))
182 % (self._uri, self._version, username))
183 body = json.dumps({"value": password})
183 body = json.dumps({"value": password})
184 return self._request(url, body)
184 return self._request(url, body)
185
185
186 def user_groups(self, username):
186 def user_groups(self, username):
187 """Retrieve a list of groups to which this user belongs."""
187 """Retrieve a list of groups to which this user belongs."""
188 url = ("%s/rest/usermanagement/%s/user/group/nested?username=%s"
188 url = ("%s/rest/usermanagement/%s/user/group/nested?username=%s"
189 % (self._uri, self._version, username))
189 % (self._uri, self._version, username))
190 return self._request(url)
190 return self._request(url)
191
191
192
192
193 class RhodeCodeAuthPlugin(RhodeCodeExternalAuthPlugin):
193 class RhodeCodeAuthPlugin(RhodeCodeExternalAuthPlugin):
194 uid = 'crowd'
194 uid = 'crowd'
195 _settings_unsafe_keys = ['app_password']
195 _settings_unsafe_keys = ['app_password']
196
196
197 def includeme(self, config):
197 def includeme(self, config):
198 config.add_authn_plugin(self)
198 config.add_authn_plugin(self)
199 config.add_authn_resource(self.get_id(), CrowdAuthnResource(self))
199 config.add_authn_resource(self.get_id(), CrowdAuthnResource(self))
200 config.add_view(
200 config.add_view(
201 'rhodecode.authentication.views.AuthnPluginViewBase',
201 'rhodecode.authentication.views.AuthnPluginViewBase',
202 attr='settings_get',
202 attr='settings_get',
203 renderer='rhodecode:templates/admin/auth/plugin_settings.mako',
203 renderer='rhodecode:templates/admin/auth/plugin_settings.mako',
204 request_method='GET',
204 request_method='GET',
205 route_name='auth_home',
205 route_name='auth_home',
206 context=CrowdAuthnResource)
206 context=CrowdAuthnResource)
207 config.add_view(
207 config.add_view(
208 'rhodecode.authentication.views.AuthnPluginViewBase',
208 'rhodecode.authentication.views.AuthnPluginViewBase',
209 attr='settings_post',
209 attr='settings_post',
210 renderer='rhodecode:templates/admin/auth/plugin_settings.mako',
210 renderer='rhodecode:templates/admin/auth/plugin_settings.mako',
211 request_method='POST',
211 request_method='POST',
212 route_name='auth_home',
212 route_name='auth_home',
213 context=CrowdAuthnResource)
213 context=CrowdAuthnResource)
214
214
215 def get_settings_schema(self):
215 def get_settings_schema(self):
216 return CrowdSettingsSchema()
216 return CrowdSettingsSchema()
217
217
218 def get_display_name(self, load_from_settings=False):
218 def get_display_name(self, load_from_settings=False):
219 return _('CROWD')
219 return _('CROWD')
220
220
221 @classmethod
221 @classmethod
222 def docs(cls):
222 def docs(cls):
223 return "https://docs.rhodecode.com/RhodeCode-Enterprise/auth/auth-crowd.html"
223 return "https://docs.rhodecode.com/RhodeCode-Enterprise/auth/auth-crowd.html"
224
224
225 @hybrid_property
225 @hybrid_property
226 def name(self):
226 def name(self):
227 return u"crowd"
227 return u"crowd"
228
228
229 def use_fake_password(self):
229 def use_fake_password(self):
230 return True
230 return True
231
231
232 def user_activation_state(self):
232 def user_activation_state(self):
233 def_user_perms = User.get_default_user().AuthUser().permissions['global']
233 def_user_perms = User.get_default_user().AuthUser().permissions['global']
234 return 'hg.extern_activate.auto' in def_user_perms
234 return 'hg.extern_activate.auto' in def_user_perms
235
235
236 def auth(self, userobj, username, password, settings, **kwargs):
236 def auth(self, userobj, username, password, settings, **kwargs):
237 """
237 """
238 Given a user object (which may be null), username, a plaintext password,
238 Given a user object (which may be null), username, a plaintext password,
239 and a settings object (containing all the keys needed as listed in settings()),
239 and a settings object (containing all the keys needed as listed in settings()),
240 authenticate this user's login attempt.
240 authenticate this user's login attempt.
241
241
242 Return None on failure. On success, return a dictionary of the form:
242 Return None on failure. On success, return a dictionary of the form:
243
243
244 see: RhodeCodeAuthPluginBase.auth_func_attrs
244 see: RhodeCodeAuthPluginBase.auth_func_attrs
245 This is later validated for correctness
245 This is later validated for correctness
246 """
246 """
247 if not username or not password:
247 if not username or not password:
248 log.debug('Empty username or password skipping...')
248 log.debug('Empty username or password skipping...')
249 return None
249 return None
250
250
251 log.debug("Crowd settings: \n%s", formatted_json(settings))
251 log.debug("Crowd settings: \n%s", formatted_json(settings))
252 server = CrowdServer(**settings)
252 server = CrowdServer(**settings)
253 server.set_credentials(settings["app_name"], settings["app_password"])
253 server.set_credentials(settings["app_name"], settings["app_password"])
254 crowd_user = server.user_auth(username, password)
254 crowd_user = server.user_auth(username, password)
255 log.debug("Crowd returned: \n%s", formatted_json(crowd_user))
255 log.debug("Crowd returned: \n%s", formatted_json(crowd_user))
256 if not crowd_user["status"]:
256 if not crowd_user["status"]:
257 return None
257 return None
258
258
259 res = server.user_groups(crowd_user["name"])
259 res = server.user_groups(crowd_user["name"])
260 log.debug("Crowd groups: \n%s", formatted_json(res))
260 log.debug("Crowd groups: \n%s", formatted_json(res))
261 crowd_user["groups"] = [x["name"] for x in res["groups"]]
261 crowd_user["groups"] = [x["name"] for x in res["groups"]]
262
262
263 # old attrs fetched from RhodeCode database
263 # old attrs fetched from RhodeCode database
264 admin = getattr(userobj, 'admin', False)
264 admin = getattr(userobj, 'admin', False)
265 active = getattr(userobj, 'active', True)
265 active = getattr(userobj, 'active', True)
266 email = getattr(userobj, 'email', '')
266 email = getattr(userobj, 'email', '')
267 username = getattr(userobj, 'username', username)
267 username = getattr(userobj, 'username', username)
268 firstname = getattr(userobj, 'firstname', '')
268 firstname = getattr(userobj, 'firstname', '')
269 lastname = getattr(userobj, 'lastname', '')
269 lastname = getattr(userobj, 'lastname', '')
270 extern_type = getattr(userobj, 'extern_type', '')
270 extern_type = getattr(userobj, 'extern_type', '')
271
271
272 user_attrs = {
272 user_attrs = {
273 'username': username,
273 'username': username,
274 'firstname': crowd_user["first-name"] or firstname,
274 'firstname': crowd_user["first-name"] or firstname,
275 'lastname': crowd_user["last-name"] or lastname,
275 'lastname': crowd_user["last-name"] or lastname,
276 'groups': crowd_user["groups"],
276 'groups': crowd_user["groups"],
277 'user_group_sync': True,
277 'user_group_sync': True,
278 'email': crowd_user["email"] or email,
278 'email': crowd_user["email"] or email,
279 'admin': admin,
279 'admin': admin,
280 'active': active,
280 'active': active,
281 'active_from_extern': crowd_user.get('active'),
281 'active_from_extern': crowd_user.get('active'),
282 'extern_name': crowd_user["name"],
282 'extern_name': crowd_user["name"],
283 'extern_type': extern_type,
283 'extern_type': extern_type,
284 }
284 }
285
285
286 # set an admin if we're in admin_groups of crowd
286 # set an admin if we're in admin_groups of crowd
287 for group in settings["admin_groups"]:
287 for group in settings["admin_groups"]:
288 if group in user_attrs["groups"]:
288 if group in user_attrs["groups"]:
289 user_attrs["admin"] = True
289 user_attrs["admin"] = True
290 log.debug("Final crowd user object: \n%s", formatted_json(user_attrs))
290 log.debug("Final crowd user object: \n%s", formatted_json(user_attrs))
291 log.info('user `%s` authenticated correctly', user_attrs['username'])
291 log.info('user `%s` authenticated correctly', user_attrs['username'])
292 return user_attrs
292 return user_attrs
293
293
294
294
295 def includeme(config):
295 def includeme(config):
296 plugin_id = 'egg:rhodecode-enterprise-ce#{}'.format(RhodeCodeAuthPlugin.uid)
296 plugin_id = 'egg:rhodecode-enterprise-ce#{}'.format(RhodeCodeAuthPlugin.uid)
297 plugin_factory(plugin_id).includeme(config)
297 plugin_factory(plugin_id).includeme(config)
@@ -1,263 +1,263 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2
2
3
3
4 import os
4 import os
5 import re
5 import re
6 import shutil
6 import shutil
7 import logging
7 import logging
8
8
9 from rhodecode.lib.dbmigrate.migrate import exceptions
9 from rhodecode.lib.dbmigrate.migrate import exceptions
10 from rhodecode.lib.dbmigrate.migrate.versioning import pathed, script
10 from rhodecode.lib.dbmigrate.migrate.versioning import pathed, script
11 from datetime import datetime
11 from datetime import datetime
12
12
13
13
14 log = logging.getLogger(__name__)
14 log = logging.getLogger(__name__)
15
15
16 class VerNum(object):
16 class VerNum(object):
17 """A version number that behaves like a string and int at the same time"""
17 """A version number that behaves like a string and int at the same time"""
18
18
19 _instances = {}
19 _instances = {}
20
20
21 def __new__(cls, value):
21 def __new__(cls, value):
22 val = str(value)
22 val = str(value)
23 if val not in cls._instances:
23 if val not in cls._instances:
24 cls._instances[val] = super(VerNum, cls).__new__(cls)
24 cls._instances[val] = super(VerNum, cls).__new__(cls)
25 ret = cls._instances[val]
25 ret = cls._instances[val]
26 return ret
26 return ret
27
27
28 def __init__(self,value):
28 def __init__(self,value):
29 self.value = str(int(value))
29 self.value = str(int(value))
30 if self < 0:
30 if self < 0:
31 raise ValueError("Version number cannot be negative")
31 raise ValueError("Version number cannot be negative")
32
32
33 def __add__(self, value):
33 def __add__(self, value):
34 ret = int(self) + int(value)
34 ret = int(self) + int(value)
35 return VerNum(ret)
35 return VerNum(ret)
36
36
37 def __sub__(self, value):
37 def __sub__(self, value):
38 return self + (int(value) * -1)
38 return self + (int(value) * -1)
39
39
40 def __eq__(self, value):
40 def __eq__(self, value):
41 return int(self) == int(value)
41 return int(self) == int(value)
42
42
43 def __ne__(self, value):
43 def __ne__(self, value):
44 return int(self) != int(value)
44 return int(self) != int(value)
45
45
46 def __lt__(self, value):
46 def __lt__(self, value):
47 return int(self) < int(value)
47 return int(self) < int(value)
48
48
49 def __gt__(self, value):
49 def __gt__(self, value):
50 return int(self) > int(value)
50 return int(self) > int(value)
51
51
52 def __ge__(self, value):
52 def __ge__(self, value):
53 return int(self) >= int(value)
53 return int(self) >= int(value)
54
54
55 def __le__(self, value):
55 def __le__(self, value):
56 return int(self) <= int(value)
56 return int(self) <= int(value)
57
57
58 def __repr__(self):
58 def __repr__(self):
59 return "<VerNum(%s)>" % self.value
59 return "<VerNum(%s)>" % self.value
60
60
61 def __str__(self):
61 def __str__(self):
62 return str(self.value)
62 return str(self.value)
63
63
64 def __int__(self):
64 def __int__(self):
65 return int(self.value)
65 return int(self.value)
66
66
67
67
68 class Collection(pathed.Pathed):
68 class Collection(pathed.Pathed):
69 """A collection of versioning scripts in a repository"""
69 """A collection of versioning scripts in a repository"""
70
70
71 FILENAME_WITH_VERSION = re.compile(r'^(\d{3,}).*')
71 FILENAME_WITH_VERSION = re.compile(r'^(\d{3,}).*')
72
72
73 def __init__(self, path):
73 def __init__(self, path):
74 """Collect current version scripts in repository
74 """Collect current version scripts in repository
75 and store them in self.versions
75 and store them in self.versions
76 """
76 """
77 super(Collection, self).__init__(path)
77 super(Collection, self).__init__(path)
78
78
79 # Create temporary list of files, allowing skipped version numbers.
79 # Create temporary list of files, allowing skipped version numbers.
80 files = os.listdir(path)
80 files = os.listdir(path)
81 if '1' in files:
81 if '1' in files:
82 # deprecation
82 # deprecation
83 raise Exception('It looks like you have a repository in the old '
83 raise Exception('It looks like you have a repository in the old '
84 'format (with directories for each version). '
84 'format (with directories for each version). '
85 'Please convert repository before proceeding.')
85 'Please convert repository before proceeding.')
86
86
87 tempVersions = {}
87 tempVersions = {}
88 for filename in files:
88 for filename in files:
89 match = self.FILENAME_WITH_VERSION.match(filename)
89 match = self.FILENAME_WITH_VERSION.match(filename)
90 if match:
90 if match:
91 num = int(match.group(1))
91 num = int(match.group(1))
92 tempVersions.setdefault(num, []).append(filename)
92 tempVersions.setdefault(num, []).append(filename)
93 else:
93 else:
94 pass # Must be a helper file or something, let's ignore it.
94 pass # Must be a helper file or something, let's ignore it.
95
95
96 # Create the versions member where the keys
96 # Create the versions member where the keys
97 # are VerNum's and the values are Version's.
97 # are VerNum's and the values are Version's.
98 self.versions = {}
98 self.versions = {}
99 for num, files in list(tempVersions.items()):
99 for num, files in list(tempVersions.items()):
100 self.versions[VerNum(num)] = Version(num, path, files)
100 self.versions[VerNum(num)] = Version(num, path, files)
101
101
102 @property
102 @property
103 def latest(self):
103 def latest(self):
104 """:returns: Latest version in Collection"""
104 """:returns: Latest version in Collection"""
105 return max([VerNum(0)] + list(self.versions.keys()))
105 return max([VerNum(0)] + list(self.versions.keys()))
106
106
107 def _next_ver_num(self, use_timestamp_numbering):
107 def _next_ver_num(self, use_timestamp_numbering):
108 if use_timestamp_numbering == True:
108 if use_timestamp_numbering is True:
109 return VerNum(int(datetime.utcnow().strftime('%Y%m%d%H%M%S')))
109 return VerNum(int(datetime.utcnow().strftime('%Y%m%d%H%M%S')))
110 else:
110 else:
111 return self.latest + 1
111 return self.latest + 1
112
112
113 def create_new_python_version(self, description, **k):
113 def create_new_python_version(self, description, **k):
114 """Create Python files for new version"""
114 """Create Python files for new version"""
115 ver = self._next_ver_num(k.pop('use_timestamp_numbering', False))
115 ver = self._next_ver_num(k.pop('use_timestamp_numbering', False))
116 extra = str_to_filename(description)
116 extra = str_to_filename(description)
117
117
118 if extra:
118 if extra:
119 if extra == '_':
119 if extra == '_':
120 extra = ''
120 extra = ''
121 elif not extra.startswith('_'):
121 elif not extra.startswith('_'):
122 extra = '_%s' % extra
122 extra = '_%s' % extra
123
123
124 filename = '%03d%s.py' % (ver, extra)
124 filename = '%03d%s.py' % (ver, extra)
125 filepath = self._version_path(filename)
125 filepath = self._version_path(filename)
126
126
127 script.PythonScript.create(filepath, **k)
127 script.PythonScript.create(filepath, **k)
128 self.versions[ver] = Version(ver, self.path, [filename])
128 self.versions[ver] = Version(ver, self.path, [filename])
129
129
130 def create_new_sql_version(self, database, description, **k):
130 def create_new_sql_version(self, database, description, **k):
131 """Create SQL files for new version"""
131 """Create SQL files for new version"""
132 ver = self._next_ver_num(k.pop('use_timestamp_numbering', False))
132 ver = self._next_ver_num(k.pop('use_timestamp_numbering', False))
133 self.versions[ver] = Version(ver, self.path, [])
133 self.versions[ver] = Version(ver, self.path, [])
134
134
135 extra = str_to_filename(description)
135 extra = str_to_filename(description)
136
136
137 if extra:
137 if extra:
138 if extra == '_':
138 if extra == '_':
139 extra = ''
139 extra = ''
140 elif not extra.startswith('_'):
140 elif not extra.startswith('_'):
141 extra = '_%s' % extra
141 extra = '_%s' % extra
142
142
143 # Create new files.
143 # Create new files.
144 for op in ('upgrade', 'downgrade'):
144 for op in ('upgrade', 'downgrade'):
145 filename = '%03d%s_%s_%s.sql' % (ver, extra, database, op)
145 filename = '%03d%s_%s_%s.sql' % (ver, extra, database, op)
146 filepath = self._version_path(filename)
146 filepath = self._version_path(filename)
147 script.SqlScript.create(filepath, **k)
147 script.SqlScript.create(filepath, **k)
148 self.versions[ver].add_script(filepath)
148 self.versions[ver].add_script(filepath)
149
149
150 def version(self, vernum=None):
150 def version(self, vernum=None):
151 """Returns latest Version if vernum is not given.
151 """Returns latest Version if vernum is not given.
152 Otherwise, returns wanted version"""
152 Otherwise, returns wanted version"""
153 if vernum is None:
153 if vernum is None:
154 vernum = self.latest
154 vernum = self.latest
155 return self.versions[VerNum(vernum)]
155 return self.versions[VerNum(vernum)]
156
156
157 @classmethod
157 @classmethod
158 def clear(cls):
158 def clear(cls):
159 super(Collection, cls).clear()
159 super(Collection, cls).clear()
160
160
161 def _version_path(self, ver):
161 def _version_path(self, ver):
162 """Returns path of file in versions repository"""
162 """Returns path of file in versions repository"""
163 return os.path.join(self.path, str(ver))
163 return os.path.join(self.path, str(ver))
164
164
165
165
166 class Version(object):
166 class Version(object):
167 """A single version in a collection
167 """A single version in a collection
168 :param vernum: Version Number
168 :param vernum: Version Number
169 :param path: Path to script files
169 :param path: Path to script files
170 :param filelist: List of scripts
170 :param filelist: List of scripts
171 :type vernum: int, VerNum
171 :type vernum: int, VerNum
172 :type path: string
172 :type path: string
173 :type filelist: list
173 :type filelist: list
174 """
174 """
175
175
176 def __init__(self, vernum, path, filelist):
176 def __init__(self, vernum, path, filelist):
177 self.version = VerNum(vernum)
177 self.version = VerNum(vernum)
178
178
179 # Collect scripts in this folder
179 # Collect scripts in this folder
180 self.sql = {}
180 self.sql = {}
181 self.python = None
181 self.python = None
182
182
183 for script in filelist:
183 for script in filelist:
184 self.add_script(os.path.join(path, script))
184 self.add_script(os.path.join(path, script))
185
185
186 def script(self, database=None, operation=None):
186 def script(self, database=None, operation=None):
187 """Returns SQL or Python Script"""
187 """Returns SQL or Python Script"""
188 for db in (database, 'default'):
188 for db in (database, 'default'):
189 # Try to return a .sql script first
189 # Try to return a .sql script first
190 try:
190 try:
191 return self.sql[db][operation]
191 return self.sql[db][operation]
192 except KeyError:
192 except KeyError:
193 continue # No .sql script exists
193 continue # No .sql script exists
194
194
195 # TODO: maybe add force Python parameter?
195 # TODO: maybe add force Python parameter?
196 ret = self.python
196 ret = self.python
197
197
198 assert ret is not None, \
198 assert ret is not None, \
199 "There is no script for %d version" % self.version
199 "There is no script for %d version" % self.version
200 return ret
200 return ret
201
201
202 def add_script(self, path):
202 def add_script(self, path):
203 """Add script to Collection/Version"""
203 """Add script to Collection/Version"""
204 if path.endswith(Extensions.py):
204 if path.endswith(Extensions.py):
205 self._add_script_py(path)
205 self._add_script_py(path)
206 elif path.endswith(Extensions.sql):
206 elif path.endswith(Extensions.sql):
207 self._add_script_sql(path)
207 self._add_script_sql(path)
208
208
209 SQL_FILENAME = re.compile(r'^.*\.sql')
209 SQL_FILENAME = re.compile(r'^.*\.sql')
210
210
211 def _add_script_sql(self, path):
211 def _add_script_sql(self, path):
212 basename = os.path.basename(path)
212 basename = os.path.basename(path)
213 match = self.SQL_FILENAME.match(basename)
213 match = self.SQL_FILENAME.match(basename)
214
214
215 if match:
215 if match:
216 basename = basename.replace('.sql', '')
216 basename = basename.replace('.sql', '')
217 parts = basename.split('_')
217 parts = basename.split('_')
218 if len(parts) < 3:
218 if len(parts) < 3:
219 raise exceptions.ScriptError(
219 raise exceptions.ScriptError(
220 "Invalid SQL script name %s " % basename + \
220 "Invalid SQL script name %s " % basename + \
221 "(needs to be ###_description_database_operation.sql)")
221 "(needs to be ###_description_database_operation.sql)")
222 version = parts[0]
222 version = parts[0]
223 op = parts[-1]
223 op = parts[-1]
224 # NOTE(mriedem): check for ibm_db_sa as the database in the name
224 # NOTE(mriedem): check for ibm_db_sa as the database in the name
225 if 'ibm_db_sa' in basename:
225 if 'ibm_db_sa' in basename:
226 if len(parts) == 6:
226 if len(parts) == 6:
227 dbms = '_'.join(parts[-4: -1])
227 dbms = '_'.join(parts[-4: -1])
228 else:
228 else:
229 raise exceptions.ScriptError(
229 raise exceptions.ScriptError(
230 "Invalid ibm_db_sa SQL script name '%s'; "
230 "Invalid ibm_db_sa SQL script name '%s'; "
231 "(needs to be "
231 "(needs to be "
232 "###_description_ibm_db_sa_operation.sql)" % basename)
232 "###_description_ibm_db_sa_operation.sql)" % basename)
233 else:
233 else:
234 dbms = parts[-2]
234 dbms = parts[-2]
235 else:
235 else:
236 raise exceptions.ScriptError(
236 raise exceptions.ScriptError(
237 "Invalid SQL script name %s " % basename + \
237 "Invalid SQL script name %s " % basename + \
238 "(needs to be ###_description_database_operation.sql)")
238 "(needs to be ###_description_database_operation.sql)")
239
239
240 # File the script into a dictionary
240 # File the script into a dictionary
241 self.sql.setdefault(dbms, {})[op] = script.SqlScript(path)
241 self.sql.setdefault(dbms, {})[op] = script.SqlScript(path)
242
242
243 def _add_script_py(self, path):
243 def _add_script_py(self, path):
244 if self.python is not None:
244 if self.python is not None:
245 raise exceptions.ScriptError('You can only have one Python script '
245 raise exceptions.ScriptError('You can only have one Python script '
246 'per version, but you have: %s and %s' % (self.python, path))
246 'per version, but you have: %s and %s' % (self.python, path))
247 self.python = script.PythonScript(path)
247 self.python = script.PythonScript(path)
248
248
249
249
250 class Extensions:
250 class Extensions:
251 """A namespace for file extensions"""
251 """A namespace for file extensions"""
252 py = 'py'
252 py = 'py'
253 sql = 'sql'
253 sql = 'sql'
254
254
255 def str_to_filename(s):
255 def str_to_filename(s):
256 """Replaces spaces, (double and single) quotes
256 """Replaces spaces, (double and single) quotes
257 and double underscores to underscores
257 and double underscores to underscores
258 """
258 """
259
259
260 s = s.replace(' ', '_').replace('"', '_').replace("'", '_').replace(".", "_")
260 s = s.replace(' ', '_').replace('"', '_').replace("'", '_').replace(".", "_")
261 while '__' in s:
261 while '__' in s:
262 s = s.replace('__', '_')
262 s = s.replace('__', '_')
263 return s
263 return s
@@ -1,186 +1,185 b''
1
2 # Copyright (C) 2010-2023 RhodeCode GmbH
1 # Copyright (C) 2010-2023 RhodeCode GmbH
3 #
2 #
4 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
6 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
7 #
6 #
8 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
10 # GNU General Public License for more details.
12 #
11 #
13 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 #
14 #
16 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
17 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
19
18
20 import sys
19 import sys
21 import logging
20 import logging
22
21
23
22
24 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = list(range(30, 38))
23 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = list(range(30, 38))
25
24
26 # Sequences
25 # Sequences
27 RESET_SEQ = "\033[0m"
26 RESET_SEQ = "\033[0m"
28 COLOR_SEQ = "\033[0;%dm"
27 COLOR_SEQ = "\033[0;%dm"
29 BOLD_SEQ = "\033[1m"
28 BOLD_SEQ = "\033[1m"
30
29
31 COLORS = {
30 COLORS = {
32 'CRITICAL': MAGENTA,
31 'CRITICAL': MAGENTA,
33 'ERROR': RED,
32 'ERROR': RED,
34 'WARNING': CYAN,
33 'WARNING': CYAN,
35 'INFO': GREEN,
34 'INFO': GREEN,
36 'DEBUG': BLUE,
35 'DEBUG': BLUE,
37 'SQL': YELLOW
36 'SQL': YELLOW
38 }
37 }
39
38
40
39
41 def _inject_req_id(record, with_prefix=True):
40 def _inject_req_id(record, with_prefix=True):
42 from pyramid.threadlocal import get_current_request
41 from pyramid.threadlocal import get_current_request
43 dummy = '00000000-0000-0000-0000-000000000000'
42 dummy = '00000000-0000-0000-0000-000000000000'
44 req_id = None
43 req_id = None
45
44
46 req = get_current_request()
45 req = get_current_request()
47 if req:
46 if req:
48 req_id = getattr(req, 'req_id', None)
47 req_id = getattr(req, 'req_id', None)
49 if with_prefix:
48 if with_prefix:
50 req_id = 'req_id:%-36s' % (req_id or dummy)
49 req_id = 'req_id:%-36s' % (req_id or dummy)
51 else:
50 else:
52 req_id = (req_id or dummy)
51 req_id = (req_id or dummy)
53 record.req_id = req_id
52 record.req_id = req_id
54
53
55
54
56 def _add_log_to_debug_bucket(formatted_record):
55 def _add_log_to_debug_bucket(formatted_record):
57 from pyramid.threadlocal import get_current_request
56 from pyramid.threadlocal import get_current_request
58 req = get_current_request()
57 req = get_current_request()
59 if req:
58 if req:
60 req.req_id_bucket.append(formatted_record)
59 req.req_id_bucket.append(formatted_record)
61
60
62
61
63 def one_space_trim(s):
62 def one_space_trim(s):
64 if s.find(" ") == -1:
63 if s.find(" ") == -1:
65 return s
64 return s
66 else:
65 else:
67 s = s.replace(' ', ' ')
66 s = s.replace(' ', ' ')
68 return one_space_trim(s)
67 return one_space_trim(s)
69
68
70
69
71 def format_sql(sql):
70 def format_sql(sql):
72 sql = sql.replace('\n', '')
71 sql = sql.replace('\n', '')
73 sql = one_space_trim(sql)
72 sql = one_space_trim(sql)
74 sql = sql\
73 sql = sql\
75 .replace(',', ',\n\t')\
74 .replace(',', ',\n\t')\
76 .replace('SELECT', '\n\tSELECT \n\t')\
75 .replace('SELECT', '\n\tSELECT \n\t')\
77 .replace('UPDATE', '\n\tUPDATE \n\t')\
76 .replace('UPDATE', '\n\tUPDATE \n\t')\
78 .replace('DELETE', '\n\tDELETE \n\t')\
77 .replace('DELETE', '\n\tDELETE \n\t')\
79 .replace('FROM', '\n\tFROM')\
78 .replace('FROM', '\n\tFROM')\
80 .replace('ORDER BY', '\n\tORDER BY')\
79 .replace('ORDER BY', '\n\tORDER BY')\
81 .replace('LIMIT', '\n\tLIMIT')\
80 .replace('LIMIT', '\n\tLIMIT')\
82 .replace('WHERE', '\n\tWHERE')\
81 .replace('WHERE', '\n\tWHERE')\
83 .replace('AND', '\n\tAND')\
82 .replace('AND', '\n\tAND')\
84 .replace('LEFT', '\n\tLEFT')\
83 .replace('LEFT', '\n\tLEFT')\
85 .replace('INNER', '\n\tINNER')\
84 .replace('INNER', '\n\tINNER')\
86 .replace('INSERT', '\n\tINSERT')\
85 .replace('INSERT', '\n\tINSERT')\
87 .replace('DELETE', '\n\tDELETE')
86 .replace('DELETE', '\n\tDELETE')
88 return sql
87 return sql
89
88
90
89
91 class ExceptionAwareFormatter(logging.Formatter):
90 class ExceptionAwareFormatter(logging.Formatter):
92 """
91 """
93 Extended logging formatter which prints out remote tracebacks.
92 Extended logging formatter which prints out remote tracebacks.
94 """
93 """
95
94
96 def formatException(self, ei):
95 def formatException(self, ei):
97 ex_type, ex_value, ex_tb = ei
96 ex_type, ex_value, ex_tb = ei
98
97
99 local_tb = logging.Formatter.formatException(self, ei)
98 local_tb = logging.Formatter.formatException(self, ei)
100 if hasattr(ex_value, '_vcs_server_traceback'):
99 if hasattr(ex_value, '_vcs_server_traceback'):
101
100
102 def formatRemoteTraceback(remote_tb_lines):
101 def formatRemoteTraceback(remote_tb_lines):
103 result = ["\n +--- This exception occured remotely on VCSServer - Remote traceback:\n\n"]
102 result = ["\n +--- This exception occured remotely on VCSServer - Remote traceback:\n\n"]
104 result.append(remote_tb_lines)
103 result.append(remote_tb_lines)
105 result.append("\n +--- End of remote traceback\n")
104 result.append("\n +--- End of remote traceback\n")
106 return result
105 return result
107
106
108 try:
107 try:
109 if ex_type is not None and ex_value is None and ex_tb is None:
108 if ex_type is not None and ex_value is None and ex_tb is None:
110 # possible old (3.x) call syntax where caller is only
109 # possible old (3.x) call syntax where caller is only
111 # providing exception object
110 # providing exception object
112 if type(ex_type) is not type:
111 if type(ex_type) is not type:
113 raise TypeError(
112 raise TypeError(
114 "invalid argument: ex_type should be an exception "
113 "invalid argument: ex_type should be an exception "
115 "type, or just supply no arguments at all")
114 "type, or just supply no arguments at all")
116 if ex_type is None and ex_tb is None:
115 if ex_type is None and ex_tb is None:
117 ex_type, ex_value, ex_tb = sys.exc_info()
116 ex_type, ex_value, ex_tb = sys.exc_info()
118
117
119 remote_tb = getattr(ex_value, "_vcs_server_traceback", None)
118 remote_tb = getattr(ex_value, "_vcs_server_traceback", None)
120
119
121 if remote_tb:
120 if remote_tb:
122 remote_tb = formatRemoteTraceback(remote_tb)
121 remote_tb = formatRemoteTraceback(remote_tb)
123 return local_tb + ''.join(remote_tb)
122 return local_tb + ''.join(remote_tb)
124 finally:
123 finally:
125 # clean up cycle to traceback, to allow proper GC
124 # clean up cycle to traceback, to allow proper GC
126 del ex_type, ex_value, ex_tb
125 del ex_type, ex_value, ex_tb
127
126
128 return local_tb
127 return local_tb
129
128
130
129
131 class RequestTrackingFormatter(ExceptionAwareFormatter):
130 class RequestTrackingFormatter(ExceptionAwareFormatter):
132 def format(self, record):
131 def format(self, record):
133 _inject_req_id(record)
132 _inject_req_id(record)
134 def_record = logging.Formatter.format(self, record)
133 def_record = logging.Formatter.format(self, record)
135 _add_log_to_debug_bucket(def_record)
134 _add_log_to_debug_bucket(def_record)
136 return def_record
135 return def_record
137
136
138
137
139 class ColorFormatter(ExceptionAwareFormatter):
138 class ColorFormatter(ExceptionAwareFormatter):
140
139
141 def format(self, record):
140 def format(self, record):
142 """
141 """
143 Changes record's levelname to use with COLORS enum
142 Changes record's levelname to use with COLORS enum
144 """
143 """
145 def_record = super(ColorFormatter, self).format(record)
144 def_record = super(ColorFormatter, self).format(record)
146
145
147 levelname = record.levelname
146 levelname = record.levelname
148 start = COLOR_SEQ % (COLORS[levelname])
147 start = COLOR_SEQ % (COLORS[levelname])
149 end = RESET_SEQ
148 end = RESET_SEQ
150
149
151 colored_record = ''.join([start, def_record, end])
150 colored_record = ''.join([start, def_record, end])
152 return colored_record
151 return colored_record
153
152
154
153
155 class ColorRequestTrackingFormatter(RequestTrackingFormatter):
154 class ColorRequestTrackingFormatter(RequestTrackingFormatter):
156
155
157 def format(self, record):
156 def format(self, record):
158 """
157 """
159 Changes record's levelname to use with COLORS enum
158 Changes record's levelname to use with COLORS enum
160 """
159 """
161 def_record = super(ColorRequestTrackingFormatter, self).format(record)
160 def_record = super(ColorRequestTrackingFormatter, self).format(record)
162
161
163 levelname = record.levelname
162 levelname = record.levelname
164 start = COLOR_SEQ % (COLORS[levelname])
163 start = COLOR_SEQ % (COLORS[levelname])
165 end = RESET_SEQ
164 end = RESET_SEQ
166
165
167 colored_record = ''.join([start, def_record, end])
166 colored_record = ''.join([start, def_record, end])
168 return colored_record
167 return colored_record
169
168
170
169
171 class ColorFormatterSql(logging.Formatter):
170 class ColorFormatterSql(logging.Formatter):
172
171
173 def format(self, record):
172 def format(self, record):
174 """
173 """
175 Changes record's levelname to use with COLORS enum
174 Changes record's levelname to use with COLORS enum
176 """
175 """
177
176
178 start = COLOR_SEQ % (COLORS['SQL'])
177 start = COLOR_SEQ % (COLORS['SQL'])
179 def_record = format_sql(logging.Formatter.format(self, record))
178 def_record = format_sql(logging.Formatter.format(self, record))
180 end = RESET_SEQ
179 end = RESET_SEQ
181
180
182 colored_record = ''.join([start, def_record, end])
181 colored_record = ''.join([start, def_record, end])
183 return colored_record
182 return colored_record
184
183
185 # marcink: needs to stay with this name for backward .ini compatability
184 # NOTE (marcink): needs to stay with this name for backward .ini compatability
186 Pyro4AwareFormatter = ExceptionAwareFormatter
185 Pyro4AwareFormatter = ExceptionAwareFormatter # noqa
@@ -1,132 +1,132 b''
1
1
2 # Copyright (C) 2010-2023 RhodeCode GmbH
2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software: you can redistribute it and/or modify
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License, version 3
5 # it under the terms of the GNU Affero General Public License, version 3
6 # (only), as published by the Free Software Foundation.
6 # (only), as published by the Free Software Foundation.
7 #
7 #
8 # This program is distributed in the hope that it will be useful,
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
11 # GNU General Public License for more details.
12 #
12 #
13 # You should have received a copy of the GNU Affero General Public License
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 #
15 #
16 # This program is dual-licensed. If you wish to learn more about the
16 # This program is dual-licensed. If you wish to learn more about the
17 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19
19
20 import re
20 import re
21 import markdown
21 import markdown
22 import xml.etree.ElementTree as etree
22 import xml.etree.ElementTree as etree
23
23
24 from markdown.extensions import Extension
24 from markdown.extensions import Extension
25 from markdown.extensions.fenced_code import FencedCodeExtension
25 from markdown.extensions.fenced_code import FencedCodeExtension
26 from markdown.extensions.tables import TableExtension
26 from markdown.extensions.tables import TableExtension
27 from markdown.extensions.nl2br import Nl2BrExtension as _Nl2BrExtension
27 from markdown.extensions.nl2br import Nl2BrExtension as _Nl2BrExtension
28 from markdown.extensions.wikilinks import WikiLinkExtension
28 from markdown.extensions.wikilinks import WikiLinkExtension
29 from markdown.inlinepatterns import Pattern
29 from markdown.inlinepatterns import Pattern
30
30
31 import gfm
31 import gfm
32
32
33
33
34 class InlineProcessor(Pattern):
34 class InlineProcessor(Pattern):
35 """
35 """
36 Base class that inline patterns subclass.
36 Base class that inline patterns subclass.
37 This is the newer style inline processor that uses a more
37 This is the newer style inline processor that uses a more
38 efficient and flexible search approach.
38 efficient and flexible search approach.
39 """
39 """
40
40
41 def __init__(self, pattern, md=None):
41 def __init__(self, pattern, md=None):
42 """
42 """
43 Create an instant of an inline pattern.
43 Create an instant of an inline pattern.
44 Keyword arguments:
44 Keyword arguments:
45 * pattern: A regular expression that matches a pattern
45 * pattern: A regular expression that matches a pattern
46 """
46 """
47 self.pattern = pattern
47 self.pattern = pattern
48 self.compiled_re = re.compile(pattern, re.DOTALL | re.UNICODE)
48 self.compiled_re = re.compile(pattern, re.DOTALL | re.UNICODE)
49
49
50 # Api for Markdown to pass safe_mode into instance
50 # Api for Markdown to pass safe_mode into instance
51 self.safe_mode = False
51 self.safe_mode = False
52 self.md = md
52 self.md = md
53
53
54 def handleMatch(self, m, data):
54 def handleMatch(self, m, data):
55 """Return a ElementTree element from the given match and the
55 """Return a ElementTree element from the given match and the
56 start and end index of the matched text.
56 start and end index of the matched text.
57 If `start` and/or `end` are returned as `None`, it will be
57 If `start` and/or `end` are returned as `None`, it will be
58 assumed that the processor did not find a valid region of text.
58 assumed that the processor did not find a valid region of text.
59 Subclasses should override this method.
59 Subclasses should override this method.
60 Keyword arguments:
60 Keyword arguments:
61 * m: A re match object containing a match of the pattern.
61 * m: A re match object containing a match of the pattern.
62 * data: The buffer current under analysis
62 * data: The buffer current under analysis
63 Returns:
63 Returns:
64 * el: The ElementTree element, text or None.
64 * el: The ElementTree element, text or None.
65 * start: The start of the region that has been matched or None.
65 * start: The start of the region that has been matched or None.
66 * end: The end of the region that has been matched or None.
66 * end: The end of the region that has been matched or None.
67 """
67 """
68 pass # pragma: no cover
68 pass # pragma: no cover
69
69
70
70
71 class SimpleTagInlineProcessor(InlineProcessor):
71 class SimpleTagInlineProcessor(InlineProcessor):
72 """
72 """
73 Return element of type `tag` with a text attribute of group(2)
73 Return element of type `tag` with a text attribute of group(2)
74 of a Pattern.
74 of a Pattern.
75 """
75 """
76 def __init__(self, pattern, tag):
76 def __init__(self, pattern, tag):
77 InlineProcessor.__init__(self, pattern)
77 InlineProcessor.__init__(self, pattern)
78 self.tag = tag
78 self.tag = tag
79
79
80 def handleMatch(self, m, data): # pragma: no cover
80 def handleMatch(self, m, data): # pragma: no cover
81 el = etree.Element(self.tag)
81 el = etree.Element(self.tag)
82 el.text = m.group(2)
82 el.text = m.group(2)
83 return el, m.start(0), m.end(0)
83 return el, m.start(0), m.end(0)
84
84
85
85
86 class SubstituteTagInlineProcessor(SimpleTagInlineProcessor):
86 class SubstituteTagInlineProcessor(SimpleTagInlineProcessor):
87 """ Return an element of type `tag` with no children. """
87 """ Return an element of type `tag` with no children. """
88 def handleMatch(self, m, data):
88 def handleMatch(self, m, data):
89 return etree.Element(self.tag), m.start(0), m.end(0)
89 return etree.Element(self.tag), m.start(0), m.end(0)
90
90
91
91
92 class Nl2BrExtension(_Nl2BrExtension):
92 class Nl2BrExtension(_Nl2BrExtension):
93 pass
93 pass
94
94
95
95
96 # Global Vars
96 # Global Vars
97 URLIZE_RE = '(%s)' % '|'.join([
97 URLIZE_RE = '(%s)' % '|'.join([
98 r'<(?:f|ht)tps?://[^>]*>',
98 r'<(?:f|ht)tps?://[^>]*>',
99 r'\b(?:f|ht)tps?://[^)<>\s]+[^.,)<>\s]',
99 r'\b(?:f|ht)tps?://[^)<>\s]+[^.,)<>\s]',
100 r'\bwww\.[^)<>\s]+[^.,)<>\s]',
100 r'\bwww\.[^)<>\s]+[^.,)<>\s]',
101 r'[^(<\s]+\.(?:com|net|org)\b',
101 r'[^(<\s]+\.(?:com|net|org)\b',
102 ])
102 ])
103
103
104
104
105 class UrlizePattern(markdown.inlinepatterns.Pattern):
105 class UrlizePattern(markdown.inlinepatterns.Pattern):
106 """ Return a link Element given an autolink (`http://example/com`). """
106 """ Return a link Element given an autolink (`http://example/com`). """
107 def handleMatch(self, m):
107 def handleMatch(self, m):
108 url = m.group(2)
108 url = m.group(2)
109
109
110 if url.startswith('<'):
110 if url.startswith('<'):
111 url = url[1:-1]
111 url = url[1:-1]
112
112
113 text = url
113 text = url
114
114
115 if not url.split('://')[0] in ('http','https','ftp'):
115 if url.split('://')[0] not in ('http', 'https', 'ftp'):
116 if '@' in url and not '/' in url:
116 if '@' in url and '/' not in url:
117 url = 'mailto:' + url
117 url = 'mailto:' + url
118 else:
118 else:
119 url = 'http://' + url
119 url = 'http://' + url
120
120
121 el = markdown.util.etree.Element("a")
121 el = markdown.util.etree.Element("a")
122 el.set('href', url)
122 el.set('href', url)
123 el.text = markdown.util.AtomicString(text)
123 el.text = markdown.util.AtomicString(text)
124 return el
124 return el
125
125
126
126
127 class UrlizeExtension(Extension):
127 class UrlizeExtension(Extension):
128 """ Urlize Extension for Python-Markdown. """
128 """ Urlize Extension for Python-Markdown. """
129
129
130 def extendMarkdown(self, md):
130 def extendMarkdown(self, md):
131 """ Replace autolink with UrlizePattern """
131 """ Replace autolink with UrlizePattern """
132 md.inlinePatterns['autolink'] = UrlizePattern(URLIZE_RE, md)
132 md.inlinePatterns['autolink'] = UrlizePattern(URLIZE_RE, md)
@@ -1,218 +1,218 b''
1
1
2 # Copyright (C) 2010-2023 RhodeCode GmbH
2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software: you can redistribute it and/or modify
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License, version 3
5 # it under the terms of the GNU Affero General Public License, version 3
6 # (only), as published by the Free Software Foundation.
6 # (only), as published by the Free Software Foundation.
7 #
7 #
8 # This program is distributed in the hope that it will be useful,
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
11 # GNU General Public License for more details.
12 #
12 #
13 # You should have received a copy of the GNU Affero General Public License
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 #
15 #
16 # This program is dual-licensed. If you wish to learn more about the
16 # This program is dual-licensed. If you wish to learn more about the
17 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19
19
20 import mock
20 import mock
21 import pytest
21 import pytest
22
22
23 from rhodecode.model.db import User
23 from rhodecode.model.db import User
24 from rhodecode.tests import TEST_USER_REGULAR_LOGIN
24 from rhodecode.tests import TEST_USER_REGULAR_LOGIN
25 from rhodecode.tests.fixture import Fixture
25 from rhodecode.tests.fixture import Fixture
26 from rhodecode.model.user_group import UserGroupModel
26 from rhodecode.model.user_group import UserGroupModel
27 from rhodecode.model.meta import Session
27 from rhodecode.model.meta import Session
28
28
29
29
30 fixture = Fixture()
30 fixture = Fixture()
31
31
32
32
33 def teardown_module(self):
33 def teardown_module(self):
34 _delete_all_user_groups()
34 _delete_all_user_groups()
35
35
36
36
37 class TestGetUserGroups(object):
37 class TestGetUserGroups(object):
38 def test_returns_filtered_list(self, backend, user_util):
38 def test_returns_filtered_list(self, backend, user_util):
39 created_groups = []
39 created_groups = []
40 for i in range(4):
40 for i in range(4):
41 created_groups.append(
41 created_groups.append(
42 user_util.create_user_group(users_group_active=True))
42 user_util.create_user_group(users_group_active=True))
43
43
44 group_filter = created_groups[-1].users_group_name[-2:]
44 group_filter = created_groups[-1].users_group_name[-2:]
45 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
45 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
46 with self._patch_user_group_list():
46 with self._patch_user_group_list():
47 groups = UserGroupModel().get_user_groups(group_filter)
47 groups = UserGroupModel().get_user_groups(group_filter)
48
48
49 fake_groups = [
49 fake_groups = [
50 u for u in groups if u['value'].startswith('test_returns')]
50 u for u in groups if u['value'].startswith('test_returns')]
51 assert len(fake_groups) == 1
51 assert len(fake_groups) == 1
52 assert fake_groups[0]['value'] == created_groups[-1].users_group_name
52 assert fake_groups[0]['value'] == created_groups[-1].users_group_name
53 assert fake_groups[0]['value_display'].startswith(
53 assert fake_groups[0]['value_display'].startswith(
54 'Group: test_returns')
54 'Group: test_returns')
55
55
56 def test_returns_limited_list(self, backend, user_util):
56 def test_returns_limited_list(self, backend, user_util):
57 created_groups = []
57 created_groups = []
58 for i in range(3):
58 for i in range(3):
59 created_groups.append(
59 created_groups.append(
60 user_util.create_user_group(users_group_active=True))
60 user_util.create_user_group(users_group_active=True))
61 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
61 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
62 with self._patch_user_group_list():
62 with self._patch_user_group_list():
63 groups = UserGroupModel().get_user_groups('test_returns')
63 groups = UserGroupModel().get_user_groups('test_returns')
64
64
65 fake_groups = [
65 fake_groups = [
66 u for u in groups if u['value'].startswith('test_returns')]
66 u for u in groups if u['value'].startswith('test_returns')]
67 assert len(fake_groups) == 3
67 assert len(fake_groups) == 3
68
68
69 def test_returns_active_user_groups(self, backend, user_util):
69 def test_returns_active_user_groups(self, backend, user_util):
70 for i in range(4):
70 for i in range(4):
71 is_active = i % 2 == 0
71 is_active = i % 2 == 0
72 user_util.create_user_group(users_group_active=is_active)
72 user_util.create_user_group(users_group_active=is_active)
73 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
73 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
74 with self._patch_user_group_list():
74 with self._patch_user_group_list():
75 groups = UserGroupModel().get_user_groups()
75 groups = UserGroupModel().get_user_groups()
76 expected = ('id', 'icon_link', 'value_display', 'value', 'value_type')
76 expected = ('id', 'icon_link', 'value_display', 'value', 'value_type')
77 for group in groups:
77 for group in groups:
78 assert group['value_type'] is 'user_group'
78 assert group['value_type'] == 'user_group'
79 for key in expected:
79 for key in expected:
80 assert key in group
80 assert key in group
81
81
82 fake_groups = [
82 fake_groups = [
83 u for u in groups if u['value'].startswith('test_returns')]
83 u for u in groups if u['value'].startswith('test_returns')]
84 assert len(fake_groups) == 2
84 assert len(fake_groups) == 2
85 for user in fake_groups:
85 for user in fake_groups:
86 assert user['value_display'].startswith('Group: test_returns')
86 assert user['value_display'].startswith('Group: test_returns')
87
87
88 def _patch_user_group_list(self):
88 def _patch_user_group_list(self):
89 def side_effect(group_list, perm_set):
89 def side_effect(group_list, perm_set):
90 return group_list
90 return group_list
91 return mock.patch(
91 return mock.patch(
92 'rhodecode.model.user_group.UserGroupList', side_effect=side_effect)
92 'rhodecode.model.user_group.UserGroupList', side_effect=side_effect)
93
93
94
94
95 @pytest.mark.parametrize(
95 @pytest.mark.parametrize(
96 "pre_existing, regular_should_be, external_should_be, groups, "
96 "pre_existing, regular_should_be, external_should_be, groups, "
97 "expected", [
97 "expected", [
98 ([], [], [], [], []),
98 ([], [], [], [], []),
99 # no changes of regular
99 # no changes of regular
100 ([], ['regular'], [], [], ['regular']),
100 ([], ['regular'], [], [], ['regular']),
101 # not added to regular group
101 # not added to regular group
102 (['some_other'], [], [], ['some_other'], []),
102 (['some_other'], [], [], ['some_other'], []),
103 (
103 (
104 [], ['regular'], ['container'], ['container'],
104 [], ['regular'], ['container'], ['container'],
105 ['regular', 'container']
105 ['regular', 'container']
106 ),
106 ),
107 (
107 (
108 [], ['regular'], [], ['container', 'container2'],
108 [], ['regular'], [], ['container', 'container2'],
109 ['regular', 'container', 'container2']
109 ['regular', 'container', 'container2']
110 ),
110 ),
111 # remove not used
111 # remove not used
112 ([], ['regular'], ['other'], [], ['regular']),
112 ([], ['regular'], ['other'], [], ['regular']),
113 (
113 (
114 ['some_other'], ['regular'], ['other', 'container'],
114 ['some_other'], ['regular'], ['other', 'container'],
115 ['container', 'container2'],
115 ['container', 'container2'],
116 ['regular', 'container', 'container2']
116 ['regular', 'container', 'container2']
117 ),
117 ),
118 ])
118 ])
119 def test_enforce_groups(pre_existing, regular_should_be,
119 def test_enforce_groups(pre_existing, regular_should_be,
120 external_should_be, groups, expected, backend_hg):
120 external_should_be, groups, expected, backend_hg):
121 # TODO: anderson: adding backend_hg fixture so it sets up the database
121 # TODO: anderson: adding backend_hg fixture so it sets up the database
122 # for when running this file alone
122 # for when running this file alone
123 _delete_all_user_groups()
123 _delete_all_user_groups()
124
124
125 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
125 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
126 for gr in pre_existing:
126 for gr in pre_existing:
127 gr = fixture.create_user_group(gr)
127 gr = fixture.create_user_group(gr)
128 Session().commit()
128 Session().commit()
129
129
130 # make sure use is just in those groups
130 # make sure use is just in those groups
131 for gr in regular_should_be:
131 for gr in regular_should_be:
132 gr = fixture.create_user_group(gr)
132 gr = fixture.create_user_group(gr)
133 Session().commit()
133 Session().commit()
134 UserGroupModel().add_user_to_group(gr, user)
134 UserGroupModel().add_user_to_group(gr, user)
135 Session().commit()
135 Session().commit()
136
136
137 # now special external groups created by auth plugins
137 # now special external groups created by auth plugins
138 for gr in external_should_be:
138 for gr in external_should_be:
139 gr = fixture.create_user_group(
139 gr = fixture.create_user_group(
140 gr, user_group_data={'extern_type': 'container'})
140 gr, user_group_data={'extern_type': 'container'})
141 Session().commit()
141 Session().commit()
142 UserGroupModel().add_user_to_group(gr, user)
142 UserGroupModel().add_user_to_group(gr, user)
143 Session().commit()
143 Session().commit()
144
144
145 UserGroupModel().enforce_groups(user, groups, 'container')
145 UserGroupModel().enforce_groups(user, groups, 'container')
146 Session().commit()
146 Session().commit()
147
147
148 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
148 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
149 in_groups = user.group_member
149 in_groups = user.group_member
150
150
151 expected.sort()
151 expected.sort()
152 assert (
152 assert (
153 expected == sorted(x.users_group.users_group_name for x in in_groups))
153 expected == sorted(x.users_group.users_group_name for x in in_groups))
154
154
155
155
156 def _delete_all_user_groups():
156 def _delete_all_user_groups():
157 for gr in UserGroupModel.get_all():
157 for gr in UserGroupModel.get_all():
158 fixture.destroy_user_group(gr)
158 fixture.destroy_user_group(gr)
159 Session().commit()
159 Session().commit()
160
160
161
161
162 def test_add_and_remove_user_from_group(user_regular, user_util):
162 def test_add_and_remove_user_from_group(user_regular, user_util):
163 user_group = user_util.create_user_group()
163 user_group = user_util.create_user_group()
164 assert user_group.members == []
164 assert user_group.members == []
165 UserGroupModel().add_user_to_group(user_group, user_regular)
165 UserGroupModel().add_user_to_group(user_group, user_regular)
166 Session().commit()
166 Session().commit()
167 assert user_group.members[0].user == user_regular
167 assert user_group.members[0].user == user_regular
168 UserGroupModel().remove_user_from_group(user_group, user_regular)
168 UserGroupModel().remove_user_from_group(user_group, user_regular)
169 Session().commit()
169 Session().commit()
170 assert user_group.members == []
170 assert user_group.members == []
171
171
172
172
173 @pytest.mark.parametrize('data, expected', [
173 @pytest.mark.parametrize('data, expected', [
174 ([], []),
174 ([], []),
175 ([{"member_user_id": 1, "type": "new"}], [1]),
175 ([{"member_user_id": 1, "type": "new"}], [1]),
176 ([{"member_user_id": 1, "type": "new"},
176 ([{"member_user_id": 1, "type": "new"},
177 {"member_user_id": 1, "type": "existing"}], [1]),
177 {"member_user_id": 1, "type": "existing"}], [1]),
178 ([{"member_user_id": 1, "type": "new"},
178 ([{"member_user_id": 1, "type": "new"},
179 {"member_user_id": 2, "type": "new"},
179 {"member_user_id": 2, "type": "new"},
180 {"member_user_id": 3, "type": "remove"}], [1, 2])
180 {"member_user_id": 3, "type": "remove"}], [1, 2])
181 ])
181 ])
182 def test_clean_members_data(data, expected):
182 def test_clean_members_data(data, expected):
183 cleaned = UserGroupModel()._clean_members_data(data)
183 cleaned = UserGroupModel()._clean_members_data(data)
184 assert cleaned == expected
184 assert cleaned == expected
185
185
186
186
187 def _create_test_members():
187 def _create_test_members():
188 members = []
188 members = []
189 for member_number in range(3):
189 for member_number in range(3):
190 member = mock.Mock()
190 member = mock.Mock()
191 member.user_id = member_number + 1
191 member.user_id = member_number + 1
192 member.user.user_id = member_number + 1
192 member.user.user_id = member_number + 1
193 members.append(member)
193 members.append(member)
194 return members
194 return members
195
195
196
196
197 def test_get_added_and_removed_users():
197 def test_get_added_and_removed_users():
198 members = _create_test_members()
198 members = _create_test_members()
199 mock_user_group = mock.Mock()
199 mock_user_group = mock.Mock()
200 mock_user_group.members = [members[0], members[1]]
200 mock_user_group.members = [members[0], members[1]]
201 new_users_list = [members[1].user.user_id, members[2].user.user_id]
201 new_users_list = [members[1].user.user_id, members[2].user.user_id]
202 model = UserGroupModel()
202 model = UserGroupModel()
203
203
204 added, removed = model._get_added_and_removed_user_ids(
204 added, removed = model._get_added_and_removed_user_ids(
205 mock_user_group, new_users_list)
205 mock_user_group, new_users_list)
206
206
207 assert added == [members[2].user.user_id]
207 assert added == [members[2].user.user_id]
208 assert removed == [members[0].user.user_id]
208 assert removed == [members[0].user.user_id]
209
209
210
210
211 def test_set_users_as_members_and_find_user_in_group(
211 def test_set_users_as_members_and_find_user_in_group(
212 user_util, user_regular, user_admin):
212 user_util, user_regular, user_admin):
213 user_group = user_util.create_user_group()
213 user_group = user_util.create_user_group()
214 assert len(user_group.members) == 0
214 assert len(user_group.members) == 0
215 user_list = [user_regular.user_id, user_admin.user_id]
215 user_list = [user_regular.user_id, user_admin.user_id]
216 UserGroupModel()._set_users_as_members(user_group, user_list)
216 UserGroupModel()._set_users_as_members(user_group, user_list)
217 assert len(user_group.members) == 2
217 assert len(user_group.members) == 2
218 assert UserGroupModel()._find_user_in_group(user_regular, user_group)
218 assert UserGroupModel()._find_user_in_group(user_regular, user_group)
@@ -1,328 +1,328 b''
1
1
2 # Copyright (C) 2010-2023 RhodeCode GmbH
2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software: you can redistribute it and/or modify
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License, version 3
5 # it under the terms of the GNU Affero General Public License, version 3
6 # (only), as published by the Free Software Foundation.
6 # (only), as published by the Free Software Foundation.
7 #
7 #
8 # This program is distributed in the hope that it will be useful,
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
11 # GNU General Public License for more details.
12 #
12 #
13 # You should have received a copy of the GNU Affero General Public License
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 #
15 #
16 # This program is dual-licensed. If you wish to learn more about the
16 # This program is dual-licensed. If you wish to learn more about the
17 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19
19
20 import pytest
20 import pytest
21 import mock
21 import mock
22
22
23 from rhodecode.model.db import (
23 from rhodecode.model.db import (
24 true, User, UserGroup, UserGroupMember, UserEmailMap, Permission, UserIpMap)
24 true, User, UserGroup, UserGroupMember, UserEmailMap, Permission, UserIpMap)
25 from rhodecode.model.meta import Session
25 from rhodecode.model.meta import Session
26 from rhodecode.model.user import UserModel
26 from rhodecode.model.user import UserModel
27 from rhodecode.model.user_group import UserGroupModel
27 from rhodecode.model.user_group import UserGroupModel
28 from rhodecode.model.repo import RepoModel
28 from rhodecode.model.repo import RepoModel
29 from rhodecode.model.repo_group import RepoGroupModel
29 from rhodecode.model.repo_group import RepoGroupModel
30 from rhodecode.tests.fixture import Fixture
30 from rhodecode.tests.fixture import Fixture
31 from rhodecode.lib.str_utils import safe_str
31 from rhodecode.lib.str_utils import safe_str
32
32
33
33
34 fixture = Fixture()
34 fixture = Fixture()
35
35
36
36
37 class TestGetUsers(object):
37 class TestGetUsers(object):
38 def test_returns_active_users(self, backend, user_util):
38 def test_returns_active_users(self, backend, user_util):
39 for i in range(4):
39 for i in range(4):
40 is_active = i % 2 == 0
40 is_active = i % 2 == 0
41 user_util.create_user(active=is_active, lastname='Fake user')
41 user_util.create_user(active=is_active, lastname='Fake user')
42
42
43 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
43 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
44 with mock.patch('rhodecode.lib.helpers.link_to_user'):
44 with mock.patch('rhodecode.lib.helpers.link_to_user'):
45 users = UserModel().get_users()
45 users = UserModel().get_users()
46 fake_users = [u for u in users if u['last_name'] == 'Fake user']
46 fake_users = [u for u in users if u['last_name'] == 'Fake user']
47 assert len(fake_users) == 2
47 assert len(fake_users) == 2
48
48
49 expected_keys = (
49 expected_keys = (
50 'id', 'first_name', 'last_name', 'username', 'icon_link',
50 'id', 'first_name', 'last_name', 'username', 'icon_link',
51 'value_display', 'value', 'value_type')
51 'value_display', 'value', 'value_type')
52 for user in users:
52 for user in users:
53 assert user['value_type'] is 'user'
53 assert user['value_type'] == 'user'
54 for key in expected_keys:
54 for key in expected_keys:
55 assert key in user
55 assert key in user
56
56
57 def test_returns_user_filtered_by_last_name(self, backend, user_util):
57 def test_returns_user_filtered_by_last_name(self, backend, user_util):
58 keywords = ('aBc', u'ΓΌnicode')
58 keywords = ('aBc', u'ΓΌnicode')
59 for keyword in keywords:
59 for keyword in keywords:
60 for i in range(2):
60 for i in range(2):
61 user_util.create_user(
61 user_util.create_user(
62 active=True, lastname=u'Fake {} user'.format(keyword))
62 active=True, lastname=u'Fake {} user'.format(keyword))
63
63
64 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
64 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
65 with mock.patch('rhodecode.lib.helpers.link_to_user'):
65 with mock.patch('rhodecode.lib.helpers.link_to_user'):
66 keyword = keywords[1].lower()
66 keyword = keywords[1].lower()
67 users = UserModel().get_users(name_contains=keyword)
67 users = UserModel().get_users(name_contains=keyword)
68
68
69 fake_users = [u for u in users if u['last_name'].startswith('Fake')]
69 fake_users = [u for u in users if u['last_name'].startswith('Fake')]
70 assert len(fake_users) == 2
70 assert len(fake_users) == 2
71 for user in fake_users:
71 for user in fake_users:
72 assert user['last_name'] == safe_str('Fake ΓΌnicode user')
72 assert user['last_name'] == safe_str('Fake ΓΌnicode user')
73
73
74 def test_returns_user_filtered_by_first_name(self, backend, user_util):
74 def test_returns_user_filtered_by_first_name(self, backend, user_util):
75 created_users = []
75 created_users = []
76 keywords = ('aBc', u'ΓΌnicode')
76 keywords = ('aBc', u'ΓΌnicode')
77 for keyword in keywords:
77 for keyword in keywords:
78 for i in range(2):
78 for i in range(2):
79 created_users.append(user_util.create_user(
79 created_users.append(user_util.create_user(
80 active=True, lastname='Fake user',
80 active=True, lastname='Fake user',
81 firstname=u'Fake {} user'.format(keyword)))
81 firstname=u'Fake {} user'.format(keyword)))
82
82
83 keyword = keywords[1].lower()
83 keyword = keywords[1].lower()
84 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
84 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
85 with mock.patch('rhodecode.lib.helpers.link_to_user'):
85 with mock.patch('rhodecode.lib.helpers.link_to_user'):
86 users = UserModel().get_users(name_contains=keyword)
86 users = UserModel().get_users(name_contains=keyword)
87
87
88 fake_users = [u for u in users if u['last_name'].startswith('Fake')]
88 fake_users = [u for u in users if u['last_name'].startswith('Fake')]
89 assert len(fake_users) == 2
89 assert len(fake_users) == 2
90 for user in fake_users:
90 for user in fake_users:
91 assert user['first_name'] == safe_str('Fake ΓΌnicode user')
91 assert user['first_name'] == safe_str('Fake ΓΌnicode user')
92
92
93 def test_returns_user_filtered_by_username(self, backend, user_util):
93 def test_returns_user_filtered_by_username(self, backend, user_util):
94 created_users = []
94 created_users = []
95 for i in range(5):
95 for i in range(5):
96 created_users.append(user_util.create_user(
96 created_users.append(user_util.create_user(
97 active=True, lastname='Fake user'))
97 active=True, lastname='Fake user'))
98
98
99 user_filter = created_users[-1].username[-2:]
99 user_filter = created_users[-1].username[-2:]
100 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
100 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
101 with mock.patch('rhodecode.lib.helpers.link_to_user'):
101 with mock.patch('rhodecode.lib.helpers.link_to_user'):
102 users = UserModel().get_users(name_contains=user_filter)
102 users = UserModel().get_users(name_contains=user_filter)
103
103
104 fake_users = [u for u in users if u['last_name'].startswith('Fake')]
104 fake_users = [u for u in users if u['last_name'].startswith('Fake')]
105 assert len(fake_users) == 1
105 assert len(fake_users) == 1
106 assert fake_users[0]['username'] == created_users[-1].username
106 assert fake_users[0]['username'] == created_users[-1].username
107
107
108 def test_returns_limited_user_list(self, backend, user_util):
108 def test_returns_limited_user_list(self, backend, user_util):
109 created_users = []
109 created_users = []
110 for i in range(5):
110 for i in range(5):
111 created_users.append(user_util.create_user(
111 created_users.append(user_util.create_user(
112 active=True, lastname='Fake user'))
112 active=True, lastname='Fake user'))
113
113
114 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
114 with mock.patch('rhodecode.lib.helpers.gravatar_url'):
115 with mock.patch('rhodecode.lib.helpers.link_to_user'):
115 with mock.patch('rhodecode.lib.helpers.link_to_user'):
116 users = UserModel().get_users(name_contains='Fake', limit=3)
116 users = UserModel().get_users(name_contains='Fake', limit=3)
117
117
118 fake_users = [u for u in users if u['last_name'].startswith('Fake')]
118 fake_users = [u for u in users if u['last_name'].startswith('Fake')]
119 assert len(fake_users) == 3
119 assert len(fake_users) == 3
120
120
121
121
122 @pytest.fixture()
122 @pytest.fixture()
123 def test_user(request, baseapp):
123 def test_user(request, baseapp):
124 usr = UserModel().create_or_update(
124 usr = UserModel().create_or_update(
125 username='test_user',
125 username='test_user',
126 password='qweqwe',
126 password='qweqwe',
127 email='main_email@rhodecode.org',
127 email='main_email@rhodecode.org',
128 firstname='u1', lastname=u'u1')
128 firstname='u1', lastname=u'u1')
129 Session().commit()
129 Session().commit()
130 assert User.get_by_username(u'test_user') == usr
130 assert User.get_by_username(u'test_user') == usr
131
131
132 @request.addfinalizer
132 @request.addfinalizer
133 def cleanup():
133 def cleanup():
134 if UserModel().get_user(usr.user_id) is None:
134 if UserModel().get_user(usr.user_id) is None:
135 return
135 return
136
136
137 perm = Permission.query().all()
137 perm = Permission.query().all()
138 for p in perm:
138 for p in perm:
139 UserModel().revoke_perm(usr, p)
139 UserModel().revoke_perm(usr, p)
140
140
141 UserModel().delete(usr.user_id)
141 UserModel().delete(usr.user_id)
142 Session().commit()
142 Session().commit()
143
143
144 return usr
144 return usr
145
145
146
146
147 def test_create_and_remove(test_user):
147 def test_create_and_remove(test_user):
148 usr = test_user
148 usr = test_user
149
149
150 # make user group
150 # make user group
151 user_group = fixture.create_user_group('some_example_group')
151 user_group = fixture.create_user_group('some_example_group')
152 Session().commit()
152 Session().commit()
153
153
154 UserGroupModel().add_user_to_group(user_group, usr)
154 UserGroupModel().add_user_to_group(user_group, usr)
155 Session().commit()
155 Session().commit()
156
156
157 assert UserGroup.get(user_group.users_group_id) == user_group
157 assert UserGroup.get(user_group.users_group_id) == user_group
158 assert UserGroupMember.query().count() == 1
158 assert UserGroupMember.query().count() == 1
159 UserModel().delete(usr.user_id)
159 UserModel().delete(usr.user_id)
160 Session().commit()
160 Session().commit()
161
161
162 assert UserGroupMember.query().all() == []
162 assert UserGroupMember.query().all() == []
163
163
164
164
165 def test_additonal_email_as_main(test_user):
165 def test_additonal_email_as_main(test_user):
166 with pytest.raises(AttributeError):
166 with pytest.raises(AttributeError):
167 m = UserEmailMap()
167 m = UserEmailMap()
168 m.email = test_user.email
168 m.email = test_user.email
169 m.user = test_user
169 m.user = test_user
170 Session().add(m)
170 Session().add(m)
171 Session().commit()
171 Session().commit()
172
172
173
173
174 def test_extra_email_map(test_user):
174 def test_extra_email_map(test_user):
175
175
176 m = UserEmailMap()
176 m = UserEmailMap()
177 m.email = u'main_email2@rhodecode.org'
177 m.email = u'main_email2@rhodecode.org'
178 m.user = test_user
178 m.user = test_user
179 Session().add(m)
179 Session().add(m)
180 Session().commit()
180 Session().commit()
181
181
182 u = User.get_by_email(email='main_email@rhodecode.org')
182 u = User.get_by_email(email='main_email@rhodecode.org')
183 assert test_user.user_id == u.user_id
183 assert test_user.user_id == u.user_id
184 assert test_user.username == u.username
184 assert test_user.username == u.username
185
185
186 u = User.get_by_email(email='main_email2@rhodecode.org')
186 u = User.get_by_email(email='main_email2@rhodecode.org')
187 assert test_user.user_id == u.user_id
187 assert test_user.user_id == u.user_id
188 assert test_user.username == u.username
188 assert test_user.username == u.username
189 u = User.get_by_email(email='main_email3@rhodecode.org')
189 u = User.get_by_email(email='main_email3@rhodecode.org')
190 assert u is None
190 assert u is None
191
191
192
192
193 def test_get_api_data_replaces_secret_data_by_default(test_user):
193 def test_get_api_data_replaces_secret_data_by_default(test_user):
194 api_data = test_user.get_api_data()
194 api_data = test_user.get_api_data()
195 api_key_length = 40
195 api_key_length = 40
196 expected_replacement = '*' * api_key_length
196 expected_replacement = '*' * api_key_length
197
197
198 for key in api_data['auth_tokens']:
198 for key in api_data['auth_tokens']:
199 assert key == expected_replacement
199 assert key == expected_replacement
200
200
201
201
202 def test_get_api_data_includes_secret_data_if_activated(test_user):
202 def test_get_api_data_includes_secret_data_if_activated(test_user):
203 api_data = test_user.get_api_data(include_secrets=True)
203 api_data = test_user.get_api_data(include_secrets=True)
204 assert api_data['auth_tokens'] == test_user.auth_tokens
204 assert api_data['auth_tokens'] == test_user.auth_tokens
205
205
206
206
207 def test_add_perm(test_user):
207 def test_add_perm(test_user):
208 perm = Permission.query().all()[0]
208 perm = Permission.query().all()[0]
209 UserModel().grant_perm(test_user, perm)
209 UserModel().grant_perm(test_user, perm)
210 Session().commit()
210 Session().commit()
211 assert UserModel().has_perm(test_user, perm)
211 assert UserModel().has_perm(test_user, perm)
212
212
213
213
214 def test_has_perm(test_user):
214 def test_has_perm(test_user):
215 perm = Permission.query().all()
215 perm = Permission.query().all()
216 for p in perm:
216 for p in perm:
217 assert not UserModel().has_perm(test_user, p)
217 assert not UserModel().has_perm(test_user, p)
218
218
219
219
220 def test_revoke_perm(test_user):
220 def test_revoke_perm(test_user):
221 perm = Permission.query().all()[0]
221 perm = Permission.query().all()[0]
222 UserModel().grant_perm(test_user, perm)
222 UserModel().grant_perm(test_user, perm)
223 Session().commit()
223 Session().commit()
224 assert UserModel().has_perm(test_user, perm)
224 assert UserModel().has_perm(test_user, perm)
225
225
226 # revoke
226 # revoke
227 UserModel().revoke_perm(test_user, perm)
227 UserModel().revoke_perm(test_user, perm)
228 Session().commit()
228 Session().commit()
229 assert not UserModel().has_perm(test_user, perm)
229 assert not UserModel().has_perm(test_user, perm)
230
230
231
231
232 @pytest.mark.parametrize("ip_range, expected, expect_errors", [
232 @pytest.mark.parametrize("ip_range, expected, expect_errors", [
233 ('', [], False),
233 ('', [], False),
234 ('127.0.0.1', ['127.0.0.1'], False),
234 ('127.0.0.1', ['127.0.0.1'], False),
235 ('127.0.0.1,127.0.0.2', ['127.0.0.1', '127.0.0.2'], False),
235 ('127.0.0.1,127.0.0.2', ['127.0.0.1', '127.0.0.2'], False),
236 ('127.0.0.1 , 127.0.0.2', ['127.0.0.1', '127.0.0.2'], False),
236 ('127.0.0.1 , 127.0.0.2', ['127.0.0.1', '127.0.0.2'], False),
237 (
237 (
238 '127.0.0.1,172.172.172.0,127.0.0.2',
238 '127.0.0.1,172.172.172.0,127.0.0.2',
239 ['127.0.0.1', '172.172.172.0', '127.0.0.2'], False),
239 ['127.0.0.1', '172.172.172.0', '127.0.0.2'], False),
240 (
240 (
241 '127.0.0.1-127.0.0.5',
241 '127.0.0.1-127.0.0.5',
242 ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4', '127.0.0.5'],
242 ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4', '127.0.0.5'],
243 False),
243 False),
244 (
244 (
245 '127.0.0.1 - 127.0.0.5',
245 '127.0.0.1 - 127.0.0.5',
246 ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4', '127.0.0.5'],
246 ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4', '127.0.0.5'],
247 False
247 False
248 ),
248 ),
249 ('-', [], True),
249 ('-', [], True),
250 ('127.0.0.1-32', [], True),
250 ('127.0.0.1-32', [], True),
251 (
251 (
252 '127.0.0.1,127.0.0.1,127.0.0.1,127.0.0.1-127.0.0.2,127.0.0.2',
252 '127.0.0.1,127.0.0.1,127.0.0.1,127.0.0.1-127.0.0.2,127.0.0.2',
253 ['127.0.0.1', '127.0.0.2'], False),
253 ['127.0.0.1', '127.0.0.2'], False),
254 (
254 (
255 '127.0.0.1-127.0.0.2,127.0.0.4-127.0.0.6,',
255 '127.0.0.1-127.0.0.2,127.0.0.4-127.0.0.6,',
256 ['127.0.0.1', '127.0.0.2', '127.0.0.4', '127.0.0.5', '127.0.0.6'],
256 ['127.0.0.1', '127.0.0.2', '127.0.0.4', '127.0.0.5', '127.0.0.6'],
257 False
257 False
258 ),
258 ),
259 (
259 (
260 '127.0.0.1-127.0.0.2,127.0.0.1-127.0.0.6,',
260 '127.0.0.1-127.0.0.2,127.0.0.1-127.0.0.6,',
261 ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4', '127.0.0.5',
261 ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4', '127.0.0.5',
262 '127.0.0.6'],
262 '127.0.0.6'],
263 False
263 False
264 ),
264 ),
265 ])
265 ])
266 def test_ip_range_generator(ip_range, expected, expect_errors):
266 def test_ip_range_generator(ip_range, expected, expect_errors):
267 func = UserModel().parse_ip_range
267 func = UserModel().parse_ip_range
268 if expect_errors:
268 if expect_errors:
269 pytest.raises(ValueError, func, ip_range)
269 pytest.raises(ValueError, func, ip_range)
270 else:
270 else:
271 parsed_list = func(ip_range)
271 parsed_list = func(ip_range)
272 assert parsed_list == expected
272 assert parsed_list == expected
273
273
274
274
275 def test_user_delete_cascades_ip_whitelist(test_user):
275 def test_user_delete_cascades_ip_whitelist(test_user):
276 sample_ip = '1.1.1.1'
276 sample_ip = '1.1.1.1'
277 uid_map = UserIpMap(user_id=test_user.user_id, ip_addr=sample_ip)
277 uid_map = UserIpMap(user_id=test_user.user_id, ip_addr=sample_ip)
278 Session().add(uid_map)
278 Session().add(uid_map)
279 Session().delete(test_user)
279 Session().delete(test_user)
280 try:
280 try:
281 Session().flush()
281 Session().flush()
282 finally:
282 finally:
283 Session().rollback()
283 Session().rollback()
284
284
285
285
286 def test_account_for_deactivation_generation(test_user):
286 def test_account_for_deactivation_generation(test_user):
287 accounts = UserModel().get_accounts_in_creation_order(
287 accounts = UserModel().get_accounts_in_creation_order(
288 current_user=test_user)
288 current_user=test_user)
289 # current user should be #1 in the list
289 # current user should be #1 in the list
290 assert accounts[0] == test_user.user_id
290 assert accounts[0] == test_user.user_id
291 active_users = User.query().filter(User.active == true()).count()
291 active_users = User.query().filter(User.active == true()).count()
292 assert active_users == len(accounts)
292 assert active_users == len(accounts)
293
293
294
294
295 def test_user_delete_cascades_permissions_on_repo(backend, test_user):
295 def test_user_delete_cascades_permissions_on_repo(backend, test_user):
296 test_repo = backend.create_repo()
296 test_repo = backend.create_repo()
297 RepoModel().grant_user_permission(
297 RepoModel().grant_user_permission(
298 test_repo, test_user, 'repository.write')
298 test_repo, test_user, 'repository.write')
299 Session().commit()
299 Session().commit()
300
300
301 assert test_user.repo_to_perm
301 assert test_user.repo_to_perm
302
302
303 UserModel().delete(test_user)
303 UserModel().delete(test_user)
304 Session().commit()
304 Session().commit()
305
305
306
306
307 def test_user_delete_cascades_permissions_on_repo_group(
307 def test_user_delete_cascades_permissions_on_repo_group(
308 test_repo_group, test_user):
308 test_repo_group, test_user):
309 RepoGroupModel().grant_user_permission(
309 RepoGroupModel().grant_user_permission(
310 test_repo_group, test_user, 'group.write')
310 test_repo_group, test_user, 'group.write')
311 Session().commit()
311 Session().commit()
312
312
313 assert test_user.repo_group_to_perm
313 assert test_user.repo_group_to_perm
314
314
315 Session().delete(test_user)
315 Session().delete(test_user)
316 Session().commit()
316 Session().commit()
317
317
318
318
319 def test_user_delete_cascades_permissions_on_user_group(
319 def test_user_delete_cascades_permissions_on_user_group(
320 test_user_group, test_user):
320 test_user_group, test_user):
321 UserGroupModel().grant_user_permission(
321 UserGroupModel().grant_user_permission(
322 test_user_group, test_user, 'usergroup.write')
322 test_user_group, test_user, 'usergroup.write')
323 Session().commit()
323 Session().commit()
324
324
325 assert test_user.user_group_to_perm
325 assert test_user.user_group_to_perm
326
326
327 Session().delete(test_user)
327 Session().delete(test_user)
328 Session().commit()
328 Session().commit()
@@ -1,295 +1,295 b''
1
1
2 # Copyright (C) 2010-2023 RhodeCode GmbH
2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software: you can redistribute it and/or modify
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License, version 3
5 # it under the terms of the GNU Affero General Public License, version 3
6 # (only), as published by the Free Software Foundation.
6 # (only), as published by the Free Software Foundation.
7 #
7 #
8 # This program is distributed in the hope that it will be useful,
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
11 # GNU General Public License for more details.
12 #
12 #
13 # You should have received a copy of the GNU Affero General Public License
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 #
15 #
16 # This program is dual-licensed. If you wish to learn more about the
16 # This program is dual-licensed. If you wish to learn more about the
17 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19
19
20
20
21 import formencode
21 import formencode
22 import pytest
22 import pytest
23
23
24 from rhodecode.tests import (
24 from rhodecode.tests import (
25 HG_REPO, TEST_USER_REGULAR2_EMAIL, TEST_USER_REGULAR2_LOGIN,
25 HG_REPO, TEST_USER_REGULAR2_EMAIL, TEST_USER_REGULAR2_LOGIN,
26 TEST_USER_REGULAR2_PASS, TEST_USER_ADMIN_LOGIN, TESTS_TMP_PATH)
26 TEST_USER_REGULAR2_PASS, TEST_USER_ADMIN_LOGIN, TESTS_TMP_PATH)
27
27
28 from rhodecode.model import validators as v
28 from rhodecode.model import validators as v
29 from rhodecode.model.user_group import UserGroupModel
29 from rhodecode.model.user_group import UserGroupModel
30
30
31 from rhodecode.model.meta import Session
31 from rhodecode.model.meta import Session
32 from rhodecode.model.repo_group import RepoGroupModel
32 from rhodecode.model.repo_group import RepoGroupModel
33 from rhodecode.model.db import ChangesetStatus, Repository
33 from rhodecode.model.db import ChangesetStatus, Repository
34 from rhodecode.model.changeset_status import ChangesetStatusModel
34 from rhodecode.model.changeset_status import ChangesetStatusModel
35 from rhodecode.tests.fixture import Fixture
35 from rhodecode.tests.fixture import Fixture
36
36
37 fixture = Fixture()
37 fixture = Fixture()
38
38
39 pytestmark = pytest.mark.usefixtures('baseapp')
39 pytestmark = pytest.mark.usefixtures('baseapp')
40
40
41
41
42 @pytest.fixture()
42 @pytest.fixture()
43 def localizer():
43 def localizer():
44 def func(msg):
44 def func(msg):
45 return msg
45 return msg
46 return func
46 return func
47
47
48
48
49 def test_Message_extractor(localizer):
49 def test_Message_extractor(localizer):
50 validator = v.ValidUsername(localizer)
50 validator = v.ValidUsername(localizer)
51 pytest.raises(formencode.Invalid, validator.to_python, 'default')
51 pytest.raises(formencode.Invalid, validator.to_python, 'default')
52
52
53 class StateObj(object):
53 class StateObj(object):
54 pass
54 pass
55
55
56 pytest.raises(
56 pytest.raises(
57 formencode.Invalid, validator.to_python, 'default', StateObj)
57 formencode.Invalid, validator.to_python, 'default', StateObj)
58
58
59
59
60 def test_ValidUsername(localizer):
60 def test_ValidUsername(localizer):
61 validator = v.ValidUsername(localizer)
61 validator = v.ValidUsername(localizer)
62
62
63 pytest.raises(formencode.Invalid, validator.to_python, 'default')
63 pytest.raises(formencode.Invalid, validator.to_python, 'default')
64 pytest.raises(formencode.Invalid, validator.to_python, 'new_user')
64 pytest.raises(formencode.Invalid, validator.to_python, 'new_user')
65 pytest.raises(formencode.Invalid, validator.to_python, '.,')
65 pytest.raises(formencode.Invalid, validator.to_python, '.,')
66 pytest.raises(
66 pytest.raises(
67 formencode.Invalid, validator.to_python, TEST_USER_ADMIN_LOGIN)
67 formencode.Invalid, validator.to_python, TEST_USER_ADMIN_LOGIN)
68 assert 'test' == validator.to_python('test')
68 assert 'test' == validator.to_python('test')
69
69
70 validator = v.ValidUsername(localizer, edit=True, old_data={'user_id': 1})
70 validator = v.ValidUsername(localizer, edit=True, old_data={'user_id': 1})
71
71
72
72
73 def test_ValidRepoUser(localizer):
73 def test_ValidRepoUser(localizer):
74 validator = v.ValidRepoUser(localizer)
74 validator = v.ValidRepoUser(localizer)
75 pytest.raises(formencode.Invalid, validator.to_python, 'nouser')
75 pytest.raises(formencode.Invalid, validator.to_python, 'nouser')
76 assert TEST_USER_ADMIN_LOGIN == \
76 assert TEST_USER_ADMIN_LOGIN == \
77 validator.to_python(TEST_USER_ADMIN_LOGIN)
77 validator.to_python(TEST_USER_ADMIN_LOGIN)
78
78
79
79
80 def test_ValidUserGroup(localizer):
80 def test_ValidUserGroup(localizer):
81 validator = v.ValidUserGroup(localizer)
81 validator = v.ValidUserGroup(localizer)
82 pytest.raises(formencode.Invalid, validator.to_python, 'default')
82 pytest.raises(formencode.Invalid, validator.to_python, 'default')
83 pytest.raises(formencode.Invalid, validator.to_python, '.,')
83 pytest.raises(formencode.Invalid, validator.to_python, '.,')
84
84
85 gr = fixture.create_user_group('test')
85 gr = fixture.create_user_group('test')
86 gr2 = fixture.create_user_group('tes2')
86 gr2 = fixture.create_user_group('tes2')
87 Session().commit()
87 Session().commit()
88 pytest.raises(formencode.Invalid, validator.to_python, 'test')
88 pytest.raises(formencode.Invalid, validator.to_python, 'test')
89 assert gr.users_group_id is not None
89 assert gr.users_group_id is not None
90 validator = v.ValidUserGroup(localizer,
90 validator = v.ValidUserGroup(localizer,
91 edit=True,
91 edit=True,
92 old_data={'users_group_id': gr2.users_group_id})
92 old_data={'users_group_id': gr2.users_group_id})
93
93
94 pytest.raises(formencode.Invalid, validator.to_python, 'test')
94 pytest.raises(formencode.Invalid, validator.to_python, 'test')
95 pytest.raises(formencode.Invalid, validator.to_python, 'TesT')
95 pytest.raises(formencode.Invalid, validator.to_python, 'TesT')
96 pytest.raises(formencode.Invalid, validator.to_python, 'TEST')
96 pytest.raises(formencode.Invalid, validator.to_python, 'TEST')
97 UserGroupModel().delete(gr)
97 UserGroupModel().delete(gr)
98 UserGroupModel().delete(gr2)
98 UserGroupModel().delete(gr2)
99 Session().commit()
99 Session().commit()
100
100
101
101
102 @pytest.fixture(scope='function')
102 @pytest.fixture(scope='function')
103 def repo_group(request):
103 def repo_group(request):
104 model = RepoGroupModel()
104 model = RepoGroupModel()
105 gr = model.create(
105 gr = model.create(
106 group_name='test_gr', group_description='desc', just_db=True,
106 group_name='test_gr', group_description='desc', just_db=True,
107 owner=TEST_USER_ADMIN_LOGIN)
107 owner=TEST_USER_ADMIN_LOGIN)
108
108
109 def cleanup():
109 def cleanup():
110 model.delete(gr)
110 model.delete(gr)
111
111
112 request.addfinalizer(cleanup)
112 request.addfinalizer(cleanup)
113
113
114 return gr
114 return gr
115
115
116
116
117 def test_ValidRepoGroup_same_name_as_repo(localizer):
117 def test_ValidRepoGroup_same_name_as_repo(localizer):
118 validator = v.ValidRepoGroup(localizer)
118 validator = v.ValidRepoGroup(localizer)
119 with pytest.raises(formencode.Invalid) as excinfo:
119 with pytest.raises(formencode.Invalid) as excinfo:
120 validator.to_python({'group_name': HG_REPO})
120 validator.to_python({'group_name': HG_REPO})
121 expected_msg = 'Repository with name "vcs_test_hg" already exists'
121 expected_msg = 'Repository with name "vcs_test_hg" already exists'
122 assert expected_msg in str(excinfo.value)
122 assert expected_msg in str(excinfo.value)
123
123
124
124
125 def test_ValidRepoGroup_group_exists(localizer, repo_group):
125 def test_ValidRepoGroup_group_exists(localizer, repo_group):
126 validator = v.ValidRepoGroup(localizer)
126 validator = v.ValidRepoGroup(localizer)
127 with pytest.raises(formencode.Invalid) as excinfo:
127 with pytest.raises(formencode.Invalid) as excinfo:
128 validator.to_python({'group_name': repo_group.group_name})
128 validator.to_python({'group_name': repo_group.group_name})
129 expected_msg = 'Group "test_gr" already exists'
129 expected_msg = 'Group "test_gr" already exists'
130 assert expected_msg in str(excinfo.value)
130 assert expected_msg in str(excinfo.value)
131
131
132
132
133 def test_ValidRepoGroup_invalid_parent(localizer, repo_group):
133 def test_ValidRepoGroup_invalid_parent(localizer, repo_group):
134 validator = v.ValidRepoGroup(localizer, edit=True,
134 validator = v.ValidRepoGroup(localizer, edit=True,
135 old_data={'group_id': repo_group.group_id})
135 old_data={'group_id': repo_group.group_id})
136 with pytest.raises(formencode.Invalid) as excinfo:
136 with pytest.raises(formencode.Invalid) as excinfo:
137 validator.to_python({
137 validator.to_python({
138 'group_name': repo_group.group_name + 'n',
138 'group_name': repo_group.group_name + 'n',
139 'group_parent_id': repo_group.group_id,
139 'group_parent_id': repo_group.group_id,
140 })
140 })
141 expected_msg = 'Cannot assign this group as parent'
141 expected_msg = 'Cannot assign this group as parent'
142 assert expected_msg in str(excinfo.value)
142 assert expected_msg in str(excinfo.value)
143
143
144
144
145 def test_ValidRepoGroup_edit_group_no_root_permission(localizer, repo_group):
145 def test_ValidRepoGroup_edit_group_no_root_permission(localizer, repo_group):
146 validator = v.ValidRepoGroup(localizer,
146 validator = v.ValidRepoGroup(localizer,
147 edit=True, old_data={'group_id': repo_group.group_id},
147 edit=True, old_data={'group_id': repo_group.group_id},
148 can_create_in_root=False)
148 can_create_in_root=False)
149
149
150 # Cannot change parent
150 # Cannot change parent
151 with pytest.raises(formencode.Invalid) as excinfo:
151 with pytest.raises(formencode.Invalid) as excinfo:
152 validator.to_python({'group_parent_id': '25'})
152 validator.to_python({'group_parent_id': '25'})
153 expected_msg = 'no permission to store repository group in root location'
153 expected_msg = 'no permission to store repository group in root location'
154 assert expected_msg in str(excinfo.value)
154 assert expected_msg in str(excinfo.value)
155
155
156 # Chaning all the other fields is allowed
156 # Chaning all the other fields is allowed
157 validator.to_python({'group_name': 'foo', 'group_parent_id': '-1'})
157 validator.to_python({'group_name': 'foo', 'group_parent_id': '-1'})
158 validator.to_python(
158 validator.to_python(
159 {'user': TEST_USER_REGULAR2_LOGIN, 'group_parent_id': '-1'})
159 {'user': TEST_USER_REGULAR2_LOGIN, 'group_parent_id': '-1'})
160 validator.to_python({'group_description': 'bar', 'group_parent_id': '-1'})
160 validator.to_python({'group_description': 'bar', 'group_parent_id': '-1'})
161 validator.to_python({'enable_locking': 'true', 'group_parent_id': '-1'})
161 validator.to_python({'enable_locking': 'true', 'group_parent_id': '-1'})
162
162
163
163
164 def test_ValidPassword(localizer):
164 def test_ValidPassword(localizer):
165 validator = v.ValidPassword(localizer)
165 validator = v.ValidPassword(localizer)
166 assert 'lol' == validator.to_python('lol')
166 assert 'lol' == validator.to_python('lol')
167 assert None == validator.to_python(None)
167 assert None is validator.to_python(None)
168 pytest.raises(formencode.Invalid, validator.to_python, 'Δ…Δ‡ΕΌΕΊ')
168 pytest.raises(formencode.Invalid, validator.to_python, 'Δ…Δ‡ΕΌΕΊ')
169
169
170
170
171 def test_ValidPasswordsMatch(localizer):
171 def test_ValidPasswordsMatch(localizer):
172 validator = v.ValidPasswordsMatch(localizer)
172 validator = v.ValidPasswordsMatch(localizer)
173 pytest.raises(
173 pytest.raises(
174 formencode.Invalid,
174 formencode.Invalid,
175 validator.to_python, {'password': 'pass',
175 validator.to_python, {'password': 'pass',
176 'password_confirmation': 'pass2'})
176 'password_confirmation': 'pass2'})
177
177
178 pytest.raises(
178 pytest.raises(
179 formencode.Invalid,
179 formencode.Invalid,
180 validator.to_python, {'new_password': 'pass',
180 validator.to_python, {'new_password': 'pass',
181 'password_confirmation': 'pass2'})
181 'password_confirmation': 'pass2'})
182
182
183 assert {'new_password': 'pass', 'password_confirmation': 'pass'} == \
183 assert {'new_password': 'pass', 'password_confirmation': 'pass'} == \
184 validator.to_python({'new_password': 'pass',
184 validator.to_python({'new_password': 'pass',
185 'password_confirmation': 'pass'})
185 'password_confirmation': 'pass'})
186
186
187 assert {'password': 'pass', 'password_confirmation': 'pass'} == \
187 assert {'password': 'pass', 'password_confirmation': 'pass'} == \
188 validator.to_python({'password': 'pass',
188 validator.to_python({'password': 'pass',
189 'password_confirmation': 'pass'})
189 'password_confirmation': 'pass'})
190
190
191
191
192 def test_ValidAuth(localizer, config_stub):
192 def test_ValidAuth(localizer, config_stub):
193 config_stub.testing_securitypolicy()
193 config_stub.testing_securitypolicy()
194 config_stub.include('rhodecode.authentication')
194 config_stub.include('rhodecode.authentication')
195 config_stub.include('rhodecode.authentication.plugins.auth_rhodecode')
195 config_stub.include('rhodecode.authentication.plugins.auth_rhodecode')
196 config_stub.include('rhodecode.authentication.plugins.auth_token')
196 config_stub.include('rhodecode.authentication.plugins.auth_token')
197
197
198 validator = v.ValidAuth(localizer)
198 validator = v.ValidAuth(localizer)
199 valid_creds = {
199 valid_creds = {
200 'username': TEST_USER_REGULAR2_LOGIN,
200 'username': TEST_USER_REGULAR2_LOGIN,
201 'password': TEST_USER_REGULAR2_PASS,
201 'password': TEST_USER_REGULAR2_PASS,
202 }
202 }
203 invalid_creds = {
203 invalid_creds = {
204 'username': 'err',
204 'username': 'err',
205 'password': 'err',
205 'password': 'err',
206 }
206 }
207 assert valid_creds == validator.to_python(valid_creds)
207 assert valid_creds == validator.to_python(valid_creds)
208 pytest.raises(
208 pytest.raises(
209 formencode.Invalid, validator.to_python, invalid_creds)
209 formencode.Invalid, validator.to_python, invalid_creds)
210
210
211
211
212 def test_ValidRepoName(localizer):
212 def test_ValidRepoName(localizer):
213 validator = v.ValidRepoName(localizer)
213 validator = v.ValidRepoName(localizer)
214
214
215 pytest.raises(
215 pytest.raises(
216 formencode.Invalid, validator.to_python, {'repo_name': ''})
216 formencode.Invalid, validator.to_python, {'repo_name': ''})
217
217
218 pytest.raises(
218 pytest.raises(
219 formencode.Invalid, validator.to_python, {'repo_name': HG_REPO})
219 formencode.Invalid, validator.to_python, {'repo_name': HG_REPO})
220
220
221 gr = RepoGroupModel().create(group_name='group_test',
221 gr = RepoGroupModel().create(group_name='group_test',
222 group_description='desc',
222 group_description='desc',
223 owner=TEST_USER_ADMIN_LOGIN)
223 owner=TEST_USER_ADMIN_LOGIN)
224 pytest.raises(
224 pytest.raises(
225 formencode.Invalid, validator.to_python, {'repo_name': gr.group_name})
225 formencode.Invalid, validator.to_python, {'repo_name': gr.group_name})
226
226
227 #TODO: write an error case for that ie. create a repo withinh a group
227 #TODO: write an error case for that ie. create a repo withinh a group
228 # pytest.raises(formencode.Invalid,
228 # pytest.raises(formencode.Invalid,
229 # validator.to_python, {'repo_name': 'some',
229 # validator.to_python, {'repo_name': 'some',
230 # 'repo_group': gr.group_id})
230 # 'repo_group': gr.group_id})
231
231
232
232
233 def test_ValidForkName(localizer):
233 def test_ValidForkName(localizer):
234 # this uses ValidRepoName validator
234 # this uses ValidRepoName validator
235 assert True
235 assert True
236
236
237 @pytest.mark.parametrize("name, expected", [
237 @pytest.mark.parametrize("name, expected", [
238 ('test', 'test'), ('lolz!', 'lolz'), (' aavv', 'aavv'),
238 ('test', 'test'), ('lolz!', 'lolz'), (' aavv', 'aavv'),
239 ('ala ma kota', 'ala-ma-kota'), ('@nooo', 'nooo'),
239 ('ala ma kota', 'ala-ma-kota'), ('@nooo', 'nooo'),
240 ('$!haha lolz !', 'haha-lolz'), ('$$$$$', ''), ('{}OK!', 'OK'),
240 ('$!haha lolz !', 'haha-lolz'), ('$$$$$', ''), ('{}OK!', 'OK'),
241 ('/]re po', 're-po')])
241 ('/]re po', 're-po')])
242 def test_SlugifyName(name, expected, localizer):
242 def test_SlugifyName(name, expected, localizer):
243 validator = v.SlugifyName(localizer)
243 validator = v.SlugifyName(localizer)
244 assert expected == validator.to_python(name)
244 assert expected == validator.to_python(name)
245
245
246
246
247 def test_ValidForkType(localizer):
247 def test_ValidForkType(localizer):
248 validator = v.ValidForkType(localizer, old_data={'repo_type': 'hg'})
248 validator = v.ValidForkType(localizer, old_data={'repo_type': 'hg'})
249 assert 'hg' == validator.to_python('hg')
249 assert 'hg' == validator.to_python('hg')
250 pytest.raises(formencode.Invalid, validator.to_python, 'git')
250 pytest.raises(formencode.Invalid, validator.to_python, 'git')
251
251
252
252
253 def test_ValidPath(localizer):
253 def test_ValidPath(localizer):
254 validator = v.ValidPath(localizer)
254 validator = v.ValidPath(localizer)
255 assert TESTS_TMP_PATH == validator.to_python(TESTS_TMP_PATH)
255 assert TESTS_TMP_PATH == validator.to_python(TESTS_TMP_PATH)
256 pytest.raises(
256 pytest.raises(
257 formencode.Invalid, validator.to_python, '/no_such_dir')
257 formencode.Invalid, validator.to_python, '/no_such_dir')
258
258
259
259
260 def test_UniqSystemEmail(localizer):
260 def test_UniqSystemEmail(localizer):
261 validator = v.UniqSystemEmail(localizer, old_data={})
261 validator = v.UniqSystemEmail(localizer, old_data={})
262
262
263 assert 'mail@python.org' == validator.to_python('MaiL@Python.org')
263 assert 'mail@python.org' == validator.to_python('MaiL@Python.org')
264
264
265 email = TEST_USER_REGULAR2_EMAIL
265 email = TEST_USER_REGULAR2_EMAIL
266 pytest.raises(formencode.Invalid, validator.to_python, email)
266 pytest.raises(formencode.Invalid, validator.to_python, email)
267
267
268
268
269 def test_ValidSystemEmail(localizer):
269 def test_ValidSystemEmail(localizer):
270 validator = v.ValidSystemEmail(localizer)
270 validator = v.ValidSystemEmail(localizer)
271 email = TEST_USER_REGULAR2_EMAIL
271 email = TEST_USER_REGULAR2_EMAIL
272
272
273 assert email == validator.to_python(email)
273 assert email == validator.to_python(email)
274 pytest.raises(formencode.Invalid, validator.to_python, 'err')
274 pytest.raises(formencode.Invalid, validator.to_python, 'err')
275
275
276
276
277 def test_NotReviewedRevisions(localizer):
277 def test_NotReviewedRevisions(localizer):
278 repo_id = Repository.get_by_repo_name(HG_REPO).repo_id
278 repo_id = Repository.get_by_repo_name(HG_REPO).repo_id
279 validator = v.NotReviewedRevisions(localizer, repo_id)
279 validator = v.NotReviewedRevisions(localizer, repo_id)
280 rev = '0' * 40
280 rev = '0' * 40
281 # add status for a rev, that should throw an error because it is already
281 # add status for a rev, that should throw an error because it is already
282 # reviewed
282 # reviewed
283 new_status = ChangesetStatus()
283 new_status = ChangesetStatus()
284 new_status.author = ChangesetStatusModel()._get_user(TEST_USER_ADMIN_LOGIN)
284 new_status.author = ChangesetStatusModel()._get_user(TEST_USER_ADMIN_LOGIN)
285 new_status.repo = ChangesetStatusModel()._get_repo(HG_REPO)
285 new_status.repo = ChangesetStatusModel()._get_repo(HG_REPO)
286 new_status.status = ChangesetStatus.STATUS_APPROVED
286 new_status.status = ChangesetStatus.STATUS_APPROVED
287 new_status.comment = None
287 new_status.comment = None
288 new_status.revision = rev
288 new_status.revision = rev
289 Session().add(new_status)
289 Session().add(new_status)
290 Session().commit()
290 Session().commit()
291 try:
291 try:
292 pytest.raises(formencode.Invalid, validator.to_python, [rev])
292 pytest.raises(formencode.Invalid, validator.to_python, [rev])
293 finally:
293 finally:
294 Session().delete(new_status)
294 Session().delete(new_status)
295 Session().commit()
295 Session().commit()
General Comments 0
You need to be logged in to leave comments. Login now