Show More
@@ -1,206 +1,206 b'' | |||
|
1 | 1 | |
|
2 | 2 | # Copyright (C) 2010-2023 RhodeCode GmbH |
|
3 | 3 | # |
|
4 | 4 | # This program is free software: you can redistribute it and/or modify |
|
5 | 5 | # it under the terms of the GNU Affero General Public License, version 3 |
|
6 | 6 | # (only), as published by the Free Software Foundation. |
|
7 | 7 | # |
|
8 | 8 | # This program is distributed in the hope that it will be useful, |
|
9 | 9 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
10 | 10 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
11 | 11 | # GNU General Public License for more details. |
|
12 | 12 | # |
|
13 | 13 | # You should have received a copy of the GNU Affero General Public License |
|
14 | 14 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
15 | 15 | # |
|
16 | 16 | # This program is dual-licensed. If you wish to learn more about the |
|
17 | 17 | # RhodeCode Enterprise Edition, including its added features, Support services, |
|
18 | 18 | # and proprietary license terms, please see https://rhodecode.com/licenses/ |
|
19 | 19 | |
|
20 | 20 | import mock |
|
21 | 21 | import pytest |
|
22 | 22 | |
|
23 | 23 | from rhodecode.lib.auth import check_password |
|
24 | 24 | from rhodecode.model.user import UserModel |
|
25 | 25 | from rhodecode.tests import ( |
|
26 | 26 | TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_EMAIL) |
|
27 | 27 | from rhodecode.api.tests.utils import ( |
|
28 | 28 | build_data, api_call, assert_ok, assert_error, jsonify, crash) |
|
29 | 29 | from rhodecode.tests.fixture import Fixture |
|
30 | 30 | from rhodecode.model.db import RepoGroup |
|
31 | 31 | |
|
32 | 32 | |
|
33 | 33 | # TODO: mikhail: remove fixture from here |
|
34 | 34 | fixture = Fixture() |
|
35 | 35 | |
|
36 | 36 | |
|
37 | 37 | @pytest.mark.usefixtures("testuser_api", "app") |
|
38 | 38 | class TestCreateUser(object): |
|
39 | 39 | def test_api_create_existing_user(self): |
|
40 | 40 | id_, params = build_data( |
|
41 | 41 | self.apikey, 'create_user', |
|
42 | 42 | username=TEST_USER_ADMIN_LOGIN, |
|
43 | 43 | email='test@foo.com', |
|
44 | 44 | password='trololo') |
|
45 | 45 | response = api_call(self.app, params) |
|
46 | 46 | |
|
47 | 47 | expected = "user `%s` already exist" % (TEST_USER_ADMIN_LOGIN,) |
|
48 | 48 | assert_error(id_, expected, given=response.body) |
|
49 | 49 | |
|
50 | 50 | def test_api_create_user_with_existing_email(self): |
|
51 | 51 | id_, params = build_data( |
|
52 | 52 | self.apikey, 'create_user', |
|
53 | 53 | username=TEST_USER_ADMIN_LOGIN + 'new', |
|
54 | 54 | email=TEST_USER_REGULAR_EMAIL, |
|
55 | 55 | password='trololo') |
|
56 | 56 | response = api_call(self.app, params) |
|
57 | 57 | |
|
58 | 58 | expected = "email `%s` already exist" % (TEST_USER_REGULAR_EMAIL,) |
|
59 | 59 | assert_error(id_, expected, given=response.body) |
|
60 | 60 | |
|
61 | 61 | def test_api_create_user_with_wrong_username(self): |
|
62 | 62 | bad_username = '<> HELLO WORLD <>' |
|
63 | 63 | id_, params = build_data( |
|
64 | 64 | self.apikey, 'create_user', |
|
65 | 65 | username=bad_username, |
|
66 | 66 | email='new@email.com', |
|
67 | 67 | password='trololo') |
|
68 | 68 | response = api_call(self.app, params) |
|
69 | 69 | |
|
70 | 70 | expected = {'username': |
|
71 | 71 | "Username may only contain alphanumeric characters " |
|
72 | 72 | "underscores, periods or dashes and must begin with " |
|
73 | 73 | "alphanumeric character or underscore"} |
|
74 | 74 | assert_error(id_, expected, given=response.body) |
|
75 | 75 | |
|
76 | 76 | def test_api_create_user(self): |
|
77 | 77 | username = 'test_new_api_user' |
|
78 | 78 | email = username + "@foo.com" |
|
79 | 79 | |
|
80 | 80 | id_, params = build_data( |
|
81 | 81 | self.apikey, 'create_user', |
|
82 | 82 | username=username, |
|
83 | 83 | email=email, |
|
84 | 84 | description='CTO of Things', |
|
85 | 85 | password='example') |
|
86 | 86 | response = api_call(self.app, params) |
|
87 | 87 | |
|
88 | 88 | usr = UserModel().get_by_username(username) |
|
89 | 89 | ret = { |
|
90 | 90 | 'msg': 'created new user `%s`' % (username,), |
|
91 | 91 | 'user': jsonify(usr.get_api_data(include_secrets=True)), |
|
92 | 92 | } |
|
93 | 93 | try: |
|
94 | 94 | expected = ret |
|
95 | 95 | assert check_password('example', usr.password) |
|
96 | 96 | assert_ok(id_, expected, given=response.body) |
|
97 | 97 | finally: |
|
98 | 98 | fixture.destroy_user(usr.user_id) |
|
99 | 99 | |
|
100 | 100 | def test_api_create_user_without_password(self): |
|
101 | 101 | username = 'test_new_api_user_passwordless' |
|
102 | 102 | email = username + "@foo.com" |
|
103 | 103 | |
|
104 | 104 | id_, params = build_data( |
|
105 | 105 | self.apikey, 'create_user', |
|
106 | 106 | username=username, |
|
107 | 107 | email=email) |
|
108 | 108 | response = api_call(self.app, params) |
|
109 | 109 | |
|
110 | 110 | usr = UserModel().get_by_username(username) |
|
111 | 111 | ret = { |
|
112 | 112 | 'msg': 'created new user `%s`' % (username,), |
|
113 | 113 | 'user': jsonify(usr.get_api_data(include_secrets=True)), |
|
114 | 114 | } |
|
115 | 115 | try: |
|
116 | 116 | expected = ret |
|
117 | 117 | assert_ok(id_, expected, given=response.body) |
|
118 | 118 | finally: |
|
119 | 119 | fixture.destroy_user(usr.user_id) |
|
120 | 120 | |
|
121 | 121 | def test_api_create_user_with_extern_name(self): |
|
122 | 122 | username = 'test_new_api_user_passwordless' |
|
123 | 123 | email = username + "@foo.com" |
|
124 | 124 | |
|
125 | 125 | id_, params = build_data( |
|
126 | 126 | self.apikey, 'create_user', |
|
127 | 127 | username=username, |
|
128 | 128 | email=email, extern_name='rhodecode') |
|
129 | 129 | response = api_call(self.app, params) |
|
130 | 130 | |
|
131 | 131 | usr = UserModel().get_by_username(username) |
|
132 | 132 | ret = { |
|
133 | 133 | 'msg': 'created new user `%s`' % (username,), |
|
134 | 134 | 'user': jsonify(usr.get_api_data(include_secrets=True)), |
|
135 | 135 | } |
|
136 | 136 | try: |
|
137 | 137 | expected = ret |
|
138 | 138 | assert_ok(id_, expected, given=response.body) |
|
139 | 139 | finally: |
|
140 | 140 | fixture.destroy_user(usr.user_id) |
|
141 | 141 | |
|
142 | 142 | def test_api_create_user_with_password_change(self): |
|
143 | 143 | username = 'test_new_api_user_password_change' |
|
144 | 144 | email = username + "@foo.com" |
|
145 | 145 | |
|
146 | 146 | id_, params = build_data( |
|
147 | 147 | self.apikey, 'create_user', |
|
148 | 148 | username=username, |
|
149 | 149 | email=email, extern_name='rhodecode', |
|
150 | 150 | force_password_change=True) |
|
151 | 151 | response = api_call(self.app, params) |
|
152 | 152 | |
|
153 | 153 | usr = UserModel().get_by_username(username) |
|
154 | 154 | ret = { |
|
155 | 155 | 'msg': 'created new user `%s`' % (username,), |
|
156 | 156 | 'user': jsonify(usr.get_api_data(include_secrets=True)), |
|
157 | 157 | } |
|
158 | 158 | try: |
|
159 | 159 | expected = ret |
|
160 | 160 | assert_ok(id_, expected, given=response.body) |
|
161 | 161 | finally: |
|
162 | 162 | fixture.destroy_user(usr.user_id) |
|
163 | 163 | |
|
164 | 164 | def test_api_create_user_with_personal_repo_group(self): |
|
165 | 165 | username = 'test_new_api_user_personal_group' |
|
166 | 166 | email = username + "@foo.com" |
|
167 | 167 | |
|
168 | 168 | id_, params = build_data( |
|
169 | 169 | self.apikey, 'create_user', |
|
170 | 170 | username=username, |
|
171 | 171 | email=email, extern_name='rhodecode', |
|
172 | 172 | create_personal_repo_group=True) |
|
173 | 173 | response = api_call(self.app, params) |
|
174 | 174 | |
|
175 | 175 | usr = UserModel().get_by_username(username) |
|
176 | 176 | ret = { |
|
177 | 177 | 'msg': 'created new user `%s`' % (username,), |
|
178 | 178 | 'user': jsonify(usr.get_api_data(include_secrets=True)), |
|
179 | 179 | } |
|
180 | 180 | |
|
181 | 181 | personal_group = RepoGroup.get_by_group_name(username) |
|
182 | 182 | assert personal_group |
|
183 |
assert personal_group.personal |
|
|
183 | assert personal_group.personal is True | |
|
184 | 184 | assert personal_group.user.username == username |
|
185 | 185 | |
|
186 | 186 | try: |
|
187 | 187 | expected = ret |
|
188 | 188 | assert_ok(id_, expected, given=response.body) |
|
189 | 189 | finally: |
|
190 | 190 | fixture.destroy_repo_group(username) |
|
191 | 191 | fixture.destroy_user(usr.user_id) |
|
192 | 192 | |
|
193 | 193 | @mock.patch.object(UserModel, 'create_or_update', crash) |
|
194 | 194 | def test_api_create_user_when_exception_happened(self): |
|
195 | 195 | |
|
196 | 196 | username = 'test_new_api_user' |
|
197 | 197 | email = username + "@foo.com" |
|
198 | 198 | |
|
199 | 199 | id_, params = build_data( |
|
200 | 200 | self.apikey, 'create_user', |
|
201 | 201 | username=username, |
|
202 | 202 | email=email, |
|
203 | 203 | password='trololo') |
|
204 | 204 | response = api_call(self.app, params) |
|
205 | 205 | expected = 'failed to create user `%s`' % (username,) |
|
206 | 206 | assert_error(id_, expected, given=response.body) |
@@ -1,255 +1,255 b'' | |||
|
1 | 1 | |
|
2 | 2 | # Copyright (C) 2010-2023 RhodeCode GmbH |
|
3 | 3 | # |
|
4 | 4 | # This program is free software: you can redistribute it and/or modify |
|
5 | 5 | # it under the terms of the GNU Affero General Public License, version 3 |
|
6 | 6 | # (only), as published by the Free Software Foundation. |
|
7 | 7 | # |
|
8 | 8 | # This program is distributed in the hope that it will be useful, |
|
9 | 9 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
10 | 10 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
11 | 11 | # GNU General Public License for more details. |
|
12 | 12 | # |
|
13 | 13 | # You should have received a copy of the GNU Affero General Public License |
|
14 | 14 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
15 | 15 | # |
|
16 | 16 | # This program is dual-licensed. If you wish to learn more about the |
|
17 | 17 | # RhodeCode Enterprise Edition, including its added features, Support services, |
|
18 | 18 | # and proprietary license terms, please see https://rhodecode.com/licenses/ |
|
19 | 19 | |
|
20 | 20 | import pytest |
|
21 | 21 | |
|
22 | 22 | from rhodecode.tests import ( |
|
23 | 23 | TestController, assert_session_flash, TEST_USER_ADMIN_LOGIN) |
|
24 | 24 | from rhodecode.model.db import UserGroup |
|
25 | 25 | from rhodecode.model.meta import Session |
|
26 | 26 | from rhodecode.tests.fixture import Fixture |
|
27 | 27 | |
|
28 | 28 | fixture = Fixture() |
|
29 | 29 | |
|
30 | 30 | |
|
31 | 31 | def route_path(name, params=None, **kwargs): |
|
32 | 32 | import urllib.request |
|
33 | 33 | import urllib.parse |
|
34 | 34 | import urllib.error |
|
35 | 35 | from rhodecode.apps._base import ADMIN_PREFIX |
|
36 | 36 | |
|
37 | 37 | base_url = { |
|
38 | 38 | 'user_groups': ADMIN_PREFIX + '/user_groups', |
|
39 | 39 | 'user_groups_data': ADMIN_PREFIX + '/user_groups_data', |
|
40 | 40 | 'user_group_members_data': ADMIN_PREFIX + '/user_groups/{user_group_id}/members', |
|
41 | 41 | 'user_groups_new': ADMIN_PREFIX + '/user_groups/new', |
|
42 | 42 | 'user_groups_create': ADMIN_PREFIX + '/user_groups/create', |
|
43 | 43 | 'edit_user_group': ADMIN_PREFIX + '/user_groups/{user_group_id}/edit', |
|
44 | 44 | 'edit_user_group_advanced_sync': ADMIN_PREFIX + '/user_groups/{user_group_id}/edit/advanced/sync', |
|
45 | 45 | 'edit_user_group_global_perms_update': ADMIN_PREFIX + '/user_groups/{user_group_id}/edit/global_permissions/update', |
|
46 | 46 | 'user_groups_update': ADMIN_PREFIX + '/user_groups/{user_group_id}/update', |
|
47 | 47 | 'user_groups_delete': ADMIN_PREFIX + '/user_groups/{user_group_id}/delete', |
|
48 | 48 | |
|
49 | 49 | }[name].format(**kwargs) |
|
50 | 50 | |
|
51 | 51 | if params: |
|
52 | 52 | base_url = '{}?{}'.format(base_url, urllib.parse.urlencode(params)) |
|
53 | 53 | return base_url |
|
54 | 54 | |
|
55 | 55 | |
|
56 | 56 | class TestUserGroupsView(TestController): |
|
57 | 57 | |
|
58 | 58 | def test_set_synchronization(self, user_util): |
|
59 | 59 | self.log_user() |
|
60 | 60 | user_group_name = user_util.create_user_group().users_group_name |
|
61 | 61 | |
|
62 | 62 | group = Session().query(UserGroup).filter( |
|
63 | 63 | UserGroup.users_group_name == user_group_name).one() |
|
64 | 64 | |
|
65 | 65 | assert group.group_data.get('extern_type') is None |
|
66 | 66 | |
|
67 | 67 | # enable |
|
68 | 68 | self.app.post( |
|
69 | 69 | route_path('edit_user_group_advanced_sync', |
|
70 | 70 | user_group_id=group.users_group_id), |
|
71 | 71 | params={'csrf_token': self.csrf_token}, status=302) |
|
72 | 72 | |
|
73 | 73 | group = Session().query(UserGroup).filter( |
|
74 | 74 | UserGroup.users_group_name == user_group_name).one() |
|
75 | 75 | assert group.group_data.get('extern_type') == 'manual' |
|
76 | 76 | assert group.group_data.get('extern_type_set_by') == TEST_USER_ADMIN_LOGIN |
|
77 | 77 | |
|
78 | 78 | # disable |
|
79 | 79 | self.app.post( |
|
80 | 80 | route_path('edit_user_group_advanced_sync', |
|
81 | 81 | user_group_id=group.users_group_id), |
|
82 | 82 | params={'csrf_token': self.csrf_token}, status=302) |
|
83 | 83 | |
|
84 | 84 | group = Session().query(UserGroup).filter( |
|
85 | 85 | UserGroup.users_group_name == user_group_name).one() |
|
86 | 86 | assert group.group_data.get('extern_type') is None |
|
87 | 87 | assert group.group_data.get('extern_type_set_by') == TEST_USER_ADMIN_LOGIN |
|
88 | 88 | |
|
89 | 89 | def test_delete_user_group(self, user_util): |
|
90 | 90 | self.log_user() |
|
91 | 91 | user_group_id = user_util.create_user_group().users_group_id |
|
92 | 92 | |
|
93 | 93 | group = Session().query(UserGroup).filter( |
|
94 | 94 | UserGroup.users_group_id == user_group_id).one() |
|
95 | 95 | |
|
96 | 96 | self.app.post( |
|
97 | 97 | route_path('user_groups_delete', user_group_id=group.users_group_id), |
|
98 | 98 | params={'csrf_token': self.csrf_token}) |
|
99 | 99 | |
|
100 | 100 | group = Session().query(UserGroup).filter( |
|
101 | 101 | UserGroup.users_group_id == user_group_id).scalar() |
|
102 | 102 | |
|
103 | 103 | assert group is None |
|
104 | 104 | |
|
105 | 105 | @pytest.mark.parametrize('repo_create, repo_create_write, user_group_create, repo_group_create, fork_create, inherit_default_permissions, expect_error, expect_form_error', [ |
|
106 | 106 | ('hg.create.none', 'hg.create.write_on_repogroup.false', 'hg.usergroup.create.false', 'hg.repogroup.create.false', 'hg.fork.none', 'hg.inherit_default_perms.false', False, False), |
|
107 | 107 | ('hg.create.repository', 'hg.create.write_on_repogroup.true', 'hg.usergroup.create.true', 'hg.repogroup.create.true', 'hg.fork.repository', 'hg.inherit_default_perms.false', False, False), |
|
108 | 108 | ('hg.create.XXX', 'hg.create.write_on_repogroup.true', 'hg.usergroup.create.true', 'hg.repogroup.create.true', 'hg.fork.repository', 'hg.inherit_default_perms.false', False, True), |
|
109 | 109 | ('', '', '', '', '', '', True, False), |
|
110 | 110 | ]) |
|
111 | 111 | def test_global_permissions_on_user_group( |
|
112 | 112 | self, repo_create, repo_create_write, user_group_create, |
|
113 | 113 | repo_group_create, fork_create, expect_error, expect_form_error, |
|
114 | 114 | inherit_default_permissions, user_util): |
|
115 | 115 | |
|
116 | 116 | self.log_user() |
|
117 | 117 | user_group = user_util.create_user_group() |
|
118 | 118 | |
|
119 | 119 | user_group_name = user_group.users_group_name |
|
120 | 120 | user_group_id = user_group.users_group_id |
|
121 | 121 | |
|
122 | 122 | # ENABLE REPO CREATE ON A GROUP |
|
123 | 123 | perm_params = { |
|
124 | 124 | 'inherit_default_permissions': False, |
|
125 | 125 | 'default_repo_create': repo_create, |
|
126 | 126 | 'default_repo_create_on_write': repo_create_write, |
|
127 | 127 | 'default_user_group_create': user_group_create, |
|
128 | 128 | 'default_repo_group_create': repo_group_create, |
|
129 | 129 | 'default_fork_create': fork_create, |
|
130 | 130 | 'default_inherit_default_permissions': inherit_default_permissions, |
|
131 | 131 | |
|
132 | 132 | 'csrf_token': self.csrf_token, |
|
133 | 133 | } |
|
134 | 134 | response = self.app.post( |
|
135 | 135 | route_path('edit_user_group_global_perms_update', |
|
136 | 136 | user_group_id=user_group_id), |
|
137 | 137 | params=perm_params) |
|
138 | 138 | |
|
139 | 139 | if expect_form_error: |
|
140 | 140 | assert response.status_int == 200 |
|
141 | 141 | response.mustcontain('Value must be one of') |
|
142 | 142 | else: |
|
143 | 143 | if expect_error: |
|
144 | 144 | msg = 'An error occurred during permissions saving' |
|
145 | 145 | else: |
|
146 | 146 | msg = 'User Group global permissions updated successfully' |
|
147 | 147 | ug = UserGroup.get_by_group_name(user_group_name) |
|
148 | 148 | del perm_params['csrf_token'] |
|
149 | 149 | del perm_params['inherit_default_permissions'] |
|
150 | 150 | assert perm_params == ug.get_default_perms() |
|
151 | 151 | assert_session_flash(response, msg) |
|
152 | 152 | |
|
153 | 153 | def test_edit_view(self, user_util): |
|
154 | 154 | self.log_user() |
|
155 | 155 | |
|
156 | 156 | user_group = user_util.create_user_group() |
|
157 | 157 | self.app.get( |
|
158 | 158 | route_path('edit_user_group', |
|
159 | 159 | user_group_id=user_group.users_group_id), |
|
160 | 160 | status=200) |
|
161 | 161 | |
|
162 | 162 | def test_update_user_group(self, user_util): |
|
163 | 163 | user = self.log_user() |
|
164 | 164 | |
|
165 | 165 | user_group = user_util.create_user_group() |
|
166 | 166 | users_group_id = user_group.users_group_id |
|
167 | 167 | new_name = user_group.users_group_name + '_CHANGE' |
|
168 | 168 | |
|
169 | 169 | params = [ |
|
170 | 170 | ('users_group_active', False), |
|
171 | 171 | ('user_group_description', 'DESC'), |
|
172 | 172 | ('users_group_name', new_name), |
|
173 | 173 | ('user', user['username']), |
|
174 | 174 | ('csrf_token', self.csrf_token), |
|
175 | 175 | ('__start__', 'user_group_members:sequence'), |
|
176 | 176 | ('__start__', 'member:mapping'), |
|
177 | 177 | ('member_user_id', user['user_id']), |
|
178 | 178 | ('type', 'existing'), |
|
179 | 179 | ('__end__', 'member:mapping'), |
|
180 | 180 | ('__end__', 'user_group_members:sequence'), |
|
181 | 181 | ] |
|
182 | 182 | |
|
183 | 183 | self.app.post( |
|
184 | 184 | route_path('user_groups_update', |
|
185 | 185 | user_group_id=users_group_id), |
|
186 | 186 | params=params, |
|
187 | 187 | status=302) |
|
188 | 188 | |
|
189 | 189 | user_group = UserGroup.get(users_group_id) |
|
190 | 190 | assert user_group |
|
191 | 191 | |
|
192 | 192 | assert user_group.users_group_name == new_name |
|
193 | 193 | assert user_group.user_group_description == 'DESC' |
|
194 |
assert user_group.users_group_active |
|
|
194 | assert user_group.users_group_active is False | |
|
195 | 195 | |
|
196 | 196 | def test_update_user_group_name_conflicts(self, user_util): |
|
197 | 197 | self.log_user() |
|
198 | 198 | user_group_old = user_util.create_user_group() |
|
199 | 199 | new_name = user_group_old.users_group_name |
|
200 | 200 | |
|
201 | 201 | user_group = user_util.create_user_group() |
|
202 | 202 | |
|
203 | 203 | params = dict( |
|
204 | 204 | users_group_active=False, |
|
205 | 205 | user_group_description='DESC', |
|
206 | 206 | users_group_name=new_name, |
|
207 | 207 | csrf_token=self.csrf_token) |
|
208 | 208 | |
|
209 | 209 | response = self.app.post( |
|
210 | 210 | route_path('user_groups_update', |
|
211 | 211 | user_group_id=user_group.users_group_id), |
|
212 | 212 | params=params, |
|
213 | 213 | status=200) |
|
214 | 214 | |
|
215 | 215 | response.mustcontain('User group `{}` already exists'.format( |
|
216 | 216 | new_name)) |
|
217 | 217 | |
|
218 | 218 | def test_update_members_from_user_ids(self, user_regular): |
|
219 | 219 | uid = user_regular.user_id |
|
220 | 220 | username = user_regular.username |
|
221 | 221 | self.log_user() |
|
222 | 222 | |
|
223 | 223 | user_group = fixture.create_user_group('test_gr_ids') |
|
224 | 224 | assert user_group.members == [] |
|
225 | 225 | assert user_group.user != user_regular |
|
226 | 226 | expected_active_state = not user_group.users_group_active |
|
227 | 227 | |
|
228 | 228 | form_data = [ |
|
229 | 229 | ('csrf_token', self.csrf_token), |
|
230 | 230 | ('user', username), |
|
231 | 231 | ('users_group_name', 'changed_name'), |
|
232 | 232 | ('users_group_active', expected_active_state), |
|
233 | 233 | ('user_group_description', 'changed_description'), |
|
234 | 234 | |
|
235 | 235 | ('__start__', 'user_group_members:sequence'), |
|
236 | 236 | ('__start__', 'member:mapping'), |
|
237 | 237 | ('member_user_id', uid), |
|
238 | 238 | ('type', 'existing'), |
|
239 | 239 | ('__end__', 'member:mapping'), |
|
240 | 240 | ('__end__', 'user_group_members:sequence'), |
|
241 | 241 | ] |
|
242 | 242 | ugid = user_group.users_group_id |
|
243 | 243 | self.app.post( |
|
244 | 244 | route_path('user_groups_update', user_group_id=ugid), form_data) |
|
245 | 245 | |
|
246 | 246 | user_group = UserGroup.get(ugid) |
|
247 | 247 | assert user_group |
|
248 | 248 | |
|
249 | 249 | assert user_group.members[0].user_id == uid |
|
250 | 250 | assert user_group.user_id == uid |
|
251 | 251 | assert 'changed_name' in user_group.users_group_name |
|
252 | 252 | assert 'changed_description' in user_group.user_group_description |
|
253 | 253 | assert user_group.users_group_active == expected_active_state |
|
254 | 254 | |
|
255 | 255 | fixture.destroy_user_group(user_group) |
@@ -1,297 +1,297 b'' | |||
|
1 | 1 | |
|
2 | 2 | |
|
3 | 3 | # Copyright (C) 2012-2023 RhodeCode GmbH |
|
4 | 4 | # |
|
5 | 5 | # This program is free software: you can redistribute it and/or modify |
|
6 | 6 | # it under the terms of the GNU Affero General Public License, version 3 |
|
7 | 7 | # (only), as published by the Free Software Foundation. |
|
8 | 8 | # |
|
9 | 9 | # This program is distributed in the hope that it will be useful, |
|
10 | 10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 | 11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 | 12 | # GNU General Public License for more details. |
|
13 | 13 | # |
|
14 | 14 | # You should have received a copy of the GNU Affero General Public License |
|
15 | 15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | 16 | # |
|
17 | 17 | # This program is dual-licensed. If you wish to learn more about the |
|
18 | 18 | # RhodeCode Enterprise Edition, including its added features, Support services, |
|
19 | 19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ |
|
20 | 20 | |
|
21 | 21 | """ |
|
22 | 22 | RhodeCode authentication plugin for Atlassian CROWD |
|
23 | 23 | """ |
|
24 | 24 | |
|
25 | 25 | |
|
26 | 26 | import colander |
|
27 | 27 | import base64 |
|
28 | 28 | import logging |
|
29 | 29 | import urllib.request |
|
30 | 30 | import urllib.error |
|
31 | 31 | import urllib.parse |
|
32 | 32 | |
|
33 | 33 | from rhodecode.translation import _ |
|
34 | 34 | from rhodecode.authentication.base import ( |
|
35 | 35 | RhodeCodeExternalAuthPlugin, hybrid_property) |
|
36 | 36 | from rhodecode.authentication.schema import AuthnPluginSettingsSchemaBase |
|
37 | 37 | from rhodecode.authentication.routes import AuthnPluginResourceBase |
|
38 | 38 | from rhodecode.lib.colander_utils import strip_whitespace |
|
39 | 39 | from rhodecode.lib.ext_json import json, formatted_json |
|
40 | 40 | from rhodecode.model.db import User |
|
41 | 41 | |
|
42 | 42 | log = logging.getLogger(__name__) |
|
43 | 43 | |
|
44 | 44 | |
|
45 | 45 | def plugin_factory(plugin_id, *args, **kwargs): |
|
46 | 46 | """ |
|
47 | 47 | Factory function that is called during plugin discovery. |
|
48 | 48 | It returns the plugin instance. |
|
49 | 49 | """ |
|
50 | 50 | plugin = RhodeCodeAuthPlugin(plugin_id) |
|
51 | 51 | return plugin |
|
52 | 52 | |
|
53 | 53 | |
|
54 | 54 | class CrowdAuthnResource(AuthnPluginResourceBase): |
|
55 | 55 | pass |
|
56 | 56 | |
|
57 | 57 | |
|
58 | 58 | class CrowdSettingsSchema(AuthnPluginSettingsSchemaBase): |
|
59 | 59 | host = colander.SchemaNode( |
|
60 | 60 | colander.String(), |
|
61 | 61 | default='127.0.0.1', |
|
62 | 62 | description=_('The FQDN or IP of the Atlassian CROWD Server'), |
|
63 | 63 | preparer=strip_whitespace, |
|
64 | 64 | title=_('Host'), |
|
65 | 65 | widget='string') |
|
66 | 66 | port = colander.SchemaNode( |
|
67 | 67 | colander.Int(), |
|
68 | 68 | default=8095, |
|
69 | 69 | description=_('The Port in use by the Atlassian CROWD Server'), |
|
70 | 70 | preparer=strip_whitespace, |
|
71 | 71 | title=_('Port'), |
|
72 | 72 | validator=colander.Range(min=0, max=65536), |
|
73 | 73 | widget='int') |
|
74 | 74 | app_name = colander.SchemaNode( |
|
75 | 75 | colander.String(), |
|
76 | 76 | default='', |
|
77 | 77 | description=_('The Application Name to authenticate to CROWD'), |
|
78 | 78 | preparer=strip_whitespace, |
|
79 | 79 | title=_('Application Name'), |
|
80 | 80 | widget='string') |
|
81 | 81 | app_password = colander.SchemaNode( |
|
82 | 82 | colander.String(), |
|
83 | 83 | default='', |
|
84 | 84 | description=_('The password to authenticate to CROWD'), |
|
85 | 85 | preparer=strip_whitespace, |
|
86 | 86 | title=_('Application Password'), |
|
87 | 87 | widget='password') |
|
88 | 88 | admin_groups = colander.SchemaNode( |
|
89 | 89 | colander.String(), |
|
90 | 90 | default='', |
|
91 | 91 | description=_('A comma separated list of group names that identify ' |
|
92 | 92 | 'users as RhodeCode Administrators'), |
|
93 | 93 | missing='', |
|
94 | 94 | preparer=strip_whitespace, |
|
95 | 95 | title=_('Admin Groups'), |
|
96 | 96 | widget='string') |
|
97 | 97 | |
|
98 | 98 | |
|
99 | 99 | class CrowdServer(object): |
|
100 | 100 | def __init__(self, *args, **kwargs): |
|
101 | 101 | """ |
|
102 | 102 | Create a new CrowdServer object that points to IP/Address 'host', |
|
103 | 103 | on the given port, and using the given method (https/http). user and |
|
104 | 104 | passwd can be set here or with set_credentials. If unspecified, |
|
105 | 105 | "version" defaults to "latest". |
|
106 | 106 | |
|
107 | 107 | example:: |
|
108 | 108 | |
|
109 | 109 | cserver = CrowdServer(host="127.0.0.1", |
|
110 | 110 | port="8095", |
|
111 | 111 | user="some_app", |
|
112 | 112 | passwd="some_passwd", |
|
113 | 113 | version="1") |
|
114 | 114 | """ |
|
115 |
if not |
|
|
115 | if 'port' not in kwargs: | |
|
116 | 116 | kwargs["port"] = "8095" |
|
117 | 117 | self._logger = kwargs.get("logger", logging.getLogger(__name__)) |
|
118 | 118 | self._uri = "%s://%s:%s/crowd" % (kwargs.get("method", "http"), |
|
119 | 119 | kwargs.get("host", "127.0.0.1"), |
|
120 | 120 | kwargs.get("port", "8095")) |
|
121 | 121 | self.set_credentials(kwargs.get("user", ""), |
|
122 | 122 | kwargs.get("passwd", "")) |
|
123 | 123 | self._version = kwargs.get("version", "latest") |
|
124 | 124 | self._url_list = None |
|
125 | 125 | self._appname = "crowd" |
|
126 | 126 | |
|
127 | 127 | def set_credentials(self, user, passwd): |
|
128 | 128 | self.user = user |
|
129 | 129 | self.passwd = passwd |
|
130 | 130 | self._make_opener() |
|
131 | 131 | |
|
132 | 132 | def _make_opener(self): |
|
133 | 133 | mgr = urllib.request.HTTPPasswordMgrWithDefaultRealm() |
|
134 | 134 | mgr.add_password(None, self._uri, self.user, self.passwd) |
|
135 | 135 | handler = urllib.request.HTTPBasicAuthHandler(mgr) |
|
136 | 136 | self.opener = urllib.request.build_opener(handler) |
|
137 | 137 | |
|
138 | 138 | def _request(self, url, body=None, headers=None, |
|
139 | 139 | method=None, noformat=False, |
|
140 | 140 | empty_response_ok=False): |
|
141 | 141 | _headers = {"Content-type": "application/json", |
|
142 | 142 | "Accept": "application/json"} |
|
143 | 143 | if self.user and self.passwd: |
|
144 | 144 | authstring = base64.b64encode("%s:%s" % (self.user, self.passwd)) |
|
145 | 145 | _headers["Authorization"] = "Basic %s" % authstring |
|
146 | 146 | if headers: |
|
147 | 147 | _headers.update(headers) |
|
148 | 148 | log.debug("Sent crowd: \n%s" |
|
149 | 149 | % (formatted_json({"url": url, "body": body, |
|
150 | 150 | "headers": _headers}))) |
|
151 | 151 | request = urllib.request.Request(url, body, _headers) |
|
152 | 152 | if method: |
|
153 | 153 | request.get_method = lambda: method |
|
154 | 154 | |
|
155 | 155 | global msg |
|
156 | 156 | msg = "" |
|
157 | 157 | try: |
|
158 | 158 | ret_doc = self.opener.open(request) |
|
159 | 159 | msg = ret_doc.read() |
|
160 | 160 | if not msg and empty_response_ok: |
|
161 | 161 | ret_val = {} |
|
162 | 162 | ret_val["status"] = True |
|
163 | 163 | ret_val["error"] = "Response body was empty" |
|
164 | 164 | elif not noformat: |
|
165 | 165 | ret_val = json.loads(msg) |
|
166 | 166 | ret_val["status"] = True |
|
167 | 167 | else: |
|
168 | 168 | ret_val = msg |
|
169 | 169 | except Exception as e: |
|
170 | 170 | if not noformat: |
|
171 | 171 | ret_val = {"status": False, |
|
172 | 172 | "body": body, |
|
173 | 173 | "error": "{}\n{}".format(e, msg)} |
|
174 | 174 | else: |
|
175 | 175 | ret_val = None |
|
176 | 176 | return ret_val |
|
177 | 177 | |
|
178 | 178 | def user_auth(self, username, password): |
|
179 | 179 | """Authenticate a user against crowd. Returns brief information about |
|
180 | 180 | the user.""" |
|
181 | 181 | url = ("%s/rest/usermanagement/%s/authentication?username=%s" |
|
182 | 182 | % (self._uri, self._version, username)) |
|
183 | 183 | body = json.dumps({"value": password}) |
|
184 | 184 | return self._request(url, body) |
|
185 | 185 | |
|
186 | 186 | def user_groups(self, username): |
|
187 | 187 | """Retrieve a list of groups to which this user belongs.""" |
|
188 | 188 | url = ("%s/rest/usermanagement/%s/user/group/nested?username=%s" |
|
189 | 189 | % (self._uri, self._version, username)) |
|
190 | 190 | return self._request(url) |
|
191 | 191 | |
|
192 | 192 | |
|
193 | 193 | class RhodeCodeAuthPlugin(RhodeCodeExternalAuthPlugin): |
|
194 | 194 | uid = 'crowd' |
|
195 | 195 | _settings_unsafe_keys = ['app_password'] |
|
196 | 196 | |
|
197 | 197 | def includeme(self, config): |
|
198 | 198 | config.add_authn_plugin(self) |
|
199 | 199 | config.add_authn_resource(self.get_id(), CrowdAuthnResource(self)) |
|
200 | 200 | config.add_view( |
|
201 | 201 | 'rhodecode.authentication.views.AuthnPluginViewBase', |
|
202 | 202 | attr='settings_get', |
|
203 | 203 | renderer='rhodecode:templates/admin/auth/plugin_settings.mako', |
|
204 | 204 | request_method='GET', |
|
205 | 205 | route_name='auth_home', |
|
206 | 206 | context=CrowdAuthnResource) |
|
207 | 207 | config.add_view( |
|
208 | 208 | 'rhodecode.authentication.views.AuthnPluginViewBase', |
|
209 | 209 | attr='settings_post', |
|
210 | 210 | renderer='rhodecode:templates/admin/auth/plugin_settings.mako', |
|
211 | 211 | request_method='POST', |
|
212 | 212 | route_name='auth_home', |
|
213 | 213 | context=CrowdAuthnResource) |
|
214 | 214 | |
|
215 | 215 | def get_settings_schema(self): |
|
216 | 216 | return CrowdSettingsSchema() |
|
217 | 217 | |
|
218 | 218 | def get_display_name(self, load_from_settings=False): |
|
219 | 219 | return _('CROWD') |
|
220 | 220 | |
|
221 | 221 | @classmethod |
|
222 | 222 | def docs(cls): |
|
223 | 223 | return "https://docs.rhodecode.com/RhodeCode-Enterprise/auth/auth-crowd.html" |
|
224 | 224 | |
|
225 | 225 | @hybrid_property |
|
226 | 226 | def name(self): |
|
227 | 227 | return u"crowd" |
|
228 | 228 | |
|
229 | 229 | def use_fake_password(self): |
|
230 | 230 | return True |
|
231 | 231 | |
|
232 | 232 | def user_activation_state(self): |
|
233 | 233 | def_user_perms = User.get_default_user().AuthUser().permissions['global'] |
|
234 | 234 | return 'hg.extern_activate.auto' in def_user_perms |
|
235 | 235 | |
|
236 | 236 | def auth(self, userobj, username, password, settings, **kwargs): |
|
237 | 237 | """ |
|
238 | 238 | Given a user object (which may be null), username, a plaintext password, |
|
239 | 239 | and a settings object (containing all the keys needed as listed in settings()), |
|
240 | 240 | authenticate this user's login attempt. |
|
241 | 241 | |
|
242 | 242 | Return None on failure. On success, return a dictionary of the form: |
|
243 | 243 | |
|
244 | 244 | see: RhodeCodeAuthPluginBase.auth_func_attrs |
|
245 | 245 | This is later validated for correctness |
|
246 | 246 | """ |
|
247 | 247 | if not username or not password: |
|
248 | 248 | log.debug('Empty username or password skipping...') |
|
249 | 249 | return None |
|
250 | 250 | |
|
251 | 251 | log.debug("Crowd settings: \n%s", formatted_json(settings)) |
|
252 | 252 | server = CrowdServer(**settings) |
|
253 | 253 | server.set_credentials(settings["app_name"], settings["app_password"]) |
|
254 | 254 | crowd_user = server.user_auth(username, password) |
|
255 | 255 | log.debug("Crowd returned: \n%s", formatted_json(crowd_user)) |
|
256 | 256 | if not crowd_user["status"]: |
|
257 | 257 | return None |
|
258 | 258 | |
|
259 | 259 | res = server.user_groups(crowd_user["name"]) |
|
260 | 260 | log.debug("Crowd groups: \n%s", formatted_json(res)) |
|
261 | 261 | crowd_user["groups"] = [x["name"] for x in res["groups"]] |
|
262 | 262 | |
|
263 | 263 | # old attrs fetched from RhodeCode database |
|
264 | 264 | admin = getattr(userobj, 'admin', False) |
|
265 | 265 | active = getattr(userobj, 'active', True) |
|
266 | 266 | email = getattr(userobj, 'email', '') |
|
267 | 267 | username = getattr(userobj, 'username', username) |
|
268 | 268 | firstname = getattr(userobj, 'firstname', '') |
|
269 | 269 | lastname = getattr(userobj, 'lastname', '') |
|
270 | 270 | extern_type = getattr(userobj, 'extern_type', '') |
|
271 | 271 | |
|
272 | 272 | user_attrs = { |
|
273 | 273 | 'username': username, |
|
274 | 274 | 'firstname': crowd_user["first-name"] or firstname, |
|
275 | 275 | 'lastname': crowd_user["last-name"] or lastname, |
|
276 | 276 | 'groups': crowd_user["groups"], |
|
277 | 277 | 'user_group_sync': True, |
|
278 | 278 | 'email': crowd_user["email"] or email, |
|
279 | 279 | 'admin': admin, |
|
280 | 280 | 'active': active, |
|
281 | 281 | 'active_from_extern': crowd_user.get('active'), |
|
282 | 282 | 'extern_name': crowd_user["name"], |
|
283 | 283 | 'extern_type': extern_type, |
|
284 | 284 | } |
|
285 | 285 | |
|
286 | 286 | # set an admin if we're in admin_groups of crowd |
|
287 | 287 | for group in settings["admin_groups"]: |
|
288 | 288 | if group in user_attrs["groups"]: |
|
289 | 289 | user_attrs["admin"] = True |
|
290 | 290 | log.debug("Final crowd user object: \n%s", formatted_json(user_attrs)) |
|
291 | 291 | log.info('user `%s` authenticated correctly', user_attrs['username']) |
|
292 | 292 | return user_attrs |
|
293 | 293 | |
|
294 | 294 | |
|
295 | 295 | def includeme(config): |
|
296 | 296 | plugin_id = 'egg:rhodecode-enterprise-ce#{}'.format(RhodeCodeAuthPlugin.uid) |
|
297 | 297 | plugin_factory(plugin_id).includeme(config) |
@@ -1,263 +1,263 b'' | |||
|
1 | 1 | #!/usr/bin/env python |
|
2 | 2 | |
|
3 | 3 | |
|
4 | 4 | import os |
|
5 | 5 | import re |
|
6 | 6 | import shutil |
|
7 | 7 | import logging |
|
8 | 8 | |
|
9 | 9 | from rhodecode.lib.dbmigrate.migrate import exceptions |
|
10 | 10 | from rhodecode.lib.dbmigrate.migrate.versioning import pathed, script |
|
11 | 11 | from datetime import datetime |
|
12 | 12 | |
|
13 | 13 | |
|
14 | 14 | log = logging.getLogger(__name__) |
|
15 | 15 | |
|
16 | 16 | class VerNum(object): |
|
17 | 17 | """A version number that behaves like a string and int at the same time""" |
|
18 | 18 | |
|
19 | 19 | _instances = {} |
|
20 | 20 | |
|
21 | 21 | def __new__(cls, value): |
|
22 | 22 | val = str(value) |
|
23 | 23 | if val not in cls._instances: |
|
24 | 24 | cls._instances[val] = super(VerNum, cls).__new__(cls) |
|
25 | 25 | ret = cls._instances[val] |
|
26 | 26 | return ret |
|
27 | 27 | |
|
28 | 28 | def __init__(self,value): |
|
29 | 29 | self.value = str(int(value)) |
|
30 | 30 | if self < 0: |
|
31 | 31 | raise ValueError("Version number cannot be negative") |
|
32 | 32 | |
|
33 | 33 | def __add__(self, value): |
|
34 | 34 | ret = int(self) + int(value) |
|
35 | 35 | return VerNum(ret) |
|
36 | 36 | |
|
37 | 37 | def __sub__(self, value): |
|
38 | 38 | return self + (int(value) * -1) |
|
39 | 39 | |
|
40 | 40 | def __eq__(self, value): |
|
41 | 41 | return int(self) == int(value) |
|
42 | 42 | |
|
43 | 43 | def __ne__(self, value): |
|
44 | 44 | return int(self) != int(value) |
|
45 | 45 | |
|
46 | 46 | def __lt__(self, value): |
|
47 | 47 | return int(self) < int(value) |
|
48 | 48 | |
|
49 | 49 | def __gt__(self, value): |
|
50 | 50 | return int(self) > int(value) |
|
51 | 51 | |
|
52 | 52 | def __ge__(self, value): |
|
53 | 53 | return int(self) >= int(value) |
|
54 | 54 | |
|
55 | 55 | def __le__(self, value): |
|
56 | 56 | return int(self) <= int(value) |
|
57 | 57 | |
|
58 | 58 | def __repr__(self): |
|
59 | 59 | return "<VerNum(%s)>" % self.value |
|
60 | 60 | |
|
61 | 61 | def __str__(self): |
|
62 | 62 | return str(self.value) |
|
63 | 63 | |
|
64 | 64 | def __int__(self): |
|
65 | 65 | return int(self.value) |
|
66 | 66 | |
|
67 | 67 | |
|
68 | 68 | class Collection(pathed.Pathed): |
|
69 | 69 | """A collection of versioning scripts in a repository""" |
|
70 | 70 | |
|
71 | 71 | FILENAME_WITH_VERSION = re.compile(r'^(\d{3,}).*') |
|
72 | 72 | |
|
73 | 73 | def __init__(self, path): |
|
74 | 74 | """Collect current version scripts in repository |
|
75 | 75 | and store them in self.versions |
|
76 | 76 | """ |
|
77 | 77 | super(Collection, self).__init__(path) |
|
78 | 78 | |
|
79 | 79 | # Create temporary list of files, allowing skipped version numbers. |
|
80 | 80 | files = os.listdir(path) |
|
81 | 81 | if '1' in files: |
|
82 | 82 | # deprecation |
|
83 | 83 | raise Exception('It looks like you have a repository in the old ' |
|
84 | 84 | 'format (with directories for each version). ' |
|
85 | 85 | 'Please convert repository before proceeding.') |
|
86 | 86 | |
|
87 | 87 | tempVersions = {} |
|
88 | 88 | for filename in files: |
|
89 | 89 | match = self.FILENAME_WITH_VERSION.match(filename) |
|
90 | 90 | if match: |
|
91 | 91 | num = int(match.group(1)) |
|
92 | 92 | tempVersions.setdefault(num, []).append(filename) |
|
93 | 93 | else: |
|
94 | 94 | pass # Must be a helper file or something, let's ignore it. |
|
95 | 95 | |
|
96 | 96 | # Create the versions member where the keys |
|
97 | 97 | # are VerNum's and the values are Version's. |
|
98 | 98 | self.versions = {} |
|
99 | 99 | for num, files in list(tempVersions.items()): |
|
100 | 100 | self.versions[VerNum(num)] = Version(num, path, files) |
|
101 | 101 | |
|
102 | 102 | @property |
|
103 | 103 | def latest(self): |
|
104 | 104 | """:returns: Latest version in Collection""" |
|
105 | 105 | return max([VerNum(0)] + list(self.versions.keys())) |
|
106 | 106 | |
|
107 | 107 | def _next_ver_num(self, use_timestamp_numbering): |
|
108 |
if use_timestamp_numbering |
|
|
108 | if use_timestamp_numbering is True: | |
|
109 | 109 | return VerNum(int(datetime.utcnow().strftime('%Y%m%d%H%M%S'))) |
|
110 | 110 | else: |
|
111 | 111 | return self.latest + 1 |
|
112 | 112 | |
|
113 | 113 | def create_new_python_version(self, description, **k): |
|
114 | 114 | """Create Python files for new version""" |
|
115 | 115 | ver = self._next_ver_num(k.pop('use_timestamp_numbering', False)) |
|
116 | 116 | extra = str_to_filename(description) |
|
117 | 117 | |
|
118 | 118 | if extra: |
|
119 | 119 | if extra == '_': |
|
120 | 120 | extra = '' |
|
121 | 121 | elif not extra.startswith('_'): |
|
122 | 122 | extra = '_%s' % extra |
|
123 | 123 | |
|
124 | 124 | filename = '%03d%s.py' % (ver, extra) |
|
125 | 125 | filepath = self._version_path(filename) |
|
126 | 126 | |
|
127 | 127 | script.PythonScript.create(filepath, **k) |
|
128 | 128 | self.versions[ver] = Version(ver, self.path, [filename]) |
|
129 | 129 | |
|
130 | 130 | def create_new_sql_version(self, database, description, **k): |
|
131 | 131 | """Create SQL files for new version""" |
|
132 | 132 | ver = self._next_ver_num(k.pop('use_timestamp_numbering', False)) |
|
133 | 133 | self.versions[ver] = Version(ver, self.path, []) |
|
134 | 134 | |
|
135 | 135 | extra = str_to_filename(description) |
|
136 | 136 | |
|
137 | 137 | if extra: |
|
138 | 138 | if extra == '_': |
|
139 | 139 | extra = '' |
|
140 | 140 | elif not extra.startswith('_'): |
|
141 | 141 | extra = '_%s' % extra |
|
142 | 142 | |
|
143 | 143 | # Create new files. |
|
144 | 144 | for op in ('upgrade', 'downgrade'): |
|
145 | 145 | filename = '%03d%s_%s_%s.sql' % (ver, extra, database, op) |
|
146 | 146 | filepath = self._version_path(filename) |
|
147 | 147 | script.SqlScript.create(filepath, **k) |
|
148 | 148 | self.versions[ver].add_script(filepath) |
|
149 | 149 | |
|
150 | 150 | def version(self, vernum=None): |
|
151 | 151 | """Returns latest Version if vernum is not given. |
|
152 | 152 | Otherwise, returns wanted version""" |
|
153 | 153 | if vernum is None: |
|
154 | 154 | vernum = self.latest |
|
155 | 155 | return self.versions[VerNum(vernum)] |
|
156 | 156 | |
|
157 | 157 | @classmethod |
|
158 | 158 | def clear(cls): |
|
159 | 159 | super(Collection, cls).clear() |
|
160 | 160 | |
|
161 | 161 | def _version_path(self, ver): |
|
162 | 162 | """Returns path of file in versions repository""" |
|
163 | 163 | return os.path.join(self.path, str(ver)) |
|
164 | 164 | |
|
165 | 165 | |
|
166 | 166 | class Version(object): |
|
167 | 167 | """A single version in a collection |
|
168 | 168 | :param vernum: Version Number |
|
169 | 169 | :param path: Path to script files |
|
170 | 170 | :param filelist: List of scripts |
|
171 | 171 | :type vernum: int, VerNum |
|
172 | 172 | :type path: string |
|
173 | 173 | :type filelist: list |
|
174 | 174 | """ |
|
175 | 175 | |
|
176 | 176 | def __init__(self, vernum, path, filelist): |
|
177 | 177 | self.version = VerNum(vernum) |
|
178 | 178 | |
|
179 | 179 | # Collect scripts in this folder |
|
180 | 180 | self.sql = {} |
|
181 | 181 | self.python = None |
|
182 | 182 | |
|
183 | 183 | for script in filelist: |
|
184 | 184 | self.add_script(os.path.join(path, script)) |
|
185 | 185 | |
|
186 | 186 | def script(self, database=None, operation=None): |
|
187 | 187 | """Returns SQL or Python Script""" |
|
188 | 188 | for db in (database, 'default'): |
|
189 | 189 | # Try to return a .sql script first |
|
190 | 190 | try: |
|
191 | 191 | return self.sql[db][operation] |
|
192 | 192 | except KeyError: |
|
193 | 193 | continue # No .sql script exists |
|
194 | 194 | |
|
195 | 195 | # TODO: maybe add force Python parameter? |
|
196 | 196 | ret = self.python |
|
197 | 197 | |
|
198 | 198 | assert ret is not None, \ |
|
199 | 199 | "There is no script for %d version" % self.version |
|
200 | 200 | return ret |
|
201 | 201 | |
|
202 | 202 | def add_script(self, path): |
|
203 | 203 | """Add script to Collection/Version""" |
|
204 | 204 | if path.endswith(Extensions.py): |
|
205 | 205 | self._add_script_py(path) |
|
206 | 206 | elif path.endswith(Extensions.sql): |
|
207 | 207 | self._add_script_sql(path) |
|
208 | 208 | |
|
209 | 209 | SQL_FILENAME = re.compile(r'^.*\.sql') |
|
210 | 210 | |
|
211 | 211 | def _add_script_sql(self, path): |
|
212 | 212 | basename = os.path.basename(path) |
|
213 | 213 | match = self.SQL_FILENAME.match(basename) |
|
214 | 214 | |
|
215 | 215 | if match: |
|
216 | 216 | basename = basename.replace('.sql', '') |
|
217 | 217 | parts = basename.split('_') |
|
218 | 218 | if len(parts) < 3: |
|
219 | 219 | raise exceptions.ScriptError( |
|
220 | 220 | "Invalid SQL script name %s " % basename + \ |
|
221 | 221 | "(needs to be ###_description_database_operation.sql)") |
|
222 | 222 | version = parts[0] |
|
223 | 223 | op = parts[-1] |
|
224 | 224 | # NOTE(mriedem): check for ibm_db_sa as the database in the name |
|
225 | 225 | if 'ibm_db_sa' in basename: |
|
226 | 226 | if len(parts) == 6: |
|
227 | 227 | dbms = '_'.join(parts[-4: -1]) |
|
228 | 228 | else: |
|
229 | 229 | raise exceptions.ScriptError( |
|
230 | 230 | "Invalid ibm_db_sa SQL script name '%s'; " |
|
231 | 231 | "(needs to be " |
|
232 | 232 | "###_description_ibm_db_sa_operation.sql)" % basename) |
|
233 | 233 | else: |
|
234 | 234 | dbms = parts[-2] |
|
235 | 235 | else: |
|
236 | 236 | raise exceptions.ScriptError( |
|
237 | 237 | "Invalid SQL script name %s " % basename + \ |
|
238 | 238 | "(needs to be ###_description_database_operation.sql)") |
|
239 | 239 | |
|
240 | 240 | # File the script into a dictionary |
|
241 | 241 | self.sql.setdefault(dbms, {})[op] = script.SqlScript(path) |
|
242 | 242 | |
|
243 | 243 | def _add_script_py(self, path): |
|
244 | 244 | if self.python is not None: |
|
245 | 245 | raise exceptions.ScriptError('You can only have one Python script ' |
|
246 | 246 | 'per version, but you have: %s and %s' % (self.python, path)) |
|
247 | 247 | self.python = script.PythonScript(path) |
|
248 | 248 | |
|
249 | 249 | |
|
250 | 250 | class Extensions: |
|
251 | 251 | """A namespace for file extensions""" |
|
252 | 252 | py = 'py' |
|
253 | 253 | sql = 'sql' |
|
254 | 254 | |
|
255 | 255 | def str_to_filename(s): |
|
256 | 256 | """Replaces spaces, (double and single) quotes |
|
257 | 257 | and double underscores to underscores |
|
258 | 258 | """ |
|
259 | 259 | |
|
260 | 260 | s = s.replace(' ', '_').replace('"', '_').replace("'", '_').replace(".", "_") |
|
261 | 261 | while '__' in s: |
|
262 | 262 | s = s.replace('__', '_') |
|
263 | 263 | return s |
@@ -1,186 +1,185 b'' | |||
|
1 | ||
|
2 | 1 |
|
|
3 | 2 | # |
|
4 | 3 | # This program is free software: you can redistribute it and/or modify |
|
5 | 4 | # it under the terms of the GNU Affero General Public License, version 3 |
|
6 | 5 | # (only), as published by the Free Software Foundation. |
|
7 | 6 | # |
|
8 | 7 | # This program is distributed in the hope that it will be useful, |
|
9 | 8 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
10 | 9 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
11 | 10 | # GNU General Public License for more details. |
|
12 | 11 | # |
|
13 | 12 | # You should have received a copy of the GNU Affero General Public License |
|
14 | 13 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
15 | 14 | # |
|
16 | 15 | # This program is dual-licensed. If you wish to learn more about the |
|
17 | 16 | # RhodeCode Enterprise Edition, including its added features, Support services, |
|
18 | 17 | # and proprietary license terms, please see https://rhodecode.com/licenses/ |
|
19 | 18 | |
|
20 | 19 | import sys |
|
21 | 20 | import logging |
|
22 | 21 | |
|
23 | 22 | |
|
24 | 23 | BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = list(range(30, 38)) |
|
25 | 24 | |
|
26 | 25 | # Sequences |
|
27 | 26 | RESET_SEQ = "\033[0m" |
|
28 | 27 | COLOR_SEQ = "\033[0;%dm" |
|
29 | 28 | BOLD_SEQ = "\033[1m" |
|
30 | 29 | |
|
31 | 30 | COLORS = { |
|
32 | 31 | 'CRITICAL': MAGENTA, |
|
33 | 32 | 'ERROR': RED, |
|
34 | 33 | 'WARNING': CYAN, |
|
35 | 34 | 'INFO': GREEN, |
|
36 | 35 | 'DEBUG': BLUE, |
|
37 | 36 | 'SQL': YELLOW |
|
38 | 37 | } |
|
39 | 38 | |
|
40 | 39 | |
|
41 | 40 | def _inject_req_id(record, with_prefix=True): |
|
42 | 41 | from pyramid.threadlocal import get_current_request |
|
43 | 42 | dummy = '00000000-0000-0000-0000-000000000000' |
|
44 | 43 | req_id = None |
|
45 | 44 | |
|
46 | 45 | req = get_current_request() |
|
47 | 46 | if req: |
|
48 | 47 | req_id = getattr(req, 'req_id', None) |
|
49 | 48 | if with_prefix: |
|
50 | 49 | req_id = 'req_id:%-36s' % (req_id or dummy) |
|
51 | 50 | else: |
|
52 | 51 | req_id = (req_id or dummy) |
|
53 | 52 | record.req_id = req_id |
|
54 | 53 | |
|
55 | 54 | |
|
56 | 55 | def _add_log_to_debug_bucket(formatted_record): |
|
57 | 56 | from pyramid.threadlocal import get_current_request |
|
58 | 57 | req = get_current_request() |
|
59 | 58 | if req: |
|
60 | 59 | req.req_id_bucket.append(formatted_record) |
|
61 | 60 | |
|
62 | 61 | |
|
63 | 62 | def one_space_trim(s): |
|
64 | 63 | if s.find(" ") == -1: |
|
65 | 64 | return s |
|
66 | 65 | else: |
|
67 | 66 | s = s.replace(' ', ' ') |
|
68 | 67 | return one_space_trim(s) |
|
69 | 68 | |
|
70 | 69 | |
|
71 | 70 | def format_sql(sql): |
|
72 | 71 | sql = sql.replace('\n', '') |
|
73 | 72 | sql = one_space_trim(sql) |
|
74 | 73 | sql = sql\ |
|
75 | 74 | .replace(',', ',\n\t')\ |
|
76 | 75 | .replace('SELECT', '\n\tSELECT \n\t')\ |
|
77 | 76 | .replace('UPDATE', '\n\tUPDATE \n\t')\ |
|
78 | 77 | .replace('DELETE', '\n\tDELETE \n\t')\ |
|
79 | 78 | .replace('FROM', '\n\tFROM')\ |
|
80 | 79 | .replace('ORDER BY', '\n\tORDER BY')\ |
|
81 | 80 | .replace('LIMIT', '\n\tLIMIT')\ |
|
82 | 81 | .replace('WHERE', '\n\tWHERE')\ |
|
83 | 82 | .replace('AND', '\n\tAND')\ |
|
84 | 83 | .replace('LEFT', '\n\tLEFT')\ |
|
85 | 84 | .replace('INNER', '\n\tINNER')\ |
|
86 | 85 | .replace('INSERT', '\n\tINSERT')\ |
|
87 | 86 | .replace('DELETE', '\n\tDELETE') |
|
88 | 87 | return sql |
|
89 | 88 | |
|
90 | 89 | |
|
91 | 90 | class ExceptionAwareFormatter(logging.Formatter): |
|
92 | 91 | """ |
|
93 | 92 | Extended logging formatter which prints out remote tracebacks. |
|
94 | 93 | """ |
|
95 | 94 | |
|
96 | 95 | def formatException(self, ei): |
|
97 | 96 | ex_type, ex_value, ex_tb = ei |
|
98 | 97 | |
|
99 | 98 | local_tb = logging.Formatter.formatException(self, ei) |
|
100 | 99 | if hasattr(ex_value, '_vcs_server_traceback'): |
|
101 | 100 | |
|
102 | 101 | def formatRemoteTraceback(remote_tb_lines): |
|
103 | 102 | result = ["\n +--- This exception occured remotely on VCSServer - Remote traceback:\n\n"] |
|
104 | 103 | result.append(remote_tb_lines) |
|
105 | 104 | result.append("\n +--- End of remote traceback\n") |
|
106 | 105 | return result |
|
107 | 106 | |
|
108 | 107 | try: |
|
109 | 108 | if ex_type is not None and ex_value is None and ex_tb is None: |
|
110 | 109 | # possible old (3.x) call syntax where caller is only |
|
111 | 110 | # providing exception object |
|
112 | 111 | if type(ex_type) is not type: |
|
113 | 112 | raise TypeError( |
|
114 | 113 | "invalid argument: ex_type should be an exception " |
|
115 | 114 | "type, or just supply no arguments at all") |
|
116 | 115 | if ex_type is None and ex_tb is None: |
|
117 | 116 | ex_type, ex_value, ex_tb = sys.exc_info() |
|
118 | 117 | |
|
119 | 118 | remote_tb = getattr(ex_value, "_vcs_server_traceback", None) |
|
120 | 119 | |
|
121 | 120 | if remote_tb: |
|
122 | 121 | remote_tb = formatRemoteTraceback(remote_tb) |
|
123 | 122 | return local_tb + ''.join(remote_tb) |
|
124 | 123 | finally: |
|
125 | 124 | # clean up cycle to traceback, to allow proper GC |
|
126 | 125 | del ex_type, ex_value, ex_tb |
|
127 | 126 | |
|
128 | 127 | return local_tb |
|
129 | 128 | |
|
130 | 129 | |
|
131 | 130 | class RequestTrackingFormatter(ExceptionAwareFormatter): |
|
132 | 131 | def format(self, record): |
|
133 | 132 | _inject_req_id(record) |
|
134 | 133 | def_record = logging.Formatter.format(self, record) |
|
135 | 134 | _add_log_to_debug_bucket(def_record) |
|
136 | 135 | return def_record |
|
137 | 136 | |
|
138 | 137 | |
|
139 | 138 | class ColorFormatter(ExceptionAwareFormatter): |
|
140 | 139 | |
|
141 | 140 | def format(self, record): |
|
142 | 141 | """ |
|
143 | 142 | Changes record's levelname to use with COLORS enum |
|
144 | 143 | """ |
|
145 | 144 | def_record = super(ColorFormatter, self).format(record) |
|
146 | 145 | |
|
147 | 146 | levelname = record.levelname |
|
148 | 147 | start = COLOR_SEQ % (COLORS[levelname]) |
|
149 | 148 | end = RESET_SEQ |
|
150 | 149 | |
|
151 | 150 | colored_record = ''.join([start, def_record, end]) |
|
152 | 151 | return colored_record |
|
153 | 152 | |
|
154 | 153 | |
|
155 | 154 | class ColorRequestTrackingFormatter(RequestTrackingFormatter): |
|
156 | 155 | |
|
157 | 156 | def format(self, record): |
|
158 | 157 | """ |
|
159 | 158 | Changes record's levelname to use with COLORS enum |
|
160 | 159 | """ |
|
161 | 160 | def_record = super(ColorRequestTrackingFormatter, self).format(record) |
|
162 | 161 | |
|
163 | 162 | levelname = record.levelname |
|
164 | 163 | start = COLOR_SEQ % (COLORS[levelname]) |
|
165 | 164 | end = RESET_SEQ |
|
166 | 165 | |
|
167 | 166 | colored_record = ''.join([start, def_record, end]) |
|
168 | 167 | return colored_record |
|
169 | 168 | |
|
170 | 169 | |
|
171 | 170 | class ColorFormatterSql(logging.Formatter): |
|
172 | 171 | |
|
173 | 172 | def format(self, record): |
|
174 | 173 | """ |
|
175 | 174 | Changes record's levelname to use with COLORS enum |
|
176 | 175 | """ |
|
177 | 176 | |
|
178 | 177 | start = COLOR_SEQ % (COLORS['SQL']) |
|
179 | 178 | def_record = format_sql(logging.Formatter.format(self, record)) |
|
180 | 179 | end = RESET_SEQ |
|
181 | 180 | |
|
182 | 181 | colored_record = ''.join([start, def_record, end]) |
|
183 | 182 | return colored_record |
|
184 | 183 | |
|
185 | # marcink: needs to stay with this name for backward .ini compatability | |
|
186 | Pyro4AwareFormatter = ExceptionAwareFormatter | |
|
184 | # NOTE (marcink): needs to stay with this name for backward .ini compatability | |
|
185 | Pyro4AwareFormatter = ExceptionAwareFormatter # noqa |
@@ -1,132 +1,132 b'' | |||
|
1 | 1 | |
|
2 | 2 | # Copyright (C) 2010-2023 RhodeCode GmbH |
|
3 | 3 | # |
|
4 | 4 | # This program is free software: you can redistribute it and/or modify |
|
5 | 5 | # it under the terms of the GNU Affero General Public License, version 3 |
|
6 | 6 | # (only), as published by the Free Software Foundation. |
|
7 | 7 | # |
|
8 | 8 | # This program is distributed in the hope that it will be useful, |
|
9 | 9 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
10 | 10 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
11 | 11 | # GNU General Public License for more details. |
|
12 | 12 | # |
|
13 | 13 | # You should have received a copy of the GNU Affero General Public License |
|
14 | 14 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
15 | 15 | # |
|
16 | 16 | # This program is dual-licensed. If you wish to learn more about the |
|
17 | 17 | # RhodeCode Enterprise Edition, including its added features, Support services, |
|
18 | 18 | # and proprietary license terms, please see https://rhodecode.com/licenses/ |
|
19 | 19 | |
|
20 | 20 | import re |
|
21 | 21 | import markdown |
|
22 | 22 | import xml.etree.ElementTree as etree |
|
23 | 23 | |
|
24 | 24 | from markdown.extensions import Extension |
|
25 | 25 | from markdown.extensions.fenced_code import FencedCodeExtension |
|
26 | 26 | from markdown.extensions.tables import TableExtension |
|
27 | 27 | from markdown.extensions.nl2br import Nl2BrExtension as _Nl2BrExtension |
|
28 | 28 | from markdown.extensions.wikilinks import WikiLinkExtension |
|
29 | 29 | from markdown.inlinepatterns import Pattern |
|
30 | 30 | |
|
31 | 31 | import gfm |
|
32 | 32 | |
|
33 | 33 | |
|
34 | 34 | class InlineProcessor(Pattern): |
|
35 | 35 | """ |
|
36 | 36 | Base class that inline patterns subclass. |
|
37 | 37 | This is the newer style inline processor that uses a more |
|
38 | 38 | efficient and flexible search approach. |
|
39 | 39 | """ |
|
40 | 40 | |
|
41 | 41 | def __init__(self, pattern, md=None): |
|
42 | 42 | """ |
|
43 | 43 | Create an instant of an inline pattern. |
|
44 | 44 | Keyword arguments: |
|
45 | 45 | * pattern: A regular expression that matches a pattern |
|
46 | 46 | """ |
|
47 | 47 | self.pattern = pattern |
|
48 | 48 | self.compiled_re = re.compile(pattern, re.DOTALL | re.UNICODE) |
|
49 | 49 | |
|
50 | 50 | # Api for Markdown to pass safe_mode into instance |
|
51 | 51 | self.safe_mode = False |
|
52 | 52 | self.md = md |
|
53 | 53 | |
|
54 | 54 | def handleMatch(self, m, data): |
|
55 | 55 | """Return a ElementTree element from the given match and the |
|
56 | 56 | start and end index of the matched text. |
|
57 | 57 | If `start` and/or `end` are returned as `None`, it will be |
|
58 | 58 | assumed that the processor did not find a valid region of text. |
|
59 | 59 | Subclasses should override this method. |
|
60 | 60 | Keyword arguments: |
|
61 | 61 | * m: A re match object containing a match of the pattern. |
|
62 | 62 | * data: The buffer current under analysis |
|
63 | 63 | Returns: |
|
64 | 64 | * el: The ElementTree element, text or None. |
|
65 | 65 | * start: The start of the region that has been matched or None. |
|
66 | 66 | * end: The end of the region that has been matched or None. |
|
67 | 67 | """ |
|
68 | 68 | pass # pragma: no cover |
|
69 | 69 | |
|
70 | 70 | |
|
71 | 71 | class SimpleTagInlineProcessor(InlineProcessor): |
|
72 | 72 | """ |
|
73 | 73 | Return element of type `tag` with a text attribute of group(2) |
|
74 | 74 | of a Pattern. |
|
75 | 75 | """ |
|
76 | 76 | def __init__(self, pattern, tag): |
|
77 | 77 | InlineProcessor.__init__(self, pattern) |
|
78 | 78 | self.tag = tag |
|
79 | 79 | |
|
80 | 80 | def handleMatch(self, m, data): # pragma: no cover |
|
81 | 81 | el = etree.Element(self.tag) |
|
82 | 82 | el.text = m.group(2) |
|
83 | 83 | return el, m.start(0), m.end(0) |
|
84 | 84 | |
|
85 | 85 | |
|
86 | 86 | class SubstituteTagInlineProcessor(SimpleTagInlineProcessor): |
|
87 | 87 | """ Return an element of type `tag` with no children. """ |
|
88 | 88 | def handleMatch(self, m, data): |
|
89 | 89 | return etree.Element(self.tag), m.start(0), m.end(0) |
|
90 | 90 | |
|
91 | 91 | |
|
92 | 92 | class Nl2BrExtension(_Nl2BrExtension): |
|
93 | 93 | pass |
|
94 | 94 | |
|
95 | 95 | |
|
96 | 96 | # Global Vars |
|
97 | 97 | URLIZE_RE = '(%s)' % '|'.join([ |
|
98 | 98 | r'<(?:f|ht)tps?://[^>]*>', |
|
99 | 99 | r'\b(?:f|ht)tps?://[^)<>\s]+[^.,)<>\s]', |
|
100 | 100 | r'\bwww\.[^)<>\s]+[^.,)<>\s]', |
|
101 | 101 | r'[^(<\s]+\.(?:com|net|org)\b', |
|
102 | 102 | ]) |
|
103 | 103 | |
|
104 | 104 | |
|
105 | 105 | class UrlizePattern(markdown.inlinepatterns.Pattern): |
|
106 | 106 | """ Return a link Element given an autolink (`http://example/com`). """ |
|
107 | 107 | def handleMatch(self, m): |
|
108 | 108 | url = m.group(2) |
|
109 | 109 | |
|
110 | 110 | if url.startswith('<'): |
|
111 | 111 | url = url[1:-1] |
|
112 | 112 | |
|
113 | 113 | text = url |
|
114 | 114 | |
|
115 |
if |
|
|
116 |
if '@' in url and |
|
|
115 | if url.split('://')[0] not in ('http', 'https', 'ftp'): | |
|
116 | if '@' in url and '/' not in url: | |
|
117 | 117 | url = 'mailto:' + url |
|
118 | 118 | else: |
|
119 | 119 | url = 'http://' + url |
|
120 | 120 | |
|
121 | 121 | el = markdown.util.etree.Element("a") |
|
122 | 122 | el.set('href', url) |
|
123 | 123 | el.text = markdown.util.AtomicString(text) |
|
124 | 124 | return el |
|
125 | 125 | |
|
126 | 126 | |
|
127 | 127 | class UrlizeExtension(Extension): |
|
128 | 128 | """ Urlize Extension for Python-Markdown. """ |
|
129 | 129 | |
|
130 | 130 | def extendMarkdown(self, md): |
|
131 | 131 | """ Replace autolink with UrlizePattern """ |
|
132 | 132 | md.inlinePatterns['autolink'] = UrlizePattern(URLIZE_RE, md) |
@@ -1,218 +1,218 b'' | |||
|
1 | 1 | |
|
2 | 2 | # Copyright (C) 2010-2023 RhodeCode GmbH |
|
3 | 3 | # |
|
4 | 4 | # This program is free software: you can redistribute it and/or modify |
|
5 | 5 | # it under the terms of the GNU Affero General Public License, version 3 |
|
6 | 6 | # (only), as published by the Free Software Foundation. |
|
7 | 7 | # |
|
8 | 8 | # This program is distributed in the hope that it will be useful, |
|
9 | 9 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
10 | 10 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
11 | 11 | # GNU General Public License for more details. |
|
12 | 12 | # |
|
13 | 13 | # You should have received a copy of the GNU Affero General Public License |
|
14 | 14 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
15 | 15 | # |
|
16 | 16 | # This program is dual-licensed. If you wish to learn more about the |
|
17 | 17 | # RhodeCode Enterprise Edition, including its added features, Support services, |
|
18 | 18 | # and proprietary license terms, please see https://rhodecode.com/licenses/ |
|
19 | 19 | |
|
20 | 20 | import mock |
|
21 | 21 | import pytest |
|
22 | 22 | |
|
23 | 23 | from rhodecode.model.db import User |
|
24 | 24 | from rhodecode.tests import TEST_USER_REGULAR_LOGIN |
|
25 | 25 | from rhodecode.tests.fixture import Fixture |
|
26 | 26 | from rhodecode.model.user_group import UserGroupModel |
|
27 | 27 | from rhodecode.model.meta import Session |
|
28 | 28 | |
|
29 | 29 | |
|
30 | 30 | fixture = Fixture() |
|
31 | 31 | |
|
32 | 32 | |
|
33 | 33 | def teardown_module(self): |
|
34 | 34 | _delete_all_user_groups() |
|
35 | 35 | |
|
36 | 36 | |
|
37 | 37 | class TestGetUserGroups(object): |
|
38 | 38 | def test_returns_filtered_list(self, backend, user_util): |
|
39 | 39 | created_groups = [] |
|
40 | 40 | for i in range(4): |
|
41 | 41 | created_groups.append( |
|
42 | 42 | user_util.create_user_group(users_group_active=True)) |
|
43 | 43 | |
|
44 | 44 | group_filter = created_groups[-1].users_group_name[-2:] |
|
45 | 45 | with mock.patch('rhodecode.lib.helpers.gravatar_url'): |
|
46 | 46 | with self._patch_user_group_list(): |
|
47 | 47 | groups = UserGroupModel().get_user_groups(group_filter) |
|
48 | 48 | |
|
49 | 49 | fake_groups = [ |
|
50 | 50 | u for u in groups if u['value'].startswith('test_returns')] |
|
51 | 51 | assert len(fake_groups) == 1 |
|
52 | 52 | assert fake_groups[0]['value'] == created_groups[-1].users_group_name |
|
53 | 53 | assert fake_groups[0]['value_display'].startswith( |
|
54 | 54 | 'Group: test_returns') |
|
55 | 55 | |
|
56 | 56 | def test_returns_limited_list(self, backend, user_util): |
|
57 | 57 | created_groups = [] |
|
58 | 58 | for i in range(3): |
|
59 | 59 | created_groups.append( |
|
60 | 60 | user_util.create_user_group(users_group_active=True)) |
|
61 | 61 | with mock.patch('rhodecode.lib.helpers.gravatar_url'): |
|
62 | 62 | with self._patch_user_group_list(): |
|
63 | 63 | groups = UserGroupModel().get_user_groups('test_returns') |
|
64 | 64 | |
|
65 | 65 | fake_groups = [ |
|
66 | 66 | u for u in groups if u['value'].startswith('test_returns')] |
|
67 | 67 | assert len(fake_groups) == 3 |
|
68 | 68 | |
|
69 | 69 | def test_returns_active_user_groups(self, backend, user_util): |
|
70 | 70 | for i in range(4): |
|
71 | 71 | is_active = i % 2 == 0 |
|
72 | 72 | user_util.create_user_group(users_group_active=is_active) |
|
73 | 73 | with mock.patch('rhodecode.lib.helpers.gravatar_url'): |
|
74 | 74 | with self._patch_user_group_list(): |
|
75 | 75 | groups = UserGroupModel().get_user_groups() |
|
76 | 76 | expected = ('id', 'icon_link', 'value_display', 'value', 'value_type') |
|
77 | 77 | for group in groups: |
|
78 |
assert group['value_type'] |
|
|
78 | assert group['value_type'] == 'user_group' | |
|
79 | 79 | for key in expected: |
|
80 | 80 | assert key in group |
|
81 | 81 | |
|
82 | 82 | fake_groups = [ |
|
83 | 83 | u for u in groups if u['value'].startswith('test_returns')] |
|
84 | 84 | assert len(fake_groups) == 2 |
|
85 | 85 | for user in fake_groups: |
|
86 | 86 | assert user['value_display'].startswith('Group: test_returns') |
|
87 | 87 | |
|
88 | 88 | def _patch_user_group_list(self): |
|
89 | 89 | def side_effect(group_list, perm_set): |
|
90 | 90 | return group_list |
|
91 | 91 | return mock.patch( |
|
92 | 92 | 'rhodecode.model.user_group.UserGroupList', side_effect=side_effect) |
|
93 | 93 | |
|
94 | 94 | |
|
95 | 95 | @pytest.mark.parametrize( |
|
96 | 96 | "pre_existing, regular_should_be, external_should_be, groups, " |
|
97 | 97 | "expected", [ |
|
98 | 98 | ([], [], [], [], []), |
|
99 | 99 | # no changes of regular |
|
100 | 100 | ([], ['regular'], [], [], ['regular']), |
|
101 | 101 | # not added to regular group |
|
102 | 102 | (['some_other'], [], [], ['some_other'], []), |
|
103 | 103 | ( |
|
104 | 104 | [], ['regular'], ['container'], ['container'], |
|
105 | 105 | ['regular', 'container'] |
|
106 | 106 | ), |
|
107 | 107 | ( |
|
108 | 108 | [], ['regular'], [], ['container', 'container2'], |
|
109 | 109 | ['regular', 'container', 'container2'] |
|
110 | 110 | ), |
|
111 | 111 | # remove not used |
|
112 | 112 | ([], ['regular'], ['other'], [], ['regular']), |
|
113 | 113 | ( |
|
114 | 114 | ['some_other'], ['regular'], ['other', 'container'], |
|
115 | 115 | ['container', 'container2'], |
|
116 | 116 | ['regular', 'container', 'container2'] |
|
117 | 117 | ), |
|
118 | 118 | ]) |
|
119 | 119 | def test_enforce_groups(pre_existing, regular_should_be, |
|
120 | 120 | external_should_be, groups, expected, backend_hg): |
|
121 | 121 | # TODO: anderson: adding backend_hg fixture so it sets up the database |
|
122 | 122 | # for when running this file alone |
|
123 | 123 | _delete_all_user_groups() |
|
124 | 124 | |
|
125 | 125 | user = User.get_by_username(TEST_USER_REGULAR_LOGIN) |
|
126 | 126 | for gr in pre_existing: |
|
127 | 127 | gr = fixture.create_user_group(gr) |
|
128 | 128 | Session().commit() |
|
129 | 129 | |
|
130 | 130 | # make sure use is just in those groups |
|
131 | 131 | for gr in regular_should_be: |
|
132 | 132 | gr = fixture.create_user_group(gr) |
|
133 | 133 | Session().commit() |
|
134 | 134 | UserGroupModel().add_user_to_group(gr, user) |
|
135 | 135 | Session().commit() |
|
136 | 136 | |
|
137 | 137 | # now special external groups created by auth plugins |
|
138 | 138 | for gr in external_should_be: |
|
139 | 139 | gr = fixture.create_user_group( |
|
140 | 140 | gr, user_group_data={'extern_type': 'container'}) |
|
141 | 141 | Session().commit() |
|
142 | 142 | UserGroupModel().add_user_to_group(gr, user) |
|
143 | 143 | Session().commit() |
|
144 | 144 | |
|
145 | 145 | UserGroupModel().enforce_groups(user, groups, 'container') |
|
146 | 146 | Session().commit() |
|
147 | 147 | |
|
148 | 148 | user = User.get_by_username(TEST_USER_REGULAR_LOGIN) |
|
149 | 149 | in_groups = user.group_member |
|
150 | 150 | |
|
151 | 151 | expected.sort() |
|
152 | 152 | assert ( |
|
153 | 153 | expected == sorted(x.users_group.users_group_name for x in in_groups)) |
|
154 | 154 | |
|
155 | 155 | |
|
156 | 156 | def _delete_all_user_groups(): |
|
157 | 157 | for gr in UserGroupModel.get_all(): |
|
158 | 158 | fixture.destroy_user_group(gr) |
|
159 | 159 | Session().commit() |
|
160 | 160 | |
|
161 | 161 | |
|
162 | 162 | def test_add_and_remove_user_from_group(user_regular, user_util): |
|
163 | 163 | user_group = user_util.create_user_group() |
|
164 | 164 | assert user_group.members == [] |
|
165 | 165 | UserGroupModel().add_user_to_group(user_group, user_regular) |
|
166 | 166 | Session().commit() |
|
167 | 167 | assert user_group.members[0].user == user_regular |
|
168 | 168 | UserGroupModel().remove_user_from_group(user_group, user_regular) |
|
169 | 169 | Session().commit() |
|
170 | 170 | assert user_group.members == [] |
|
171 | 171 | |
|
172 | 172 | |
|
173 | 173 | @pytest.mark.parametrize('data, expected', [ |
|
174 | 174 | ([], []), |
|
175 | 175 | ([{"member_user_id": 1, "type": "new"}], [1]), |
|
176 | 176 | ([{"member_user_id": 1, "type": "new"}, |
|
177 | 177 | {"member_user_id": 1, "type": "existing"}], [1]), |
|
178 | 178 | ([{"member_user_id": 1, "type": "new"}, |
|
179 | 179 | {"member_user_id": 2, "type": "new"}, |
|
180 | 180 | {"member_user_id": 3, "type": "remove"}], [1, 2]) |
|
181 | 181 | ]) |
|
182 | 182 | def test_clean_members_data(data, expected): |
|
183 | 183 | cleaned = UserGroupModel()._clean_members_data(data) |
|
184 | 184 | assert cleaned == expected |
|
185 | 185 | |
|
186 | 186 | |
|
187 | 187 | def _create_test_members(): |
|
188 | 188 | members = [] |
|
189 | 189 | for member_number in range(3): |
|
190 | 190 | member = mock.Mock() |
|
191 | 191 | member.user_id = member_number + 1 |
|
192 | 192 | member.user.user_id = member_number + 1 |
|
193 | 193 | members.append(member) |
|
194 | 194 | return members |
|
195 | 195 | |
|
196 | 196 | |
|
197 | 197 | def test_get_added_and_removed_users(): |
|
198 | 198 | members = _create_test_members() |
|
199 | 199 | mock_user_group = mock.Mock() |
|
200 | 200 | mock_user_group.members = [members[0], members[1]] |
|
201 | 201 | new_users_list = [members[1].user.user_id, members[2].user.user_id] |
|
202 | 202 | model = UserGroupModel() |
|
203 | 203 | |
|
204 | 204 | added, removed = model._get_added_and_removed_user_ids( |
|
205 | 205 | mock_user_group, new_users_list) |
|
206 | 206 | |
|
207 | 207 | assert added == [members[2].user.user_id] |
|
208 | 208 | assert removed == [members[0].user.user_id] |
|
209 | 209 | |
|
210 | 210 | |
|
211 | 211 | def test_set_users_as_members_and_find_user_in_group( |
|
212 | 212 | user_util, user_regular, user_admin): |
|
213 | 213 | user_group = user_util.create_user_group() |
|
214 | 214 | assert len(user_group.members) == 0 |
|
215 | 215 | user_list = [user_regular.user_id, user_admin.user_id] |
|
216 | 216 | UserGroupModel()._set_users_as_members(user_group, user_list) |
|
217 | 217 | assert len(user_group.members) == 2 |
|
218 | 218 | assert UserGroupModel()._find_user_in_group(user_regular, user_group) |
@@ -1,328 +1,328 b'' | |||
|
1 | 1 | |
|
2 | 2 | # Copyright (C) 2010-2023 RhodeCode GmbH |
|
3 | 3 | # |
|
4 | 4 | # This program is free software: you can redistribute it and/or modify |
|
5 | 5 | # it under the terms of the GNU Affero General Public License, version 3 |
|
6 | 6 | # (only), as published by the Free Software Foundation. |
|
7 | 7 | # |
|
8 | 8 | # This program is distributed in the hope that it will be useful, |
|
9 | 9 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
10 | 10 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
11 | 11 | # GNU General Public License for more details. |
|
12 | 12 | # |
|
13 | 13 | # You should have received a copy of the GNU Affero General Public License |
|
14 | 14 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
15 | 15 | # |
|
16 | 16 | # This program is dual-licensed. If you wish to learn more about the |
|
17 | 17 | # RhodeCode Enterprise Edition, including its added features, Support services, |
|
18 | 18 | # and proprietary license terms, please see https://rhodecode.com/licenses/ |
|
19 | 19 | |
|
20 | 20 | import pytest |
|
21 | 21 | import mock |
|
22 | 22 | |
|
23 | 23 | from rhodecode.model.db import ( |
|
24 | 24 | true, User, UserGroup, UserGroupMember, UserEmailMap, Permission, UserIpMap) |
|
25 | 25 | from rhodecode.model.meta import Session |
|
26 | 26 | from rhodecode.model.user import UserModel |
|
27 | 27 | from rhodecode.model.user_group import UserGroupModel |
|
28 | 28 | from rhodecode.model.repo import RepoModel |
|
29 | 29 | from rhodecode.model.repo_group import RepoGroupModel |
|
30 | 30 | from rhodecode.tests.fixture import Fixture |
|
31 | 31 | from rhodecode.lib.str_utils import safe_str |
|
32 | 32 | |
|
33 | 33 | |
|
34 | 34 | fixture = Fixture() |
|
35 | 35 | |
|
36 | 36 | |
|
37 | 37 | class TestGetUsers(object): |
|
38 | 38 | def test_returns_active_users(self, backend, user_util): |
|
39 | 39 | for i in range(4): |
|
40 | 40 | is_active = i % 2 == 0 |
|
41 | 41 | user_util.create_user(active=is_active, lastname='Fake user') |
|
42 | 42 | |
|
43 | 43 | with mock.patch('rhodecode.lib.helpers.gravatar_url'): |
|
44 | 44 | with mock.patch('rhodecode.lib.helpers.link_to_user'): |
|
45 | 45 | users = UserModel().get_users() |
|
46 | 46 | fake_users = [u for u in users if u['last_name'] == 'Fake user'] |
|
47 | 47 | assert len(fake_users) == 2 |
|
48 | 48 | |
|
49 | 49 | expected_keys = ( |
|
50 | 50 | 'id', 'first_name', 'last_name', 'username', 'icon_link', |
|
51 | 51 | 'value_display', 'value', 'value_type') |
|
52 | 52 | for user in users: |
|
53 |
assert user['value_type'] |
|
|
53 | assert user['value_type'] == 'user' | |
|
54 | 54 | for key in expected_keys: |
|
55 | 55 | assert key in user |
|
56 | 56 | |
|
57 | 57 | def test_returns_user_filtered_by_last_name(self, backend, user_util): |
|
58 | 58 | keywords = ('aBc', u'ΓΌnicode') |
|
59 | 59 | for keyword in keywords: |
|
60 | 60 | for i in range(2): |
|
61 | 61 | user_util.create_user( |
|
62 | 62 | active=True, lastname=u'Fake {} user'.format(keyword)) |
|
63 | 63 | |
|
64 | 64 | with mock.patch('rhodecode.lib.helpers.gravatar_url'): |
|
65 | 65 | with mock.patch('rhodecode.lib.helpers.link_to_user'): |
|
66 | 66 | keyword = keywords[1].lower() |
|
67 | 67 | users = UserModel().get_users(name_contains=keyword) |
|
68 | 68 | |
|
69 | 69 | fake_users = [u for u in users if u['last_name'].startswith('Fake')] |
|
70 | 70 | assert len(fake_users) == 2 |
|
71 | 71 | for user in fake_users: |
|
72 | 72 | assert user['last_name'] == safe_str('Fake ΓΌnicode user') |
|
73 | 73 | |
|
74 | 74 | def test_returns_user_filtered_by_first_name(self, backend, user_util): |
|
75 | 75 | created_users = [] |
|
76 | 76 | keywords = ('aBc', u'ΓΌnicode') |
|
77 | 77 | for keyword in keywords: |
|
78 | 78 | for i in range(2): |
|
79 | 79 | created_users.append(user_util.create_user( |
|
80 | 80 | active=True, lastname='Fake user', |
|
81 | 81 | firstname=u'Fake {} user'.format(keyword))) |
|
82 | 82 | |
|
83 | 83 | keyword = keywords[1].lower() |
|
84 | 84 | with mock.patch('rhodecode.lib.helpers.gravatar_url'): |
|
85 | 85 | with mock.patch('rhodecode.lib.helpers.link_to_user'): |
|
86 | 86 | users = UserModel().get_users(name_contains=keyword) |
|
87 | 87 | |
|
88 | 88 | fake_users = [u for u in users if u['last_name'].startswith('Fake')] |
|
89 | 89 | assert len(fake_users) == 2 |
|
90 | 90 | for user in fake_users: |
|
91 | 91 | assert user['first_name'] == safe_str('Fake ΓΌnicode user') |
|
92 | 92 | |
|
93 | 93 | def test_returns_user_filtered_by_username(self, backend, user_util): |
|
94 | 94 | created_users = [] |
|
95 | 95 | for i in range(5): |
|
96 | 96 | created_users.append(user_util.create_user( |
|
97 | 97 | active=True, lastname='Fake user')) |
|
98 | 98 | |
|
99 | 99 | user_filter = created_users[-1].username[-2:] |
|
100 | 100 | with mock.patch('rhodecode.lib.helpers.gravatar_url'): |
|
101 | 101 | with mock.patch('rhodecode.lib.helpers.link_to_user'): |
|
102 | 102 | users = UserModel().get_users(name_contains=user_filter) |
|
103 | 103 | |
|
104 | 104 | fake_users = [u for u in users if u['last_name'].startswith('Fake')] |
|
105 | 105 | assert len(fake_users) == 1 |
|
106 | 106 | assert fake_users[0]['username'] == created_users[-1].username |
|
107 | 107 | |
|
108 | 108 | def test_returns_limited_user_list(self, backend, user_util): |
|
109 | 109 | created_users = [] |
|
110 | 110 | for i in range(5): |
|
111 | 111 | created_users.append(user_util.create_user( |
|
112 | 112 | active=True, lastname='Fake user')) |
|
113 | 113 | |
|
114 | 114 | with mock.patch('rhodecode.lib.helpers.gravatar_url'): |
|
115 | 115 | with mock.patch('rhodecode.lib.helpers.link_to_user'): |
|
116 | 116 | users = UserModel().get_users(name_contains='Fake', limit=3) |
|
117 | 117 | |
|
118 | 118 | fake_users = [u for u in users if u['last_name'].startswith('Fake')] |
|
119 | 119 | assert len(fake_users) == 3 |
|
120 | 120 | |
|
121 | 121 | |
|
122 | 122 | @pytest.fixture() |
|
123 | 123 | def test_user(request, baseapp): |
|
124 | 124 | usr = UserModel().create_or_update( |
|
125 | 125 | username='test_user', |
|
126 | 126 | password='qweqwe', |
|
127 | 127 | email='main_email@rhodecode.org', |
|
128 | 128 | firstname='u1', lastname=u'u1') |
|
129 | 129 | Session().commit() |
|
130 | 130 | assert User.get_by_username(u'test_user') == usr |
|
131 | 131 | |
|
132 | 132 | @request.addfinalizer |
|
133 | 133 | def cleanup(): |
|
134 | 134 | if UserModel().get_user(usr.user_id) is None: |
|
135 | 135 | return |
|
136 | 136 | |
|
137 | 137 | perm = Permission.query().all() |
|
138 | 138 | for p in perm: |
|
139 | 139 | UserModel().revoke_perm(usr, p) |
|
140 | 140 | |
|
141 | 141 | UserModel().delete(usr.user_id) |
|
142 | 142 | Session().commit() |
|
143 | 143 | |
|
144 | 144 | return usr |
|
145 | 145 | |
|
146 | 146 | |
|
147 | 147 | def test_create_and_remove(test_user): |
|
148 | 148 | usr = test_user |
|
149 | 149 | |
|
150 | 150 | # make user group |
|
151 | 151 | user_group = fixture.create_user_group('some_example_group') |
|
152 | 152 | Session().commit() |
|
153 | 153 | |
|
154 | 154 | UserGroupModel().add_user_to_group(user_group, usr) |
|
155 | 155 | Session().commit() |
|
156 | 156 | |
|
157 | 157 | assert UserGroup.get(user_group.users_group_id) == user_group |
|
158 | 158 | assert UserGroupMember.query().count() == 1 |
|
159 | 159 | UserModel().delete(usr.user_id) |
|
160 | 160 | Session().commit() |
|
161 | 161 | |
|
162 | 162 | assert UserGroupMember.query().all() == [] |
|
163 | 163 | |
|
164 | 164 | |
|
165 | 165 | def test_additonal_email_as_main(test_user): |
|
166 | 166 | with pytest.raises(AttributeError): |
|
167 | 167 | m = UserEmailMap() |
|
168 | 168 | m.email = test_user.email |
|
169 | 169 | m.user = test_user |
|
170 | 170 | Session().add(m) |
|
171 | 171 | Session().commit() |
|
172 | 172 | |
|
173 | 173 | |
|
174 | 174 | def test_extra_email_map(test_user): |
|
175 | 175 | |
|
176 | 176 | m = UserEmailMap() |
|
177 | 177 | m.email = u'main_email2@rhodecode.org' |
|
178 | 178 | m.user = test_user |
|
179 | 179 | Session().add(m) |
|
180 | 180 | Session().commit() |
|
181 | 181 | |
|
182 | 182 | u = User.get_by_email(email='main_email@rhodecode.org') |
|
183 | 183 | assert test_user.user_id == u.user_id |
|
184 | 184 | assert test_user.username == u.username |
|
185 | 185 | |
|
186 | 186 | u = User.get_by_email(email='main_email2@rhodecode.org') |
|
187 | 187 | assert test_user.user_id == u.user_id |
|
188 | 188 | assert test_user.username == u.username |
|
189 | 189 | u = User.get_by_email(email='main_email3@rhodecode.org') |
|
190 | 190 | assert u is None |
|
191 | 191 | |
|
192 | 192 | |
|
193 | 193 | def test_get_api_data_replaces_secret_data_by_default(test_user): |
|
194 | 194 | api_data = test_user.get_api_data() |
|
195 | 195 | api_key_length = 40 |
|
196 | 196 | expected_replacement = '*' * api_key_length |
|
197 | 197 | |
|
198 | 198 | for key in api_data['auth_tokens']: |
|
199 | 199 | assert key == expected_replacement |
|
200 | 200 | |
|
201 | 201 | |
|
202 | 202 | def test_get_api_data_includes_secret_data_if_activated(test_user): |
|
203 | 203 | api_data = test_user.get_api_data(include_secrets=True) |
|
204 | 204 | assert api_data['auth_tokens'] == test_user.auth_tokens |
|
205 | 205 | |
|
206 | 206 | |
|
207 | 207 | def test_add_perm(test_user): |
|
208 | 208 | perm = Permission.query().all()[0] |
|
209 | 209 | UserModel().grant_perm(test_user, perm) |
|
210 | 210 | Session().commit() |
|
211 | 211 | assert UserModel().has_perm(test_user, perm) |
|
212 | 212 | |
|
213 | 213 | |
|
214 | 214 | def test_has_perm(test_user): |
|
215 | 215 | perm = Permission.query().all() |
|
216 | 216 | for p in perm: |
|
217 | 217 | assert not UserModel().has_perm(test_user, p) |
|
218 | 218 | |
|
219 | 219 | |
|
220 | 220 | def test_revoke_perm(test_user): |
|
221 | 221 | perm = Permission.query().all()[0] |
|
222 | 222 | UserModel().grant_perm(test_user, perm) |
|
223 | 223 | Session().commit() |
|
224 | 224 | assert UserModel().has_perm(test_user, perm) |
|
225 | 225 | |
|
226 | 226 | # revoke |
|
227 | 227 | UserModel().revoke_perm(test_user, perm) |
|
228 | 228 | Session().commit() |
|
229 | 229 | assert not UserModel().has_perm(test_user, perm) |
|
230 | 230 | |
|
231 | 231 | |
|
232 | 232 | @pytest.mark.parametrize("ip_range, expected, expect_errors", [ |
|
233 | 233 | ('', [], False), |
|
234 | 234 | ('127.0.0.1', ['127.0.0.1'], False), |
|
235 | 235 | ('127.0.0.1,127.0.0.2', ['127.0.0.1', '127.0.0.2'], False), |
|
236 | 236 | ('127.0.0.1 , 127.0.0.2', ['127.0.0.1', '127.0.0.2'], False), |
|
237 | 237 | ( |
|
238 | 238 | '127.0.0.1,172.172.172.0,127.0.0.2', |
|
239 | 239 | ['127.0.0.1', '172.172.172.0', '127.0.0.2'], False), |
|
240 | 240 | ( |
|
241 | 241 | '127.0.0.1-127.0.0.5', |
|
242 | 242 | ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4', '127.0.0.5'], |
|
243 | 243 | False), |
|
244 | 244 | ( |
|
245 | 245 | '127.0.0.1 - 127.0.0.5', |
|
246 | 246 | ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4', '127.0.0.5'], |
|
247 | 247 | False |
|
248 | 248 | ), |
|
249 | 249 | ('-', [], True), |
|
250 | 250 | ('127.0.0.1-32', [], True), |
|
251 | 251 | ( |
|
252 | 252 | '127.0.0.1,127.0.0.1,127.0.0.1,127.0.0.1-127.0.0.2,127.0.0.2', |
|
253 | 253 | ['127.0.0.1', '127.0.0.2'], False), |
|
254 | 254 | ( |
|
255 | 255 | '127.0.0.1-127.0.0.2,127.0.0.4-127.0.0.6,', |
|
256 | 256 | ['127.0.0.1', '127.0.0.2', '127.0.0.4', '127.0.0.5', '127.0.0.6'], |
|
257 | 257 | False |
|
258 | 258 | ), |
|
259 | 259 | ( |
|
260 | 260 | '127.0.0.1-127.0.0.2,127.0.0.1-127.0.0.6,', |
|
261 | 261 | ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4', '127.0.0.5', |
|
262 | 262 | '127.0.0.6'], |
|
263 | 263 | False |
|
264 | 264 | ), |
|
265 | 265 | ]) |
|
266 | 266 | def test_ip_range_generator(ip_range, expected, expect_errors): |
|
267 | 267 | func = UserModel().parse_ip_range |
|
268 | 268 | if expect_errors: |
|
269 | 269 | pytest.raises(ValueError, func, ip_range) |
|
270 | 270 | else: |
|
271 | 271 | parsed_list = func(ip_range) |
|
272 | 272 | assert parsed_list == expected |
|
273 | 273 | |
|
274 | 274 | |
|
275 | 275 | def test_user_delete_cascades_ip_whitelist(test_user): |
|
276 | 276 | sample_ip = '1.1.1.1' |
|
277 | 277 | uid_map = UserIpMap(user_id=test_user.user_id, ip_addr=sample_ip) |
|
278 | 278 | Session().add(uid_map) |
|
279 | 279 | Session().delete(test_user) |
|
280 | 280 | try: |
|
281 | 281 | Session().flush() |
|
282 | 282 | finally: |
|
283 | 283 | Session().rollback() |
|
284 | 284 | |
|
285 | 285 | |
|
286 | 286 | def test_account_for_deactivation_generation(test_user): |
|
287 | 287 | accounts = UserModel().get_accounts_in_creation_order( |
|
288 | 288 | current_user=test_user) |
|
289 | 289 | # current user should be #1 in the list |
|
290 | 290 | assert accounts[0] == test_user.user_id |
|
291 | 291 | active_users = User.query().filter(User.active == true()).count() |
|
292 | 292 | assert active_users == len(accounts) |
|
293 | 293 | |
|
294 | 294 | |
|
295 | 295 | def test_user_delete_cascades_permissions_on_repo(backend, test_user): |
|
296 | 296 | test_repo = backend.create_repo() |
|
297 | 297 | RepoModel().grant_user_permission( |
|
298 | 298 | test_repo, test_user, 'repository.write') |
|
299 | 299 | Session().commit() |
|
300 | 300 | |
|
301 | 301 | assert test_user.repo_to_perm |
|
302 | 302 | |
|
303 | 303 | UserModel().delete(test_user) |
|
304 | 304 | Session().commit() |
|
305 | 305 | |
|
306 | 306 | |
|
307 | 307 | def test_user_delete_cascades_permissions_on_repo_group( |
|
308 | 308 | test_repo_group, test_user): |
|
309 | 309 | RepoGroupModel().grant_user_permission( |
|
310 | 310 | test_repo_group, test_user, 'group.write') |
|
311 | 311 | Session().commit() |
|
312 | 312 | |
|
313 | 313 | assert test_user.repo_group_to_perm |
|
314 | 314 | |
|
315 | 315 | Session().delete(test_user) |
|
316 | 316 | Session().commit() |
|
317 | 317 | |
|
318 | 318 | |
|
319 | 319 | def test_user_delete_cascades_permissions_on_user_group( |
|
320 | 320 | test_user_group, test_user): |
|
321 | 321 | UserGroupModel().grant_user_permission( |
|
322 | 322 | test_user_group, test_user, 'usergroup.write') |
|
323 | 323 | Session().commit() |
|
324 | 324 | |
|
325 | 325 | assert test_user.user_group_to_perm |
|
326 | 326 | |
|
327 | 327 | Session().delete(test_user) |
|
328 | 328 | Session().commit() |
@@ -1,295 +1,295 b'' | |||
|
1 | 1 | |
|
2 | 2 | # Copyright (C) 2010-2023 RhodeCode GmbH |
|
3 | 3 | # |
|
4 | 4 | # This program is free software: you can redistribute it and/or modify |
|
5 | 5 | # it under the terms of the GNU Affero General Public License, version 3 |
|
6 | 6 | # (only), as published by the Free Software Foundation. |
|
7 | 7 | # |
|
8 | 8 | # This program is distributed in the hope that it will be useful, |
|
9 | 9 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
10 | 10 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
11 | 11 | # GNU General Public License for more details. |
|
12 | 12 | # |
|
13 | 13 | # You should have received a copy of the GNU Affero General Public License |
|
14 | 14 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
15 | 15 | # |
|
16 | 16 | # This program is dual-licensed. If you wish to learn more about the |
|
17 | 17 | # RhodeCode Enterprise Edition, including its added features, Support services, |
|
18 | 18 | # and proprietary license terms, please see https://rhodecode.com/licenses/ |
|
19 | 19 | |
|
20 | 20 | |
|
21 | 21 | import formencode |
|
22 | 22 | import pytest |
|
23 | 23 | |
|
24 | 24 | from rhodecode.tests import ( |
|
25 | 25 | HG_REPO, TEST_USER_REGULAR2_EMAIL, TEST_USER_REGULAR2_LOGIN, |
|
26 | 26 | TEST_USER_REGULAR2_PASS, TEST_USER_ADMIN_LOGIN, TESTS_TMP_PATH) |
|
27 | 27 | |
|
28 | 28 | from rhodecode.model import validators as v |
|
29 | 29 | from rhodecode.model.user_group import UserGroupModel |
|
30 | 30 | |
|
31 | 31 | from rhodecode.model.meta import Session |
|
32 | 32 | from rhodecode.model.repo_group import RepoGroupModel |
|
33 | 33 | from rhodecode.model.db import ChangesetStatus, Repository |
|
34 | 34 | from rhodecode.model.changeset_status import ChangesetStatusModel |
|
35 | 35 | from rhodecode.tests.fixture import Fixture |
|
36 | 36 | |
|
37 | 37 | fixture = Fixture() |
|
38 | 38 | |
|
39 | 39 | pytestmark = pytest.mark.usefixtures('baseapp') |
|
40 | 40 | |
|
41 | 41 | |
|
42 | 42 | @pytest.fixture() |
|
43 | 43 | def localizer(): |
|
44 | 44 | def func(msg): |
|
45 | 45 | return msg |
|
46 | 46 | return func |
|
47 | 47 | |
|
48 | 48 | |
|
49 | 49 | def test_Message_extractor(localizer): |
|
50 | 50 | validator = v.ValidUsername(localizer) |
|
51 | 51 | pytest.raises(formencode.Invalid, validator.to_python, 'default') |
|
52 | 52 | |
|
53 | 53 | class StateObj(object): |
|
54 | 54 | pass |
|
55 | 55 | |
|
56 | 56 | pytest.raises( |
|
57 | 57 | formencode.Invalid, validator.to_python, 'default', StateObj) |
|
58 | 58 | |
|
59 | 59 | |
|
60 | 60 | def test_ValidUsername(localizer): |
|
61 | 61 | validator = v.ValidUsername(localizer) |
|
62 | 62 | |
|
63 | 63 | pytest.raises(formencode.Invalid, validator.to_python, 'default') |
|
64 | 64 | pytest.raises(formencode.Invalid, validator.to_python, 'new_user') |
|
65 | 65 | pytest.raises(formencode.Invalid, validator.to_python, '.,') |
|
66 | 66 | pytest.raises( |
|
67 | 67 | formencode.Invalid, validator.to_python, TEST_USER_ADMIN_LOGIN) |
|
68 | 68 | assert 'test' == validator.to_python('test') |
|
69 | 69 | |
|
70 | 70 | validator = v.ValidUsername(localizer, edit=True, old_data={'user_id': 1}) |
|
71 | 71 | |
|
72 | 72 | |
|
73 | 73 | def test_ValidRepoUser(localizer): |
|
74 | 74 | validator = v.ValidRepoUser(localizer) |
|
75 | 75 | pytest.raises(formencode.Invalid, validator.to_python, 'nouser') |
|
76 | 76 | assert TEST_USER_ADMIN_LOGIN == \ |
|
77 | 77 | validator.to_python(TEST_USER_ADMIN_LOGIN) |
|
78 | 78 | |
|
79 | 79 | |
|
80 | 80 | def test_ValidUserGroup(localizer): |
|
81 | 81 | validator = v.ValidUserGroup(localizer) |
|
82 | 82 | pytest.raises(formencode.Invalid, validator.to_python, 'default') |
|
83 | 83 | pytest.raises(formencode.Invalid, validator.to_python, '.,') |
|
84 | 84 | |
|
85 | 85 | gr = fixture.create_user_group('test') |
|
86 | 86 | gr2 = fixture.create_user_group('tes2') |
|
87 | 87 | Session().commit() |
|
88 | 88 | pytest.raises(formencode.Invalid, validator.to_python, 'test') |
|
89 | 89 | assert gr.users_group_id is not None |
|
90 | 90 | validator = v.ValidUserGroup(localizer, |
|
91 | 91 | edit=True, |
|
92 | 92 | old_data={'users_group_id': gr2.users_group_id}) |
|
93 | 93 | |
|
94 | 94 | pytest.raises(formencode.Invalid, validator.to_python, 'test') |
|
95 | 95 | pytest.raises(formencode.Invalid, validator.to_python, 'TesT') |
|
96 | 96 | pytest.raises(formencode.Invalid, validator.to_python, 'TEST') |
|
97 | 97 | UserGroupModel().delete(gr) |
|
98 | 98 | UserGroupModel().delete(gr2) |
|
99 | 99 | Session().commit() |
|
100 | 100 | |
|
101 | 101 | |
|
102 | 102 | @pytest.fixture(scope='function') |
|
103 | 103 | def repo_group(request): |
|
104 | 104 | model = RepoGroupModel() |
|
105 | 105 | gr = model.create( |
|
106 | 106 | group_name='test_gr', group_description='desc', just_db=True, |
|
107 | 107 | owner=TEST_USER_ADMIN_LOGIN) |
|
108 | 108 | |
|
109 | 109 | def cleanup(): |
|
110 | 110 | model.delete(gr) |
|
111 | 111 | |
|
112 | 112 | request.addfinalizer(cleanup) |
|
113 | 113 | |
|
114 | 114 | return gr |
|
115 | 115 | |
|
116 | 116 | |
|
117 | 117 | def test_ValidRepoGroup_same_name_as_repo(localizer): |
|
118 | 118 | validator = v.ValidRepoGroup(localizer) |
|
119 | 119 | with pytest.raises(formencode.Invalid) as excinfo: |
|
120 | 120 | validator.to_python({'group_name': HG_REPO}) |
|
121 | 121 | expected_msg = 'Repository with name "vcs_test_hg" already exists' |
|
122 | 122 | assert expected_msg in str(excinfo.value) |
|
123 | 123 | |
|
124 | 124 | |
|
125 | 125 | def test_ValidRepoGroup_group_exists(localizer, repo_group): |
|
126 | 126 | validator = v.ValidRepoGroup(localizer) |
|
127 | 127 | with pytest.raises(formencode.Invalid) as excinfo: |
|
128 | 128 | validator.to_python({'group_name': repo_group.group_name}) |
|
129 | 129 | expected_msg = 'Group "test_gr" already exists' |
|
130 | 130 | assert expected_msg in str(excinfo.value) |
|
131 | 131 | |
|
132 | 132 | |
|
133 | 133 | def test_ValidRepoGroup_invalid_parent(localizer, repo_group): |
|
134 | 134 | validator = v.ValidRepoGroup(localizer, edit=True, |
|
135 | 135 | old_data={'group_id': repo_group.group_id}) |
|
136 | 136 | with pytest.raises(formencode.Invalid) as excinfo: |
|
137 | 137 | validator.to_python({ |
|
138 | 138 | 'group_name': repo_group.group_name + 'n', |
|
139 | 139 | 'group_parent_id': repo_group.group_id, |
|
140 | 140 | }) |
|
141 | 141 | expected_msg = 'Cannot assign this group as parent' |
|
142 | 142 | assert expected_msg in str(excinfo.value) |
|
143 | 143 | |
|
144 | 144 | |
|
145 | 145 | def test_ValidRepoGroup_edit_group_no_root_permission(localizer, repo_group): |
|
146 | 146 | validator = v.ValidRepoGroup(localizer, |
|
147 | 147 | edit=True, old_data={'group_id': repo_group.group_id}, |
|
148 | 148 | can_create_in_root=False) |
|
149 | 149 | |
|
150 | 150 | # Cannot change parent |
|
151 | 151 | with pytest.raises(formencode.Invalid) as excinfo: |
|
152 | 152 | validator.to_python({'group_parent_id': '25'}) |
|
153 | 153 | expected_msg = 'no permission to store repository group in root location' |
|
154 | 154 | assert expected_msg in str(excinfo.value) |
|
155 | 155 | |
|
156 | 156 | # Chaning all the other fields is allowed |
|
157 | 157 | validator.to_python({'group_name': 'foo', 'group_parent_id': '-1'}) |
|
158 | 158 | validator.to_python( |
|
159 | 159 | {'user': TEST_USER_REGULAR2_LOGIN, 'group_parent_id': '-1'}) |
|
160 | 160 | validator.to_python({'group_description': 'bar', 'group_parent_id': '-1'}) |
|
161 | 161 | validator.to_python({'enable_locking': 'true', 'group_parent_id': '-1'}) |
|
162 | 162 | |
|
163 | 163 | |
|
164 | 164 | def test_ValidPassword(localizer): |
|
165 | 165 | validator = v.ValidPassword(localizer) |
|
166 | 166 | assert 'lol' == validator.to_python('lol') |
|
167 |
assert None |
|
|
167 | assert None is validator.to_python(None) | |
|
168 | 168 | pytest.raises(formencode.Invalid, validator.to_python, 'Δ ΔΕΌΕΊ') |
|
169 | 169 | |
|
170 | 170 | |
|
171 | 171 | def test_ValidPasswordsMatch(localizer): |
|
172 | 172 | validator = v.ValidPasswordsMatch(localizer) |
|
173 | 173 | pytest.raises( |
|
174 | 174 | formencode.Invalid, |
|
175 | 175 | validator.to_python, {'password': 'pass', |
|
176 | 176 | 'password_confirmation': 'pass2'}) |
|
177 | 177 | |
|
178 | 178 | pytest.raises( |
|
179 | 179 | formencode.Invalid, |
|
180 | 180 | validator.to_python, {'new_password': 'pass', |
|
181 | 181 | 'password_confirmation': 'pass2'}) |
|
182 | 182 | |
|
183 | 183 | assert {'new_password': 'pass', 'password_confirmation': 'pass'} == \ |
|
184 | 184 | validator.to_python({'new_password': 'pass', |
|
185 | 185 | 'password_confirmation': 'pass'}) |
|
186 | 186 | |
|
187 | 187 | assert {'password': 'pass', 'password_confirmation': 'pass'} == \ |
|
188 | 188 | validator.to_python({'password': 'pass', |
|
189 | 189 | 'password_confirmation': 'pass'}) |
|
190 | 190 | |
|
191 | 191 | |
|
192 | 192 | def test_ValidAuth(localizer, config_stub): |
|
193 | 193 | config_stub.testing_securitypolicy() |
|
194 | 194 | config_stub.include('rhodecode.authentication') |
|
195 | 195 | config_stub.include('rhodecode.authentication.plugins.auth_rhodecode') |
|
196 | 196 | config_stub.include('rhodecode.authentication.plugins.auth_token') |
|
197 | 197 | |
|
198 | 198 | validator = v.ValidAuth(localizer) |
|
199 | 199 | valid_creds = { |
|
200 | 200 | 'username': TEST_USER_REGULAR2_LOGIN, |
|
201 | 201 | 'password': TEST_USER_REGULAR2_PASS, |
|
202 | 202 | } |
|
203 | 203 | invalid_creds = { |
|
204 | 204 | 'username': 'err', |
|
205 | 205 | 'password': 'err', |
|
206 | 206 | } |
|
207 | 207 | assert valid_creds == validator.to_python(valid_creds) |
|
208 | 208 | pytest.raises( |
|
209 | 209 | formencode.Invalid, validator.to_python, invalid_creds) |
|
210 | 210 | |
|
211 | 211 | |
|
212 | 212 | def test_ValidRepoName(localizer): |
|
213 | 213 | validator = v.ValidRepoName(localizer) |
|
214 | 214 | |
|
215 | 215 | pytest.raises( |
|
216 | 216 | formencode.Invalid, validator.to_python, {'repo_name': ''}) |
|
217 | 217 | |
|
218 | 218 | pytest.raises( |
|
219 | 219 | formencode.Invalid, validator.to_python, {'repo_name': HG_REPO}) |
|
220 | 220 | |
|
221 | 221 | gr = RepoGroupModel().create(group_name='group_test', |
|
222 | 222 | group_description='desc', |
|
223 | 223 | owner=TEST_USER_ADMIN_LOGIN) |
|
224 | 224 | pytest.raises( |
|
225 | 225 | formencode.Invalid, validator.to_python, {'repo_name': gr.group_name}) |
|
226 | 226 | |
|
227 | 227 | #TODO: write an error case for that ie. create a repo withinh a group |
|
228 | 228 | # pytest.raises(formencode.Invalid, |
|
229 | 229 | # validator.to_python, {'repo_name': 'some', |
|
230 | 230 | # 'repo_group': gr.group_id}) |
|
231 | 231 | |
|
232 | 232 | |
|
233 | 233 | def test_ValidForkName(localizer): |
|
234 | 234 | # this uses ValidRepoName validator |
|
235 | 235 | assert True |
|
236 | 236 | |
|
237 | 237 | @pytest.mark.parametrize("name, expected", [ |
|
238 | 238 | ('test', 'test'), ('lolz!', 'lolz'), (' aavv', 'aavv'), |
|
239 | 239 | ('ala ma kota', 'ala-ma-kota'), ('@nooo', 'nooo'), |
|
240 | 240 | ('$!haha lolz !', 'haha-lolz'), ('$$$$$', ''), ('{}OK!', 'OK'), |
|
241 | 241 | ('/]re po', 're-po')]) |
|
242 | 242 | def test_SlugifyName(name, expected, localizer): |
|
243 | 243 | validator = v.SlugifyName(localizer) |
|
244 | 244 | assert expected == validator.to_python(name) |
|
245 | 245 | |
|
246 | 246 | |
|
247 | 247 | def test_ValidForkType(localizer): |
|
248 | 248 | validator = v.ValidForkType(localizer, old_data={'repo_type': 'hg'}) |
|
249 | 249 | assert 'hg' == validator.to_python('hg') |
|
250 | 250 | pytest.raises(formencode.Invalid, validator.to_python, 'git') |
|
251 | 251 | |
|
252 | 252 | |
|
253 | 253 | def test_ValidPath(localizer): |
|
254 | 254 | validator = v.ValidPath(localizer) |
|
255 | 255 | assert TESTS_TMP_PATH == validator.to_python(TESTS_TMP_PATH) |
|
256 | 256 | pytest.raises( |
|
257 | 257 | formencode.Invalid, validator.to_python, '/no_such_dir') |
|
258 | 258 | |
|
259 | 259 | |
|
260 | 260 | def test_UniqSystemEmail(localizer): |
|
261 | 261 | validator = v.UniqSystemEmail(localizer, old_data={}) |
|
262 | 262 | |
|
263 | 263 | assert 'mail@python.org' == validator.to_python('MaiL@Python.org') |
|
264 | 264 | |
|
265 | 265 | email = TEST_USER_REGULAR2_EMAIL |
|
266 | 266 | pytest.raises(formencode.Invalid, validator.to_python, email) |
|
267 | 267 | |
|
268 | 268 | |
|
269 | 269 | def test_ValidSystemEmail(localizer): |
|
270 | 270 | validator = v.ValidSystemEmail(localizer) |
|
271 | 271 | email = TEST_USER_REGULAR2_EMAIL |
|
272 | 272 | |
|
273 | 273 | assert email == validator.to_python(email) |
|
274 | 274 | pytest.raises(formencode.Invalid, validator.to_python, 'err') |
|
275 | 275 | |
|
276 | 276 | |
|
277 | 277 | def test_NotReviewedRevisions(localizer): |
|
278 | 278 | repo_id = Repository.get_by_repo_name(HG_REPO).repo_id |
|
279 | 279 | validator = v.NotReviewedRevisions(localizer, repo_id) |
|
280 | 280 | rev = '0' * 40 |
|
281 | 281 | # add status for a rev, that should throw an error because it is already |
|
282 | 282 | # reviewed |
|
283 | 283 | new_status = ChangesetStatus() |
|
284 | 284 | new_status.author = ChangesetStatusModel()._get_user(TEST_USER_ADMIN_LOGIN) |
|
285 | 285 | new_status.repo = ChangesetStatusModel()._get_repo(HG_REPO) |
|
286 | 286 | new_status.status = ChangesetStatus.STATUS_APPROVED |
|
287 | 287 | new_status.comment = None |
|
288 | 288 | new_status.revision = rev |
|
289 | 289 | Session().add(new_status) |
|
290 | 290 | Session().commit() |
|
291 | 291 | try: |
|
292 | 292 | pytest.raises(formencode.Invalid, validator.to_python, [rev]) |
|
293 | 293 | finally: |
|
294 | 294 | Session().delete(new_status) |
|
295 | 295 | Session().commit() |
General Comments 0
You need to be logged in to leave comments.
Login now