Show More
|
1 | NO CONTENT: new file 100644 |
@@ -0,0 +1,230 b'' | |||
|
1 | # -*- coding: utf-8 -*- | |
|
2 | ||
|
3 | # Copyright (C) 2010-2017 RhodeCode GmbH | |
|
4 | # | |
|
5 | # This program is free software: you can redistribute it and/or modify | |
|
6 | # it under the terms of the GNU Affero General Public License, version 3 | |
|
7 | # (only), as published by the Free Software Foundation. | |
|
8 | # | |
|
9 | # This program is distributed in the hope that it will be useful, | |
|
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
|
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
|
12 | # GNU General Public License for more details. | |
|
13 | # | |
|
14 | # You should have received a copy of the GNU Affero General Public License | |
|
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
|
16 | # | |
|
17 | # This program is dual-licensed. If you wish to learn more about the | |
|
18 | # RhodeCode Enterprise Edition, including its added features, Support services, | |
|
19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ | |
|
20 | ||
|
21 | import mock | |
|
22 | import pytest | |
|
23 | ||
|
24 | from rhodecode.lib.utils2 import str2bool | |
|
25 | from rhodecode.lib.vcs.exceptions import RepositoryRequirementError | |
|
26 | from rhodecode.model.db import Repository, UserRepoToPerm, Permission, User | |
|
27 | from rhodecode.model.meta import Session | |
|
28 | from rhodecode.tests import ( | |
|
29 | url, HG_REPO, TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, | |
|
30 | assert_session_flash) | |
|
31 | from rhodecode.tests.fixture import Fixture | |
|
32 | ||
|
33 | fixture = Fixture() | |
|
34 | ||
|
35 | ||
|
36 | def route_path(name, params=None, **kwargs): | |
|
37 | import urllib | |
|
38 | ||
|
39 | base_url = { | |
|
40 | 'edit_repo': '/{repo_name}/settings', | |
|
41 | }[name].format(**kwargs) | |
|
42 | ||
|
43 | if params: | |
|
44 | base_url = '{}?{}'.format(base_url, urllib.urlencode(params)) | |
|
45 | return base_url | |
|
46 | ||
|
47 | ||
|
48 | def _get_permission_for_user(user, repo): | |
|
49 | perm = UserRepoToPerm.query()\ | |
|
50 | .filter(UserRepoToPerm.repository == | |
|
51 | Repository.get_by_repo_name(repo))\ | |
|
52 | .filter(UserRepoToPerm.user == User.get_by_username(user))\ | |
|
53 | .all() | |
|
54 | return perm | |
|
55 | ||
|
56 | ||
|
57 | @pytest.mark.usefixtures('autologin_user', 'app') | |
|
58 | class TestAdminRepoSettings(object): | |
|
59 | @pytest.mark.parametrize('urlname', [ | |
|
60 | 'edit_repo', | |
|
61 | ]) | |
|
62 | def test_show_page(self, urlname, app, backend): | |
|
63 | app.get(route_path(urlname, repo_name=backend.repo_name), status=200) | |
|
64 | ||
|
65 | def test_edit_accessible_when_missing_requirements( | |
|
66 | self, backend_hg, autologin_user): | |
|
67 | scm_patcher = mock.patch.object( | |
|
68 | Repository, 'scm_instance', side_effect=RepositoryRequirementError) | |
|
69 | with scm_patcher: | |
|
70 | self.app.get(route_path('edit_repo', repo_name=backend_hg.repo_name)) | |
|
71 | ||
|
72 | @pytest.mark.parametrize('urlname', [ | |
|
73 | 'edit_repo_perms', | |
|
74 | 'edit_repo_advanced', | |
|
75 | 'repo_vcs_settings', | |
|
76 | 'edit_repo_fields', | |
|
77 | 'repo_settings_issuetracker', | |
|
78 | 'edit_repo_caches', | |
|
79 | 'edit_repo_remote', | |
|
80 | 'edit_repo_statistics', | |
|
81 | ]) | |
|
82 | def test_show_page_pylons(self, urlname, app): | |
|
83 | app.get(url(urlname, repo_name=HG_REPO)) | |
|
84 | ||
|
85 | @pytest.mark.parametrize('update_settings', [ | |
|
86 | {'repo_description': 'alter-desc'}, | |
|
87 | {'repo_owner': TEST_USER_REGULAR_LOGIN}, | |
|
88 | {'repo_private': 'true'}, | |
|
89 | {'repo_enable_locking': 'true'}, | |
|
90 | {'repo_enable_downloads': 'true'}, | |
|
91 | ]) | |
|
92 | def test_update_repo_settings(self, update_settings, csrf_token, backend, user_util): | |
|
93 | repo = user_util.create_repo(repo_type=backend.alias) | |
|
94 | repo_name = repo.repo_name | |
|
95 | ||
|
96 | params = fixture._get_repo_create_params( | |
|
97 | csrf_token=csrf_token, | |
|
98 | repo_name=repo_name, | |
|
99 | repo_type=backend.alias, | |
|
100 | repo_owner=TEST_USER_ADMIN_LOGIN, | |
|
101 | repo_description='DESC', | |
|
102 | ||
|
103 | repo_private='false', | |
|
104 | repo_enable_locking='false', | |
|
105 | repo_enable_downloads='false') | |
|
106 | params.update(update_settings) | |
|
107 | self.app.post( | |
|
108 | route_path('edit_repo', repo_name=repo_name), | |
|
109 | params=params, status=302) | |
|
110 | ||
|
111 | repo = Repository.get_by_repo_name(repo_name) | |
|
112 | assert repo.user.username == \ | |
|
113 | update_settings.get('repo_owner', repo.user.username) | |
|
114 | ||
|
115 | assert repo.description == \ | |
|
116 | update_settings.get('repo_description', repo.description) | |
|
117 | ||
|
118 | assert repo.private == \ | |
|
119 | str2bool(update_settings.get( | |
|
120 | 'repo_private', repo.private)) | |
|
121 | ||
|
122 | assert repo.enable_locking == \ | |
|
123 | str2bool(update_settings.get( | |
|
124 | 'repo_enable_locking', repo.enable_locking)) | |
|
125 | ||
|
126 | assert repo.enable_downloads == \ | |
|
127 | str2bool(update_settings.get( | |
|
128 | 'repo_enable_downloads', repo.enable_downloads)) | |
|
129 | ||
|
130 | def test_update_repo_name_via_settings(self, csrf_token, user_util, backend): | |
|
131 | repo = user_util.create_repo(repo_type=backend.alias) | |
|
132 | repo_name = repo.repo_name | |
|
133 | ||
|
134 | repo_group = user_util.create_repo_group() | |
|
135 | repo_group_name = repo_group.group_name | |
|
136 | new_name = repo_group_name + '_' + repo_name | |
|
137 | ||
|
138 | params = fixture._get_repo_create_params( | |
|
139 | csrf_token=csrf_token, | |
|
140 | repo_name=new_name, | |
|
141 | repo_type=backend.alias, | |
|
142 | repo_owner=TEST_USER_ADMIN_LOGIN, | |
|
143 | repo_description='DESC', | |
|
144 | repo_private='false', | |
|
145 | repo_enable_locking='false', | |
|
146 | repo_enable_downloads='false') | |
|
147 | self.app.post( | |
|
148 | route_path('edit_repo', repo_name=repo_name), | |
|
149 | params=params, status=302) | |
|
150 | repo = Repository.get_by_repo_name(new_name) | |
|
151 | assert repo.repo_name == new_name | |
|
152 | ||
|
153 | def test_update_repo_group_via_settings(self, csrf_token, user_util, backend): | |
|
154 | repo = user_util.create_repo(repo_type=backend.alias) | |
|
155 | repo_name = repo.repo_name | |
|
156 | ||
|
157 | repo_group = user_util.create_repo_group() | |
|
158 | repo_group_name = repo_group.group_name | |
|
159 | repo_group_id = repo_group.group_id | |
|
160 | ||
|
161 | new_name = repo_group_name + '/' + repo_name | |
|
162 | params = fixture._get_repo_create_params( | |
|
163 | csrf_token=csrf_token, | |
|
164 | repo_name=repo_name, | |
|
165 | repo_type=backend.alias, | |
|
166 | repo_owner=TEST_USER_ADMIN_LOGIN, | |
|
167 | repo_description='DESC', | |
|
168 | repo_group=repo_group_id, | |
|
169 | repo_private='false', | |
|
170 | repo_enable_locking='false', | |
|
171 | repo_enable_downloads='false') | |
|
172 | self.app.post( | |
|
173 | route_path('edit_repo', repo_name=repo_name), | |
|
174 | params=params, status=302) | |
|
175 | repo = Repository.get_by_repo_name(new_name) | |
|
176 | assert repo.repo_name == new_name | |
|
177 | ||
|
178 | def test_set_private_flag_sets_default_user_permissions_to_none( | |
|
179 | self, autologin_user, backend, csrf_token): | |
|
180 | ||
|
181 | # initially repository perm should be read | |
|
182 | perm = _get_permission_for_user(user='default', repo=backend.repo_name) | |
|
183 | assert len(perm) == 1 | |
|
184 | assert perm[0].permission.permission_name == 'repository.read' | |
|
185 | assert not backend.repo.private | |
|
186 | ||
|
187 | response = self.app.post( | |
|
188 | route_path('edit_repo', repo_name=backend.repo_name), | |
|
189 | params=fixture._get_repo_create_params( | |
|
190 | repo_private='true', | |
|
191 | repo_name=backend.repo_name, | |
|
192 | repo_type=backend.alias, | |
|
193 | repo_owner=TEST_USER_ADMIN_LOGIN, | |
|
194 | csrf_token=csrf_token), status=302) | |
|
195 | ||
|
196 | assert_session_flash( | |
|
197 | response, | |
|
198 | msg='Repository %s updated successfully' % (backend.repo_name)) | |
|
199 | ||
|
200 | repo = Repository.get_by_repo_name(backend.repo_name) | |
|
201 | assert repo.private is True | |
|
202 | ||
|
203 | # now the repo default permission should be None | |
|
204 | perm = _get_permission_for_user(user='default', repo=backend.repo_name) | |
|
205 | assert len(perm) == 1 | |
|
206 | assert perm[0].permission.permission_name == 'repository.none' | |
|
207 | ||
|
208 | response = self.app.post( | |
|
209 | route_path('edit_repo', repo_name=backend.repo_name), | |
|
210 | params=fixture._get_repo_create_params( | |
|
211 | repo_private='false', | |
|
212 | repo_name=backend.repo_name, | |
|
213 | repo_type=backend.alias, | |
|
214 | repo_owner=TEST_USER_ADMIN_LOGIN, | |
|
215 | csrf_token=csrf_token), status=302) | |
|
216 | ||
|
217 | assert_session_flash( | |
|
218 | response, | |
|
219 | msg='Repository %s updated successfully' % (backend.repo_name)) | |
|
220 | assert backend.repo.private is False | |
|
221 | ||
|
222 | # we turn off private now the repo default permission should stay None | |
|
223 | perm = _get_permission_for_user(user='default', repo=backend.repo_name) | |
|
224 | assert len(perm) == 1 | |
|
225 | assert perm[0].permission.permission_name == 'repository.none' | |
|
226 | ||
|
227 | # update this permission back | |
|
228 | perm[0].permission = Permission.get_by_key('repository.read') | |
|
229 | Session().add(perm[0]) | |
|
230 | Session().commit() |
@@ -0,0 +1,117 b'' | |||
|
1 | # -*- coding: utf-8 -*- | |
|
2 | ||
|
3 | # Copyright (C) 2010-2017 RhodeCode GmbH | |
|
4 | # | |
|
5 | # This program is free software: you can redistribute it and/or modify | |
|
6 | # it under the terms of the GNU Affero General Public License, version 3 | |
|
7 | # (only), as published by the Free Software Foundation. | |
|
8 | # | |
|
9 | # This program is distributed in the hope that it will be useful, | |
|
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
|
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
|
12 | # GNU General Public License for more details. | |
|
13 | # | |
|
14 | # You should have received a copy of the GNU Affero General Public License | |
|
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
|
16 | # | |
|
17 | # This program is dual-licensed. If you wish to learn more about the | |
|
18 | # RhodeCode Enterprise Edition, including its added features, Support services, | |
|
19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ | |
|
20 | ||
|
21 | import mock | |
|
22 | import pytest | |
|
23 | ||
|
24 | import rhodecode | |
|
25 | from rhodecode.model.settings import SettingsModel | |
|
26 | from rhodecode.tests import url | |
|
27 | from rhodecode.tests.utils import AssertResponse | |
|
28 | ||
|
29 | ||
|
30 | def route_path(name, params=None, **kwargs): | |
|
31 | import urllib | |
|
32 | ||
|
33 | base_url = { | |
|
34 | 'edit_repo': '/{repo_name}/settings', | |
|
35 | }[name].format(**kwargs) | |
|
36 | ||
|
37 | if params: | |
|
38 | base_url = '{}?{}'.format(base_url, urllib.urlencode(params)) | |
|
39 | return base_url | |
|
40 | ||
|
41 | ||
|
42 | @pytest.mark.usefixtures('autologin_user', 'app') | |
|
43 | class TestAdminRepoVcsSettings(object): | |
|
44 | ||
|
45 | @pytest.mark.parametrize('setting_name, setting_backends', [ | |
|
46 | ('hg_use_rebase_for_merging', ['hg']), | |
|
47 | ]) | |
|
48 | def test_labs_settings_visible_if_enabled( | |
|
49 | self, setting_name, setting_backends, backend): | |
|
50 | if backend.alias not in setting_backends: | |
|
51 | pytest.skip('Setting not available for backend {}'.format(backend)) | |
|
52 | ||
|
53 | vcs_settings_url = url( | |
|
54 | 'repo_vcs_settings', repo_name=backend.repo.repo_name) | |
|
55 | ||
|
56 | with mock.patch.dict( | |
|
57 | rhodecode.CONFIG, {'labs_settings_active': 'true'}): | |
|
58 | response = self.app.get(vcs_settings_url) | |
|
59 | ||
|
60 | assertr = AssertResponse(response) | |
|
61 | assertr.one_element_exists('#rhodecode_{}'.format(setting_name)) | |
|
62 | ||
|
63 | @pytest.mark.parametrize('setting_name, setting_backends', [ | |
|
64 | ('hg_use_rebase_for_merging', ['hg']), | |
|
65 | ]) | |
|
66 | def test_labs_settings_not_visible_if_disabled( | |
|
67 | self, setting_name, setting_backends, backend): | |
|
68 | if backend.alias not in setting_backends: | |
|
69 | pytest.skip('Setting not available for backend {}'.format(backend)) | |
|
70 | ||
|
71 | vcs_settings_url = url( | |
|
72 | 'repo_vcs_settings', repo_name=backend.repo.repo_name) | |
|
73 | ||
|
74 | with mock.patch.dict( | |
|
75 | rhodecode.CONFIG, {'labs_settings_active': 'false'}): | |
|
76 | response = self.app.get(vcs_settings_url) | |
|
77 | ||
|
78 | assertr = AssertResponse(response) | |
|
79 | assertr.no_element_exists('#rhodecode_{}'.format(setting_name)) | |
|
80 | ||
|
81 | @pytest.mark.parametrize('setting_name, setting_backends', [ | |
|
82 | ('hg_use_rebase_for_merging', ['hg']), | |
|
83 | ]) | |
|
84 | def test_update_boolean_settings( | |
|
85 | self, csrf_token, setting_name, setting_backends, backend): | |
|
86 | if backend.alias not in setting_backends: | |
|
87 | pytest.skip('Setting not available for backend {}'.format(backend)) | |
|
88 | ||
|
89 | repo = backend.create_repo() | |
|
90 | ||
|
91 | settings_model = SettingsModel(repo=repo) | |
|
92 | vcs_settings_url = url( | |
|
93 | 'repo_vcs_settings', repo_name=repo.repo_name) | |
|
94 | ||
|
95 | self.app.post( | |
|
96 | vcs_settings_url, | |
|
97 | params={ | |
|
98 | 'inherit_global_settings': False, | |
|
99 | 'new_svn_branch': 'dummy-value-for-testing', | |
|
100 | 'new_svn_tag': 'dummy-value-for-testing', | |
|
101 | 'rhodecode_{}'.format(setting_name): 'true', | |
|
102 | 'csrf_token': csrf_token, | |
|
103 | }) | |
|
104 | setting = settings_model.get_setting_by_name(setting_name) | |
|
105 | assert setting.app_settings_value | |
|
106 | ||
|
107 | self.app.post( | |
|
108 | vcs_settings_url, | |
|
109 | params={ | |
|
110 | 'inherit_global_settings': False, | |
|
111 | 'new_svn_branch': 'dummy-value-for-testing', | |
|
112 | 'new_svn_tag': 'dummy-value-for-testing', | |
|
113 | 'rhodecode_{}'.format(setting_name): 'false', | |
|
114 | 'csrf_token': csrf_token, | |
|
115 | }) | |
|
116 | setting = settings_model.get_setting_by_name(setting_name) | |
|
117 | assert not setting.app_settings_value |
@@ -1,203 +1,204 b'' | |||
|
1 | 1 | # -*- coding: utf-8 -*- |
|
2 | 2 | |
|
3 | 3 | # Copyright (C) 2016-2017 RhodeCode GmbH |
|
4 | 4 | # |
|
5 | 5 | # This program is free software: you can redistribute it and/or modify |
|
6 | 6 | # it under the terms of the GNU Affero General Public License, version 3 |
|
7 | 7 | # (only), as published by the Free Software Foundation. |
|
8 | 8 | # |
|
9 | 9 | # This program is distributed in the hope that it will be useful, |
|
10 | 10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 | 11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 | 12 | # GNU General Public License for more details. |
|
13 | 13 | # |
|
14 | 14 | # You should have received a copy of the GNU Affero General Public License |
|
15 | 15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | 16 | # |
|
17 | 17 | # This program is dual-licensed. If you wish to learn more about the |
|
18 | 18 | # RhodeCode Enterprise Edition, including its added features, Support services, |
|
19 | 19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ |
|
20 | 20 | |
|
21 | 21 | |
|
22 | 22 | import pytest |
|
23 | 23 | |
|
24 | from rhodecode.tests import no_newline_id_generator | |
|
24 | 25 | from rhodecode.config.middleware import ( |
|
25 | 26 | _sanitize_vcs_settings, _bool_setting, _string_setting, _list_setting, |
|
26 | 27 | _int_setting) |
|
27 | 28 | |
|
28 | 29 | |
|
29 | 30 | class TestHelperFunctions(object): |
|
30 | 31 | @pytest.mark.parametrize('raw, expected', [ |
|
31 | 32 | ('true', True), (u'true', True), |
|
32 | 33 | ('yes', True), (u'yes', True), |
|
33 | 34 | ('on', True), (u'on', True), |
|
34 | 35 | ('false', False), (u'false', False), |
|
35 | 36 | ('no', False), (u'no', False), |
|
36 | 37 | ('off', False), (u'off', False), |
|
37 | 38 | ('invalid-bool-value', False), |
|
38 | 39 | ('invalid-∫øø@-√å@¨€', False), |
|
39 | 40 | (u'invalid-∫øø@-√å@¨€', False), |
|
40 | 41 | ]) |
|
41 | 42 | def test_bool_setting_helper(self, raw, expected): |
|
42 | 43 | key = 'dummy-key' |
|
43 | 44 | settings = {key: raw} |
|
44 | 45 | _bool_setting(settings, key, None) |
|
45 | 46 | assert settings[key] is expected |
|
46 | 47 | |
|
47 | 48 | @pytest.mark.parametrize('raw, expected', [ |
|
48 | 49 | ('', ''), |
|
49 | 50 | ('test-string', 'test-string'), |
|
50 | 51 | ('CaSe-TeSt', 'case-test'), |
|
51 | 52 | ('test-string-烩€', 'test-string-烩€'), |
|
52 | 53 | (u'test-string-烩€', u'test-string-烩€'), |
|
53 | 54 | ]) |
|
54 | 55 | def test_string_setting_helper(self, raw, expected): |
|
55 | 56 | key = 'dummy-key' |
|
56 | 57 | settings = {key: raw} |
|
57 | 58 | _string_setting(settings, key, None) |
|
58 | 59 | assert settings[key] == expected |
|
59 | 60 | |
|
60 | 61 | @pytest.mark.parametrize('raw, expected', [ |
|
61 | 62 | ('', []), |
|
62 | 63 | ('test', ['test']), |
|
63 | 64 | ('CaSe-TeSt', ['CaSe-TeSt']), |
|
64 | 65 | ('test-string-烩€', ['test-string-烩€']), |
|
65 | 66 | (u'test-string-烩€', [u'test-string-烩€']), |
|
66 | 67 | ('hg git svn', ['hg', 'git', 'svn']), |
|
67 | 68 | ('hg,git,svn', ['hg', 'git', 'svn']), |
|
68 | 69 | ('hg, git, svn', ['hg', 'git', 'svn']), |
|
69 | 70 | ('hg\ngit\nsvn', ['hg', 'git', 'svn']), |
|
70 | 71 | (' hg\n git\n svn ', ['hg', 'git', 'svn']), |
|
71 | 72 | (', hg , git , svn , ', ['', 'hg', 'git', 'svn', '']), |
|
72 | 73 | ('cheese,free node,other', ['cheese', 'free node', 'other']), |
|
73 | ]) | |
|
74 | ], ids=no_newline_id_generator) | |
|
74 | 75 | def test_list_setting_helper(self, raw, expected): |
|
75 | 76 | key = 'dummy-key' |
|
76 | 77 | settings = {key: raw} |
|
77 | 78 | _list_setting(settings, key, None) |
|
78 | 79 | assert settings[key] == expected |
|
79 | 80 | |
|
80 | 81 | @pytest.mark.parametrize('raw, expected', [ |
|
81 | 82 | ('0', 0), |
|
82 | 83 | ('-0', 0), |
|
83 | 84 | ('12345', 12345), |
|
84 | 85 | ('-12345', -12345), |
|
85 | 86 | (u'-12345', -12345), |
|
86 | 87 | ]) |
|
87 | 88 | def test_int_setting_helper(self, raw, expected): |
|
88 | 89 | key = 'dummy-key' |
|
89 | 90 | settings = {key: raw} |
|
90 | 91 | _int_setting(settings, key, None) |
|
91 | 92 | assert settings[key] == expected |
|
92 | 93 | |
|
93 | 94 | @pytest.mark.parametrize('raw', [ |
|
94 | 95 | ('0xff'), |
|
95 | 96 | (''), |
|
96 | 97 | ('invalid-int'), |
|
97 | 98 | ('invalid-⁄~†'), |
|
98 | 99 | (u'invalid-⁄~†'), |
|
99 | 100 | ]) |
|
100 | 101 | def test_int_setting_helper_invalid_input(self, raw): |
|
101 | 102 | key = 'dummy-key' |
|
102 | 103 | settings = {key: raw} |
|
103 | 104 | with pytest.raises(Exception): |
|
104 | 105 | _int_setting(settings, key, None) |
|
105 | 106 | |
|
106 | 107 | |
|
107 | 108 | class TestSanitizeVcsSettings(object): |
|
108 | 109 | _bool_settings = [ |
|
109 | 110 | ('vcs.hooks.direct_calls', False), |
|
110 | 111 | ('vcs.server.enable', True), |
|
111 | 112 | ('vcs.start_server', False), |
|
112 | 113 | ('startup.import_repos', False), |
|
113 | 114 | ] |
|
114 | 115 | |
|
115 | 116 | _string_settings = [ |
|
116 | 117 | ('vcs.svn.compatible_version', ''), |
|
117 | 118 | ('git_rev_filter', '--all'), |
|
118 | 119 | ('vcs.hooks.protocol', 'http'), |
|
119 | 120 | ('vcs.scm_app_implementation', 'http'), |
|
120 | 121 | ('vcs.server', ''), |
|
121 | 122 | ('vcs.server.log_level', 'debug'), |
|
122 | 123 | ('vcs.server.protocol', 'http'), |
|
123 | 124 | ] |
|
124 | 125 | |
|
125 | 126 | _list_settings = [ |
|
126 | 127 | ('vcs.backends', 'hg git'), |
|
127 | 128 | ] |
|
128 | 129 | |
|
129 | 130 | @pytest.mark.parametrize('key, default', _list_settings) |
|
130 | 131 | def test_list_setting_spacesep_list(self, key, default): |
|
131 | 132 | test_list = ['test', 'list', 'values', 'for', key] |
|
132 | 133 | input_value = ' '.join(test_list) |
|
133 | 134 | settings = {key: input_value} |
|
134 | 135 | _sanitize_vcs_settings(settings) |
|
135 | 136 | assert settings[key] == test_list |
|
136 | 137 | |
|
137 | 138 | @pytest.mark.parametrize('key, default', _list_settings) |
|
138 | 139 | def test_list_setting_newlinesep_list(self, key, default): |
|
139 | 140 | test_list = ['test', 'list', 'values', 'for', key] |
|
140 | 141 | input_value = '\n'.join(test_list) |
|
141 | 142 | settings = {key: input_value} |
|
142 | 143 | _sanitize_vcs_settings(settings) |
|
143 | 144 | assert settings[key] == test_list |
|
144 | 145 | |
|
145 | 146 | @pytest.mark.parametrize('key, default', _list_settings) |
|
146 | 147 | def test_list_setting_commasep_list(self, key, default): |
|
147 | 148 | test_list = ['test', 'list', 'values', 'for', key] |
|
148 | 149 | input_value = ','.join(test_list) |
|
149 | 150 | settings = {key: input_value} |
|
150 | 151 | _sanitize_vcs_settings(settings) |
|
151 | 152 | assert settings[key] == test_list |
|
152 | 153 | |
|
153 | 154 | @pytest.mark.parametrize('key, default', _list_settings) |
|
154 | 155 | def test_list_setting_comma_and_space_sep_list(self, key, default): |
|
155 | 156 | test_list = ['test', 'list', 'values', 'for', key] |
|
156 | 157 | input_value = ', '.join(test_list) |
|
157 | 158 | settings = {key: input_value} |
|
158 | 159 | _sanitize_vcs_settings(settings) |
|
159 | 160 | assert settings[key] == test_list |
|
160 | 161 | |
|
161 | 162 | @pytest.mark.parametrize('key, default', _string_settings) |
|
162 | 163 | def test_string_setting_string(self, key, default): |
|
163 | 164 | test_value = 'test-string-for-{}'.format(key) |
|
164 | 165 | settings = {key: test_value} |
|
165 | 166 | _sanitize_vcs_settings(settings) |
|
166 | 167 | assert settings[key] == test_value |
|
167 | 168 | |
|
168 | 169 | @pytest.mark.parametrize('key, default', _string_settings) |
|
169 | 170 | def test_string_setting_default(self, key, default): |
|
170 | 171 | settings = {} |
|
171 | 172 | _sanitize_vcs_settings(settings) |
|
172 | 173 | assert settings[key] == default |
|
173 | 174 | |
|
174 | 175 | @pytest.mark.parametrize('key, default', _string_settings) |
|
175 | 176 | def test_string_setting_lowercase(self, key, default): |
|
176 | 177 | test_value = 'Test-String-For-{}'.format(key) |
|
177 | 178 | settings = {key: test_value} |
|
178 | 179 | _sanitize_vcs_settings(settings) |
|
179 | 180 | assert settings[key] == test_value.lower() |
|
180 | 181 | |
|
181 | 182 | @pytest.mark.parametrize('key, default', _bool_settings) |
|
182 | 183 | def test_bool_setting_true(self, key, default): |
|
183 | 184 | settings = {key: 'true'} |
|
184 | 185 | _sanitize_vcs_settings(settings) |
|
185 | 186 | assert settings[key] is True |
|
186 | 187 | |
|
187 | 188 | @pytest.mark.parametrize('key, default', _bool_settings) |
|
188 | 189 | def test_bool_setting_false(self, key, default): |
|
189 | 190 | settings = {key: 'false'} |
|
190 | 191 | _sanitize_vcs_settings(settings) |
|
191 | 192 | assert settings[key] is False |
|
192 | 193 | |
|
193 | 194 | @pytest.mark.parametrize('key, default', _bool_settings) |
|
194 | 195 | def test_bool_setting_invalid_string(self, key, default): |
|
195 | 196 | settings = {key: 'no-bool-val-string'} |
|
196 | 197 | _sanitize_vcs_settings(settings) |
|
197 | 198 | assert settings[key] is False |
|
198 | 199 | |
|
199 | 200 | @pytest.mark.parametrize('key, default', _bool_settings) |
|
200 | 201 | def test_bool_setting_default(self, key, default): |
|
201 | 202 | settings = {} |
|
202 | 203 | _sanitize_vcs_settings(settings) |
|
203 | 204 | assert settings[key] is default |
@@ -1,79 +1,79 b'' | |||
|
1 | 1 | # -*- coding: utf-8 -*- |
|
2 | 2 | |
|
3 | 3 | # Copyright (C) 2010-2017 RhodeCode GmbH |
|
4 | 4 | # |
|
5 | 5 | # This program is free software: you can redistribute it and/or modify |
|
6 | 6 | # it under the terms of the GNU Affero General Public License, version 3 |
|
7 | 7 | # (only), as published by the Free Software Foundation. |
|
8 | 8 | # |
|
9 | 9 | # This program is distributed in the hope that it will be useful, |
|
10 | 10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 | 11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 | 12 | # GNU General Public License for more details. |
|
13 | 13 | # |
|
14 | 14 | # You should have received a copy of the GNU Affero General Public License |
|
15 | 15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | 16 | # |
|
17 | 17 | # This program is dual-licensed. If you wish to learn more about the |
|
18 | 18 | # RhodeCode Enterprise Edition, including its added features, Support services, |
|
19 | 19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ |
|
20 | 20 | |
|
21 | 21 | import mock |
|
22 | 22 | import pylons |
|
23 | 23 | import pytest |
|
24 | 24 | from pylons import tmpl_context as c |
|
25 | 25 | |
|
26 | 26 | |
|
27 | 27 | @pytest.fixture |
|
28 | 28 | def current_user(request, user_util): |
|
29 | 29 | user = user_util.create_user() |
|
30 | 30 | |
|
31 | 31 | request_mock = mock.Mock(user=user) |
|
32 | 32 | pylons.request._push_object(request_mock) |
|
33 | 33 | |
|
34 | 34 | @request.addfinalizer |
|
35 | 35 | def cleanup(): |
|
36 | 36 | pylons.request._pop_object() |
|
37 | 37 | return user |
|
38 | 38 | |
|
39 | 39 | |
|
40 | 40 | @pytest.fixture |
|
41 | 41 | def personal_group_with_parent(user_util, current_user): |
|
42 | 42 | group_read_only = user_util.create_repo_group() |
|
43 | 43 | user_util.grant_user_permission_to_repo_group( |
|
44 | 44 | group_read_only, current_user, 'group.read') |
|
45 | 45 | |
|
46 | 46 | # TODO: johbo: This should go into the business models |
|
47 | 47 | group_as_admin = user_util.create_repo_group( |
|
48 | 48 | owner=current_user) |
|
49 | 49 | group_as_admin.parent_group = group_read_only |
|
50 | 50 | group_as_admin.group_name = group_as_admin.get_new_name( |
|
51 | 51 | group_as_admin.group_name) |
|
52 | 52 | |
|
53 | 53 | return group_as_admin |
|
54 | 54 | |
|
55 | 55 | |
|
56 | 56 | @pytest.fixture |
|
57 | 57 | def controller(): |
|
58 | 58 | from rhodecode.controllers.admin import repo_groups |
|
59 | 59 | return repo_groups.RepoGroupsController() |
|
60 | 60 | |
|
61 | 61 | |
|
62 | 62 | def test_repo_groups_load_defaults( |
|
63 | 63 | current_user, personal_group_with_parent, controller): |
|
64 | 64 | personal_group = personal_group_with_parent |
|
65 | 65 | controller._RepoGroupsController__load_defaults(True, personal_group) |
|
66 | 66 | |
|
67 |
expected_list = [ |
|
|
67 | expected_list = [-1, personal_group.parent_group.group_id] | |
|
68 | 68 | returned_group_ids = [group[0] for group in c.repo_groups] |
|
69 | 69 | assert returned_group_ids == expected_list |
|
70 | 70 | |
|
71 | 71 | |
|
72 | 72 | def test_repo_groups_load_defaults_with_missing_group( |
|
73 | 73 | current_user, personal_group_with_parent, controller): |
|
74 | 74 | personal_group = personal_group_with_parent |
|
75 | 75 | controller._RepoGroupsController__load_defaults(True) |
|
76 | 76 | |
|
77 |
expected_list = sorted([ |
|
|
77 | expected_list = sorted([-1, personal_group.group_id]) | |
|
78 | 78 | returned_group_ids = sorted([group[0] for group in c.repo_groups]) |
|
79 | 79 | assert returned_group_ids == expected_list |
@@ -1,1268 +1,1207 b'' | |||
|
1 | 1 | # -*- coding: utf-8 -*- |
|
2 | 2 | |
|
3 | 3 | # Copyright (C) 2010-2017 RhodeCode GmbH |
|
4 | 4 | # |
|
5 | 5 | # This program is free software: you can redistribute it and/or modify |
|
6 | 6 | # it under the terms of the GNU Affero General Public License, version 3 |
|
7 | 7 | # (only), as published by the Free Software Foundation. |
|
8 | 8 | # |
|
9 | 9 | # This program is distributed in the hope that it will be useful, |
|
10 | 10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 | 11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 | 12 | # GNU General Public License for more details. |
|
13 | 13 | # |
|
14 | 14 | # You should have received a copy of the GNU Affero General Public License |
|
15 | 15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | 16 | # |
|
17 | 17 | # This program is dual-licensed. If you wish to learn more about the |
|
18 | 18 | # RhodeCode Enterprise Edition, including its added features, Support services, |
|
19 | 19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ |
|
20 | 20 | |
|
21 | 21 | import urllib |
|
22 | 22 | |
|
23 | 23 | import mock |
|
24 | 24 | import pytest |
|
25 | 25 | |
|
26 | 26 | from rhodecode.lib import auth |
|
27 | 27 | from rhodecode.lib.utils2 import safe_str, str2bool, safe_unicode |
|
28 | 28 | from rhodecode.lib.vcs.exceptions import RepositoryRequirementError |
|
29 | 29 | from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\ |
|
30 | 30 | Permission |
|
31 | 31 | from rhodecode.model.meta import Session |
|
32 | 32 | from rhodecode.model.repo import RepoModel |
|
33 | 33 | from rhodecode.model.repo_group import RepoGroupModel |
|
34 | 34 | from rhodecode.model.settings import SettingsModel, VcsSettingsModel |
|
35 | 35 | from rhodecode.model.user import UserModel |
|
36 | 36 | from rhodecode.tests import ( |
|
37 | 37 | login_user_session, url, assert_session_flash, TEST_USER_ADMIN_LOGIN, |
|
38 | 38 | TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS, HG_REPO, GIT_REPO, |
|
39 | 39 | logout_user_session) |
|
40 | 40 | from rhodecode.tests.fixture import Fixture, error_function |
|
41 | 41 | from rhodecode.tests.utils import AssertResponse, repo_on_filesystem |
|
42 | 42 | |
|
43 | 43 | fixture = Fixture() |
|
44 | 44 | |
|
45 | 45 | |
|
46 | 46 | @pytest.mark.usefixtures("app") |
|
47 | 47 | class TestAdminRepos(object): |
|
48 | 48 | |
|
49 | 49 | def test_index(self): |
|
50 | 50 | self.app.get(url('repos')) |
|
51 | 51 | |
|
52 | 52 | def test_create_page_restricted(self, autologin_user, backend): |
|
53 | 53 | with mock.patch('rhodecode.BACKENDS', {'git': 'git'}): |
|
54 | 54 | response = self.app.get(url('new_repo'), status=200) |
|
55 | 55 | assert_response = AssertResponse(response) |
|
56 | 56 | element = assert_response.get_element('#repo_type') |
|
57 | 57 | assert element.text_content() == '\ngit\n' |
|
58 | 58 | |
|
59 | 59 | def test_create_page_non_restricted(self, autologin_user, backend): |
|
60 | 60 | response = self.app.get(url('new_repo'), status=200) |
|
61 | 61 | assert_response = AssertResponse(response) |
|
62 | 62 | assert_response.element_contains('#repo_type', 'git') |
|
63 | 63 | assert_response.element_contains('#repo_type', 'svn') |
|
64 | 64 | assert_response.element_contains('#repo_type', 'hg') |
|
65 | 65 | |
|
66 | 66 | @pytest.mark.parametrize("suffix", |
|
67 | 67 | [u'', u'xxa'], ids=['', 'non-ascii']) |
|
68 | 68 | def test_create(self, autologin_user, backend, suffix, csrf_token): |
|
69 | 69 | repo_name_unicode = backend.new_repo_name(suffix=suffix) |
|
70 | 70 | repo_name = repo_name_unicode.encode('utf8') |
|
71 | 71 | description_unicode = u'description for newly created repo' + suffix |
|
72 | 72 | description = description_unicode.encode('utf8') |
|
73 | 73 | response = self.app.post( |
|
74 | 74 | url('repos'), |
|
75 | 75 | fixture._get_repo_create_params( |
|
76 | 76 | repo_private=False, |
|
77 | 77 | repo_name=repo_name, |
|
78 | 78 | repo_type=backend.alias, |
|
79 | 79 | repo_description=description, |
|
80 | 80 | csrf_token=csrf_token), |
|
81 | 81 | status=302) |
|
82 | 82 | |
|
83 | 83 | self.assert_repository_is_created_correctly( |
|
84 | 84 | repo_name, description, backend) |
|
85 | 85 | |
|
86 | 86 | def test_create_numeric(self, autologin_user, backend, csrf_token): |
|
87 | 87 | numeric_repo = '1234' |
|
88 | 88 | repo_name = numeric_repo |
|
89 | 89 | description = 'description for newly created repo' + numeric_repo |
|
90 | 90 | self.app.post( |
|
91 | 91 | url('repos'), |
|
92 | 92 | fixture._get_repo_create_params( |
|
93 | 93 | repo_private=False, |
|
94 | 94 | repo_name=repo_name, |
|
95 | 95 | repo_type=backend.alias, |
|
96 | 96 | repo_description=description, |
|
97 | 97 | csrf_token=csrf_token)) |
|
98 | 98 | |
|
99 | 99 | self.assert_repository_is_created_correctly( |
|
100 | 100 | repo_name, description, backend) |
|
101 | 101 | |
|
102 | 102 | @pytest.mark.parametrize("suffix", [u'', u'ąćę'], ids=['', 'non-ascii']) |
|
103 | 103 | def test_create_in_group( |
|
104 | 104 | self, autologin_user, backend, suffix, csrf_token): |
|
105 | 105 | # create GROUP |
|
106 | 106 | group_name = 'sometest_%s' % backend.alias |
|
107 | 107 | gr = RepoGroupModel().create(group_name=group_name, |
|
108 | 108 | group_description='test', |
|
109 | 109 | owner=TEST_USER_ADMIN_LOGIN) |
|
110 | 110 | Session().commit() |
|
111 | 111 | |
|
112 | 112 | repo_name = u'ingroup' + suffix |
|
113 | 113 | repo_name_full = RepoGroup.url_sep().join( |
|
114 | 114 | [group_name, repo_name]) |
|
115 | 115 | description = u'description for newly created repo' |
|
116 | 116 | self.app.post( |
|
117 | 117 | url('repos'), |
|
118 | 118 | fixture._get_repo_create_params( |
|
119 | 119 | repo_private=False, |
|
120 | 120 | repo_name=safe_str(repo_name), |
|
121 | 121 | repo_type=backend.alias, |
|
122 | 122 | repo_description=description, |
|
123 | 123 | repo_group=gr.group_id, |
|
124 | 124 | csrf_token=csrf_token)) |
|
125 | 125 | |
|
126 | 126 | # TODO: johbo: Cleanup work to fixture |
|
127 | 127 | try: |
|
128 | 128 | self.assert_repository_is_created_correctly( |
|
129 | 129 | repo_name_full, description, backend) |
|
130 | 130 | |
|
131 | 131 | new_repo = RepoModel().get_by_repo_name(repo_name_full) |
|
132 | 132 | inherited_perms = UserRepoToPerm.query().filter( |
|
133 | 133 | UserRepoToPerm.repository_id == new_repo.repo_id).all() |
|
134 | 134 | assert len(inherited_perms) == 1 |
|
135 | 135 | finally: |
|
136 | 136 | RepoModel().delete(repo_name_full) |
|
137 | 137 | RepoGroupModel().delete(group_name) |
|
138 | 138 | Session().commit() |
|
139 | 139 | |
|
140 | 140 | def test_create_in_group_numeric( |
|
141 | 141 | self, autologin_user, backend, csrf_token): |
|
142 | 142 | # create GROUP |
|
143 | 143 | group_name = 'sometest_%s' % backend.alias |
|
144 | 144 | gr = RepoGroupModel().create(group_name=group_name, |
|
145 | 145 | group_description='test', |
|
146 | 146 | owner=TEST_USER_ADMIN_LOGIN) |
|
147 | 147 | Session().commit() |
|
148 | 148 | |
|
149 | 149 | repo_name = '12345' |
|
150 | 150 | repo_name_full = RepoGroup.url_sep().join([group_name, repo_name]) |
|
151 | 151 | description = 'description for newly created repo' |
|
152 | 152 | self.app.post( |
|
153 | 153 | url('repos'), |
|
154 | 154 | fixture._get_repo_create_params( |
|
155 | 155 | repo_private=False, |
|
156 | 156 | repo_name=repo_name, |
|
157 | 157 | repo_type=backend.alias, |
|
158 | 158 | repo_description=description, |
|
159 | 159 | repo_group=gr.group_id, |
|
160 | 160 | csrf_token=csrf_token)) |
|
161 | 161 | |
|
162 | 162 | # TODO: johbo: Cleanup work to fixture |
|
163 | 163 | try: |
|
164 | 164 | self.assert_repository_is_created_correctly( |
|
165 | 165 | repo_name_full, description, backend) |
|
166 | 166 | |
|
167 | 167 | new_repo = RepoModel().get_by_repo_name(repo_name_full) |
|
168 | 168 | inherited_perms = UserRepoToPerm.query()\ |
|
169 | 169 | .filter(UserRepoToPerm.repository_id == new_repo.repo_id).all() |
|
170 | 170 | assert len(inherited_perms) == 1 |
|
171 | 171 | finally: |
|
172 | 172 | RepoModel().delete(repo_name_full) |
|
173 | 173 | RepoGroupModel().delete(group_name) |
|
174 | 174 | Session().commit() |
|
175 | 175 | |
|
176 | 176 | def test_create_in_group_without_needed_permissions(self, backend): |
|
177 | 177 | session = login_user_session( |
|
178 | 178 | self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) |
|
179 | 179 | csrf_token = auth.get_csrf_token(session) |
|
180 | 180 | # revoke |
|
181 | 181 | user_model = UserModel() |
|
182 | 182 | # disable fork and create on default user |
|
183 | 183 | user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository') |
|
184 | 184 | user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none') |
|
185 | 185 | user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository') |
|
186 | 186 | user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none') |
|
187 | 187 | |
|
188 | 188 | # disable on regular user |
|
189 | 189 | user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository') |
|
190 | 190 | user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none') |
|
191 | 191 | user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository') |
|
192 | 192 | user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none') |
|
193 | 193 | Session().commit() |
|
194 | 194 | |
|
195 | 195 | # create GROUP |
|
196 | 196 | group_name = 'reg_sometest_%s' % backend.alias |
|
197 | 197 | gr = RepoGroupModel().create(group_name=group_name, |
|
198 | 198 | group_description='test', |
|
199 | 199 | owner=TEST_USER_ADMIN_LOGIN) |
|
200 | 200 | Session().commit() |
|
201 | 201 | |
|
202 | 202 | group_name_allowed = 'reg_sometest_allowed_%s' % backend.alias |
|
203 | 203 | gr_allowed = RepoGroupModel().create( |
|
204 | 204 | group_name=group_name_allowed, |
|
205 | 205 | group_description='test', |
|
206 | 206 | owner=TEST_USER_REGULAR_LOGIN) |
|
207 | 207 | Session().commit() |
|
208 | 208 | |
|
209 | 209 | repo_name = 'ingroup' |
|
210 | 210 | description = 'description for newly created repo' |
|
211 | 211 | response = self.app.post( |
|
212 | 212 | url('repos'), |
|
213 | 213 | fixture._get_repo_create_params( |
|
214 | 214 | repo_private=False, |
|
215 | 215 | repo_name=repo_name, |
|
216 | 216 | repo_type=backend.alias, |
|
217 | 217 | repo_description=description, |
|
218 | 218 | repo_group=gr.group_id, |
|
219 | 219 | csrf_token=csrf_token)) |
|
220 | 220 | |
|
221 | 221 | response.mustcontain('Invalid value') |
|
222 | 222 | |
|
223 | 223 | # user is allowed to create in this group |
|
224 | 224 | repo_name = 'ingroup' |
|
225 | 225 | repo_name_full = RepoGroup.url_sep().join( |
|
226 | 226 | [group_name_allowed, repo_name]) |
|
227 | 227 | description = 'description for newly created repo' |
|
228 | 228 | response = self.app.post( |
|
229 | 229 | url('repos'), |
|
230 | 230 | fixture._get_repo_create_params( |
|
231 | 231 | repo_private=False, |
|
232 | 232 | repo_name=repo_name, |
|
233 | 233 | repo_type=backend.alias, |
|
234 | 234 | repo_description=description, |
|
235 | 235 | repo_group=gr_allowed.group_id, |
|
236 | 236 | csrf_token=csrf_token)) |
|
237 | 237 | |
|
238 | 238 | # TODO: johbo: Cleanup in pytest fixture |
|
239 | 239 | try: |
|
240 | 240 | self.assert_repository_is_created_correctly( |
|
241 | 241 | repo_name_full, description, backend) |
|
242 | 242 | |
|
243 | 243 | new_repo = RepoModel().get_by_repo_name(repo_name_full) |
|
244 | 244 | inherited_perms = UserRepoToPerm.query().filter( |
|
245 | 245 | UserRepoToPerm.repository_id == new_repo.repo_id).all() |
|
246 | 246 | assert len(inherited_perms) == 1 |
|
247 | 247 | |
|
248 | 248 | assert repo_on_filesystem(repo_name_full) |
|
249 | 249 | finally: |
|
250 | 250 | RepoModel().delete(repo_name_full) |
|
251 | 251 | RepoGroupModel().delete(group_name) |
|
252 | 252 | RepoGroupModel().delete(group_name_allowed) |
|
253 | 253 | Session().commit() |
|
254 | 254 | |
|
255 | 255 | def test_create_in_group_inherit_permissions(self, autologin_user, backend, |
|
256 | 256 | csrf_token): |
|
257 | 257 | # create GROUP |
|
258 | 258 | group_name = 'sometest_%s' % backend.alias |
|
259 | 259 | gr = RepoGroupModel().create(group_name=group_name, |
|
260 | 260 | group_description='test', |
|
261 | 261 | owner=TEST_USER_ADMIN_LOGIN) |
|
262 | 262 | perm = Permission.get_by_key('repository.write') |
|
263 | 263 | RepoGroupModel().grant_user_permission( |
|
264 | 264 | gr, TEST_USER_REGULAR_LOGIN, perm) |
|
265 | 265 | |
|
266 | 266 | # add repo permissions |
|
267 | 267 | Session().commit() |
|
268 | 268 | |
|
269 | 269 | repo_name = 'ingroup_inherited_%s' % backend.alias |
|
270 | 270 | repo_name_full = RepoGroup.url_sep().join([group_name, repo_name]) |
|
271 | 271 | description = 'description for newly created repo' |
|
272 | 272 | self.app.post( |
|
273 | 273 | url('repos'), |
|
274 | 274 | fixture._get_repo_create_params( |
|
275 | 275 | repo_private=False, |
|
276 | 276 | repo_name=repo_name, |
|
277 | 277 | repo_type=backend.alias, |
|
278 | 278 | repo_description=description, |
|
279 | 279 | repo_group=gr.group_id, |
|
280 | 280 | repo_copy_permissions=True, |
|
281 | 281 | csrf_token=csrf_token)) |
|
282 | 282 | |
|
283 | 283 | # TODO: johbo: Cleanup to pytest fixture |
|
284 | 284 | try: |
|
285 | 285 | self.assert_repository_is_created_correctly( |
|
286 | 286 | repo_name_full, description, backend) |
|
287 | 287 | except Exception: |
|
288 | 288 | RepoGroupModel().delete(group_name) |
|
289 | 289 | Session().commit() |
|
290 | 290 | raise |
|
291 | 291 | |
|
292 | 292 | # check if inherited permissions are applied |
|
293 | 293 | new_repo = RepoModel().get_by_repo_name(repo_name_full) |
|
294 | 294 | inherited_perms = UserRepoToPerm.query().filter( |
|
295 | 295 | UserRepoToPerm.repository_id == new_repo.repo_id).all() |
|
296 | 296 | assert len(inherited_perms) == 2 |
|
297 | 297 | |
|
298 | 298 | assert TEST_USER_REGULAR_LOGIN in [ |
|
299 | 299 | x.user.username for x in inherited_perms] |
|
300 | 300 | assert 'repository.write' in [ |
|
301 | 301 | x.permission.permission_name for x in inherited_perms] |
|
302 | 302 | |
|
303 | 303 | RepoModel().delete(repo_name_full) |
|
304 | 304 | RepoGroupModel().delete(group_name) |
|
305 | 305 | Session().commit() |
|
306 | 306 | |
|
307 | 307 | @pytest.mark.xfail_backends( |
|
308 | 308 | "git", "hg", reason="Missing reposerver support") |
|
309 | 309 | def test_create_with_clone_uri(self, autologin_user, backend, reposerver, |
|
310 | 310 | csrf_token): |
|
311 | 311 | source_repo = backend.create_repo(number_of_commits=2) |
|
312 | 312 | source_repo_name = source_repo.repo_name |
|
313 | 313 | reposerver.serve(source_repo.scm_instance()) |
|
314 | 314 | |
|
315 | 315 | repo_name = backend.new_repo_name() |
|
316 | 316 | response = self.app.post( |
|
317 | 317 | url('repos'), |
|
318 | 318 | fixture._get_repo_create_params( |
|
319 | 319 | repo_private=False, |
|
320 | 320 | repo_name=repo_name, |
|
321 | 321 | repo_type=backend.alias, |
|
322 | 322 | repo_description='', |
|
323 | 323 | clone_uri=reposerver.url, |
|
324 | 324 | csrf_token=csrf_token), |
|
325 | 325 | status=302) |
|
326 | 326 | |
|
327 | 327 | # Should be redirected to the creating page |
|
328 | 328 | response.mustcontain('repo_creating') |
|
329 | 329 | |
|
330 | 330 | # Expecting that both repositories have same history |
|
331 | 331 | source_repo = RepoModel().get_by_repo_name(source_repo_name) |
|
332 | 332 | source_vcs = source_repo.scm_instance() |
|
333 | 333 | repo = RepoModel().get_by_repo_name(repo_name) |
|
334 | 334 | repo_vcs = repo.scm_instance() |
|
335 | 335 | assert source_vcs[0].message == repo_vcs[0].message |
|
336 | 336 | assert source_vcs.count() == repo_vcs.count() |
|
337 | 337 | assert source_vcs.commit_ids == repo_vcs.commit_ids |
|
338 | 338 | |
|
339 | 339 | @pytest.mark.xfail_backends("svn", reason="Depends on import support") |
|
340 | 340 | def test_create_remote_repo_wrong_clone_uri(self, autologin_user, backend, |
|
341 | 341 | csrf_token): |
|
342 | 342 | repo_name = backend.new_repo_name() |
|
343 | 343 | description = 'description for newly created repo' |
|
344 | 344 | response = self.app.post( |
|
345 | 345 | url('repos'), |
|
346 | 346 | fixture._get_repo_create_params( |
|
347 | 347 | repo_private=False, |
|
348 | 348 | repo_name=repo_name, |
|
349 | 349 | repo_type=backend.alias, |
|
350 | 350 | repo_description=description, |
|
351 | 351 | clone_uri='http://repo.invalid/repo', |
|
352 | 352 | csrf_token=csrf_token)) |
|
353 | 353 | response.mustcontain('invalid clone url') |
|
354 | 354 | |
|
355 | 355 | @pytest.mark.xfail_backends("svn", reason="Depends on import support") |
|
356 | 356 | def test_create_remote_repo_wrong_clone_uri_hg_svn( |
|
357 | 357 | self, autologin_user, backend, csrf_token): |
|
358 | 358 | repo_name = backend.new_repo_name() |
|
359 | 359 | description = 'description for newly created repo' |
|
360 | 360 | response = self.app.post( |
|
361 | 361 | url('repos'), |
|
362 | 362 | fixture._get_repo_create_params( |
|
363 | 363 | repo_private=False, |
|
364 | 364 | repo_name=repo_name, |
|
365 | 365 | repo_type=backend.alias, |
|
366 | 366 | repo_description=description, |
|
367 | 367 | clone_uri='svn+http://svn.invalid/repo', |
|
368 | 368 | csrf_token=csrf_token)) |
|
369 | 369 | response.mustcontain('invalid clone url') |
|
370 | 370 | |
|
371 | 371 | def test_create_with_git_suffix( |
|
372 | 372 | self, autologin_user, backend, csrf_token): |
|
373 | 373 | repo_name = backend.new_repo_name() + ".git" |
|
374 | 374 | description = 'description for newly created repo' |
|
375 | 375 | response = self.app.post( |
|
376 | 376 | url('repos'), |
|
377 | 377 | fixture._get_repo_create_params( |
|
378 | 378 | repo_private=False, |
|
379 | 379 | repo_name=repo_name, |
|
380 | 380 | repo_type=backend.alias, |
|
381 | 381 | repo_description=description, |
|
382 | 382 | csrf_token=csrf_token)) |
|
383 | 383 | response.mustcontain('Repository name cannot end with .git') |
|
384 | 384 | |
|
385 | 385 | @pytest.mark.parametrize("suffix", [u'', u'ąęł'], ids=['', 'non-ascii']) |
|
386 | 386 | def test_delete(self, autologin_user, backend, suffix, csrf_token): |
|
387 | 387 | repo = backend.create_repo(name_suffix=suffix) |
|
388 | 388 | repo_name = repo.repo_name |
|
389 | 389 | |
|
390 | 390 | response = self.app.post(url('repo', repo_name=repo_name), |
|
391 | 391 | params={'_method': 'delete', |
|
392 | 392 | 'csrf_token': csrf_token}) |
|
393 | 393 | assert_session_flash(response, 'Deleted repository %s' % (repo_name)) |
|
394 | 394 | response.follow() |
|
395 | 395 | |
|
396 | 396 | # check if repo was deleted from db |
|
397 | 397 | assert RepoModel().get_by_repo_name(repo_name) is None |
|
398 | 398 | assert not repo_on_filesystem(repo_name) |
|
399 | 399 | |
|
400 | 400 | def test_show(self, autologin_user, backend): |
|
401 | 401 | self.app.get(url('repo', repo_name=backend.repo_name)) |
|
402 | 402 | |
|
403 | def test_edit(self, backend, autologin_user): | |
|
404 | self.app.get(url('edit_repo', repo_name=backend.repo_name)) | |
|
405 | ||
|
406 | def test_edit_accessible_when_missing_requirements( | |
|
407 | self, backend_hg, autologin_user): | |
|
408 | scm_patcher = mock.patch.object( | |
|
409 | Repository, 'scm_instance', side_effect=RepositoryRequirementError) | |
|
410 | with scm_patcher: | |
|
411 | self.app.get(url('edit_repo', repo_name=backend_hg.repo_name)) | |
|
412 | ||
|
413 | def test_set_private_flag_sets_default_to_none( | |
|
414 | self, autologin_user, backend, csrf_token): | |
|
415 | # initially repository perm should be read | |
|
416 | perm = _get_permission_for_user(user='default', repo=backend.repo_name) | |
|
417 | assert len(perm) == 1 | |
|
418 | assert perm[0].permission.permission_name == 'repository.read' | |
|
419 | assert not backend.repo.private | |
|
420 | ||
|
421 | response = self.app.post( | |
|
422 | url('repo', repo_name=backend.repo_name), | |
|
423 | fixture._get_repo_create_params( | |
|
424 | repo_private=1, | |
|
425 | repo_name=backend.repo_name, | |
|
426 | repo_type=backend.alias, | |
|
427 | user=TEST_USER_ADMIN_LOGIN, | |
|
428 | _method='put', | |
|
429 | csrf_token=csrf_token)) | |
|
430 | assert_session_flash( | |
|
431 | response, | |
|
432 | msg='Repository %s updated successfully' % (backend.repo_name)) | |
|
433 | assert backend.repo.private | |
|
434 | ||
|
435 | # now the repo default permission should be None | |
|
436 | perm = _get_permission_for_user(user='default', repo=backend.repo_name) | |
|
437 | assert len(perm) == 1 | |
|
438 | assert perm[0].permission.permission_name == 'repository.none' | |
|
439 | ||
|
440 | response = self.app.post( | |
|
441 | url('repo', repo_name=backend.repo_name), | |
|
442 | fixture._get_repo_create_params( | |
|
443 | repo_private=False, | |
|
444 | repo_name=backend.repo_name, | |
|
445 | repo_type=backend.alias, | |
|
446 | user=TEST_USER_ADMIN_LOGIN, | |
|
447 | _method='put', | |
|
448 | csrf_token=csrf_token)) | |
|
449 | assert_session_flash( | |
|
450 | response, | |
|
451 | msg='Repository %s updated successfully' % (backend.repo_name)) | |
|
452 | assert not backend.repo.private | |
|
453 | ||
|
454 | # we turn off private now the repo default permission should stay None | |
|
455 | perm = _get_permission_for_user(user='default', repo=backend.repo_name) | |
|
456 | assert len(perm) == 1 | |
|
457 | assert perm[0].permission.permission_name == 'repository.none' | |
|
458 | ||
|
459 | # update this permission back | |
|
460 | perm[0].permission = Permission.get_by_key('repository.read') | |
|
461 | Session().add(perm[0]) | |
|
462 | Session().commit() | |
|
463 | ||
|
464 | 403 | def test_default_user_cannot_access_private_repo_in_a_group( |
|
465 | 404 | self, autologin_user, user_util, backend, csrf_token): |
|
466 | 405 | |
|
467 | 406 | group = user_util.create_repo_group() |
|
468 | 407 | |
|
469 | 408 | repo = backend.create_repo( |
|
470 | 409 | repo_private=True, repo_group=group, repo_copy_permissions=True) |
|
471 | 410 | |
|
472 | 411 | permissions = _get_permission_for_user( |
|
473 | 412 | user='default', repo=repo.repo_name) |
|
474 | 413 | assert len(permissions) == 1 |
|
475 | 414 | assert permissions[0].permission.permission_name == 'repository.none' |
|
476 | 415 | assert permissions[0].repository.private is True |
|
477 | 416 | |
|
478 | 417 | def test_set_repo_fork_has_no_self_id(self, autologin_user, backend): |
|
479 | 418 | repo = backend.repo |
|
480 | 419 | response = self.app.get( |
|
481 | 420 | url('edit_repo_advanced', repo_name=backend.repo_name)) |
|
482 | 421 | opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id |
|
483 | 422 | response.mustcontain(no=[opt]) |
|
484 | 423 | |
|
485 | 424 | def test_set_fork_of_target_repo( |
|
486 | 425 | self, autologin_user, backend, csrf_token): |
|
487 | 426 | target_repo = 'target_%s' % backend.alias |
|
488 | 427 | fixture.create_repo(target_repo, repo_type=backend.alias) |
|
489 | 428 | repo2 = Repository.get_by_repo_name(target_repo) |
|
490 | 429 | response = self.app.post( |
|
491 | 430 | url('edit_repo_advanced_fork', repo_name=backend.repo_name), |
|
492 | 431 | params={'id_fork_of': repo2.repo_id, '_method': 'put', |
|
493 | 432 | 'csrf_token': csrf_token}) |
|
494 | 433 | repo = Repository.get_by_repo_name(backend.repo_name) |
|
495 | 434 | repo2 = Repository.get_by_repo_name(target_repo) |
|
496 | 435 | assert_session_flash( |
|
497 | 436 | response, |
|
498 | 437 | 'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name)) |
|
499 | 438 | |
|
500 | 439 | assert repo.fork == repo2 |
|
501 | 440 | response = response.follow() |
|
502 | 441 | # check if given repo is selected |
|
503 | 442 | |
|
504 | 443 | opt = 'This repository is a fork of <a href="%s">%s</a>' % ( |
|
505 | 444 | url('summary_home', repo_name=repo2.repo_name), repo2.repo_name) |
|
506 | 445 | |
|
507 | 446 | response.mustcontain(opt) |
|
508 | 447 | |
|
509 | 448 | fixture.destroy_repo(target_repo, forks='detach') |
|
510 | 449 | |
|
511 | 450 | @pytest.mark.backends("hg", "git") |
|
512 | 451 | def test_set_fork_of_other_type_repo(self, autologin_user, backend, |
|
513 | 452 | csrf_token): |
|
514 | 453 | TARGET_REPO_MAP = { |
|
515 | 454 | 'git': { |
|
516 | 455 | 'type': 'hg', |
|
517 | 456 | 'repo_name': HG_REPO}, |
|
518 | 457 | 'hg': { |
|
519 | 458 | 'type': 'git', |
|
520 | 459 | 'repo_name': GIT_REPO}, |
|
521 | 460 | } |
|
522 | 461 | target_repo = TARGET_REPO_MAP[backend.alias] |
|
523 | 462 | |
|
524 | 463 | repo2 = Repository.get_by_repo_name(target_repo['repo_name']) |
|
525 | 464 | response = self.app.post( |
|
526 | 465 | url('edit_repo_advanced_fork', repo_name=backend.repo_name), |
|
527 | 466 | params={'id_fork_of': repo2.repo_id, '_method': 'put', |
|
528 | 467 | 'csrf_token': csrf_token}) |
|
529 | 468 | assert_session_flash( |
|
530 | 469 | response, |
|
531 | 470 | 'Cannot set repository as fork of repository with other type') |
|
532 | 471 | |
|
533 | 472 | def test_set_fork_of_none(self, autologin_user, backend, csrf_token): |
|
534 | 473 | # mark it as None |
|
535 | 474 | response = self.app.post( |
|
536 | 475 | url('edit_repo_advanced_fork', repo_name=backend.repo_name), |
|
537 | 476 | params={'id_fork_of': None, '_method': 'put', |
|
538 | 477 | 'csrf_token': csrf_token}) |
|
539 | 478 | assert_session_flash( |
|
540 | 479 | response, |
|
541 | 480 | 'Marked repo %s as fork of %s' |
|
542 | 481 | % (backend.repo_name, "Nothing")) |
|
543 | 482 | assert backend.repo.fork is None |
|
544 | 483 | |
|
545 | 484 | def test_set_fork_of_same_repo(self, autologin_user, backend, csrf_token): |
|
546 | 485 | repo = Repository.get_by_repo_name(backend.repo_name) |
|
547 | 486 | response = self.app.post( |
|
548 | 487 | url('edit_repo_advanced_fork', repo_name=backend.repo_name), |
|
549 | 488 | params={'id_fork_of': repo.repo_id, '_method': 'put', |
|
550 | 489 | 'csrf_token': csrf_token}) |
|
551 | 490 | assert_session_flash( |
|
552 | 491 | response, 'An error occurred during this operation') |
|
553 | 492 | |
|
554 | 493 | def test_create_on_top_level_without_permissions(self, backend): |
|
555 | 494 | session = login_user_session( |
|
556 | 495 | self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) |
|
557 | 496 | csrf_token = auth.get_csrf_token(session) |
|
558 | 497 | |
|
559 | 498 | # revoke |
|
560 | 499 | user_model = UserModel() |
|
561 | 500 | # disable fork and create on default user |
|
562 | 501 | user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository') |
|
563 | 502 | user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none') |
|
564 | 503 | user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository') |
|
565 | 504 | user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none') |
|
566 | 505 | |
|
567 | 506 | # disable on regular user |
|
568 | 507 | user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository') |
|
569 | 508 | user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none') |
|
570 | 509 | user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository') |
|
571 | 510 | user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none') |
|
572 | 511 | Session().commit() |
|
573 | 512 | |
|
574 | 513 | repo_name = backend.new_repo_name() |
|
575 | 514 | description = 'description for newly created repo' |
|
576 | 515 | response = self.app.post( |
|
577 | 516 | url('repos'), |
|
578 | 517 | fixture._get_repo_create_params( |
|
579 | 518 | repo_private=False, |
|
580 | 519 | repo_name=repo_name, |
|
581 | 520 | repo_type=backend.alias, |
|
582 | 521 | repo_description=description, |
|
583 | 522 | csrf_token=csrf_token)) |
|
584 | 523 | |
|
585 | 524 | response.mustcontain( |
|
586 | 525 | u"You do not have the permission to store repositories in " |
|
587 | 526 | u"the root location.") |
|
588 | 527 | |
|
589 | 528 | @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function) |
|
590 | 529 | def test_create_repo_when_filesystem_op_fails( |
|
591 | 530 | self, autologin_user, backend, csrf_token): |
|
592 | 531 | repo_name = backend.new_repo_name() |
|
593 | 532 | description = 'description for newly created repo' |
|
594 | 533 | |
|
595 | 534 | response = self.app.post( |
|
596 | 535 | url('repos'), |
|
597 | 536 | fixture._get_repo_create_params( |
|
598 | 537 | repo_private=False, |
|
599 | 538 | repo_name=repo_name, |
|
600 | 539 | repo_type=backend.alias, |
|
601 | 540 | repo_description=description, |
|
602 | 541 | csrf_token=csrf_token)) |
|
603 | 542 | |
|
604 | 543 | assert_session_flash( |
|
605 | 544 | response, 'Error creating repository %s' % repo_name) |
|
606 | 545 | # repo must not be in db |
|
607 | 546 | assert backend.repo is None |
|
608 | 547 | # repo must not be in filesystem ! |
|
609 | 548 | assert not repo_on_filesystem(repo_name) |
|
610 | 549 | |
|
611 | 550 | def assert_repository_is_created_correctly( |
|
612 | 551 | self, repo_name, description, backend): |
|
613 | 552 | repo_name_utf8 = safe_str(repo_name) |
|
614 | 553 | |
|
615 | 554 | # run the check page that triggers the flash message |
|
616 | 555 | response = self.app.get(url('repo_check_home', repo_name=repo_name)) |
|
617 | 556 | assert response.json == {u'result': True} |
|
618 | 557 | |
|
619 | 558 | flash_msg = u'Created repository <a href="/{}">{}</a>'.format( |
|
620 | 559 | urllib.quote(repo_name_utf8), repo_name) |
|
621 | 560 | assert_session_flash(response, flash_msg) |
|
622 | 561 | |
|
623 | 562 | # test if the repo was created in the database |
|
624 | 563 | new_repo = RepoModel().get_by_repo_name(repo_name) |
|
625 | 564 | |
|
626 | 565 | assert new_repo.repo_name == repo_name |
|
627 | 566 | assert new_repo.description == description |
|
628 | 567 | |
|
629 | 568 | # test if the repository is visible in the list ? |
|
630 | 569 | response = self.app.get(url('summary_home', repo_name=repo_name)) |
|
631 | 570 | response.mustcontain(repo_name) |
|
632 | 571 | response.mustcontain(backend.alias) |
|
633 | 572 | |
|
634 | 573 | assert repo_on_filesystem(repo_name) |
|
635 | 574 | |
|
636 | 575 | |
|
637 | 576 | @pytest.mark.usefixtures("app") |
|
638 | 577 | class TestVcsSettings(object): |
|
639 | 578 | FORM_DATA = { |
|
640 | 579 | 'inherit_global_settings': False, |
|
641 | 580 | 'hooks_changegroup_repo_size': False, |
|
642 | 581 | 'hooks_changegroup_push_logger': False, |
|
643 | 582 | 'hooks_outgoing_pull_logger': False, |
|
644 | 583 | 'extensions_largefiles': False, |
|
645 | 584 | 'phases_publish': 'False', |
|
646 | 585 | 'rhodecode_pr_merge_enabled': False, |
|
647 | 586 | 'rhodecode_use_outdated_comments': False, |
|
648 | 587 | 'new_svn_branch': '', |
|
649 | 588 | 'new_svn_tag': '' |
|
650 | 589 | } |
|
651 | 590 | |
|
652 | 591 | @pytest.mark.skip_backends('svn') |
|
653 | 592 | def test_global_settings_initial_values(self, autologin_user, backend): |
|
654 | 593 | repo_name = backend.repo_name |
|
655 | 594 | response = self.app.get(url('repo_vcs_settings', repo_name=repo_name)) |
|
656 | 595 | |
|
657 | 596 | expected_settings = ( |
|
658 | 597 | 'rhodecode_use_outdated_comments', 'rhodecode_pr_merge_enabled', |
|
659 | 598 | 'hooks_changegroup_repo_size', 'hooks_changegroup_push_logger', |
|
660 | 599 | 'hooks_outgoing_pull_logger' |
|
661 | 600 | ) |
|
662 | 601 | for setting in expected_settings: |
|
663 | 602 | self.assert_repo_value_equals_global_value(response, setting) |
|
664 | 603 | |
|
665 | 604 | def test_show_settings_requires_repo_admin_permission( |
|
666 | 605 | self, backend, user_util, settings_util): |
|
667 | 606 | repo = backend.create_repo() |
|
668 | 607 | repo_name = repo.repo_name |
|
669 | 608 | user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN) |
|
670 | 609 | user_util.grant_user_permission_to_repo(repo, user, 'repository.admin') |
|
671 | 610 | login_user_session( |
|
672 | 611 | self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) |
|
673 | 612 | self.app.get(url('repo_vcs_settings', repo_name=repo_name), status=200) |
|
674 | 613 | |
|
675 | 614 | def test_inherit_global_settings_flag_is_true_by_default( |
|
676 | 615 | self, autologin_user, backend): |
|
677 | 616 | repo_name = backend.repo_name |
|
678 | 617 | response = self.app.get(url('repo_vcs_settings', repo_name=repo_name)) |
|
679 | 618 | |
|
680 | 619 | assert_response = AssertResponse(response) |
|
681 | 620 | element = assert_response.get_element('#inherit_global_settings') |
|
682 | 621 | assert element.checked |
|
683 | 622 | |
|
684 | 623 | @pytest.mark.parametrize('checked_value', [True, False]) |
|
685 | 624 | def test_inherit_global_settings_value( |
|
686 | 625 | self, autologin_user, backend, checked_value, settings_util): |
|
687 | 626 | repo = backend.create_repo() |
|
688 | 627 | repo_name = repo.repo_name |
|
689 | 628 | settings_util.create_repo_rhodecode_setting( |
|
690 | 629 | repo, 'inherit_vcs_settings', checked_value, 'bool') |
|
691 | 630 | response = self.app.get(url('repo_vcs_settings', repo_name=repo_name)) |
|
692 | 631 | |
|
693 | 632 | assert_response = AssertResponse(response) |
|
694 | 633 | element = assert_response.get_element('#inherit_global_settings') |
|
695 | 634 | assert element.checked == checked_value |
|
696 | 635 | |
|
697 | 636 | @pytest.mark.skip_backends('svn') |
|
698 | 637 | def test_hooks_settings_are_created( |
|
699 | 638 | self, autologin_user, backend, csrf_token): |
|
700 | 639 | repo_name = backend.repo_name |
|
701 | 640 | data = self.FORM_DATA.copy() |
|
702 | 641 | data['csrf_token'] = csrf_token |
|
703 | 642 | self.app.post( |
|
704 | 643 | url('repo_vcs_settings', repo_name=repo_name), data, status=302) |
|
705 | 644 | settings = SettingsModel(repo=repo_name) |
|
706 | 645 | try: |
|
707 | 646 | for section, key in VcsSettingsModel.HOOKS_SETTINGS: |
|
708 | 647 | ui = settings.get_ui_by_section_and_key(section, key) |
|
709 | 648 | assert ui.ui_active is False |
|
710 | 649 | finally: |
|
711 | 650 | self._cleanup_repo_settings(settings) |
|
712 | 651 | |
|
713 | 652 | def test_hooks_settings_are_not_created_for_svn( |
|
714 | 653 | self, autologin_user, backend_svn, csrf_token): |
|
715 | 654 | repo_name = backend_svn.repo_name |
|
716 | 655 | data = self.FORM_DATA.copy() |
|
717 | 656 | data['csrf_token'] = csrf_token |
|
718 | 657 | self.app.post( |
|
719 | 658 | url('repo_vcs_settings', repo_name=repo_name), data, status=302) |
|
720 | 659 | settings = SettingsModel(repo=repo_name) |
|
721 | 660 | try: |
|
722 | 661 | for section, key in VcsSettingsModel.HOOKS_SETTINGS: |
|
723 | 662 | ui = settings.get_ui_by_section_and_key(section, key) |
|
724 | 663 | assert ui is None |
|
725 | 664 | finally: |
|
726 | 665 | self._cleanup_repo_settings(settings) |
|
727 | 666 | |
|
728 | 667 | @pytest.mark.skip_backends('svn') |
|
729 | 668 | def test_hooks_settings_are_updated( |
|
730 | 669 | self, autologin_user, backend, csrf_token): |
|
731 | 670 | repo_name = backend.repo_name |
|
732 | 671 | settings = SettingsModel(repo=repo_name) |
|
733 | 672 | for section, key in VcsSettingsModel.HOOKS_SETTINGS: |
|
734 | 673 | settings.create_ui_section_value(section, '', key=key, active=True) |
|
735 | 674 | |
|
736 | 675 | data = self.FORM_DATA.copy() |
|
737 | 676 | data['csrf_token'] = csrf_token |
|
738 | 677 | self.app.post( |
|
739 | 678 | url('repo_vcs_settings', repo_name=repo_name), data, status=302) |
|
740 | 679 | try: |
|
741 | 680 | for section, key in VcsSettingsModel.HOOKS_SETTINGS: |
|
742 | 681 | ui = settings.get_ui_by_section_and_key(section, key) |
|
743 | 682 | assert ui.ui_active is False |
|
744 | 683 | finally: |
|
745 | 684 | self._cleanup_repo_settings(settings) |
|
746 | 685 | |
|
747 | 686 | def test_hooks_settings_are_not_updated_for_svn( |
|
748 | 687 | self, autologin_user, backend_svn, csrf_token): |
|
749 | 688 | repo_name = backend_svn.repo_name |
|
750 | 689 | settings = SettingsModel(repo=repo_name) |
|
751 | 690 | for section, key in VcsSettingsModel.HOOKS_SETTINGS: |
|
752 | 691 | settings.create_ui_section_value(section, '', key=key, active=True) |
|
753 | 692 | |
|
754 | 693 | data = self.FORM_DATA.copy() |
|
755 | 694 | data['csrf_token'] = csrf_token |
|
756 | 695 | self.app.post( |
|
757 | 696 | url('repo_vcs_settings', repo_name=repo_name), data, status=302) |
|
758 | 697 | try: |
|
759 | 698 | for section, key in VcsSettingsModel.HOOKS_SETTINGS: |
|
760 | 699 | ui = settings.get_ui_by_section_and_key(section, key) |
|
761 | 700 | assert ui.ui_active is True |
|
762 | 701 | finally: |
|
763 | 702 | self._cleanup_repo_settings(settings) |
|
764 | 703 | |
|
765 | 704 | @pytest.mark.skip_backends('svn') |
|
766 | 705 | def test_pr_settings_are_created( |
|
767 | 706 | self, autologin_user, backend, csrf_token): |
|
768 | 707 | repo_name = backend.repo_name |
|
769 | 708 | data = self.FORM_DATA.copy() |
|
770 | 709 | data['csrf_token'] = csrf_token |
|
771 | 710 | self.app.post( |
|
772 | 711 | url('repo_vcs_settings', repo_name=repo_name), data, status=302) |
|
773 | 712 | settings = SettingsModel(repo=repo_name) |
|
774 | 713 | try: |
|
775 | 714 | for name in VcsSettingsModel.GENERAL_SETTINGS: |
|
776 | 715 | setting = settings.get_setting_by_name(name) |
|
777 | 716 | assert setting.app_settings_value is False |
|
778 | 717 | finally: |
|
779 | 718 | self._cleanup_repo_settings(settings) |
|
780 | 719 | |
|
781 | 720 | def test_pr_settings_are_not_created_for_svn( |
|
782 | 721 | self, autologin_user, backend_svn, csrf_token): |
|
783 | 722 | repo_name = backend_svn.repo_name |
|
784 | 723 | data = self.FORM_DATA.copy() |
|
785 | 724 | data['csrf_token'] = csrf_token |
|
786 | 725 | self.app.post( |
|
787 | 726 | url('repo_vcs_settings', repo_name=repo_name), data, status=302) |
|
788 | 727 | settings = SettingsModel(repo=repo_name) |
|
789 | 728 | try: |
|
790 | 729 | for name in VcsSettingsModel.GENERAL_SETTINGS: |
|
791 | 730 | setting = settings.get_setting_by_name(name) |
|
792 | 731 | assert setting is None |
|
793 | 732 | finally: |
|
794 | 733 | self._cleanup_repo_settings(settings) |
|
795 | 734 | |
|
796 | 735 | def test_pr_settings_creation_requires_repo_admin_permission( |
|
797 | 736 | self, backend, user_util, settings_util, csrf_token): |
|
798 | 737 | repo = backend.create_repo() |
|
799 | 738 | repo_name = repo.repo_name |
|
800 | 739 | |
|
801 | 740 | logout_user_session(self.app, csrf_token) |
|
802 | 741 | session = login_user_session( |
|
803 | 742 | self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) |
|
804 | 743 | new_csrf_token = auth.get_csrf_token(session) |
|
805 | 744 | |
|
806 | 745 | user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN) |
|
807 | 746 | repo = Repository.get_by_repo_name(repo_name) |
|
808 | 747 | user_util.grant_user_permission_to_repo(repo, user, 'repository.admin') |
|
809 | 748 | data = self.FORM_DATA.copy() |
|
810 | 749 | data['csrf_token'] = new_csrf_token |
|
811 | 750 | settings = SettingsModel(repo=repo_name) |
|
812 | 751 | |
|
813 | 752 | try: |
|
814 | 753 | self.app.post( |
|
815 | 754 | url('repo_vcs_settings', repo_name=repo_name), data, |
|
816 | 755 | status=302) |
|
817 | 756 | finally: |
|
818 | 757 | self._cleanup_repo_settings(settings) |
|
819 | 758 | |
|
820 | 759 | @pytest.mark.skip_backends('svn') |
|
821 | 760 | def test_pr_settings_are_updated( |
|
822 | 761 | self, autologin_user, backend, csrf_token): |
|
823 | 762 | repo_name = backend.repo_name |
|
824 | 763 | settings = SettingsModel(repo=repo_name) |
|
825 | 764 | for name in VcsSettingsModel.GENERAL_SETTINGS: |
|
826 | 765 | settings.create_or_update_setting(name, True, 'bool') |
|
827 | 766 | |
|
828 | 767 | data = self.FORM_DATA.copy() |
|
829 | 768 | data['csrf_token'] = csrf_token |
|
830 | 769 | self.app.post( |
|
831 | 770 | url('repo_vcs_settings', repo_name=repo_name), data, status=302) |
|
832 | 771 | try: |
|
833 | 772 | for name in VcsSettingsModel.GENERAL_SETTINGS: |
|
834 | 773 | setting = settings.get_setting_by_name(name) |
|
835 | 774 | assert setting.app_settings_value is False |
|
836 | 775 | finally: |
|
837 | 776 | self._cleanup_repo_settings(settings) |
|
838 | 777 | |
|
839 | 778 | def test_pr_settings_are_not_updated_for_svn( |
|
840 | 779 | self, autologin_user, backend_svn, csrf_token): |
|
841 | 780 | repo_name = backend_svn.repo_name |
|
842 | 781 | settings = SettingsModel(repo=repo_name) |
|
843 | 782 | for name in VcsSettingsModel.GENERAL_SETTINGS: |
|
844 | 783 | settings.create_or_update_setting(name, True, 'bool') |
|
845 | 784 | |
|
846 | 785 | data = self.FORM_DATA.copy() |
|
847 | 786 | data['csrf_token'] = csrf_token |
|
848 | 787 | self.app.post( |
|
849 | 788 | url('repo_vcs_settings', repo_name=repo_name), data, status=302) |
|
850 | 789 | try: |
|
851 | 790 | for name in VcsSettingsModel.GENERAL_SETTINGS: |
|
852 | 791 | setting = settings.get_setting_by_name(name) |
|
853 | 792 | assert setting.app_settings_value is True |
|
854 | 793 | finally: |
|
855 | 794 | self._cleanup_repo_settings(settings) |
|
856 | 795 | |
|
857 | 796 | def test_svn_settings_are_created( |
|
858 | 797 | self, autologin_user, backend_svn, csrf_token, settings_util): |
|
859 | 798 | repo_name = backend_svn.repo_name |
|
860 | 799 | data = self.FORM_DATA.copy() |
|
861 | 800 | data['new_svn_tag'] = 'svn-tag' |
|
862 | 801 | data['new_svn_branch'] = 'svn-branch' |
|
863 | 802 | data['csrf_token'] = csrf_token |
|
864 | 803 | |
|
865 | 804 | # Create few global settings to make sure that uniqueness validators |
|
866 | 805 | # are not triggered |
|
867 | 806 | settings_util.create_rhodecode_ui( |
|
868 | 807 | VcsSettingsModel.SVN_BRANCH_SECTION, 'svn-branch') |
|
869 | 808 | settings_util.create_rhodecode_ui( |
|
870 | 809 | VcsSettingsModel.SVN_TAG_SECTION, 'svn-tag') |
|
871 | 810 | |
|
872 | 811 | self.app.post( |
|
873 | 812 | url('repo_vcs_settings', repo_name=repo_name), data, status=302) |
|
874 | 813 | settings = SettingsModel(repo=repo_name) |
|
875 | 814 | try: |
|
876 | 815 | svn_branches = settings.get_ui_by_section( |
|
877 | 816 | VcsSettingsModel.SVN_BRANCH_SECTION) |
|
878 | 817 | svn_branch_names = [b.ui_value for b in svn_branches] |
|
879 | 818 | svn_tags = settings.get_ui_by_section( |
|
880 | 819 | VcsSettingsModel.SVN_TAG_SECTION) |
|
881 | 820 | svn_tag_names = [b.ui_value for b in svn_tags] |
|
882 | 821 | assert 'svn-branch' in svn_branch_names |
|
883 | 822 | assert 'svn-tag' in svn_tag_names |
|
884 | 823 | finally: |
|
885 | 824 | self._cleanup_repo_settings(settings) |
|
886 | 825 | |
|
887 | 826 | def test_svn_settings_are_unique( |
|
888 | 827 | self, autologin_user, backend_svn, csrf_token, settings_util): |
|
889 | 828 | repo = backend_svn.repo |
|
890 | 829 | repo_name = repo.repo_name |
|
891 | 830 | data = self.FORM_DATA.copy() |
|
892 | 831 | data['new_svn_tag'] = 'test_tag' |
|
893 | 832 | data['new_svn_branch'] = 'test_branch' |
|
894 | 833 | data['csrf_token'] = csrf_token |
|
895 | 834 | settings_util.create_repo_rhodecode_ui( |
|
896 | 835 | repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch') |
|
897 | 836 | settings_util.create_repo_rhodecode_ui( |
|
898 | 837 | repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag') |
|
899 | 838 | |
|
900 | 839 | response = self.app.post( |
|
901 | 840 | url('repo_vcs_settings', repo_name=repo_name), data, status=200) |
|
902 | 841 | response.mustcontain('Pattern already exists') |
|
903 | 842 | |
|
904 | 843 | def test_svn_settings_with_empty_values_are_not_created( |
|
905 | 844 | self, autologin_user, backend_svn, csrf_token): |
|
906 | 845 | repo_name = backend_svn.repo_name |
|
907 | 846 | data = self.FORM_DATA.copy() |
|
908 | 847 | data['csrf_token'] = csrf_token |
|
909 | 848 | self.app.post( |
|
910 | 849 | url('repo_vcs_settings', repo_name=repo_name), data, status=302) |
|
911 | 850 | settings = SettingsModel(repo=repo_name) |
|
912 | 851 | try: |
|
913 | 852 | svn_branches = settings.get_ui_by_section( |
|
914 | 853 | VcsSettingsModel.SVN_BRANCH_SECTION) |
|
915 | 854 | svn_tags = settings.get_ui_by_section( |
|
916 | 855 | VcsSettingsModel.SVN_TAG_SECTION) |
|
917 | 856 | assert len(svn_branches) == 0 |
|
918 | 857 | assert len(svn_tags) == 0 |
|
919 | 858 | finally: |
|
920 | 859 | self._cleanup_repo_settings(settings) |
|
921 | 860 | |
|
922 | 861 | def test_svn_settings_are_shown_for_svn_repository( |
|
923 | 862 | self, autologin_user, backend_svn, csrf_token): |
|
924 | 863 | repo_name = backend_svn.repo_name |
|
925 | 864 | response = self.app.get( |
|
926 | 865 | url('repo_vcs_settings', repo_name=repo_name), status=200) |
|
927 | 866 | response.mustcontain('Subversion Settings') |
|
928 | 867 | |
|
929 | 868 | @pytest.mark.skip_backends('svn') |
|
930 | 869 | def test_svn_settings_are_not_created_for_not_svn_repository( |
|
931 | 870 | self, autologin_user, backend, csrf_token): |
|
932 | 871 | repo_name = backend.repo_name |
|
933 | 872 | data = self.FORM_DATA.copy() |
|
934 | 873 | data['csrf_token'] = csrf_token |
|
935 | 874 | self.app.post( |
|
936 | 875 | url('repo_vcs_settings', repo_name=repo_name), data, status=302) |
|
937 | 876 | settings = SettingsModel(repo=repo_name) |
|
938 | 877 | try: |
|
939 | 878 | svn_branches = settings.get_ui_by_section( |
|
940 | 879 | VcsSettingsModel.SVN_BRANCH_SECTION) |
|
941 | 880 | svn_tags = settings.get_ui_by_section( |
|
942 | 881 | VcsSettingsModel.SVN_TAG_SECTION) |
|
943 | 882 | assert len(svn_branches) == 0 |
|
944 | 883 | assert len(svn_tags) == 0 |
|
945 | 884 | finally: |
|
946 | 885 | self._cleanup_repo_settings(settings) |
|
947 | 886 | |
|
948 | 887 | @pytest.mark.skip_backends('svn') |
|
949 | 888 | def test_svn_settings_are_shown_only_for_svn_repository( |
|
950 | 889 | self, autologin_user, backend, csrf_token): |
|
951 | 890 | repo_name = backend.repo_name |
|
952 | 891 | response = self.app.get( |
|
953 | 892 | url('repo_vcs_settings', repo_name=repo_name), status=200) |
|
954 | 893 | response.mustcontain(no='Subversion Settings') |
|
955 | 894 | |
|
956 | 895 | def test_hg_settings_are_created( |
|
957 | 896 | self, autologin_user, backend_hg, csrf_token): |
|
958 | 897 | repo_name = backend_hg.repo_name |
|
959 | 898 | data = self.FORM_DATA.copy() |
|
960 | 899 | data['new_svn_tag'] = 'svn-tag' |
|
961 | 900 | data['new_svn_branch'] = 'svn-branch' |
|
962 | 901 | data['csrf_token'] = csrf_token |
|
963 | 902 | self.app.post( |
|
964 | 903 | url('repo_vcs_settings', repo_name=repo_name), data, status=302) |
|
965 | 904 | settings = SettingsModel(repo=repo_name) |
|
966 | 905 | try: |
|
967 | 906 | largefiles_ui = settings.get_ui_by_section_and_key( |
|
968 | 907 | 'extensions', 'largefiles') |
|
969 | 908 | assert largefiles_ui.ui_active is False |
|
970 | 909 | phases_ui = settings.get_ui_by_section_and_key( |
|
971 | 910 | 'phases', 'publish') |
|
972 | 911 | assert str2bool(phases_ui.ui_value) is False |
|
973 | 912 | finally: |
|
974 | 913 | self._cleanup_repo_settings(settings) |
|
975 | 914 | |
|
976 | 915 | def test_hg_settings_are_updated( |
|
977 | 916 | self, autologin_user, backend_hg, csrf_token): |
|
978 | 917 | repo_name = backend_hg.repo_name |
|
979 | 918 | settings = SettingsModel(repo=repo_name) |
|
980 | 919 | settings.create_ui_section_value( |
|
981 | 920 | 'extensions', '', key='largefiles', active=True) |
|
982 | 921 | settings.create_ui_section_value( |
|
983 | 922 | 'phases', '1', key='publish', active=True) |
|
984 | 923 | |
|
985 | 924 | data = self.FORM_DATA.copy() |
|
986 | 925 | data['csrf_token'] = csrf_token |
|
987 | 926 | self.app.post( |
|
988 | 927 | url('repo_vcs_settings', repo_name=repo_name), data, status=302) |
|
989 | 928 | try: |
|
990 | 929 | largefiles_ui = settings.get_ui_by_section_and_key( |
|
991 | 930 | 'extensions', 'largefiles') |
|
992 | 931 | assert largefiles_ui.ui_active is False |
|
993 | 932 | phases_ui = settings.get_ui_by_section_and_key( |
|
994 | 933 | 'phases', 'publish') |
|
995 | 934 | assert str2bool(phases_ui.ui_value) is False |
|
996 | 935 | finally: |
|
997 | 936 | self._cleanup_repo_settings(settings) |
|
998 | 937 | |
|
999 | 938 | def test_hg_settings_are_shown_for_hg_repository( |
|
1000 | 939 | self, autologin_user, backend_hg, csrf_token): |
|
1001 | 940 | repo_name = backend_hg.repo_name |
|
1002 | 941 | response = self.app.get( |
|
1003 | 942 | url('repo_vcs_settings', repo_name=repo_name), status=200) |
|
1004 | 943 | response.mustcontain('Mercurial Settings') |
|
1005 | 944 | |
|
1006 | 945 | @pytest.mark.skip_backends('hg') |
|
1007 | 946 | def test_hg_settings_are_created_only_for_hg_repository( |
|
1008 | 947 | self, autologin_user, backend, csrf_token): |
|
1009 | 948 | repo_name = backend.repo_name |
|
1010 | 949 | data = self.FORM_DATA.copy() |
|
1011 | 950 | data['csrf_token'] = csrf_token |
|
1012 | 951 | self.app.post( |
|
1013 | 952 | url('repo_vcs_settings', repo_name=repo_name), data, status=302) |
|
1014 | 953 | settings = SettingsModel(repo=repo_name) |
|
1015 | 954 | try: |
|
1016 | 955 | largefiles_ui = settings.get_ui_by_section_and_key( |
|
1017 | 956 | 'extensions', 'largefiles') |
|
1018 | 957 | assert largefiles_ui is None |
|
1019 | 958 | phases_ui = settings.get_ui_by_section_and_key( |
|
1020 | 959 | 'phases', 'publish') |
|
1021 | 960 | assert phases_ui is None |
|
1022 | 961 | finally: |
|
1023 | 962 | self._cleanup_repo_settings(settings) |
|
1024 | 963 | |
|
1025 | 964 | @pytest.mark.skip_backends('hg') |
|
1026 | 965 | def test_hg_settings_are_shown_only_for_hg_repository( |
|
1027 | 966 | self, autologin_user, backend, csrf_token): |
|
1028 | 967 | repo_name = backend.repo_name |
|
1029 | 968 | response = self.app.get( |
|
1030 | 969 | url('repo_vcs_settings', repo_name=repo_name), status=200) |
|
1031 | 970 | response.mustcontain(no='Mercurial Settings') |
|
1032 | 971 | |
|
1033 | 972 | @pytest.mark.skip_backends('hg') |
|
1034 | 973 | def test_hg_settings_are_updated_only_for_hg_repository( |
|
1035 | 974 | self, autologin_user, backend, csrf_token): |
|
1036 | 975 | repo_name = backend.repo_name |
|
1037 | 976 | settings = SettingsModel(repo=repo_name) |
|
1038 | 977 | settings.create_ui_section_value( |
|
1039 | 978 | 'extensions', '', key='largefiles', active=True) |
|
1040 | 979 | settings.create_ui_section_value( |
|
1041 | 980 | 'phases', '1', key='publish', active=True) |
|
1042 | 981 | |
|
1043 | 982 | data = self.FORM_DATA.copy() |
|
1044 | 983 | data['csrf_token'] = csrf_token |
|
1045 | 984 | self.app.post( |
|
1046 | 985 | url('repo_vcs_settings', repo_name=repo_name), data, status=302) |
|
1047 | 986 | try: |
|
1048 | 987 | largefiles_ui = settings.get_ui_by_section_and_key( |
|
1049 | 988 | 'extensions', 'largefiles') |
|
1050 | 989 | assert largefiles_ui.ui_active is True |
|
1051 | 990 | phases_ui = settings.get_ui_by_section_and_key( |
|
1052 | 991 | 'phases', 'publish') |
|
1053 | 992 | assert phases_ui.ui_value == '1' |
|
1054 | 993 | finally: |
|
1055 | 994 | self._cleanup_repo_settings(settings) |
|
1056 | 995 | |
|
1057 | 996 | def test_per_repo_svn_settings_are_displayed( |
|
1058 | 997 | self, autologin_user, backend_svn, settings_util): |
|
1059 | 998 | repo = backend_svn.create_repo() |
|
1060 | 999 | repo_name = repo.repo_name |
|
1061 | 1000 | branches = [ |
|
1062 | 1001 | settings_util.create_repo_rhodecode_ui( |
|
1063 | 1002 | repo, VcsSettingsModel.SVN_BRANCH_SECTION, |
|
1064 | 1003 | 'branch_{}'.format(i)) |
|
1065 | 1004 | for i in range(10)] |
|
1066 | 1005 | tags = [ |
|
1067 | 1006 | settings_util.create_repo_rhodecode_ui( |
|
1068 | 1007 | repo, VcsSettingsModel.SVN_TAG_SECTION, 'tag_{}'.format(i)) |
|
1069 | 1008 | for i in range(10)] |
|
1070 | 1009 | |
|
1071 | 1010 | response = self.app.get( |
|
1072 | 1011 | url('repo_vcs_settings', repo_name=repo_name), status=200) |
|
1073 | 1012 | assert_response = AssertResponse(response) |
|
1074 | 1013 | for branch in branches: |
|
1075 | 1014 | css_selector = '[name=branch_value_{}]'.format(branch.ui_id) |
|
1076 | 1015 | element = assert_response.get_element(css_selector) |
|
1077 | 1016 | assert element.value == branch.ui_value |
|
1078 | 1017 | for tag in tags: |
|
1079 | 1018 | css_selector = '[name=tag_ui_value_new_{}]'.format(tag.ui_id) |
|
1080 | 1019 | element = assert_response.get_element(css_selector) |
|
1081 | 1020 | assert element.value == tag.ui_value |
|
1082 | 1021 | |
|
1083 | 1022 | def test_per_repo_hg_and_pr_settings_are_not_displayed_for_svn( |
|
1084 | 1023 | self, autologin_user, backend_svn, settings_util): |
|
1085 | 1024 | repo = backend_svn.create_repo() |
|
1086 | 1025 | repo_name = repo.repo_name |
|
1087 | 1026 | response = self.app.get( |
|
1088 | 1027 | url('repo_vcs_settings', repo_name=repo_name), status=200) |
|
1089 | 1028 | response.mustcontain(no='<label>Hooks:</label>') |
|
1090 | 1029 | response.mustcontain(no='<label>Pull Request Settings:</label>') |
|
1091 | 1030 | |
|
1092 | 1031 | def test_inherit_global_settings_value_is_saved( |
|
1093 | 1032 | self, autologin_user, backend, csrf_token): |
|
1094 | 1033 | repo_name = backend.repo_name |
|
1095 | 1034 | data = self.FORM_DATA.copy() |
|
1096 | 1035 | data['csrf_token'] = csrf_token |
|
1097 | 1036 | data['inherit_global_settings'] = True |
|
1098 | 1037 | self.app.post( |
|
1099 | 1038 | url('repo_vcs_settings', repo_name=repo_name), data, status=302) |
|
1100 | 1039 | |
|
1101 | 1040 | settings = SettingsModel(repo=repo_name) |
|
1102 | 1041 | vcs_settings = VcsSettingsModel(repo=repo_name) |
|
1103 | 1042 | try: |
|
1104 | 1043 | assert vcs_settings.inherit_global_settings is True |
|
1105 | 1044 | finally: |
|
1106 | 1045 | self._cleanup_repo_settings(settings) |
|
1107 | 1046 | |
|
1108 | 1047 | def test_repo_cache_is_invalidated_when_settings_are_updated( |
|
1109 | 1048 | self, autologin_user, backend, csrf_token): |
|
1110 | 1049 | repo_name = backend.repo_name |
|
1111 | 1050 | data = self.FORM_DATA.copy() |
|
1112 | 1051 | data['csrf_token'] = csrf_token |
|
1113 | 1052 | data['inherit_global_settings'] = True |
|
1114 | 1053 | settings = SettingsModel(repo=repo_name) |
|
1115 | 1054 | |
|
1116 | 1055 | invalidation_patcher = mock.patch( |
|
1117 | 1056 | 'rhodecode.controllers.admin.repos.ScmModel.mark_for_invalidation') |
|
1118 | 1057 | with invalidation_patcher as invalidation_mock: |
|
1119 | 1058 | self.app.post( |
|
1120 | 1059 | url('repo_vcs_settings', repo_name=repo_name), data, |
|
1121 | 1060 | status=302) |
|
1122 | 1061 | try: |
|
1123 | 1062 | invalidation_mock.assert_called_once_with(repo_name, delete=True) |
|
1124 | 1063 | finally: |
|
1125 | 1064 | self._cleanup_repo_settings(settings) |
|
1126 | 1065 | |
|
1127 | 1066 | def test_other_settings_not_saved_inherit_global_settings_is_true( |
|
1128 | 1067 | self, autologin_user, backend, csrf_token): |
|
1129 | 1068 | repo_name = backend.repo_name |
|
1130 | 1069 | data = self.FORM_DATA.copy() |
|
1131 | 1070 | data['csrf_token'] = csrf_token |
|
1132 | 1071 | data['inherit_global_settings'] = True |
|
1133 | 1072 | self.app.post( |
|
1134 | 1073 | url('repo_vcs_settings', repo_name=repo_name), data, status=302) |
|
1135 | 1074 | |
|
1136 | 1075 | settings = SettingsModel(repo=repo_name) |
|
1137 | 1076 | ui_settings = ( |
|
1138 | 1077 | VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS) |
|
1139 | 1078 | |
|
1140 | 1079 | vcs_settings = [] |
|
1141 | 1080 | try: |
|
1142 | 1081 | for section, key in ui_settings: |
|
1143 | 1082 | ui = settings.get_ui_by_section_and_key(section, key) |
|
1144 | 1083 | if ui: |
|
1145 | 1084 | vcs_settings.append(ui) |
|
1146 | 1085 | vcs_settings.extend(settings.get_ui_by_section( |
|
1147 | 1086 | VcsSettingsModel.SVN_BRANCH_SECTION)) |
|
1148 | 1087 | vcs_settings.extend(settings.get_ui_by_section( |
|
1149 | 1088 | VcsSettingsModel.SVN_TAG_SECTION)) |
|
1150 | 1089 | for name in VcsSettingsModel.GENERAL_SETTINGS: |
|
1151 | 1090 | setting = settings.get_setting_by_name(name) |
|
1152 | 1091 | if setting: |
|
1153 | 1092 | vcs_settings.append(setting) |
|
1154 | 1093 | assert vcs_settings == [] |
|
1155 | 1094 | finally: |
|
1156 | 1095 | self._cleanup_repo_settings(settings) |
|
1157 | 1096 | |
|
1158 | 1097 | def test_delete_svn_branch_and_tag_patterns( |
|
1159 | 1098 | self, autologin_user, backend_svn, settings_util, csrf_token): |
|
1160 | 1099 | repo = backend_svn.create_repo() |
|
1161 | 1100 | repo_name = repo.repo_name |
|
1162 | 1101 | branch = settings_util.create_repo_rhodecode_ui( |
|
1163 | 1102 | repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch', |
|
1164 | 1103 | cleanup=False) |
|
1165 | 1104 | tag = settings_util.create_repo_rhodecode_ui( |
|
1166 | 1105 | repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag', cleanup=False) |
|
1167 | 1106 | data = { |
|
1168 | 1107 | '_method': 'delete', |
|
1169 | 1108 | 'csrf_token': csrf_token |
|
1170 | 1109 | } |
|
1171 | 1110 | for id_ in (branch.ui_id, tag.ui_id): |
|
1172 | 1111 | data['delete_svn_pattern'] = id_, |
|
1173 | 1112 | self.app.post( |
|
1174 | 1113 | url('repo_vcs_settings', repo_name=repo_name), data, |
|
1175 | 1114 | headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200) |
|
1176 | 1115 | settings = VcsSettingsModel(repo=repo_name) |
|
1177 | 1116 | assert settings.get_repo_svn_branch_patterns() == [] |
|
1178 | 1117 | |
|
1179 | 1118 | def test_delete_svn_branch_requires_repo_admin_permission( |
|
1180 | 1119 | self, backend_svn, user_util, settings_util, csrf_token): |
|
1181 | 1120 | repo = backend_svn.create_repo() |
|
1182 | 1121 | repo_name = repo.repo_name |
|
1183 | 1122 | |
|
1184 | 1123 | logout_user_session(self.app, csrf_token) |
|
1185 | 1124 | session = login_user_session( |
|
1186 | 1125 | self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) |
|
1187 | 1126 | csrf_token = auth.get_csrf_token(session) |
|
1188 | 1127 | |
|
1189 | 1128 | repo = Repository.get_by_repo_name(repo_name) |
|
1190 | 1129 | user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN) |
|
1191 | 1130 | user_util.grant_user_permission_to_repo(repo, user, 'repository.admin') |
|
1192 | 1131 | branch = settings_util.create_repo_rhodecode_ui( |
|
1193 | 1132 | repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch', |
|
1194 | 1133 | cleanup=False) |
|
1195 | 1134 | data = { |
|
1196 | 1135 | '_method': 'delete', |
|
1197 | 1136 | 'csrf_token': csrf_token, |
|
1198 | 1137 | 'delete_svn_pattern': branch.ui_id |
|
1199 | 1138 | } |
|
1200 | 1139 | self.app.post( |
|
1201 | 1140 | url('repo_vcs_settings', repo_name=repo_name), data, |
|
1202 | 1141 | headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200) |
|
1203 | 1142 | |
|
1204 | 1143 | def test_delete_svn_branch_raises_400_when_not_found( |
|
1205 | 1144 | self, autologin_user, backend_svn, settings_util, csrf_token): |
|
1206 | 1145 | repo_name = backend_svn.repo_name |
|
1207 | 1146 | data = { |
|
1208 | 1147 | '_method': 'delete', |
|
1209 | 1148 | 'delete_svn_pattern': 123, |
|
1210 | 1149 | 'csrf_token': csrf_token |
|
1211 | 1150 | } |
|
1212 | 1151 | self.app.post( |
|
1213 | 1152 | url('repo_vcs_settings', repo_name=repo_name), data, |
|
1214 | 1153 | headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400) |
|
1215 | 1154 | |
|
1216 | 1155 | def test_delete_svn_branch_raises_400_when_no_id_specified( |
|
1217 | 1156 | self, autologin_user, backend_svn, settings_util, csrf_token): |
|
1218 | 1157 | repo_name = backend_svn.repo_name |
|
1219 | 1158 | data = { |
|
1220 | 1159 | '_method': 'delete', |
|
1221 | 1160 | 'csrf_token': csrf_token |
|
1222 | 1161 | } |
|
1223 | 1162 | self.app.post( |
|
1224 | 1163 | url('repo_vcs_settings', repo_name=repo_name), data, |
|
1225 | 1164 | headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400) |
|
1226 | 1165 | |
|
1227 | 1166 | def _cleanup_repo_settings(self, settings_model): |
|
1228 | 1167 | cleanup = [] |
|
1229 | 1168 | ui_settings = ( |
|
1230 | 1169 | VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS) |
|
1231 | 1170 | |
|
1232 | 1171 | for section, key in ui_settings: |
|
1233 | 1172 | ui = settings_model.get_ui_by_section_and_key(section, key) |
|
1234 | 1173 | if ui: |
|
1235 | 1174 | cleanup.append(ui) |
|
1236 | 1175 | |
|
1237 | 1176 | cleanup.extend(settings_model.get_ui_by_section( |
|
1238 | 1177 | VcsSettingsModel.INHERIT_SETTINGS)) |
|
1239 | 1178 | cleanup.extend(settings_model.get_ui_by_section( |
|
1240 | 1179 | VcsSettingsModel.SVN_BRANCH_SECTION)) |
|
1241 | 1180 | cleanup.extend(settings_model.get_ui_by_section( |
|
1242 | 1181 | VcsSettingsModel.SVN_TAG_SECTION)) |
|
1243 | 1182 | |
|
1244 | 1183 | for name in VcsSettingsModel.GENERAL_SETTINGS: |
|
1245 | 1184 | setting = settings_model.get_setting_by_name(name) |
|
1246 | 1185 | if setting: |
|
1247 | 1186 | cleanup.append(setting) |
|
1248 | 1187 | |
|
1249 | 1188 | for object_ in cleanup: |
|
1250 | 1189 | Session().delete(object_) |
|
1251 | 1190 | Session().commit() |
|
1252 | 1191 | |
|
1253 | 1192 | def assert_repo_value_equals_global_value(self, response, setting): |
|
1254 | 1193 | assert_response = AssertResponse(response) |
|
1255 | 1194 | global_css_selector = '[name={}_inherited]'.format(setting) |
|
1256 | 1195 | repo_css_selector = '[name={}]'.format(setting) |
|
1257 | 1196 | repo_element = assert_response.get_element(repo_css_selector) |
|
1258 | 1197 | global_element = assert_response.get_element(global_css_selector) |
|
1259 | 1198 | assert repo_element.value == global_element.value |
|
1260 | 1199 | |
|
1261 | 1200 | |
|
1262 | 1201 | def _get_permission_for_user(user, repo): |
|
1263 | 1202 | perm = UserRepoToPerm.query()\ |
|
1264 | 1203 | .filter(UserRepoToPerm.repository == |
|
1265 | 1204 | Repository.get_by_repo_name(repo))\ |
|
1266 | 1205 | .filter(UserRepoToPerm.user == User.get_by_username(user))\ |
|
1267 | 1206 | .all() |
|
1268 | 1207 | return perm |
@@ -1,116 +1,116 b'' | |||
|
1 | 1 | # -*- coding: utf-8 -*- |
|
2 | 2 | |
|
3 | 3 | # Copyright (C) 2016-2017 RhodeCode GmbH |
|
4 | 4 | # |
|
5 | 5 | # This program is free software: you can redistribute it and/or modify |
|
6 | 6 | # it under the terms of the GNU Affero General Public License, version 3 |
|
7 | 7 | # (only), as published by the Free Software Foundation. |
|
8 | 8 | # |
|
9 | 9 | # This program is distributed in the hope that it will be useful, |
|
10 | 10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 | 11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 | 12 | # GNU General Public License for more details. |
|
13 | 13 | # |
|
14 | 14 | # You should have received a copy of the GNU Affero General Public License |
|
15 | 15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | 16 | # |
|
17 | 17 | # This program is dual-licensed. If you wish to learn more about the |
|
18 | 18 | # RhodeCode Enterprise Edition, including its added features, Support services, |
|
19 | 19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ |
|
20 | 20 | |
|
21 | 21 | import colander |
|
22 | 22 | import pytest |
|
23 | 23 | |
|
24 | 24 | from rhodecode.model.validation_schema import types |
|
25 | 25 | from rhodecode.model.validation_schema.schemas import repo_group_schema |
|
26 | 26 | |
|
27 | 27 | |
|
28 | 28 | class TestRepoGroupSchema(object): |
|
29 | 29 | |
|
30 | 30 | @pytest.mark.parametrize('given, expected', [ |
|
31 | 31 | ('my repo', 'my-repo'), |
|
32 | 32 | (' hello world mike ', 'hello-world-mike'), |
|
33 | 33 | |
|
34 | 34 | ('//group1/group2//', 'group1/group2'), |
|
35 | 35 | ('//group1///group2//', 'group1/group2'), |
|
36 | 36 | ('///group1/group2///group3', 'group1/group2/group3'), |
|
37 | 37 | ('word g1/group2///group3', 'word-g1/group2/group3'), |
|
38 | 38 | |
|
39 | 39 | ('grou p1/gro;,,##up2//.../group3', 'grou-p1/group2/group3'), |
|
40 | 40 | |
|
41 | 41 | ('group,,,/,,,/1/2/3', 'group/1/2/3'), |
|
42 | 42 | ('grou[]p1/gro;up2///gro up3', 'group1/group2/gro-up3'), |
|
43 | 43 | (u'grou[]p1/gro;up2///gro up3/ąć', u'group1/group2/gro-up3/ąć'), |
|
44 | 44 | ]) |
|
45 | 45 | def test_deserialize_repo_name(self, app, user_admin, given, expected): |
|
46 | 46 | schema = repo_group_schema.RepoGroupSchema().bind() |
|
47 | 47 | assert schema.get('repo_group_name').deserialize(given) == expected |
|
48 | 48 | |
|
49 | 49 | def test_deserialize(self, app, user_admin): |
|
50 | 50 | schema = repo_group_schema.RepoGroupSchema().bind( |
|
51 | 51 | user=user_admin |
|
52 | 52 | ) |
|
53 | 53 | |
|
54 | 54 | schema_data = schema.deserialize(dict( |
|
55 |
repo_group_name=' |
|
|
55 | repo_group_name='my_schema_group', | |
|
56 | 56 | repo_group_owner=user_admin.username |
|
57 | 57 | )) |
|
58 | 58 | |
|
59 |
assert schema_data['repo_group_name'] == ' |
|
|
59 | assert schema_data['repo_group_name'] == u'my_schema_group' | |
|
60 | 60 | assert schema_data['repo_group'] == { |
|
61 | 61 | 'repo_group_id': None, |
|
62 | 62 | 'repo_group_name': types.RootLocation, |
|
63 |
'repo_group_name_without_group': ' |
|
|
63 | 'repo_group_name_without_group': u'my_schema_group'} | |
|
64 | 64 | |
|
65 | 65 | @pytest.mark.parametrize('given, err_key, expected_exc', [ |
|
66 |
('xxx/ |
|
|
66 | ('xxx/my_schema_group', 'repo_group', 'Parent repository group `xxx` does not exist'), | |
|
67 | 67 | ('', 'repo_group_name', 'Name must start with a letter or number. Got ``'), |
|
68 | 68 | ]) |
|
69 | 69 | def test_deserialize_with_bad_group_name( |
|
70 | 70 | self, app, user_admin, given, err_key, expected_exc): |
|
71 | 71 | schema = repo_group_schema.RepoGroupSchema().bind( |
|
72 | 72 | repo_type_options=['hg'], |
|
73 | 73 | user=user_admin |
|
74 | 74 | ) |
|
75 | 75 | |
|
76 | 76 | with pytest.raises(colander.Invalid) as excinfo: |
|
77 | 77 | schema.deserialize(dict( |
|
78 | 78 | repo_group_name=given, |
|
79 | 79 | repo_group_owner=user_admin.username |
|
80 | 80 | )) |
|
81 | 81 | |
|
82 | 82 | assert excinfo.value.asdict()[err_key] == expected_exc |
|
83 | 83 | |
|
84 | 84 | def test_deserialize_with_group_name(self, app, user_admin, test_repo_group): |
|
85 | 85 | schema = repo_group_schema.RepoGroupSchema().bind( |
|
86 | 86 | user=user_admin |
|
87 | 87 | ) |
|
88 | 88 | |
|
89 |
full_name = test_repo_group.group_name + '/ |
|
|
89 | full_name = test_repo_group.group_name + u'/my_schema_group' | |
|
90 | 90 | schema_data = schema.deserialize(dict( |
|
91 | 91 | repo_group_name=full_name, |
|
92 | 92 | repo_group_owner=user_admin.username |
|
93 | 93 | )) |
|
94 | 94 | |
|
95 | 95 | assert schema_data['repo_group_name'] == full_name |
|
96 | 96 | assert schema_data['repo_group'] == { |
|
97 | 97 | 'repo_group_id': test_repo_group.group_id, |
|
98 | 98 | 'repo_group_name': test_repo_group.group_name, |
|
99 |
'repo_group_name_without_group': ' |
|
|
99 | 'repo_group_name_without_group': u'my_schema_group'} | |
|
100 | 100 | |
|
101 | 101 | def test_deserialize_with_group_name_regular_user_no_perms( |
|
102 | 102 | self, app, user_regular, test_repo_group): |
|
103 | 103 | schema = repo_group_schema.RepoGroupSchema().bind( |
|
104 | 104 | user=user_regular |
|
105 | 105 | ) |
|
106 | 106 | |
|
107 |
full_name = test_repo_group.group_name + '/ |
|
|
107 | full_name = test_repo_group.group_name + u'/my_schema_group' | |
|
108 | 108 | with pytest.raises(colander.Invalid) as excinfo: |
|
109 | 109 | schema.deserialize(dict( |
|
110 | 110 | repo_group_name=full_name, |
|
111 | 111 | repo_group_owner=user_regular.username |
|
112 | 112 | )) |
|
113 | 113 | |
|
114 | 114 | expected = 'Parent repository group `{}` does not exist'.format( |
|
115 | 115 | test_repo_group.group_name) |
|
116 | 116 | assert excinfo.value.asdict()['repo_group'] == expected |
@@ -1,128 +1,130 b'' | |||
|
1 | 1 | # -*- coding: utf-8 -*- |
|
2 | 2 | |
|
3 | 3 | # Copyright (C) 2016-2017 RhodeCode GmbH |
|
4 | 4 | # |
|
5 | 5 | # This program is free software: you can redistribute it and/or modify |
|
6 | 6 | # it under the terms of the GNU Affero General Public License, version 3 |
|
7 | 7 | # (only), as published by the Free Software Foundation. |
|
8 | 8 | # |
|
9 | 9 | # This program is distributed in the hope that it will be useful, |
|
10 | 10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 | 11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 | 12 | # GNU General Public License for more details. |
|
13 | 13 | # |
|
14 | 14 | # You should have received a copy of the GNU Affero General Public License |
|
15 | 15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | 16 | # |
|
17 | 17 | # This program is dual-licensed. If you wish to learn more about the |
|
18 | 18 | # RhodeCode Enterprise Edition, including its added features, Support services, |
|
19 | 19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ |
|
20 | 20 | |
|
21 | 21 | import colander |
|
22 | 22 | import pytest |
|
23 | 23 | |
|
24 | 24 | from rhodecode.model.validation_schema import types |
|
25 | 25 | from rhodecode.model.validation_schema.schemas import repo_schema |
|
26 | 26 | |
|
27 | 27 | |
|
28 | 28 | class TestRepoSchema(object): |
|
29 | 29 | |
|
30 | 30 | #TODO: |
|
31 | 31 | # test nested groups |
|
32 | 32 | |
|
33 | 33 | @pytest.mark.parametrize('given, expected', [ |
|
34 | 34 | ('my repo', 'my-repo'), |
|
35 | 35 | (' hello world mike ', 'hello-world-mike'), |
|
36 | 36 | |
|
37 | 37 | ('//group1/group2//', 'group1/group2'), |
|
38 | 38 | ('//group1///group2//', 'group1/group2'), |
|
39 | 39 | ('///group1/group2///group3', 'group1/group2/group3'), |
|
40 | 40 | ('word g1/group2///group3', 'word-g1/group2/group3'), |
|
41 | 41 | |
|
42 | 42 | ('grou p1/gro;,,##up2//.../group3', 'grou-p1/group2/group3'), |
|
43 | 43 | |
|
44 | 44 | ('group,,,/,,,/1/2/3', 'group/1/2/3'), |
|
45 | 45 | ('grou[]p1/gro;up2///gro up3', 'group1/group2/gro-up3'), |
|
46 | 46 | (u'grou[]p1/gro;up2///gro up3/ąć', u'group1/group2/gro-up3/ąć'), |
|
47 | 47 | ]) |
|
48 | 48 | def test_deserialize_repo_name(self, app, user_admin, given, expected): |
|
49 | 49 | |
|
50 | 50 | schema = repo_schema.RepoSchema().bind() |
|
51 | 51 | assert expected == schema.get('repo_name').deserialize(given) |
|
52 | 52 | |
|
53 | 53 | def test_deserialize(self, app, user_admin): |
|
54 | 54 | schema = repo_schema.RepoSchema().bind( |
|
55 | 55 | repo_type_options=['hg'], |
|
56 | 56 | user=user_admin |
|
57 | 57 | ) |
|
58 | 58 | |
|
59 | 59 | schema_data = schema.deserialize(dict( |
|
60 |
repo_name=' |
|
|
60 | repo_name='my_schema_repo', | |
|
61 | 61 | repo_type='hg', |
|
62 | 62 | repo_owner=user_admin.username |
|
63 | 63 | )) |
|
64 | 64 | |
|
65 |
assert schema_data['repo_name'] == ' |
|
|
65 | assert schema_data['repo_name'] == u'my_schema_repo' | |
|
66 | 66 | assert schema_data['repo_group'] == { |
|
67 | 67 | 'repo_group_id': None, |
|
68 | 68 | 'repo_group_name': types.RootLocation, |
|
69 |
'repo_name_with |
|
|
69 | 'repo_name_with_group': u'my_schema_repo', | |
|
70 | 'repo_name_without_group': u'my_schema_repo'} | |
|
70 | 71 | |
|
71 | 72 | @pytest.mark.parametrize('given, err_key, expected_exc', [ |
|
72 |
('xxx/ |
|
|
73 | ('xxx/my_schema_repo','repo_group', 'Repository group `xxx` does not exist'), | |
|
73 | 74 | ('', 'repo_name', 'Name must start with a letter or number. Got ``'), |
|
74 | 75 | ]) |
|
75 | 76 | def test_deserialize_with_bad_group_name( |
|
76 | 77 | self, app, user_admin, given, err_key, expected_exc): |
|
77 | 78 | |
|
78 | 79 | schema = repo_schema.RepoSchema().bind( |
|
79 | 80 | repo_type_options=['hg'], |
|
80 | 81 | user=user_admin |
|
81 | 82 | ) |
|
82 | 83 | |
|
83 | 84 | with pytest.raises(colander.Invalid) as excinfo: |
|
84 | 85 | schema.deserialize(dict( |
|
85 | 86 | repo_name=given, |
|
86 | 87 | repo_type='hg', |
|
87 | 88 | repo_owner=user_admin.username |
|
88 | 89 | )) |
|
89 | 90 | |
|
90 | 91 | assert excinfo.value.asdict()[err_key] == expected_exc |
|
91 | 92 | |
|
92 | 93 | def test_deserialize_with_group_name(self, app, user_admin, test_repo_group): |
|
93 | 94 | schema = repo_schema.RepoSchema().bind( |
|
94 | 95 | repo_type_options=['hg'], |
|
95 | 96 | user=user_admin |
|
96 | 97 | ) |
|
97 | 98 | |
|
98 |
full_name = test_repo_group.group_name + '/ |
|
|
99 | full_name = test_repo_group.group_name + u'/my_schema_repo' | |
|
99 | 100 | schema_data = schema.deserialize(dict( |
|
100 | 101 | repo_name=full_name, |
|
101 | 102 | repo_type='hg', |
|
102 | 103 | repo_owner=user_admin.username |
|
103 | 104 | )) |
|
104 | 105 | |
|
105 | 106 | assert schema_data['repo_name'] == full_name |
|
106 | 107 | assert schema_data['repo_group'] == { |
|
107 | 108 | 'repo_group_id': test_repo_group.group_id, |
|
108 | 109 | 'repo_group_name': test_repo_group.group_name, |
|
109 |
'repo_name_with |
|
|
110 | 'repo_name_with_group': full_name, | |
|
111 | 'repo_name_without_group': u'my_schema_repo'} | |
|
110 | 112 | |
|
111 | 113 | def test_deserialize_with_group_name_regular_user_no_perms( |
|
112 | 114 | self, app, user_regular, test_repo_group): |
|
113 | 115 | schema = repo_schema.RepoSchema().bind( |
|
114 | 116 | repo_type_options=['hg'], |
|
115 | 117 | user=user_regular |
|
116 | 118 | ) |
|
117 | 119 | |
|
118 |
full_name = test_repo_group.group_name + '/ |
|
|
120 | full_name = test_repo_group.group_name + '/my_schema_repo' | |
|
119 | 121 | with pytest.raises(colander.Invalid) as excinfo: |
|
120 | 122 | schema.deserialize(dict( |
|
121 | 123 | repo_name=full_name, |
|
122 | 124 | repo_type='hg', |
|
123 | 125 | repo_owner=user_regular.username |
|
124 | 126 | )) |
|
125 | 127 | |
|
126 | 128 | expected = 'Repository group `{}` does not exist'.format( |
|
127 | 129 | test_repo_group.group_name) |
|
128 | 130 | assert excinfo.value.asdict()['repo_group'] == expected |
@@ -1,335 +1,336 b'' | |||
|
1 | 1 | # -*- coding: utf-8 -*- |
|
2 | 2 | |
|
3 | 3 | # Copyright (C) 2010-2017 RhodeCode GmbH |
|
4 | 4 | # |
|
5 | 5 | # This program is free software: you can redistribute it and/or modify |
|
6 | 6 | # it under the terms of the GNU Affero General Public License, version 3 |
|
7 | 7 | # (only), as published by the Free Software Foundation. |
|
8 | 8 | # |
|
9 | 9 | # This program is distributed in the hope that it will be useful, |
|
10 | 10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 | 11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 | 12 | # GNU General Public License for more details. |
|
13 | 13 | # |
|
14 | 14 | # You should have received a copy of the GNU Affero General Public License |
|
15 | 15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | 16 | # |
|
17 | 17 | # This program is dual-licensed. If you wish to learn more about the |
|
18 | 18 | # RhodeCode Enterprise Edition, including its added features, Support services, |
|
19 | 19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ |
|
20 | 20 | |
|
21 | 21 | import os |
|
22 | 22 | import stat |
|
23 | 23 | import sys |
|
24 | 24 | |
|
25 | 25 | import pytest |
|
26 | 26 | from mock import Mock, patch, DEFAULT |
|
27 | 27 | |
|
28 | 28 | import rhodecode |
|
29 | 29 | from rhodecode.model import db, scm |
|
30 | from rhodecode.tests import no_newline_id_generator | |
|
30 | 31 | |
|
31 | 32 | |
|
32 | 33 | def test_scm_instance_config(backend): |
|
33 | 34 | repo = backend.create_repo() |
|
34 | 35 | with patch.multiple('rhodecode.model.db.Repository', |
|
35 | 36 | _get_instance=DEFAULT, |
|
36 | 37 | _get_instance_cached=DEFAULT) as mocks: |
|
37 | 38 | repo.scm_instance() |
|
38 | 39 | mocks['_get_instance'].assert_called_with( |
|
39 | 40 | config=None, cache=False) |
|
40 | 41 | |
|
41 | 42 | config = {'some': 'value'} |
|
42 | 43 | repo.scm_instance(config=config) |
|
43 | 44 | mocks['_get_instance'].assert_called_with( |
|
44 | 45 | config=config, cache=False) |
|
45 | 46 | |
|
46 | 47 | with patch.dict(rhodecode.CONFIG, {'vcs_full_cache': 'true'}): |
|
47 | 48 | repo.scm_instance(config=config) |
|
48 | 49 | mocks['_get_instance_cached'].assert_called() |
|
49 | 50 | |
|
50 | 51 | |
|
51 | 52 | def test__get_instance_config(backend): |
|
52 | 53 | repo = backend.create_repo() |
|
53 | 54 | vcs_class = Mock() |
|
54 | 55 | with patch.multiple('rhodecode.lib.vcs.backends', |
|
55 | 56 | get_scm=DEFAULT, |
|
56 | 57 | get_backend=DEFAULT) as mocks: |
|
57 | 58 | mocks['get_scm'].return_value = backend.alias |
|
58 | 59 | mocks['get_backend'].return_value = vcs_class |
|
59 | 60 | with patch('rhodecode.model.db.Repository._config') as config_mock: |
|
60 | 61 | repo._get_instance() |
|
61 | 62 | vcs_class.assert_called_with( |
|
62 | 63 | repo_path=repo.repo_full_path, config=config_mock, |
|
63 | 64 | create=False, with_wire={'cache': True}) |
|
64 | 65 | |
|
65 | 66 | new_config = {'override': 'old_config'} |
|
66 | 67 | repo._get_instance(config=new_config) |
|
67 | 68 | vcs_class.assert_called_with( |
|
68 | 69 | repo_path=repo.repo_full_path, config=new_config, create=False, |
|
69 | 70 | with_wire={'cache': True}) |
|
70 | 71 | |
|
71 | 72 | |
|
72 | 73 | def test_mark_for_invalidation_config(backend): |
|
73 | 74 | repo = backend.create_repo() |
|
74 | 75 | with patch('rhodecode.model.db.Repository.update_commit_cache') as _mock: |
|
75 | 76 | scm.ScmModel().mark_for_invalidation(repo.repo_name) |
|
76 | 77 | _, kwargs = _mock.call_args |
|
77 | 78 | assert kwargs['config'].__dict__ == repo._config.__dict__ |
|
78 | 79 | |
|
79 | 80 | |
|
80 | 81 | def test_mark_for_invalidation_with_delete_updates_last_commit(backend): |
|
81 | 82 | commits = [{'message': 'A'}, {'message': 'B'}] |
|
82 | 83 | repo = backend.create_repo(commits=commits) |
|
83 | 84 | scm.ScmModel().mark_for_invalidation(repo.repo_name, delete=True) |
|
84 | 85 | assert repo.changeset_cache['revision'] == 1 |
|
85 | 86 | |
|
86 | 87 | |
|
87 | 88 | def test_mark_for_invalidation_with_delete_updates_last_commit_empty(backend): |
|
88 | 89 | repo = backend.create_repo() |
|
89 | 90 | scm.ScmModel().mark_for_invalidation(repo.repo_name, delete=True) |
|
90 | 91 | assert repo.changeset_cache['revision'] == -1 |
|
91 | 92 | |
|
92 | 93 | |
|
93 | 94 | def test_strip_with_multiple_heads(backend_hg): |
|
94 | 95 | commits = [ |
|
95 | 96 | {'message': 'A'}, |
|
96 | 97 | {'message': 'a'}, |
|
97 | 98 | {'message': 'b'}, |
|
98 | 99 | {'message': 'B', 'parents': ['A']}, |
|
99 | 100 | {'message': 'a1'}, |
|
100 | 101 | ] |
|
101 | 102 | repo = backend_hg.create_repo(commits=commits) |
|
102 | 103 | commit_ids = backend_hg.commit_ids |
|
103 | 104 | |
|
104 | 105 | model = scm.ScmModel() |
|
105 | 106 | model.strip(repo, commit_ids['b'], branch=None) |
|
106 | 107 | |
|
107 | 108 | vcs_repo = repo.scm_instance() |
|
108 | 109 | rest_commit_ids = [c.raw_id for c in vcs_repo.get_changesets()] |
|
109 | 110 | assert len(rest_commit_ids) == 4 |
|
110 | 111 | assert commit_ids['b'] not in rest_commit_ids |
|
111 | 112 | |
|
112 | 113 | |
|
113 | 114 | def test_strip_with_single_heads(backend_hg): |
|
114 | 115 | commits = [ |
|
115 | 116 | {'message': 'A'}, |
|
116 | 117 | {'message': 'a'}, |
|
117 | 118 | {'message': 'b'}, |
|
118 | 119 | ] |
|
119 | 120 | repo = backend_hg.create_repo(commits=commits) |
|
120 | 121 | commit_ids = backend_hg.commit_ids |
|
121 | 122 | |
|
122 | 123 | model = scm.ScmModel() |
|
123 | 124 | model.strip(repo, commit_ids['b'], branch=None) |
|
124 | 125 | |
|
125 | 126 | vcs_repo = repo.scm_instance() |
|
126 | 127 | rest_commit_ids = [c.raw_id for c in vcs_repo.get_changesets()] |
|
127 | 128 | assert len(rest_commit_ids) == 2 |
|
128 | 129 | assert commit_ids['b'] not in rest_commit_ids |
|
129 | 130 | |
|
130 | 131 | |
|
131 | 132 | def test_get_nodes_returns_unicode_flat(backend_random): |
|
132 | 133 | repo = backend_random.repo |
|
133 | 134 | directories, files = scm.ScmModel().get_nodes( |
|
134 | 135 | repo.repo_name, repo.get_commit(commit_idx=0).raw_id, |
|
135 | 136 | flat=True) |
|
136 | 137 | assert_contains_only_unicode(directories) |
|
137 | 138 | assert_contains_only_unicode(files) |
|
138 | 139 | |
|
139 | 140 | |
|
140 | 141 | def test_get_nodes_returns_unicode_non_flat(backend_random): |
|
141 | 142 | repo = backend_random.repo |
|
142 | 143 | directories, files = scm.ScmModel().get_nodes( |
|
143 | 144 | repo.repo_name, repo.get_commit(commit_idx=0).raw_id, |
|
144 | 145 | flat=False) |
|
145 | 146 | # johbo: Checking only the names for now, since that is the critical |
|
146 | 147 | # part. |
|
147 | 148 | assert_contains_only_unicode([d['name'] for d in directories]) |
|
148 | 149 | assert_contains_only_unicode([f['name'] for f in files]) |
|
149 | 150 | |
|
150 | 151 | |
|
151 | 152 | def test_get_nodes_max_file_bytes(backend_random): |
|
152 | 153 | repo = backend_random.repo |
|
153 | 154 | max_file_bytes = 10 |
|
154 | 155 | directories, files = scm.ScmModel().get_nodes( |
|
155 | 156 | repo.repo_name, repo.get_commit(commit_idx=0).raw_id, content=True, |
|
156 | 157 | extended_info=True, flat=False) |
|
157 | 158 | assert any(file['content'] and len(file['content']) > max_file_bytes |
|
158 | 159 | for file in files) |
|
159 | 160 | |
|
160 | 161 | directories, files = scm.ScmModel().get_nodes( |
|
161 | 162 | repo.repo_name, repo.get_commit(commit_idx=0).raw_id, content=True, |
|
162 | 163 | extended_info=True, flat=False, max_file_bytes=max_file_bytes) |
|
163 | 164 | assert all( |
|
164 | 165 | file['content'] is None if file['size'] > max_file_bytes else True |
|
165 | 166 | for file in files) |
|
166 | 167 | |
|
167 | 168 | |
|
168 | 169 | def assert_contains_only_unicode(structure): |
|
169 | 170 | assert structure |
|
170 | 171 | for value in structure: |
|
171 | 172 | assert isinstance(value, unicode) |
|
172 | 173 | |
|
173 | 174 | |
|
174 | 175 | @pytest.mark.backends("hg", "git") |
|
175 | 176 | def test_get_non_unicode_reference(backend): |
|
176 | 177 | model = scm.ScmModel() |
|
177 | 178 | non_unicode_list = ["Adını".decode("cp1254")] |
|
178 | 179 | |
|
179 | 180 | def scm_instance(): |
|
180 | 181 | return Mock( |
|
181 | 182 | branches=non_unicode_list, bookmarks=non_unicode_list, |
|
182 | 183 | tags=non_unicode_list, alias=backend.alias) |
|
183 | 184 | |
|
184 | 185 | repo = Mock(__class__=db.Repository, scm_instance=scm_instance) |
|
185 | 186 | choices, __ = model.get_repo_landing_revs(repo=repo) |
|
186 | 187 | if backend.alias == 'hg': |
|
187 | 188 | valid_choices = [ |
|
188 | 189 | 'rev:tip', u'branch:Ad\xc4\xb1n\xc4\xb1', |
|
189 | 190 | u'book:Ad\xc4\xb1n\xc4\xb1', u'tag:Ad\xc4\xb1n\xc4\xb1'] |
|
190 | 191 | else: |
|
191 | 192 | valid_choices = [ |
|
192 | 193 | 'rev:tip', u'branch:Ad\xc4\xb1n\xc4\xb1', |
|
193 | 194 | u'tag:Ad\xc4\xb1n\xc4\xb1'] |
|
194 | 195 | |
|
195 | 196 | assert choices == valid_choices |
|
196 | 197 | |
|
197 | 198 | |
|
198 | 199 | class TestInstallSvnHooks(object): |
|
199 | 200 | HOOK_FILES = ('pre-commit', 'post-commit') |
|
200 | 201 | |
|
201 | 202 | def test_new_hooks_are_created(self, backend_svn): |
|
202 | 203 | model = scm.ScmModel() |
|
203 | 204 | repo = backend_svn.create_repo() |
|
204 | 205 | vcs_repo = repo.scm_instance() |
|
205 | 206 | model.install_svn_hooks(vcs_repo) |
|
206 | 207 | |
|
207 | 208 | hooks_path = os.path.join(vcs_repo.path, 'hooks') |
|
208 | 209 | assert os.path.isdir(hooks_path) |
|
209 | 210 | for file_name in self.HOOK_FILES: |
|
210 | 211 | file_path = os.path.join(hooks_path, file_name) |
|
211 | 212 | self._check_hook_file_mode(file_path) |
|
212 | 213 | self._check_hook_file_content(file_path) |
|
213 | 214 | |
|
214 | 215 | def test_rc_hooks_are_replaced(self, backend_svn): |
|
215 | 216 | model = scm.ScmModel() |
|
216 | 217 | repo = backend_svn.create_repo() |
|
217 | 218 | vcs_repo = repo.scm_instance() |
|
218 | 219 | hooks_path = os.path.join(vcs_repo.path, 'hooks') |
|
219 | 220 | file_paths = [os.path.join(hooks_path, f) for f in self.HOOK_FILES] |
|
220 | 221 | |
|
221 | 222 | for file_path in file_paths: |
|
222 | 223 | self._create_fake_hook( |
|
223 | 224 | file_path, content="RC_HOOK_VER = 'abcde'\n") |
|
224 | 225 | |
|
225 | 226 | model.install_svn_hooks(vcs_repo) |
|
226 | 227 | |
|
227 | 228 | for file_path in file_paths: |
|
228 | 229 | self._check_hook_file_content(file_path) |
|
229 | 230 | |
|
230 | 231 | def test_non_rc_hooks_are_not_replaced_without_force_create( |
|
231 | 232 | self, backend_svn): |
|
232 | 233 | model = scm.ScmModel() |
|
233 | 234 | repo = backend_svn.create_repo() |
|
234 | 235 | vcs_repo = repo.scm_instance() |
|
235 | 236 | hooks_path = os.path.join(vcs_repo.path, 'hooks') |
|
236 | 237 | file_paths = [os.path.join(hooks_path, f) for f in self.HOOK_FILES] |
|
237 | 238 | non_rc_content = "exit 0\n" |
|
238 | 239 | |
|
239 | 240 | for file_path in file_paths: |
|
240 | 241 | self._create_fake_hook(file_path, content=non_rc_content) |
|
241 | 242 | |
|
242 | 243 | model.install_svn_hooks(vcs_repo) |
|
243 | 244 | |
|
244 | 245 | for file_path in file_paths: |
|
245 | 246 | with open(file_path, 'rt') as hook_file: |
|
246 | 247 | content = hook_file.read() |
|
247 | 248 | assert content == non_rc_content |
|
248 | 249 | |
|
249 | 250 | def test_non_rc_hooks_are_replaced_with_force_create(self, backend_svn): |
|
250 | 251 | model = scm.ScmModel() |
|
251 | 252 | repo = backend_svn.create_repo() |
|
252 | 253 | vcs_repo = repo.scm_instance() |
|
253 | 254 | hooks_path = os.path.join(vcs_repo.path, 'hooks') |
|
254 | 255 | file_paths = [os.path.join(hooks_path, f) for f in self.HOOK_FILES] |
|
255 | 256 | non_rc_content = "exit 0\n" |
|
256 | 257 | |
|
257 | 258 | for file_path in file_paths: |
|
258 | 259 | self._create_fake_hook(file_path, content=non_rc_content) |
|
259 | 260 | |
|
260 | 261 | model.install_svn_hooks(vcs_repo, force_create=True) |
|
261 | 262 | |
|
262 | 263 | for file_path in file_paths: |
|
263 | 264 | self._check_hook_file_content(file_path) |
|
264 | 265 | |
|
265 | 266 | def _check_hook_file_mode(self, file_path): |
|
266 | 267 | assert os.path.exists(file_path) |
|
267 | 268 | stat_info = os.stat(file_path) |
|
268 | 269 | |
|
269 | 270 | file_mode = stat.S_IMODE(stat_info.st_mode) |
|
270 | 271 | expected_mode = int('755', 8) |
|
271 | 272 | assert expected_mode == file_mode |
|
272 | 273 | |
|
273 | 274 | def _check_hook_file_content(self, file_path): |
|
274 | 275 | with open(file_path, 'rt') as hook_file: |
|
275 | 276 | content = hook_file.read() |
|
276 | 277 | |
|
277 | 278 | expected_env = '#!{}'.format(sys.executable) |
|
278 | 279 | expected_rc_version = "\nRC_HOOK_VER = '{}'\n".format( |
|
279 | 280 | rhodecode.__version__) |
|
280 | 281 | assert content.strip().startswith(expected_env) |
|
281 | 282 | assert expected_rc_version in content |
|
282 | 283 | |
|
283 | 284 | def _create_fake_hook(self, file_path, content): |
|
284 | 285 | with open(file_path, 'w') as hook_file: |
|
285 | 286 | hook_file.write(content) |
|
286 | 287 | |
|
287 | 288 | |
|
288 | 289 | class TestCheckRhodecodeHook(object): |
|
289 | 290 | |
|
290 | 291 | @patch('os.path.exists', Mock(return_value=False)) |
|
291 | 292 | def test_returns_true_when_no_hook_found(self): |
|
292 | 293 | result = scm._check_rhodecode_hook('/tmp/fake_hook_file.py') |
|
293 | 294 | assert result |
|
294 | 295 | |
|
295 | 296 | @pytest.mark.parametrize("file_content, expected_result", [ |
|
296 | 297 | ("RC_HOOK_VER = '3.3.3'\n", True), |
|
297 | 298 | ("RC_HOOK = '3.3.3'\n", False), |
|
298 | ]) | |
|
299 | ], ids=no_newline_id_generator) | |
|
299 | 300 | @patch('os.path.exists', Mock(return_value=True)) |
|
300 | 301 | def test_signatures(self, file_content, expected_result): |
|
301 | 302 | hook_content_patcher = patch.object( |
|
302 | 303 | scm, '_read_hook', return_value=file_content) |
|
303 | 304 | with hook_content_patcher: |
|
304 | 305 | result = scm._check_rhodecode_hook('/tmp/fake_hook_file.py') |
|
305 | 306 | |
|
306 | 307 | assert result is expected_result |
|
307 | 308 | |
|
308 | 309 | |
|
309 | 310 | class TestInstallHooks(object): |
|
310 | 311 | def test_hooks_are_installed_for_git_repo(self, backend_git): |
|
311 | 312 | repo = backend_git.create_repo() |
|
312 | 313 | model = scm.ScmModel() |
|
313 | 314 | scm_repo = repo.scm_instance() |
|
314 | 315 | with patch.object(model, 'install_git_hook') as hooks_mock: |
|
315 | 316 | model.install_hooks(scm_repo, repo_type='git') |
|
316 | 317 | hooks_mock.assert_called_once_with(scm_repo) |
|
317 | 318 | |
|
318 | 319 | def test_hooks_are_installed_for_svn_repo(self, backend_svn): |
|
319 | 320 | repo = backend_svn.create_repo() |
|
320 | 321 | scm_repo = repo.scm_instance() |
|
321 | 322 | model = scm.ScmModel() |
|
322 | 323 | with patch.object(scm.ScmModel, 'install_svn_hooks') as hooks_mock: |
|
323 | 324 | model.install_hooks(scm_repo, repo_type='svn') |
|
324 | 325 | hooks_mock.assert_called_once_with(scm_repo) |
|
325 | 326 | |
|
326 | 327 | @pytest.mark.parametrize('hook_method', [ |
|
327 | 328 | 'install_svn_hooks', |
|
328 | 329 | 'install_git_hook']) |
|
329 | 330 | def test_mercurial_doesnt_trigger_hooks(self, backend_hg, hook_method): |
|
330 | 331 | repo = backend_hg.create_repo() |
|
331 | 332 | scm_repo = repo.scm_instance() |
|
332 | 333 | model = scm.ScmModel() |
|
333 | 334 | with patch.object(scm.ScmModel, hook_method) as hooks_mock: |
|
334 | 335 | model.install_hooks(scm_repo, repo_type='hg') |
|
335 | 336 | assert hooks_mock.call_count == 0 |
@@ -1,527 +1,529 b'' | |||
|
1 | 1 | # -*- coding: utf-8 -*- |
|
2 | 2 | |
|
3 | 3 | # Copyright (C) 2010-2017 RhodeCode GmbH |
|
4 | 4 | # |
|
5 | 5 | # This program is free software: you can redistribute it and/or modify |
|
6 | 6 | # it under the terms of the GNU Affero General Public License, version 3 |
|
7 | 7 | # (only), as published by the Free Software Foundation. |
|
8 | 8 | # |
|
9 | 9 | # This program is distributed in the hope that it will be useful, |
|
10 | 10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 | 11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 | 12 | # GNU General Public License for more details. |
|
13 | 13 | # |
|
14 | 14 | # You should have received a copy of the GNU Affero General Public License |
|
15 | 15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | 16 | # |
|
17 | 17 | # This program is dual-licensed. If you wish to learn more about the |
|
18 | 18 | # RhodeCode Enterprise Edition, including its added features, Support services, |
|
19 | 19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ |
|
20 | 20 | |
|
21 | 21 | |
|
22 | 22 | """ |
|
23 | 23 | Package for testing various lib/helper functions in rhodecode |
|
24 | 24 | """ |
|
25 | 25 | |
|
26 | 26 | import datetime |
|
27 | 27 | import string |
|
28 | 28 | import mock |
|
29 | 29 | import pytest |
|
30 | ||
|
31 | from rhodecode.tests import no_newline_id_generator | |
|
30 | 32 | from rhodecode.tests.utils import run_test_concurrently |
|
31 | 33 | from rhodecode.lib.helpers import InitialsGravatar |
|
32 | 34 | |
|
33 | 35 | from rhodecode.lib.utils2 import AttributeDict |
|
34 | 36 | from rhodecode.model.db import Repository |
|
35 | 37 | |
|
36 | 38 | |
|
37 | 39 | def _urls_for_proto(proto): |
|
38 | 40 | return [ |
|
39 | 41 | ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'], |
|
40 | 42 | '%s://127.0.0.1' % proto), |
|
41 | 43 | ('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'], |
|
42 | 44 | '%s://127.0.0.1' % proto), |
|
43 | 45 | ('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'], |
|
44 | 46 | '%s://127.0.0.1' % proto), |
|
45 | 47 | ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'], |
|
46 | 48 | '%s://127.0.0.1:8080' % proto), |
|
47 | 49 | ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'], |
|
48 | 50 | '%s://domain.org' % proto), |
|
49 | 51 | ('%s://user:pass@domain.org:8080' % proto, |
|
50 | 52 | ['%s://' % proto, 'domain.org', '8080'], |
|
51 | 53 | '%s://domain.org:8080' % proto), |
|
52 | 54 | ] |
|
53 | 55 | |
|
54 | 56 | TEST_URLS = _urls_for_proto('http') + _urls_for_proto('https') |
|
55 | 57 | |
|
56 | 58 | |
|
57 | 59 | @pytest.mark.parametrize("test_url, expected, expected_creds", TEST_URLS) |
|
58 | 60 | def test_uri_filter(test_url, expected, expected_creds): |
|
59 | 61 | from rhodecode.lib.utils2 import uri_filter |
|
60 | 62 | assert uri_filter(test_url) == expected |
|
61 | 63 | |
|
62 | 64 | |
|
63 | 65 | @pytest.mark.parametrize("test_url, expected, expected_creds", TEST_URLS) |
|
64 | 66 | def test_credentials_filter(test_url, expected, expected_creds): |
|
65 | 67 | from rhodecode.lib.utils2 import credentials_filter |
|
66 | 68 | assert credentials_filter(test_url) == expected_creds |
|
67 | 69 | |
|
68 | 70 | |
|
69 | 71 | @pytest.mark.parametrize("str_bool, expected", [ |
|
70 | 72 | ('t', True), |
|
71 | 73 | ('true', True), |
|
72 | 74 | ('y', True), |
|
73 | 75 | ('yes', True), |
|
74 | 76 | ('on', True), |
|
75 | 77 | ('1', True), |
|
76 | 78 | ('Y', True), |
|
77 | 79 | ('yeS', True), |
|
78 | 80 | ('Y', True), |
|
79 | 81 | ('TRUE', True), |
|
80 | 82 | ('T', True), |
|
81 | 83 | ('False', False), |
|
82 | 84 | ('F', False), |
|
83 | 85 | ('FALSE', False), |
|
84 | 86 | ('0', False), |
|
85 | 87 | ('-1', False), |
|
86 | 88 | ('', False) |
|
87 | 89 | ]) |
|
88 | 90 | def test_str2bool(str_bool, expected): |
|
89 | 91 | from rhodecode.lib.utils2 import str2bool |
|
90 | 92 | assert str2bool(str_bool) == expected |
|
91 | 93 | |
|
92 | 94 | |
|
93 | 95 | @pytest.mark.parametrize("text, expected", reduce(lambda a1,a2:a1+a2, [ |
|
94 | 96 | [ |
|
95 | 97 | (pref+"", []), |
|
96 | 98 | (pref+"Hi there @marcink", ['marcink']), |
|
97 | 99 | (pref+"Hi there @marcink and @bob", ['bob', 'marcink']), |
|
98 | 100 | (pref+"Hi there @marcink\n", ['marcink']), |
|
99 | 101 | (pref+"Hi there @marcink and @bob\n", ['bob', 'marcink']), |
|
100 | 102 | (pref+"Hi there marcin@rhodecode.com", []), |
|
101 | 103 | (pref+"Hi there @john.malcovic and @bob\n", ['bob', 'john.malcovic']), |
|
102 | 104 | (pref+"This needs to be reviewed: (@marcink,@john)", ["john", "marcink"]), |
|
103 | 105 | (pref+"This needs to be reviewed: (@marcink, @john)", ["john", "marcink"]), |
|
104 | 106 | (pref+"This needs to be reviewed: [@marcink,@john]", ["john", "marcink"]), |
|
105 | 107 | (pref+"This needs to be reviewed: (@marcink @john)", ["john", "marcink"]), |
|
106 | 108 | (pref+"@john @mary, please review", ["john", "mary"]), |
|
107 | 109 | (pref+"@john,@mary, please review", ["john", "mary"]), |
|
108 | 110 | (pref+"Hej @123, @22john,@mary, please review", ['123', '22john', 'mary']), |
|
109 | 111 | (pref+"@first hi there @marcink here's my email marcin@email.com " |
|
110 | 112 | "@lukaszb check @one_more22 it pls @ ttwelve @D[] @one@two@three ", ['first', 'lukaszb', 'marcink', 'one', 'one_more22']), |
|
111 | 113 | (pref+"@MARCIN @maRCiN @2one_more22 @john please see this http://org.pl", ['2one_more22', 'john', 'MARCIN', 'maRCiN']), |
|
112 | 114 | (pref+"@marian.user just do it @marco-polo and next extract @marco_polo", ['marco-polo', 'marco_polo', 'marian.user']), |
|
113 | 115 | (pref+"user.dot hej ! not-needed maril@domain.org", []), |
|
114 | 116 | (pref+"\n@marcin", ['marcin']), |
|
115 | 117 | ] |
|
116 | for pref in ['', '\n', 'hi !', '\t', '\n\n']])) | |
|
118 | for pref in ['', '\n', 'hi !', '\t', '\n\n']]), ids=no_newline_id_generator) | |
|
117 | 119 | def test_mention_extractor(text, expected): |
|
118 | 120 | from rhodecode.lib.utils2 import extract_mentioned_users |
|
119 | 121 | got = extract_mentioned_users(text) |
|
120 | 122 | assert sorted(got, key=lambda x: x.lower()) == got |
|
121 | 123 | assert set(expected) == set(got) |
|
122 | 124 | |
|
123 | 125 | @pytest.mark.parametrize("age_args, expected, kw", [ |
|
124 | 126 | ({}, u'just now', {}), |
|
125 | 127 | ({'seconds': -1}, u'1 second ago', {}), |
|
126 | 128 | ({'seconds': -60 * 2}, u'2 minutes ago', {}), |
|
127 | 129 | ({'hours': -1}, u'1 hour ago', {}), |
|
128 | 130 | ({'hours': -24}, u'1 day ago', {}), |
|
129 | 131 | ({'hours': -24 * 5}, u'5 days ago', {}), |
|
130 | 132 | ({'months': -1}, u'1 month ago', {}), |
|
131 | 133 | ({'months': -1, 'days': -2}, u'1 month and 2 days ago', {}), |
|
132 | 134 | ({'years': -1, 'months': -1}, u'1 year and 1 month ago', {}), |
|
133 | 135 | ({}, u'just now', {'short_format': True}), |
|
134 | 136 | ({'seconds': -1}, u'1sec ago', {'short_format': True}), |
|
135 | 137 | ({'seconds': -60 * 2}, u'2min ago', {'short_format': True}), |
|
136 | 138 | ({'hours': -1}, u'1h ago', {'short_format': True}), |
|
137 | 139 | ({'hours': -24}, u'1d ago', {'short_format': True}), |
|
138 | 140 | ({'hours': -24 * 5}, u'5d ago', {'short_format': True}), |
|
139 | 141 | ({'months': -1}, u'1m ago', {'short_format': True}), |
|
140 | 142 | ({'months': -1, 'days': -2}, u'1m, 2d ago', {'short_format': True}), |
|
141 | 143 | ({'years': -1, 'months': -1}, u'1y, 1m ago', {'short_format': True}), |
|
142 | 144 | ]) |
|
143 | 145 | def test_age(age_args, expected, kw, pylonsapp): |
|
144 | 146 | from rhodecode.lib.utils2 import age |
|
145 | 147 | from dateutil import relativedelta |
|
146 | 148 | n = datetime.datetime(year=2012, month=5, day=17) |
|
147 | 149 | delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs) |
|
148 | 150 | |
|
149 | 151 | def translate(elem): |
|
150 | 152 | return elem.interpolate() |
|
151 | 153 | |
|
152 | 154 | assert translate(age(n + delt(**age_args), now=n, **kw)) == expected |
|
153 | 155 | |
|
154 | 156 | |
|
155 | 157 | @pytest.mark.parametrize("age_args, expected, kw", [ |
|
156 | 158 | ({}, u'just now', {}), |
|
157 | 159 | ({'seconds': 1}, u'in 1 second', {}), |
|
158 | 160 | ({'seconds': 60 * 2}, u'in 2 minutes', {}), |
|
159 | 161 | ({'hours': 1}, u'in 1 hour', {}), |
|
160 | 162 | ({'hours': 24}, u'in 1 day', {}), |
|
161 | 163 | ({'hours': 24 * 5}, u'in 5 days', {}), |
|
162 | 164 | ({'months': 1}, u'in 1 month', {}), |
|
163 | 165 | ({'months': 1, 'days': 1}, u'in 1 month and 1 day', {}), |
|
164 | 166 | ({'years': 1, 'months': 1}, u'in 1 year and 1 month', {}), |
|
165 | 167 | ({}, u'just now', {'short_format': True}), |
|
166 | 168 | ({'seconds': 1}, u'in 1sec', {'short_format': True}), |
|
167 | 169 | ({'seconds': 60 * 2}, u'in 2min', {'short_format': True}), |
|
168 | 170 | ({'hours': 1}, u'in 1h', {'short_format': True}), |
|
169 | 171 | ({'hours': 24}, u'in 1d', {'short_format': True}), |
|
170 | 172 | ({'hours': 24 * 5}, u'in 5d', {'short_format': True}), |
|
171 | 173 | ({'months': 1}, u'in 1m', {'short_format': True}), |
|
172 | 174 | ({'months': 1, 'days': 1}, u'in 1m, 1d', {'short_format': True}), |
|
173 | 175 | ({'years': 1, 'months': 1}, u'in 1y, 1m', {'short_format': True}), |
|
174 | 176 | ]) |
|
175 | 177 | def test_age_in_future(age_args, expected, kw, pylonsapp): |
|
176 | 178 | from rhodecode.lib.utils2 import age |
|
177 | 179 | from dateutil import relativedelta |
|
178 | 180 | n = datetime.datetime(year=2012, month=5, day=17) |
|
179 | 181 | delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs) |
|
180 | 182 | |
|
181 | 183 | def translate(elem): |
|
182 | 184 | return elem.interpolate() |
|
183 | 185 | |
|
184 | 186 | assert translate(age(n + delt(**age_args), now=n, **kw)) == expected |
|
185 | 187 | |
|
186 | 188 | |
|
187 | 189 | def test_tag_exctrator(): |
|
188 | 190 | sample = ( |
|
189 | 191 | "hello pta[tag] gog [[]] [[] sda ero[or]d [me =>>< sa]" |
|
190 | 192 | "[requires] [stale] [see<>=>] [see => http://url.com]" |
|
191 | 193 | "[requires => url] [lang => python] [just a tag] <html_tag first='abc' attr=\"my.url?attr=&another=\"></html_tag>" |
|
192 | 194 | "[,d] [ => ULR ] [obsolete] [desc]]" |
|
193 | 195 | ) |
|
194 | 196 | from rhodecode.lib.helpers import desc_stylize, escaped_stylize |
|
195 | 197 | res = desc_stylize(sample) |
|
196 | 198 | assert '<div class="metatag" tag="tag">tag</div>' in res |
|
197 | 199 | assert '<div class="metatag" tag="obsolete">obsolete</div>' in res |
|
198 | 200 | assert '<div class="metatag" tag="stale">stale</div>' in res |
|
199 | 201 | assert '<div class="metatag" tag="lang">python</div>' in res |
|
200 | 202 | assert '<div class="metatag" tag="requires">requires => <a href="/url">url</a></div>' in res |
|
201 | 203 | assert '<div class="metatag" tag="tag">tag</div>' in res |
|
202 | 204 | assert '<html_tag first=\'abc\' attr=\"my.url?attr=&another=\"></html_tag>' in res |
|
203 | 205 | |
|
204 | 206 | res_encoded = escaped_stylize(sample) |
|
205 | 207 | assert '<div class="metatag" tag="tag">tag</div>' in res_encoded |
|
206 | 208 | assert '<div class="metatag" tag="obsolete">obsolete</div>' in res_encoded |
|
207 | 209 | assert '<div class="metatag" tag="stale">stale</div>' in res_encoded |
|
208 | 210 | assert '<div class="metatag" tag="lang">python</div>' in res_encoded |
|
209 | 211 | assert '<div class="metatag" tag="requires">requires => <a href="/url">url</a></div>' in res_encoded |
|
210 | 212 | assert '<div class="metatag" tag="tag">tag</div>' in res_encoded |
|
211 | 213 | assert '<html_tag first='abc' attr="my.url?attr=&another="></html_tag>' in res_encoded |
|
212 | 214 | |
|
213 | 215 | |
|
214 | 216 | @pytest.mark.parametrize("tmpl_url, email, expected", [ |
|
215 | 217 | ('http://test.com/{email}', 'test@foo.com', 'http://test.com/test@foo.com'), |
|
216 | 218 | |
|
217 | 219 | ('http://test.com/{md5email}', 'test@foo.com', 'http://test.com/3cb7232fcc48743000cb86d0d5022bd9'), |
|
218 | 220 | ('http://test.com/{md5email}', 'testąć@foo.com', 'http://test.com/978debb907a3c55cd741872ab293ef30'), |
|
219 | 221 | |
|
220 | 222 | ('http://testX.com/{md5email}?s={size}', 'test@foo.com', 'http://testX.com/3cb7232fcc48743000cb86d0d5022bd9?s=24'), |
|
221 | 223 | ('http://testX.com/{md5email}?s={size}', 'testąć@foo.com', 'http://testX.com/978debb907a3c55cd741872ab293ef30?s=24'), |
|
222 | 224 | |
|
223 | 225 | ('{scheme}://{netloc}/{md5email}/{size}', 'test@foo.com', 'https://server.com/3cb7232fcc48743000cb86d0d5022bd9/24'), |
|
224 | 226 | ('{scheme}://{netloc}/{md5email}/{size}', 'testąć@foo.com', 'https://server.com/978debb907a3c55cd741872ab293ef30/24'), |
|
225 | 227 | |
|
226 | 228 | ('http://test.com/{email}', 'testąć@foo.com', 'http://test.com/testąć@foo.com'), |
|
227 | 229 | ('http://test.com/{email}?size={size}', 'test@foo.com', 'http://test.com/test@foo.com?size=24'), |
|
228 | 230 | ('http://test.com/{email}?size={size}', 'testąć@foo.com', 'http://test.com/testąć@foo.com?size=24'), |
|
229 | 231 | ]) |
|
230 | 232 | def test_gravatar_url_builder(tmpl_url, email, expected, request_stub): |
|
231 | 233 | from rhodecode.lib.helpers import gravatar_url |
|
232 | 234 | |
|
233 | 235 | # mock pyramid.threadlocals |
|
234 | 236 | def fake_get_current_request(): |
|
235 | 237 | request_stub.scheme = 'https' |
|
236 | 238 | request_stub.host = 'server.com' |
|
237 | 239 | return request_stub |
|
238 | 240 | |
|
239 | 241 | # mock pylons.tmpl_context |
|
240 | 242 | def fake_tmpl_context(_url): |
|
241 | 243 | _c = AttributeDict() |
|
242 | 244 | _c.visual = AttributeDict() |
|
243 | 245 | _c.visual.use_gravatar = True |
|
244 | 246 | _c.visual.gravatar_url = _url |
|
245 | 247 | |
|
246 | 248 | return _c |
|
247 | 249 | |
|
248 | 250 | with mock.patch('rhodecode.lib.helpers.get_current_request', |
|
249 | 251 | fake_get_current_request): |
|
250 | 252 | fake = fake_tmpl_context(_url=tmpl_url) |
|
251 | 253 | with mock.patch('pylons.tmpl_context', fake): |
|
252 | 254 | grav = gravatar_url(email_address=email, size=24) |
|
253 | 255 | assert grav == expected |
|
254 | 256 | |
|
255 | 257 | |
|
256 | 258 | @pytest.mark.parametrize( |
|
257 | 259 | "email, first_name, last_name, expected_initials, expected_color", [ |
|
258 | 260 | |
|
259 | 261 | ('test@rhodecode.com', '', '', 'TR', '#8a994d'), |
|
260 | 262 | ('marcin.kuzminski@rhodecode.com', '', '', 'MK', '#6559b3'), |
|
261 | 263 | # special cases of email |
|
262 | 264 | ('john.van.dam@rhodecode.com', '', '', 'JD', '#526600'), |
|
263 | 265 | ('Guido.van.Rossum@rhodecode.com', '', '', 'GR', '#990052'), |
|
264 | 266 | ('Guido.van.Rossum@rhodecode.com', 'Guido', 'Van Rossum', 'GR', '#990052'), |
|
265 | 267 | |
|
266 | 268 | ('rhodecode+Guido.van.Rossum@rhodecode.com', '', '', 'RR', '#46598c'), |
|
267 | 269 | ('pclouds@rhodecode.com', 'Nguyễn Thái', 'Tgọc Duy', 'ND', '#665200'), |
|
268 | 270 | |
|
269 | 271 | ('john-brown@foo.com', '', '', 'JF', '#73006b'), |
|
270 | 272 | ('admin@rhodecode.com', 'Marcin', 'Kuzminski', 'MK', '#104036'), |
|
271 | 273 | # partials |
|
272 | 274 | ('admin@rhodecode.com', 'Marcin', '', 'MR', '#104036'), # fn+email |
|
273 | 275 | ('admin@rhodecode.com', '', 'Kuzminski', 'AK', '#104036'), # em+ln |
|
274 | 276 | # non-ascii |
|
275 | 277 | ('admin@rhodecode.com', 'Marcin', 'Śuzminski', 'MS', '#104036'), |
|
276 | 278 | ('marcin.śuzminski@rhodecode.com', '', '', 'MS', '#73000f'), |
|
277 | 279 | |
|
278 | 280 | # special cases, LDAP can provide those... |
|
279 | 281 | ('admin@', 'Marcin', 'Śuzminski', 'MS', '#aa00ff'), |
|
280 | 282 | ('marcin.śuzminski', '', '', 'MS', '#402020'), |
|
281 | 283 | ('null', '', '', 'NL', '#8c4646'), |
|
282 | 284 | ]) |
|
283 | 285 | def test_initials_gravatar_pick_of_initials_and_color_algo( |
|
284 | 286 | email, first_name, last_name, expected_initials, expected_color): |
|
285 | 287 | instance = InitialsGravatar(email, first_name, last_name) |
|
286 | 288 | assert instance.get_initials() == expected_initials |
|
287 | 289 | assert instance.str2color(email) == expected_color |
|
288 | 290 | |
|
289 | 291 | |
|
290 | 292 | def test_initials_gravatar_mapping_algo(): |
|
291 | 293 | pos = set() |
|
292 | 294 | instance = InitialsGravatar('', '', '') |
|
293 | 295 | iterations = 0 |
|
294 | 296 | |
|
295 | 297 | variations = [] |
|
296 | 298 | for letter1 in string.ascii_letters: |
|
297 | 299 | for letter2 in string.ascii_letters[::-1][:10]: |
|
298 | 300 | for letter3 in string.ascii_letters[:10]: |
|
299 | 301 | variations.append( |
|
300 | 302 | '%s@rhodecode.com' % (letter1+letter2+letter3)) |
|
301 | 303 | |
|
302 | 304 | max_variations = 4096 |
|
303 | 305 | for email in variations[:max_variations]: |
|
304 | 306 | iterations += 1 |
|
305 | 307 | pos.add( |
|
306 | 308 | instance.pick_color_bank_index(email, |
|
307 | 309 | instance.get_color_bank())) |
|
308 | 310 | |
|
309 | 311 | # we assume that we have match all 256 possible positions, |
|
310 | 312 | # in reasonable amount of different email addresses |
|
311 | 313 | assert len(pos) == 256 |
|
312 | 314 | assert iterations == max_variations |
|
313 | 315 | |
|
314 | 316 | |
|
315 | 317 | @pytest.mark.parametrize("tmpl, repo_name, overrides, prefix, expected", [ |
|
316 | 318 | (Repository.DEFAULT_CLONE_URI, 'group/repo1', {}, '', 'http://vps1:8000/group/repo1'), |
|
317 | 319 | (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'marcink'}, '', 'http://marcink@vps1:8000/group/repo1'), |
|
318 | 320 | (Repository.DEFAULT_CLONE_URI, 'group/repo1', {}, '/rc', 'http://vps1:8000/rc/group/repo1'), |
|
319 | 321 | (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'user'}, '/rc', 'http://user@vps1:8000/rc/group/repo1'), |
|
320 | 322 | (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'marcink'}, '/rc', 'http://marcink@vps1:8000/rc/group/repo1'), |
|
321 | 323 | (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'user'}, '/rc/', 'http://user@vps1:8000/rc/group/repo1'), |
|
322 | 324 | (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'marcink'}, '/rc/', 'http://marcink@vps1:8000/rc/group/repo1'), |
|
323 | 325 | ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', {}, '', 'http://vps1:8000/_23'), |
|
324 | 326 | ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', {'user': 'marcink'}, '', 'http://marcink@vps1:8000/_23'), |
|
325 | 327 | ('http://{user}@{netloc}/_{repoid}', 'group/repo1', {'user': 'marcink'}, '', 'http://marcink@vps1:8000/_23'), |
|
326 | 328 | ('http://{netloc}/_{repoid}', 'group/repo1', {'user': 'marcink'}, '', 'http://vps1:8000/_23'), |
|
327 | 329 | ('https://{user}@proxy1.server.com/{repo}', 'group/repo1', {'user': 'marcink'}, '', 'https://marcink@proxy1.server.com/group/repo1'), |
|
328 | 330 | ('https://{user}@proxy1.server.com/{repo}', 'group/repo1', {}, '', 'https://proxy1.server.com/group/repo1'), |
|
329 | 331 | ('https://proxy1.server.com/{user}/{repo}', 'group/repo1', {'user': 'marcink'}, '', 'https://proxy1.server.com/marcink/group/repo1'), |
|
330 | 332 | ]) |
|
331 | 333 | def test_clone_url_generator(tmpl, repo_name, overrides, prefix, expected): |
|
332 | 334 | from rhodecode.lib.utils2 import get_clone_url |
|
333 | 335 | clone_url = get_clone_url(uri_tmpl=tmpl, qualifed_home_url='http://vps1:8000'+prefix, |
|
334 | 336 | repo_name=repo_name, repo_id=23, **overrides) |
|
335 | 337 | assert clone_url == expected |
|
336 | 338 | |
|
337 | 339 | |
|
338 | 340 | def _quick_url(text, tmpl="""<a class="revision-link" href="%s">%s</a>""", url_=None): |
|
339 | 341 | """ |
|
340 | 342 | Changes `some text url[foo]` => `some text <a href="/">foo</a> |
|
341 | 343 | |
|
342 | 344 | :param text: |
|
343 | 345 | """ |
|
344 | 346 | import re |
|
345 | 347 | # quickly change expected url[] into a link |
|
346 | 348 | URL_PAT = re.compile(r'(?:url\[)(.+?)(?:\])') |
|
347 | 349 | |
|
348 | 350 | def url_func(match_obj): |
|
349 | 351 | _url = match_obj.groups()[0] |
|
350 | 352 | return tmpl % (url_ or '/some-url', _url) |
|
351 | 353 | return URL_PAT.sub(url_func, text) |
|
352 | 354 | |
|
353 | 355 | |
|
354 | 356 | @pytest.mark.parametrize("sample, expected", [ |
|
355 | 357 | ("", |
|
356 | 358 | ""), |
|
357 | 359 | ("git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68", |
|
358 | 360 | "git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68"), |
|
359 | 361 | ("from rev 000000000000", |
|
360 | 362 | "from rev url[000000000000]"), |
|
361 | 363 | ("from rev 000000000000123123 also rev 000000000000", |
|
362 | 364 | "from rev url[000000000000123123] also rev url[000000000000]"), |
|
363 | 365 | ("this should-000 00", |
|
364 | 366 | "this should-000 00"), |
|
365 | 367 | ("longtextffffffffff rev 123123123123", |
|
366 | 368 | "longtextffffffffff rev url[123123123123]"), |
|
367 | 369 | ("rev ffffffffffffffffffffffffffffffffffffffffffffffffff", |
|
368 | 370 | "rev ffffffffffffffffffffffffffffffffffffffffffffffffff"), |
|
369 | 371 | ("ffffffffffff some text traalaa", |
|
370 | 372 | "url[ffffffffffff] some text traalaa"), |
|
371 | 373 | ("""Multi line |
|
372 | 374 | 123123123123 |
|
373 | 375 | some text 123123123123 |
|
374 | 376 | sometimes ! |
|
375 | 377 | """, |
|
376 | 378 | """Multi line |
|
377 | 379 | url[123123123123] |
|
378 | 380 | some text url[123123123123] |
|
379 | 381 | sometimes ! |
|
380 | 382 | """) |
|
381 | ]) | |
|
383 | ], ids=no_newline_id_generator) | |
|
382 | 384 | def test_urlify_commits(sample, expected): |
|
383 | 385 | def fake_url(self, *args, **kwargs): |
|
384 | 386 | return '/some-url' |
|
385 | 387 | |
|
386 | 388 | expected = _quick_url(expected) |
|
387 | 389 | |
|
388 | 390 | with mock.patch('pylons.url', fake_url): |
|
389 | 391 | from rhodecode.lib.helpers import urlify_commits |
|
390 | 392 | assert urlify_commits(sample, 'repo_name') == expected |
|
391 | 393 | |
|
392 | 394 | |
|
393 | 395 | @pytest.mark.parametrize("sample, expected, url_", [ |
|
394 | 396 | ("", |
|
395 | 397 | "", |
|
396 | 398 | ""), |
|
397 | 399 | ("https://svn.apache.org/repos", |
|
398 | 400 | "url[https://svn.apache.org/repos]", |
|
399 | 401 | "https://svn.apache.org/repos"), |
|
400 | 402 | ("http://svn.apache.org/repos", |
|
401 | 403 | "url[http://svn.apache.org/repos]", |
|
402 | 404 | "http://svn.apache.org/repos"), |
|
403 | 405 | ("from rev a also rev http://google.com", |
|
404 | 406 | "from rev a also rev url[http://google.com]", |
|
405 | 407 | "http://google.com"), |
|
406 | 408 | ("""Multi line |
|
407 | 409 | https://foo.bar.com |
|
408 | 410 | some text lalala""", |
|
409 | 411 | """Multi line |
|
410 | 412 | url[https://foo.bar.com] |
|
411 | 413 | some text lalala""", |
|
412 | 414 | "https://foo.bar.com") |
|
413 | ]) | |
|
415 | ], ids=no_newline_id_generator) | |
|
414 | 416 | def test_urlify_test(sample, expected, url_): |
|
415 | 417 | from rhodecode.lib.helpers import urlify_text |
|
416 | 418 | expected = _quick_url(expected, tmpl="""<a href="%s">%s</a>""", url_=url_) |
|
417 | 419 | assert urlify_text(sample) == expected |
|
418 | 420 | |
|
419 | 421 | |
|
420 | 422 | @pytest.mark.parametrize("test, expected", [ |
|
421 | 423 | ("", None), |
|
422 | 424 | ("/_2", '2'), |
|
423 | 425 | ("_2", '2'), |
|
424 | 426 | ("/_2/", '2'), |
|
425 | 427 | ("_2/", '2'), |
|
426 | 428 | |
|
427 | 429 | ("/_21", '21'), |
|
428 | 430 | ("_21", '21'), |
|
429 | 431 | ("/_21/", '21'), |
|
430 | 432 | ("_21/", '21'), |
|
431 | 433 | |
|
432 | 434 | ("/_21/foobar", '21'), |
|
433 | 435 | ("_21/121", '21'), |
|
434 | 436 | ("/_21/_12", '21'), |
|
435 | 437 | ("_21/rc/foo", '21'), |
|
436 | 438 | |
|
437 | 439 | ]) |
|
438 | 440 | def test_get_repo_by_id(test, expected): |
|
439 | 441 | from rhodecode.model.repo import RepoModel |
|
440 | 442 | _test = RepoModel()._extract_id_from_repo_name(test) |
|
441 | 443 | assert _test == expected |
|
442 | 444 | |
|
443 | 445 | |
|
444 | 446 | @pytest.mark.parametrize("test_repo_name, repo_type", [ |
|
445 | 447 | ("test_repo_1", None), |
|
446 | 448 | ("repo_group/foobar", None), |
|
447 | 449 | ("test_non_asci_ąćę", None), |
|
448 | 450 | (u"test_non_asci_unicode_ąćę", None), |
|
449 | 451 | ]) |
|
450 | 452 | def test_invalidation_context(pylonsapp, test_repo_name, repo_type): |
|
451 | 453 | from beaker.cache import cache_region |
|
452 | 454 | from rhodecode.lib import caches |
|
453 | 455 | from rhodecode.model.db import CacheKey |
|
454 | 456 | |
|
455 | 457 | @cache_region('long_term') |
|
456 | 458 | def _dummy_func(cache_key): |
|
457 | 459 | return 'result' |
|
458 | 460 | |
|
459 | 461 | invalidator_context = CacheKey.repo_context_cache( |
|
460 | 462 | _dummy_func, test_repo_name, 'repo') |
|
461 | 463 | |
|
462 | 464 | with invalidator_context as context: |
|
463 | 465 | invalidated = context.invalidate() |
|
464 | 466 | result = context.compute() |
|
465 | 467 | |
|
466 | 468 | assert invalidated == True |
|
467 | 469 | assert 'result' == result |
|
468 | 470 | assert isinstance(context, caches.FreshRegionCache) |
|
469 | 471 | |
|
470 | 472 | assert 'InvalidationContext' in repr(invalidator_context) |
|
471 | 473 | |
|
472 | 474 | with invalidator_context as context: |
|
473 | 475 | context.invalidate() |
|
474 | 476 | result = context.compute() |
|
475 | 477 | |
|
476 | 478 | assert 'result' == result |
|
477 | 479 | assert isinstance(context, caches.ActiveRegionCache) |
|
478 | 480 | |
|
479 | 481 | |
|
480 | 482 | def test_invalidation_context_exception_in_compute(pylonsapp): |
|
481 | 483 | from rhodecode.model.db import CacheKey |
|
482 | 484 | from beaker.cache import cache_region |
|
483 | 485 | |
|
484 | 486 | @cache_region('long_term') |
|
485 | 487 | def _dummy_func(cache_key): |
|
486 | 488 | # this causes error since it doesn't get any params |
|
487 | 489 | raise Exception('ups') |
|
488 | 490 | |
|
489 | 491 | invalidator_context = CacheKey.repo_context_cache( |
|
490 | 492 | _dummy_func, 'test_repo_2', 'repo') |
|
491 | 493 | |
|
492 | 494 | with pytest.raises(Exception): |
|
493 | 495 | with invalidator_context as context: |
|
494 | 496 | context.invalidate() |
|
495 | 497 | context.compute() |
|
496 | 498 | |
|
497 | 499 | |
|
498 | 500 | @pytest.mark.parametrize('execution_number', range(5)) |
|
499 | 501 | def test_cache_invalidation_race_condition(execution_number, pylonsapp): |
|
500 | 502 | import time |
|
501 | 503 | from beaker.cache import cache_region |
|
502 | 504 | from rhodecode.model.db import CacheKey |
|
503 | 505 | |
|
504 | 506 | if CacheKey.metadata.bind.url.get_backend_name() == "mysql": |
|
505 | 507 | reason = ( |
|
506 | 508 | 'Fails on MariaDB due to some locking issues. Investigation' |
|
507 | 509 | ' needed') |
|
508 | 510 | pytest.xfail(reason=reason) |
|
509 | 511 | |
|
510 | 512 | @run_test_concurrently(25) |
|
511 | 513 | def test_create_and_delete_cache_keys(): |
|
512 | 514 | time.sleep(0.2) |
|
513 | 515 | |
|
514 | 516 | @cache_region('long_term') |
|
515 | 517 | def _dummy_func(cache_key): |
|
516 | 518 | return 'result' |
|
517 | 519 | |
|
518 | 520 | invalidator_context = CacheKey.repo_context_cache( |
|
519 | 521 | _dummy_func, 'test_repo_1', 'repo') |
|
520 | 522 | |
|
521 | 523 | with invalidator_context as context: |
|
522 | 524 | context.invalidate() |
|
523 | 525 | context.compute() |
|
524 | 526 | |
|
525 | 527 | CacheKey.set_invalidate('test_repo_1', delete=True) |
|
526 | 528 | |
|
527 | 529 | test_create_and_delete_cache_keys() |
|
1 | NO CONTENT: file was removed |
General Comments 0
You need to be logged in to leave comments.
Login now