##// END OF EJS Templates
tests: fixed tests for new repo settings...
marcink -
r1721:0dad6b04 default
parent child Browse files
Show More
1 NO CONTENT: new file 100644
@@ -0,0 +1,230 b''
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
21 import mock
22 import pytest
23
24 from rhodecode.lib.utils2 import str2bool
25 from rhodecode.lib.vcs.exceptions import RepositoryRequirementError
26 from rhodecode.model.db import Repository, UserRepoToPerm, Permission, User
27 from rhodecode.model.meta import Session
28 from rhodecode.tests import (
29 url, HG_REPO, TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN,
30 assert_session_flash)
31 from rhodecode.tests.fixture import Fixture
32
33 fixture = Fixture()
34
35
36 def route_path(name, params=None, **kwargs):
37 import urllib
38
39 base_url = {
40 'edit_repo': '/{repo_name}/settings',
41 }[name].format(**kwargs)
42
43 if params:
44 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
45 return base_url
46
47
48 def _get_permission_for_user(user, repo):
49 perm = UserRepoToPerm.query()\
50 .filter(UserRepoToPerm.repository ==
51 Repository.get_by_repo_name(repo))\
52 .filter(UserRepoToPerm.user == User.get_by_username(user))\
53 .all()
54 return perm
55
56
57 @pytest.mark.usefixtures('autologin_user', 'app')
58 class TestAdminRepoSettings(object):
59 @pytest.mark.parametrize('urlname', [
60 'edit_repo',
61 ])
62 def test_show_page(self, urlname, app, backend):
63 app.get(route_path(urlname, repo_name=backend.repo_name), status=200)
64
65 def test_edit_accessible_when_missing_requirements(
66 self, backend_hg, autologin_user):
67 scm_patcher = mock.patch.object(
68 Repository, 'scm_instance', side_effect=RepositoryRequirementError)
69 with scm_patcher:
70 self.app.get(route_path('edit_repo', repo_name=backend_hg.repo_name))
71
72 @pytest.mark.parametrize('urlname', [
73 'edit_repo_perms',
74 'edit_repo_advanced',
75 'repo_vcs_settings',
76 'edit_repo_fields',
77 'repo_settings_issuetracker',
78 'edit_repo_caches',
79 'edit_repo_remote',
80 'edit_repo_statistics',
81 ])
82 def test_show_page_pylons(self, urlname, app):
83 app.get(url(urlname, repo_name=HG_REPO))
84
85 @pytest.mark.parametrize('update_settings', [
86 {'repo_description': 'alter-desc'},
87 {'repo_owner': TEST_USER_REGULAR_LOGIN},
88 {'repo_private': 'true'},
89 {'repo_enable_locking': 'true'},
90 {'repo_enable_downloads': 'true'},
91 ])
92 def test_update_repo_settings(self, update_settings, csrf_token, backend, user_util):
93 repo = user_util.create_repo(repo_type=backend.alias)
94 repo_name = repo.repo_name
95
96 params = fixture._get_repo_create_params(
97 csrf_token=csrf_token,
98 repo_name=repo_name,
99 repo_type=backend.alias,
100 repo_owner=TEST_USER_ADMIN_LOGIN,
101 repo_description='DESC',
102
103 repo_private='false',
104 repo_enable_locking='false',
105 repo_enable_downloads='false')
106 params.update(update_settings)
107 self.app.post(
108 route_path('edit_repo', repo_name=repo_name),
109 params=params, status=302)
110
111 repo = Repository.get_by_repo_name(repo_name)
112 assert repo.user.username == \
113 update_settings.get('repo_owner', repo.user.username)
114
115 assert repo.description == \
116 update_settings.get('repo_description', repo.description)
117
118 assert repo.private == \
119 str2bool(update_settings.get(
120 'repo_private', repo.private))
121
122 assert repo.enable_locking == \
123 str2bool(update_settings.get(
124 'repo_enable_locking', repo.enable_locking))
125
126 assert repo.enable_downloads == \
127 str2bool(update_settings.get(
128 'repo_enable_downloads', repo.enable_downloads))
129
130 def test_update_repo_name_via_settings(self, csrf_token, user_util, backend):
131 repo = user_util.create_repo(repo_type=backend.alias)
132 repo_name = repo.repo_name
133
134 repo_group = user_util.create_repo_group()
135 repo_group_name = repo_group.group_name
136 new_name = repo_group_name + '_' + repo_name
137
138 params = fixture._get_repo_create_params(
139 csrf_token=csrf_token,
140 repo_name=new_name,
141 repo_type=backend.alias,
142 repo_owner=TEST_USER_ADMIN_LOGIN,
143 repo_description='DESC',
144 repo_private='false',
145 repo_enable_locking='false',
146 repo_enable_downloads='false')
147 self.app.post(
148 route_path('edit_repo', repo_name=repo_name),
149 params=params, status=302)
150 repo = Repository.get_by_repo_name(new_name)
151 assert repo.repo_name == new_name
152
153 def test_update_repo_group_via_settings(self, csrf_token, user_util, backend):
154 repo = user_util.create_repo(repo_type=backend.alias)
155 repo_name = repo.repo_name
156
157 repo_group = user_util.create_repo_group()
158 repo_group_name = repo_group.group_name
159 repo_group_id = repo_group.group_id
160
161 new_name = repo_group_name + '/' + repo_name
162 params = fixture._get_repo_create_params(
163 csrf_token=csrf_token,
164 repo_name=repo_name,
165 repo_type=backend.alias,
166 repo_owner=TEST_USER_ADMIN_LOGIN,
167 repo_description='DESC',
168 repo_group=repo_group_id,
169 repo_private='false',
170 repo_enable_locking='false',
171 repo_enable_downloads='false')
172 self.app.post(
173 route_path('edit_repo', repo_name=repo_name),
174 params=params, status=302)
175 repo = Repository.get_by_repo_name(new_name)
176 assert repo.repo_name == new_name
177
178 def test_set_private_flag_sets_default_user_permissions_to_none(
179 self, autologin_user, backend, csrf_token):
180
181 # initially repository perm should be read
182 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
183 assert len(perm) == 1
184 assert perm[0].permission.permission_name == 'repository.read'
185 assert not backend.repo.private
186
187 response = self.app.post(
188 route_path('edit_repo', repo_name=backend.repo_name),
189 params=fixture._get_repo_create_params(
190 repo_private='true',
191 repo_name=backend.repo_name,
192 repo_type=backend.alias,
193 repo_owner=TEST_USER_ADMIN_LOGIN,
194 csrf_token=csrf_token), status=302)
195
196 assert_session_flash(
197 response,
198 msg='Repository %s updated successfully' % (backend.repo_name))
199
200 repo = Repository.get_by_repo_name(backend.repo_name)
201 assert repo.private is True
202
203 # now the repo default permission should be None
204 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
205 assert len(perm) == 1
206 assert perm[0].permission.permission_name == 'repository.none'
207
208 response = self.app.post(
209 route_path('edit_repo', repo_name=backend.repo_name),
210 params=fixture._get_repo_create_params(
211 repo_private='false',
212 repo_name=backend.repo_name,
213 repo_type=backend.alias,
214 repo_owner=TEST_USER_ADMIN_LOGIN,
215 csrf_token=csrf_token), status=302)
216
217 assert_session_flash(
218 response,
219 msg='Repository %s updated successfully' % (backend.repo_name))
220 assert backend.repo.private is False
221
222 # we turn off private now the repo default permission should stay None
223 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
224 assert len(perm) == 1
225 assert perm[0].permission.permission_name == 'repository.none'
226
227 # update this permission back
228 perm[0].permission = Permission.get_by_key('repository.read')
229 Session().add(perm[0])
230 Session().commit()
@@ -0,0 +1,117 b''
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
21 import mock
22 import pytest
23
24 import rhodecode
25 from rhodecode.model.settings import SettingsModel
26 from rhodecode.tests import url
27 from rhodecode.tests.utils import AssertResponse
28
29
30 def route_path(name, params=None, **kwargs):
31 import urllib
32
33 base_url = {
34 'edit_repo': '/{repo_name}/settings',
35 }[name].format(**kwargs)
36
37 if params:
38 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
39 return base_url
40
41
42 @pytest.mark.usefixtures('autologin_user', 'app')
43 class TestAdminRepoVcsSettings(object):
44
45 @pytest.mark.parametrize('setting_name, setting_backends', [
46 ('hg_use_rebase_for_merging', ['hg']),
47 ])
48 def test_labs_settings_visible_if_enabled(
49 self, setting_name, setting_backends, backend):
50 if backend.alias not in setting_backends:
51 pytest.skip('Setting not available for backend {}'.format(backend))
52
53 vcs_settings_url = url(
54 'repo_vcs_settings', repo_name=backend.repo.repo_name)
55
56 with mock.patch.dict(
57 rhodecode.CONFIG, {'labs_settings_active': 'true'}):
58 response = self.app.get(vcs_settings_url)
59
60 assertr = AssertResponse(response)
61 assertr.one_element_exists('#rhodecode_{}'.format(setting_name))
62
63 @pytest.mark.parametrize('setting_name, setting_backends', [
64 ('hg_use_rebase_for_merging', ['hg']),
65 ])
66 def test_labs_settings_not_visible_if_disabled(
67 self, setting_name, setting_backends, backend):
68 if backend.alias not in setting_backends:
69 pytest.skip('Setting not available for backend {}'.format(backend))
70
71 vcs_settings_url = url(
72 'repo_vcs_settings', repo_name=backend.repo.repo_name)
73
74 with mock.patch.dict(
75 rhodecode.CONFIG, {'labs_settings_active': 'false'}):
76 response = self.app.get(vcs_settings_url)
77
78 assertr = AssertResponse(response)
79 assertr.no_element_exists('#rhodecode_{}'.format(setting_name))
80
81 @pytest.mark.parametrize('setting_name, setting_backends', [
82 ('hg_use_rebase_for_merging', ['hg']),
83 ])
84 def test_update_boolean_settings(
85 self, csrf_token, setting_name, setting_backends, backend):
86 if backend.alias not in setting_backends:
87 pytest.skip('Setting not available for backend {}'.format(backend))
88
89 repo = backend.create_repo()
90
91 settings_model = SettingsModel(repo=repo)
92 vcs_settings_url = url(
93 'repo_vcs_settings', repo_name=repo.repo_name)
94
95 self.app.post(
96 vcs_settings_url,
97 params={
98 'inherit_global_settings': False,
99 'new_svn_branch': 'dummy-value-for-testing',
100 'new_svn_tag': 'dummy-value-for-testing',
101 'rhodecode_{}'.format(setting_name): 'true',
102 'csrf_token': csrf_token,
103 })
104 setting = settings_model.get_setting_by_name(setting_name)
105 assert setting.app_settings_value
106
107 self.app.post(
108 vcs_settings_url,
109 params={
110 'inherit_global_settings': False,
111 'new_svn_branch': 'dummy-value-for-testing',
112 'new_svn_tag': 'dummy-value-for-testing',
113 'rhodecode_{}'.format(setting_name): 'false',
114 'csrf_token': csrf_token,
115 })
116 setting = settings_model.get_setting_by_name(setting_name)
117 assert not setting.app_settings_value
@@ -1,203 +1,204 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21
22 22 import pytest
23 23
24 from rhodecode.tests import no_newline_id_generator
24 25 from rhodecode.config.middleware import (
25 26 _sanitize_vcs_settings, _bool_setting, _string_setting, _list_setting,
26 27 _int_setting)
27 28
28 29
29 30 class TestHelperFunctions(object):
30 31 @pytest.mark.parametrize('raw, expected', [
31 32 ('true', True), (u'true', True),
32 33 ('yes', True), (u'yes', True),
33 34 ('on', True), (u'on', True),
34 35 ('false', False), (u'false', False),
35 36 ('no', False), (u'no', False),
36 37 ('off', False), (u'off', False),
37 38 ('invalid-bool-value', False),
38 39 ('invalid-∫øø@-√å@¨€', False),
39 40 (u'invalid-∫øø@-√å@¨€', False),
40 41 ])
41 42 def test_bool_setting_helper(self, raw, expected):
42 43 key = 'dummy-key'
43 44 settings = {key: raw}
44 45 _bool_setting(settings, key, None)
45 46 assert settings[key] is expected
46 47
47 48 @pytest.mark.parametrize('raw, expected', [
48 49 ('', ''),
49 50 ('test-string', 'test-string'),
50 51 ('CaSe-TeSt', 'case-test'),
51 52 ('test-string-烩€', 'test-string-烩€'),
52 53 (u'test-string-烩€', u'test-string-烩€'),
53 54 ])
54 55 def test_string_setting_helper(self, raw, expected):
55 56 key = 'dummy-key'
56 57 settings = {key: raw}
57 58 _string_setting(settings, key, None)
58 59 assert settings[key] == expected
59 60
60 61 @pytest.mark.parametrize('raw, expected', [
61 62 ('', []),
62 63 ('test', ['test']),
63 64 ('CaSe-TeSt', ['CaSe-TeSt']),
64 65 ('test-string-烩€', ['test-string-烩€']),
65 66 (u'test-string-烩€', [u'test-string-烩€']),
66 67 ('hg git svn', ['hg', 'git', 'svn']),
67 68 ('hg,git,svn', ['hg', 'git', 'svn']),
68 69 ('hg, git, svn', ['hg', 'git', 'svn']),
69 70 ('hg\ngit\nsvn', ['hg', 'git', 'svn']),
70 71 (' hg\n git\n svn ', ['hg', 'git', 'svn']),
71 72 (', hg , git , svn , ', ['', 'hg', 'git', 'svn', '']),
72 73 ('cheese,free node,other', ['cheese', 'free node', 'other']),
73 ])
74 ], ids=no_newline_id_generator)
74 75 def test_list_setting_helper(self, raw, expected):
75 76 key = 'dummy-key'
76 77 settings = {key: raw}
77 78 _list_setting(settings, key, None)
78 79 assert settings[key] == expected
79 80
80 81 @pytest.mark.parametrize('raw, expected', [
81 82 ('0', 0),
82 83 ('-0', 0),
83 84 ('12345', 12345),
84 85 ('-12345', -12345),
85 86 (u'-12345', -12345),
86 87 ])
87 88 def test_int_setting_helper(self, raw, expected):
88 89 key = 'dummy-key'
89 90 settings = {key: raw}
90 91 _int_setting(settings, key, None)
91 92 assert settings[key] == expected
92 93
93 94 @pytest.mark.parametrize('raw', [
94 95 ('0xff'),
95 96 (''),
96 97 ('invalid-int'),
97 98 ('invalid-⁄~†'),
98 99 (u'invalid-⁄~†'),
99 100 ])
100 101 def test_int_setting_helper_invalid_input(self, raw):
101 102 key = 'dummy-key'
102 103 settings = {key: raw}
103 104 with pytest.raises(Exception):
104 105 _int_setting(settings, key, None)
105 106
106 107
107 108 class TestSanitizeVcsSettings(object):
108 109 _bool_settings = [
109 110 ('vcs.hooks.direct_calls', False),
110 111 ('vcs.server.enable', True),
111 112 ('vcs.start_server', False),
112 113 ('startup.import_repos', False),
113 114 ]
114 115
115 116 _string_settings = [
116 117 ('vcs.svn.compatible_version', ''),
117 118 ('git_rev_filter', '--all'),
118 119 ('vcs.hooks.protocol', 'http'),
119 120 ('vcs.scm_app_implementation', 'http'),
120 121 ('vcs.server', ''),
121 122 ('vcs.server.log_level', 'debug'),
122 123 ('vcs.server.protocol', 'http'),
123 124 ]
124 125
125 126 _list_settings = [
126 127 ('vcs.backends', 'hg git'),
127 128 ]
128 129
129 130 @pytest.mark.parametrize('key, default', _list_settings)
130 131 def test_list_setting_spacesep_list(self, key, default):
131 132 test_list = ['test', 'list', 'values', 'for', key]
132 133 input_value = ' '.join(test_list)
133 134 settings = {key: input_value}
134 135 _sanitize_vcs_settings(settings)
135 136 assert settings[key] == test_list
136 137
137 138 @pytest.mark.parametrize('key, default', _list_settings)
138 139 def test_list_setting_newlinesep_list(self, key, default):
139 140 test_list = ['test', 'list', 'values', 'for', key]
140 141 input_value = '\n'.join(test_list)
141 142 settings = {key: input_value}
142 143 _sanitize_vcs_settings(settings)
143 144 assert settings[key] == test_list
144 145
145 146 @pytest.mark.parametrize('key, default', _list_settings)
146 147 def test_list_setting_commasep_list(self, key, default):
147 148 test_list = ['test', 'list', 'values', 'for', key]
148 149 input_value = ','.join(test_list)
149 150 settings = {key: input_value}
150 151 _sanitize_vcs_settings(settings)
151 152 assert settings[key] == test_list
152 153
153 154 @pytest.mark.parametrize('key, default', _list_settings)
154 155 def test_list_setting_comma_and_space_sep_list(self, key, default):
155 156 test_list = ['test', 'list', 'values', 'for', key]
156 157 input_value = ', '.join(test_list)
157 158 settings = {key: input_value}
158 159 _sanitize_vcs_settings(settings)
159 160 assert settings[key] == test_list
160 161
161 162 @pytest.mark.parametrize('key, default', _string_settings)
162 163 def test_string_setting_string(self, key, default):
163 164 test_value = 'test-string-for-{}'.format(key)
164 165 settings = {key: test_value}
165 166 _sanitize_vcs_settings(settings)
166 167 assert settings[key] == test_value
167 168
168 169 @pytest.mark.parametrize('key, default', _string_settings)
169 170 def test_string_setting_default(self, key, default):
170 171 settings = {}
171 172 _sanitize_vcs_settings(settings)
172 173 assert settings[key] == default
173 174
174 175 @pytest.mark.parametrize('key, default', _string_settings)
175 176 def test_string_setting_lowercase(self, key, default):
176 177 test_value = 'Test-String-For-{}'.format(key)
177 178 settings = {key: test_value}
178 179 _sanitize_vcs_settings(settings)
179 180 assert settings[key] == test_value.lower()
180 181
181 182 @pytest.mark.parametrize('key, default', _bool_settings)
182 183 def test_bool_setting_true(self, key, default):
183 184 settings = {key: 'true'}
184 185 _sanitize_vcs_settings(settings)
185 186 assert settings[key] is True
186 187
187 188 @pytest.mark.parametrize('key, default', _bool_settings)
188 189 def test_bool_setting_false(self, key, default):
189 190 settings = {key: 'false'}
190 191 _sanitize_vcs_settings(settings)
191 192 assert settings[key] is False
192 193
193 194 @pytest.mark.parametrize('key, default', _bool_settings)
194 195 def test_bool_setting_invalid_string(self, key, default):
195 196 settings = {key: 'no-bool-val-string'}
196 197 _sanitize_vcs_settings(settings)
197 198 assert settings[key] is False
198 199
199 200 @pytest.mark.parametrize('key, default', _bool_settings)
200 201 def test_bool_setting_default(self, key, default):
201 202 settings = {}
202 203 _sanitize_vcs_settings(settings)
203 204 assert settings[key] is default
@@ -1,79 +1,79 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import mock
22 22 import pylons
23 23 import pytest
24 24 from pylons import tmpl_context as c
25 25
26 26
27 27 @pytest.fixture
28 28 def current_user(request, user_util):
29 29 user = user_util.create_user()
30 30
31 31 request_mock = mock.Mock(user=user)
32 32 pylons.request._push_object(request_mock)
33 33
34 34 @request.addfinalizer
35 35 def cleanup():
36 36 pylons.request._pop_object()
37 37 return user
38 38
39 39
40 40 @pytest.fixture
41 41 def personal_group_with_parent(user_util, current_user):
42 42 group_read_only = user_util.create_repo_group()
43 43 user_util.grant_user_permission_to_repo_group(
44 44 group_read_only, current_user, 'group.read')
45 45
46 46 # TODO: johbo: This should go into the business models
47 47 group_as_admin = user_util.create_repo_group(
48 48 owner=current_user)
49 49 group_as_admin.parent_group = group_read_only
50 50 group_as_admin.group_name = group_as_admin.get_new_name(
51 51 group_as_admin.group_name)
52 52
53 53 return group_as_admin
54 54
55 55
56 56 @pytest.fixture
57 57 def controller():
58 58 from rhodecode.controllers.admin import repo_groups
59 59 return repo_groups.RepoGroupsController()
60 60
61 61
62 62 def test_repo_groups_load_defaults(
63 63 current_user, personal_group_with_parent, controller):
64 64 personal_group = personal_group_with_parent
65 65 controller._RepoGroupsController__load_defaults(True, personal_group)
66 66
67 expected_list = ['-1', personal_group.parent_group.group_id]
67 expected_list = [-1, personal_group.parent_group.group_id]
68 68 returned_group_ids = [group[0] for group in c.repo_groups]
69 69 assert returned_group_ids == expected_list
70 70
71 71
72 72 def test_repo_groups_load_defaults_with_missing_group(
73 73 current_user, personal_group_with_parent, controller):
74 74 personal_group = personal_group_with_parent
75 75 controller._RepoGroupsController__load_defaults(True)
76 76
77 expected_list = sorted(['-1', personal_group.group_id])
77 expected_list = sorted([-1, personal_group.group_id])
78 78 returned_group_ids = sorted([group[0] for group in c.repo_groups])
79 79 assert returned_group_ids == expected_list
@@ -1,1268 +1,1207 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import urllib
22 22
23 23 import mock
24 24 import pytest
25 25
26 26 from rhodecode.lib import auth
27 27 from rhodecode.lib.utils2 import safe_str, str2bool, safe_unicode
28 28 from rhodecode.lib.vcs.exceptions import RepositoryRequirementError
29 29 from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\
30 30 Permission
31 31 from rhodecode.model.meta import Session
32 32 from rhodecode.model.repo import RepoModel
33 33 from rhodecode.model.repo_group import RepoGroupModel
34 34 from rhodecode.model.settings import SettingsModel, VcsSettingsModel
35 35 from rhodecode.model.user import UserModel
36 36 from rhodecode.tests import (
37 37 login_user_session, url, assert_session_flash, TEST_USER_ADMIN_LOGIN,
38 38 TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS, HG_REPO, GIT_REPO,
39 39 logout_user_session)
40 40 from rhodecode.tests.fixture import Fixture, error_function
41 41 from rhodecode.tests.utils import AssertResponse, repo_on_filesystem
42 42
43 43 fixture = Fixture()
44 44
45 45
46 46 @pytest.mark.usefixtures("app")
47 47 class TestAdminRepos(object):
48 48
49 49 def test_index(self):
50 50 self.app.get(url('repos'))
51 51
52 52 def test_create_page_restricted(self, autologin_user, backend):
53 53 with mock.patch('rhodecode.BACKENDS', {'git': 'git'}):
54 54 response = self.app.get(url('new_repo'), status=200)
55 55 assert_response = AssertResponse(response)
56 56 element = assert_response.get_element('#repo_type')
57 57 assert element.text_content() == '\ngit\n'
58 58
59 59 def test_create_page_non_restricted(self, autologin_user, backend):
60 60 response = self.app.get(url('new_repo'), status=200)
61 61 assert_response = AssertResponse(response)
62 62 assert_response.element_contains('#repo_type', 'git')
63 63 assert_response.element_contains('#repo_type', 'svn')
64 64 assert_response.element_contains('#repo_type', 'hg')
65 65
66 66 @pytest.mark.parametrize("suffix",
67 67 [u'', u'xxa'], ids=['', 'non-ascii'])
68 68 def test_create(self, autologin_user, backend, suffix, csrf_token):
69 69 repo_name_unicode = backend.new_repo_name(suffix=suffix)
70 70 repo_name = repo_name_unicode.encode('utf8')
71 71 description_unicode = u'description for newly created repo' + suffix
72 72 description = description_unicode.encode('utf8')
73 73 response = self.app.post(
74 74 url('repos'),
75 75 fixture._get_repo_create_params(
76 76 repo_private=False,
77 77 repo_name=repo_name,
78 78 repo_type=backend.alias,
79 79 repo_description=description,
80 80 csrf_token=csrf_token),
81 81 status=302)
82 82
83 83 self.assert_repository_is_created_correctly(
84 84 repo_name, description, backend)
85 85
86 86 def test_create_numeric(self, autologin_user, backend, csrf_token):
87 87 numeric_repo = '1234'
88 88 repo_name = numeric_repo
89 89 description = 'description for newly created repo' + numeric_repo
90 90 self.app.post(
91 91 url('repos'),
92 92 fixture._get_repo_create_params(
93 93 repo_private=False,
94 94 repo_name=repo_name,
95 95 repo_type=backend.alias,
96 96 repo_description=description,
97 97 csrf_token=csrf_token))
98 98
99 99 self.assert_repository_is_created_correctly(
100 100 repo_name, description, backend)
101 101
102 102 @pytest.mark.parametrize("suffix", [u'', u'ąćę'], ids=['', 'non-ascii'])
103 103 def test_create_in_group(
104 104 self, autologin_user, backend, suffix, csrf_token):
105 105 # create GROUP
106 106 group_name = 'sometest_%s' % backend.alias
107 107 gr = RepoGroupModel().create(group_name=group_name,
108 108 group_description='test',
109 109 owner=TEST_USER_ADMIN_LOGIN)
110 110 Session().commit()
111 111
112 112 repo_name = u'ingroup' + suffix
113 113 repo_name_full = RepoGroup.url_sep().join(
114 114 [group_name, repo_name])
115 115 description = u'description for newly created repo'
116 116 self.app.post(
117 117 url('repos'),
118 118 fixture._get_repo_create_params(
119 119 repo_private=False,
120 120 repo_name=safe_str(repo_name),
121 121 repo_type=backend.alias,
122 122 repo_description=description,
123 123 repo_group=gr.group_id,
124 124 csrf_token=csrf_token))
125 125
126 126 # TODO: johbo: Cleanup work to fixture
127 127 try:
128 128 self.assert_repository_is_created_correctly(
129 129 repo_name_full, description, backend)
130 130
131 131 new_repo = RepoModel().get_by_repo_name(repo_name_full)
132 132 inherited_perms = UserRepoToPerm.query().filter(
133 133 UserRepoToPerm.repository_id == new_repo.repo_id).all()
134 134 assert len(inherited_perms) == 1
135 135 finally:
136 136 RepoModel().delete(repo_name_full)
137 137 RepoGroupModel().delete(group_name)
138 138 Session().commit()
139 139
140 140 def test_create_in_group_numeric(
141 141 self, autologin_user, backend, csrf_token):
142 142 # create GROUP
143 143 group_name = 'sometest_%s' % backend.alias
144 144 gr = RepoGroupModel().create(group_name=group_name,
145 145 group_description='test',
146 146 owner=TEST_USER_ADMIN_LOGIN)
147 147 Session().commit()
148 148
149 149 repo_name = '12345'
150 150 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
151 151 description = 'description for newly created repo'
152 152 self.app.post(
153 153 url('repos'),
154 154 fixture._get_repo_create_params(
155 155 repo_private=False,
156 156 repo_name=repo_name,
157 157 repo_type=backend.alias,
158 158 repo_description=description,
159 159 repo_group=gr.group_id,
160 160 csrf_token=csrf_token))
161 161
162 162 # TODO: johbo: Cleanup work to fixture
163 163 try:
164 164 self.assert_repository_is_created_correctly(
165 165 repo_name_full, description, backend)
166 166
167 167 new_repo = RepoModel().get_by_repo_name(repo_name_full)
168 168 inherited_perms = UserRepoToPerm.query()\
169 169 .filter(UserRepoToPerm.repository_id == new_repo.repo_id).all()
170 170 assert len(inherited_perms) == 1
171 171 finally:
172 172 RepoModel().delete(repo_name_full)
173 173 RepoGroupModel().delete(group_name)
174 174 Session().commit()
175 175
176 176 def test_create_in_group_without_needed_permissions(self, backend):
177 177 session = login_user_session(
178 178 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
179 179 csrf_token = auth.get_csrf_token(session)
180 180 # revoke
181 181 user_model = UserModel()
182 182 # disable fork and create on default user
183 183 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
184 184 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
185 185 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
186 186 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
187 187
188 188 # disable on regular user
189 189 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
190 190 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
191 191 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
192 192 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
193 193 Session().commit()
194 194
195 195 # create GROUP
196 196 group_name = 'reg_sometest_%s' % backend.alias
197 197 gr = RepoGroupModel().create(group_name=group_name,
198 198 group_description='test',
199 199 owner=TEST_USER_ADMIN_LOGIN)
200 200 Session().commit()
201 201
202 202 group_name_allowed = 'reg_sometest_allowed_%s' % backend.alias
203 203 gr_allowed = RepoGroupModel().create(
204 204 group_name=group_name_allowed,
205 205 group_description='test',
206 206 owner=TEST_USER_REGULAR_LOGIN)
207 207 Session().commit()
208 208
209 209 repo_name = 'ingroup'
210 210 description = 'description for newly created repo'
211 211 response = self.app.post(
212 212 url('repos'),
213 213 fixture._get_repo_create_params(
214 214 repo_private=False,
215 215 repo_name=repo_name,
216 216 repo_type=backend.alias,
217 217 repo_description=description,
218 218 repo_group=gr.group_id,
219 219 csrf_token=csrf_token))
220 220
221 221 response.mustcontain('Invalid value')
222 222
223 223 # user is allowed to create in this group
224 224 repo_name = 'ingroup'
225 225 repo_name_full = RepoGroup.url_sep().join(
226 226 [group_name_allowed, repo_name])
227 227 description = 'description for newly created repo'
228 228 response = self.app.post(
229 229 url('repos'),
230 230 fixture._get_repo_create_params(
231 231 repo_private=False,
232 232 repo_name=repo_name,
233 233 repo_type=backend.alias,
234 234 repo_description=description,
235 235 repo_group=gr_allowed.group_id,
236 236 csrf_token=csrf_token))
237 237
238 238 # TODO: johbo: Cleanup in pytest fixture
239 239 try:
240 240 self.assert_repository_is_created_correctly(
241 241 repo_name_full, description, backend)
242 242
243 243 new_repo = RepoModel().get_by_repo_name(repo_name_full)
244 244 inherited_perms = UserRepoToPerm.query().filter(
245 245 UserRepoToPerm.repository_id == new_repo.repo_id).all()
246 246 assert len(inherited_perms) == 1
247 247
248 248 assert repo_on_filesystem(repo_name_full)
249 249 finally:
250 250 RepoModel().delete(repo_name_full)
251 251 RepoGroupModel().delete(group_name)
252 252 RepoGroupModel().delete(group_name_allowed)
253 253 Session().commit()
254 254
255 255 def test_create_in_group_inherit_permissions(self, autologin_user, backend,
256 256 csrf_token):
257 257 # create GROUP
258 258 group_name = 'sometest_%s' % backend.alias
259 259 gr = RepoGroupModel().create(group_name=group_name,
260 260 group_description='test',
261 261 owner=TEST_USER_ADMIN_LOGIN)
262 262 perm = Permission.get_by_key('repository.write')
263 263 RepoGroupModel().grant_user_permission(
264 264 gr, TEST_USER_REGULAR_LOGIN, perm)
265 265
266 266 # add repo permissions
267 267 Session().commit()
268 268
269 269 repo_name = 'ingroup_inherited_%s' % backend.alias
270 270 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
271 271 description = 'description for newly created repo'
272 272 self.app.post(
273 273 url('repos'),
274 274 fixture._get_repo_create_params(
275 275 repo_private=False,
276 276 repo_name=repo_name,
277 277 repo_type=backend.alias,
278 278 repo_description=description,
279 279 repo_group=gr.group_id,
280 280 repo_copy_permissions=True,
281 281 csrf_token=csrf_token))
282 282
283 283 # TODO: johbo: Cleanup to pytest fixture
284 284 try:
285 285 self.assert_repository_is_created_correctly(
286 286 repo_name_full, description, backend)
287 287 except Exception:
288 288 RepoGroupModel().delete(group_name)
289 289 Session().commit()
290 290 raise
291 291
292 292 # check if inherited permissions are applied
293 293 new_repo = RepoModel().get_by_repo_name(repo_name_full)
294 294 inherited_perms = UserRepoToPerm.query().filter(
295 295 UserRepoToPerm.repository_id == new_repo.repo_id).all()
296 296 assert len(inherited_perms) == 2
297 297
298 298 assert TEST_USER_REGULAR_LOGIN in [
299 299 x.user.username for x in inherited_perms]
300 300 assert 'repository.write' in [
301 301 x.permission.permission_name for x in inherited_perms]
302 302
303 303 RepoModel().delete(repo_name_full)
304 304 RepoGroupModel().delete(group_name)
305 305 Session().commit()
306 306
307 307 @pytest.mark.xfail_backends(
308 308 "git", "hg", reason="Missing reposerver support")
309 309 def test_create_with_clone_uri(self, autologin_user, backend, reposerver,
310 310 csrf_token):
311 311 source_repo = backend.create_repo(number_of_commits=2)
312 312 source_repo_name = source_repo.repo_name
313 313 reposerver.serve(source_repo.scm_instance())
314 314
315 315 repo_name = backend.new_repo_name()
316 316 response = self.app.post(
317 317 url('repos'),
318 318 fixture._get_repo_create_params(
319 319 repo_private=False,
320 320 repo_name=repo_name,
321 321 repo_type=backend.alias,
322 322 repo_description='',
323 323 clone_uri=reposerver.url,
324 324 csrf_token=csrf_token),
325 325 status=302)
326 326
327 327 # Should be redirected to the creating page
328 328 response.mustcontain('repo_creating')
329 329
330 330 # Expecting that both repositories have same history
331 331 source_repo = RepoModel().get_by_repo_name(source_repo_name)
332 332 source_vcs = source_repo.scm_instance()
333 333 repo = RepoModel().get_by_repo_name(repo_name)
334 334 repo_vcs = repo.scm_instance()
335 335 assert source_vcs[0].message == repo_vcs[0].message
336 336 assert source_vcs.count() == repo_vcs.count()
337 337 assert source_vcs.commit_ids == repo_vcs.commit_ids
338 338
339 339 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
340 340 def test_create_remote_repo_wrong_clone_uri(self, autologin_user, backend,
341 341 csrf_token):
342 342 repo_name = backend.new_repo_name()
343 343 description = 'description for newly created repo'
344 344 response = self.app.post(
345 345 url('repos'),
346 346 fixture._get_repo_create_params(
347 347 repo_private=False,
348 348 repo_name=repo_name,
349 349 repo_type=backend.alias,
350 350 repo_description=description,
351 351 clone_uri='http://repo.invalid/repo',
352 352 csrf_token=csrf_token))
353 353 response.mustcontain('invalid clone url')
354 354
355 355 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
356 356 def test_create_remote_repo_wrong_clone_uri_hg_svn(
357 357 self, autologin_user, backend, csrf_token):
358 358 repo_name = backend.new_repo_name()
359 359 description = 'description for newly created repo'
360 360 response = self.app.post(
361 361 url('repos'),
362 362 fixture._get_repo_create_params(
363 363 repo_private=False,
364 364 repo_name=repo_name,
365 365 repo_type=backend.alias,
366 366 repo_description=description,
367 367 clone_uri='svn+http://svn.invalid/repo',
368 368 csrf_token=csrf_token))
369 369 response.mustcontain('invalid clone url')
370 370
371 371 def test_create_with_git_suffix(
372 372 self, autologin_user, backend, csrf_token):
373 373 repo_name = backend.new_repo_name() + ".git"
374 374 description = 'description for newly created repo'
375 375 response = self.app.post(
376 376 url('repos'),
377 377 fixture._get_repo_create_params(
378 378 repo_private=False,
379 379 repo_name=repo_name,
380 380 repo_type=backend.alias,
381 381 repo_description=description,
382 382 csrf_token=csrf_token))
383 383 response.mustcontain('Repository name cannot end with .git')
384 384
385 385 @pytest.mark.parametrize("suffix", [u'', u'ąęł'], ids=['', 'non-ascii'])
386 386 def test_delete(self, autologin_user, backend, suffix, csrf_token):
387 387 repo = backend.create_repo(name_suffix=suffix)
388 388 repo_name = repo.repo_name
389 389
390 390 response = self.app.post(url('repo', repo_name=repo_name),
391 391 params={'_method': 'delete',
392 392 'csrf_token': csrf_token})
393 393 assert_session_flash(response, 'Deleted repository %s' % (repo_name))
394 394 response.follow()
395 395
396 396 # check if repo was deleted from db
397 397 assert RepoModel().get_by_repo_name(repo_name) is None
398 398 assert not repo_on_filesystem(repo_name)
399 399
400 400 def test_show(self, autologin_user, backend):
401 401 self.app.get(url('repo', repo_name=backend.repo_name))
402 402
403 def test_edit(self, backend, autologin_user):
404 self.app.get(url('edit_repo', repo_name=backend.repo_name))
405
406 def test_edit_accessible_when_missing_requirements(
407 self, backend_hg, autologin_user):
408 scm_patcher = mock.patch.object(
409 Repository, 'scm_instance', side_effect=RepositoryRequirementError)
410 with scm_patcher:
411 self.app.get(url('edit_repo', repo_name=backend_hg.repo_name))
412
413 def test_set_private_flag_sets_default_to_none(
414 self, autologin_user, backend, csrf_token):
415 # initially repository perm should be read
416 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
417 assert len(perm) == 1
418 assert perm[0].permission.permission_name == 'repository.read'
419 assert not backend.repo.private
420
421 response = self.app.post(
422 url('repo', repo_name=backend.repo_name),
423 fixture._get_repo_create_params(
424 repo_private=1,
425 repo_name=backend.repo_name,
426 repo_type=backend.alias,
427 user=TEST_USER_ADMIN_LOGIN,
428 _method='put',
429 csrf_token=csrf_token))
430 assert_session_flash(
431 response,
432 msg='Repository %s updated successfully' % (backend.repo_name))
433 assert backend.repo.private
434
435 # now the repo default permission should be None
436 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
437 assert len(perm) == 1
438 assert perm[0].permission.permission_name == 'repository.none'
439
440 response = self.app.post(
441 url('repo', repo_name=backend.repo_name),
442 fixture._get_repo_create_params(
443 repo_private=False,
444 repo_name=backend.repo_name,
445 repo_type=backend.alias,
446 user=TEST_USER_ADMIN_LOGIN,
447 _method='put',
448 csrf_token=csrf_token))
449 assert_session_flash(
450 response,
451 msg='Repository %s updated successfully' % (backend.repo_name))
452 assert not backend.repo.private
453
454 # we turn off private now the repo default permission should stay None
455 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
456 assert len(perm) == 1
457 assert perm[0].permission.permission_name == 'repository.none'
458
459 # update this permission back
460 perm[0].permission = Permission.get_by_key('repository.read')
461 Session().add(perm[0])
462 Session().commit()
463
464 403 def test_default_user_cannot_access_private_repo_in_a_group(
465 404 self, autologin_user, user_util, backend, csrf_token):
466 405
467 406 group = user_util.create_repo_group()
468 407
469 408 repo = backend.create_repo(
470 409 repo_private=True, repo_group=group, repo_copy_permissions=True)
471 410
472 411 permissions = _get_permission_for_user(
473 412 user='default', repo=repo.repo_name)
474 413 assert len(permissions) == 1
475 414 assert permissions[0].permission.permission_name == 'repository.none'
476 415 assert permissions[0].repository.private is True
477 416
478 417 def test_set_repo_fork_has_no_self_id(self, autologin_user, backend):
479 418 repo = backend.repo
480 419 response = self.app.get(
481 420 url('edit_repo_advanced', repo_name=backend.repo_name))
482 421 opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id
483 422 response.mustcontain(no=[opt])
484 423
485 424 def test_set_fork_of_target_repo(
486 425 self, autologin_user, backend, csrf_token):
487 426 target_repo = 'target_%s' % backend.alias
488 427 fixture.create_repo(target_repo, repo_type=backend.alias)
489 428 repo2 = Repository.get_by_repo_name(target_repo)
490 429 response = self.app.post(
491 430 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
492 431 params={'id_fork_of': repo2.repo_id, '_method': 'put',
493 432 'csrf_token': csrf_token})
494 433 repo = Repository.get_by_repo_name(backend.repo_name)
495 434 repo2 = Repository.get_by_repo_name(target_repo)
496 435 assert_session_flash(
497 436 response,
498 437 'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name))
499 438
500 439 assert repo.fork == repo2
501 440 response = response.follow()
502 441 # check if given repo is selected
503 442
504 443 opt = 'This repository is a fork of <a href="%s">%s</a>' % (
505 444 url('summary_home', repo_name=repo2.repo_name), repo2.repo_name)
506 445
507 446 response.mustcontain(opt)
508 447
509 448 fixture.destroy_repo(target_repo, forks='detach')
510 449
511 450 @pytest.mark.backends("hg", "git")
512 451 def test_set_fork_of_other_type_repo(self, autologin_user, backend,
513 452 csrf_token):
514 453 TARGET_REPO_MAP = {
515 454 'git': {
516 455 'type': 'hg',
517 456 'repo_name': HG_REPO},
518 457 'hg': {
519 458 'type': 'git',
520 459 'repo_name': GIT_REPO},
521 460 }
522 461 target_repo = TARGET_REPO_MAP[backend.alias]
523 462
524 463 repo2 = Repository.get_by_repo_name(target_repo['repo_name'])
525 464 response = self.app.post(
526 465 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
527 466 params={'id_fork_of': repo2.repo_id, '_method': 'put',
528 467 'csrf_token': csrf_token})
529 468 assert_session_flash(
530 469 response,
531 470 'Cannot set repository as fork of repository with other type')
532 471
533 472 def test_set_fork_of_none(self, autologin_user, backend, csrf_token):
534 473 # mark it as None
535 474 response = self.app.post(
536 475 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
537 476 params={'id_fork_of': None, '_method': 'put',
538 477 'csrf_token': csrf_token})
539 478 assert_session_flash(
540 479 response,
541 480 'Marked repo %s as fork of %s'
542 481 % (backend.repo_name, "Nothing"))
543 482 assert backend.repo.fork is None
544 483
545 484 def test_set_fork_of_same_repo(self, autologin_user, backend, csrf_token):
546 485 repo = Repository.get_by_repo_name(backend.repo_name)
547 486 response = self.app.post(
548 487 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
549 488 params={'id_fork_of': repo.repo_id, '_method': 'put',
550 489 'csrf_token': csrf_token})
551 490 assert_session_flash(
552 491 response, 'An error occurred during this operation')
553 492
554 493 def test_create_on_top_level_without_permissions(self, backend):
555 494 session = login_user_session(
556 495 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
557 496 csrf_token = auth.get_csrf_token(session)
558 497
559 498 # revoke
560 499 user_model = UserModel()
561 500 # disable fork and create on default user
562 501 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
563 502 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
564 503 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
565 504 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
566 505
567 506 # disable on regular user
568 507 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
569 508 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
570 509 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
571 510 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
572 511 Session().commit()
573 512
574 513 repo_name = backend.new_repo_name()
575 514 description = 'description for newly created repo'
576 515 response = self.app.post(
577 516 url('repos'),
578 517 fixture._get_repo_create_params(
579 518 repo_private=False,
580 519 repo_name=repo_name,
581 520 repo_type=backend.alias,
582 521 repo_description=description,
583 522 csrf_token=csrf_token))
584 523
585 524 response.mustcontain(
586 525 u"You do not have the permission to store repositories in "
587 526 u"the root location.")
588 527
589 528 @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
590 529 def test_create_repo_when_filesystem_op_fails(
591 530 self, autologin_user, backend, csrf_token):
592 531 repo_name = backend.new_repo_name()
593 532 description = 'description for newly created repo'
594 533
595 534 response = self.app.post(
596 535 url('repos'),
597 536 fixture._get_repo_create_params(
598 537 repo_private=False,
599 538 repo_name=repo_name,
600 539 repo_type=backend.alias,
601 540 repo_description=description,
602 541 csrf_token=csrf_token))
603 542
604 543 assert_session_flash(
605 544 response, 'Error creating repository %s' % repo_name)
606 545 # repo must not be in db
607 546 assert backend.repo is None
608 547 # repo must not be in filesystem !
609 548 assert not repo_on_filesystem(repo_name)
610 549
611 550 def assert_repository_is_created_correctly(
612 551 self, repo_name, description, backend):
613 552 repo_name_utf8 = safe_str(repo_name)
614 553
615 554 # run the check page that triggers the flash message
616 555 response = self.app.get(url('repo_check_home', repo_name=repo_name))
617 556 assert response.json == {u'result': True}
618 557
619 558 flash_msg = u'Created repository <a href="/{}">{}</a>'.format(
620 559 urllib.quote(repo_name_utf8), repo_name)
621 560 assert_session_flash(response, flash_msg)
622 561
623 562 # test if the repo was created in the database
624 563 new_repo = RepoModel().get_by_repo_name(repo_name)
625 564
626 565 assert new_repo.repo_name == repo_name
627 566 assert new_repo.description == description
628 567
629 568 # test if the repository is visible in the list ?
630 569 response = self.app.get(url('summary_home', repo_name=repo_name))
631 570 response.mustcontain(repo_name)
632 571 response.mustcontain(backend.alias)
633 572
634 573 assert repo_on_filesystem(repo_name)
635 574
636 575
637 576 @pytest.mark.usefixtures("app")
638 577 class TestVcsSettings(object):
639 578 FORM_DATA = {
640 579 'inherit_global_settings': False,
641 580 'hooks_changegroup_repo_size': False,
642 581 'hooks_changegroup_push_logger': False,
643 582 'hooks_outgoing_pull_logger': False,
644 583 'extensions_largefiles': False,
645 584 'phases_publish': 'False',
646 585 'rhodecode_pr_merge_enabled': False,
647 586 'rhodecode_use_outdated_comments': False,
648 587 'new_svn_branch': '',
649 588 'new_svn_tag': ''
650 589 }
651 590
652 591 @pytest.mark.skip_backends('svn')
653 592 def test_global_settings_initial_values(self, autologin_user, backend):
654 593 repo_name = backend.repo_name
655 594 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
656 595
657 596 expected_settings = (
658 597 'rhodecode_use_outdated_comments', 'rhodecode_pr_merge_enabled',
659 598 'hooks_changegroup_repo_size', 'hooks_changegroup_push_logger',
660 599 'hooks_outgoing_pull_logger'
661 600 )
662 601 for setting in expected_settings:
663 602 self.assert_repo_value_equals_global_value(response, setting)
664 603
665 604 def test_show_settings_requires_repo_admin_permission(
666 605 self, backend, user_util, settings_util):
667 606 repo = backend.create_repo()
668 607 repo_name = repo.repo_name
669 608 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
670 609 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
671 610 login_user_session(
672 611 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
673 612 self.app.get(url('repo_vcs_settings', repo_name=repo_name), status=200)
674 613
675 614 def test_inherit_global_settings_flag_is_true_by_default(
676 615 self, autologin_user, backend):
677 616 repo_name = backend.repo_name
678 617 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
679 618
680 619 assert_response = AssertResponse(response)
681 620 element = assert_response.get_element('#inherit_global_settings')
682 621 assert element.checked
683 622
684 623 @pytest.mark.parametrize('checked_value', [True, False])
685 624 def test_inherit_global_settings_value(
686 625 self, autologin_user, backend, checked_value, settings_util):
687 626 repo = backend.create_repo()
688 627 repo_name = repo.repo_name
689 628 settings_util.create_repo_rhodecode_setting(
690 629 repo, 'inherit_vcs_settings', checked_value, 'bool')
691 630 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
692 631
693 632 assert_response = AssertResponse(response)
694 633 element = assert_response.get_element('#inherit_global_settings')
695 634 assert element.checked == checked_value
696 635
697 636 @pytest.mark.skip_backends('svn')
698 637 def test_hooks_settings_are_created(
699 638 self, autologin_user, backend, csrf_token):
700 639 repo_name = backend.repo_name
701 640 data = self.FORM_DATA.copy()
702 641 data['csrf_token'] = csrf_token
703 642 self.app.post(
704 643 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
705 644 settings = SettingsModel(repo=repo_name)
706 645 try:
707 646 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
708 647 ui = settings.get_ui_by_section_and_key(section, key)
709 648 assert ui.ui_active is False
710 649 finally:
711 650 self._cleanup_repo_settings(settings)
712 651
713 652 def test_hooks_settings_are_not_created_for_svn(
714 653 self, autologin_user, backend_svn, csrf_token):
715 654 repo_name = backend_svn.repo_name
716 655 data = self.FORM_DATA.copy()
717 656 data['csrf_token'] = csrf_token
718 657 self.app.post(
719 658 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
720 659 settings = SettingsModel(repo=repo_name)
721 660 try:
722 661 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
723 662 ui = settings.get_ui_by_section_and_key(section, key)
724 663 assert ui is None
725 664 finally:
726 665 self._cleanup_repo_settings(settings)
727 666
728 667 @pytest.mark.skip_backends('svn')
729 668 def test_hooks_settings_are_updated(
730 669 self, autologin_user, backend, csrf_token):
731 670 repo_name = backend.repo_name
732 671 settings = SettingsModel(repo=repo_name)
733 672 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
734 673 settings.create_ui_section_value(section, '', key=key, active=True)
735 674
736 675 data = self.FORM_DATA.copy()
737 676 data['csrf_token'] = csrf_token
738 677 self.app.post(
739 678 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
740 679 try:
741 680 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
742 681 ui = settings.get_ui_by_section_and_key(section, key)
743 682 assert ui.ui_active is False
744 683 finally:
745 684 self._cleanup_repo_settings(settings)
746 685
747 686 def test_hooks_settings_are_not_updated_for_svn(
748 687 self, autologin_user, backend_svn, csrf_token):
749 688 repo_name = backend_svn.repo_name
750 689 settings = SettingsModel(repo=repo_name)
751 690 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
752 691 settings.create_ui_section_value(section, '', key=key, active=True)
753 692
754 693 data = self.FORM_DATA.copy()
755 694 data['csrf_token'] = csrf_token
756 695 self.app.post(
757 696 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
758 697 try:
759 698 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
760 699 ui = settings.get_ui_by_section_and_key(section, key)
761 700 assert ui.ui_active is True
762 701 finally:
763 702 self._cleanup_repo_settings(settings)
764 703
765 704 @pytest.mark.skip_backends('svn')
766 705 def test_pr_settings_are_created(
767 706 self, autologin_user, backend, csrf_token):
768 707 repo_name = backend.repo_name
769 708 data = self.FORM_DATA.copy()
770 709 data['csrf_token'] = csrf_token
771 710 self.app.post(
772 711 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
773 712 settings = SettingsModel(repo=repo_name)
774 713 try:
775 714 for name in VcsSettingsModel.GENERAL_SETTINGS:
776 715 setting = settings.get_setting_by_name(name)
777 716 assert setting.app_settings_value is False
778 717 finally:
779 718 self._cleanup_repo_settings(settings)
780 719
781 720 def test_pr_settings_are_not_created_for_svn(
782 721 self, autologin_user, backend_svn, csrf_token):
783 722 repo_name = backend_svn.repo_name
784 723 data = self.FORM_DATA.copy()
785 724 data['csrf_token'] = csrf_token
786 725 self.app.post(
787 726 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
788 727 settings = SettingsModel(repo=repo_name)
789 728 try:
790 729 for name in VcsSettingsModel.GENERAL_SETTINGS:
791 730 setting = settings.get_setting_by_name(name)
792 731 assert setting is None
793 732 finally:
794 733 self._cleanup_repo_settings(settings)
795 734
796 735 def test_pr_settings_creation_requires_repo_admin_permission(
797 736 self, backend, user_util, settings_util, csrf_token):
798 737 repo = backend.create_repo()
799 738 repo_name = repo.repo_name
800 739
801 740 logout_user_session(self.app, csrf_token)
802 741 session = login_user_session(
803 742 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
804 743 new_csrf_token = auth.get_csrf_token(session)
805 744
806 745 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
807 746 repo = Repository.get_by_repo_name(repo_name)
808 747 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
809 748 data = self.FORM_DATA.copy()
810 749 data['csrf_token'] = new_csrf_token
811 750 settings = SettingsModel(repo=repo_name)
812 751
813 752 try:
814 753 self.app.post(
815 754 url('repo_vcs_settings', repo_name=repo_name), data,
816 755 status=302)
817 756 finally:
818 757 self._cleanup_repo_settings(settings)
819 758
820 759 @pytest.mark.skip_backends('svn')
821 760 def test_pr_settings_are_updated(
822 761 self, autologin_user, backend, csrf_token):
823 762 repo_name = backend.repo_name
824 763 settings = SettingsModel(repo=repo_name)
825 764 for name in VcsSettingsModel.GENERAL_SETTINGS:
826 765 settings.create_or_update_setting(name, True, 'bool')
827 766
828 767 data = self.FORM_DATA.copy()
829 768 data['csrf_token'] = csrf_token
830 769 self.app.post(
831 770 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
832 771 try:
833 772 for name in VcsSettingsModel.GENERAL_SETTINGS:
834 773 setting = settings.get_setting_by_name(name)
835 774 assert setting.app_settings_value is False
836 775 finally:
837 776 self._cleanup_repo_settings(settings)
838 777
839 778 def test_pr_settings_are_not_updated_for_svn(
840 779 self, autologin_user, backend_svn, csrf_token):
841 780 repo_name = backend_svn.repo_name
842 781 settings = SettingsModel(repo=repo_name)
843 782 for name in VcsSettingsModel.GENERAL_SETTINGS:
844 783 settings.create_or_update_setting(name, True, 'bool')
845 784
846 785 data = self.FORM_DATA.copy()
847 786 data['csrf_token'] = csrf_token
848 787 self.app.post(
849 788 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
850 789 try:
851 790 for name in VcsSettingsModel.GENERAL_SETTINGS:
852 791 setting = settings.get_setting_by_name(name)
853 792 assert setting.app_settings_value is True
854 793 finally:
855 794 self._cleanup_repo_settings(settings)
856 795
857 796 def test_svn_settings_are_created(
858 797 self, autologin_user, backend_svn, csrf_token, settings_util):
859 798 repo_name = backend_svn.repo_name
860 799 data = self.FORM_DATA.copy()
861 800 data['new_svn_tag'] = 'svn-tag'
862 801 data['new_svn_branch'] = 'svn-branch'
863 802 data['csrf_token'] = csrf_token
864 803
865 804 # Create few global settings to make sure that uniqueness validators
866 805 # are not triggered
867 806 settings_util.create_rhodecode_ui(
868 807 VcsSettingsModel.SVN_BRANCH_SECTION, 'svn-branch')
869 808 settings_util.create_rhodecode_ui(
870 809 VcsSettingsModel.SVN_TAG_SECTION, 'svn-tag')
871 810
872 811 self.app.post(
873 812 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
874 813 settings = SettingsModel(repo=repo_name)
875 814 try:
876 815 svn_branches = settings.get_ui_by_section(
877 816 VcsSettingsModel.SVN_BRANCH_SECTION)
878 817 svn_branch_names = [b.ui_value for b in svn_branches]
879 818 svn_tags = settings.get_ui_by_section(
880 819 VcsSettingsModel.SVN_TAG_SECTION)
881 820 svn_tag_names = [b.ui_value for b in svn_tags]
882 821 assert 'svn-branch' in svn_branch_names
883 822 assert 'svn-tag' in svn_tag_names
884 823 finally:
885 824 self._cleanup_repo_settings(settings)
886 825
887 826 def test_svn_settings_are_unique(
888 827 self, autologin_user, backend_svn, csrf_token, settings_util):
889 828 repo = backend_svn.repo
890 829 repo_name = repo.repo_name
891 830 data = self.FORM_DATA.copy()
892 831 data['new_svn_tag'] = 'test_tag'
893 832 data['new_svn_branch'] = 'test_branch'
894 833 data['csrf_token'] = csrf_token
895 834 settings_util.create_repo_rhodecode_ui(
896 835 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch')
897 836 settings_util.create_repo_rhodecode_ui(
898 837 repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag')
899 838
900 839 response = self.app.post(
901 840 url('repo_vcs_settings', repo_name=repo_name), data, status=200)
902 841 response.mustcontain('Pattern already exists')
903 842
904 843 def test_svn_settings_with_empty_values_are_not_created(
905 844 self, autologin_user, backend_svn, csrf_token):
906 845 repo_name = backend_svn.repo_name
907 846 data = self.FORM_DATA.copy()
908 847 data['csrf_token'] = csrf_token
909 848 self.app.post(
910 849 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
911 850 settings = SettingsModel(repo=repo_name)
912 851 try:
913 852 svn_branches = settings.get_ui_by_section(
914 853 VcsSettingsModel.SVN_BRANCH_SECTION)
915 854 svn_tags = settings.get_ui_by_section(
916 855 VcsSettingsModel.SVN_TAG_SECTION)
917 856 assert len(svn_branches) == 0
918 857 assert len(svn_tags) == 0
919 858 finally:
920 859 self._cleanup_repo_settings(settings)
921 860
922 861 def test_svn_settings_are_shown_for_svn_repository(
923 862 self, autologin_user, backend_svn, csrf_token):
924 863 repo_name = backend_svn.repo_name
925 864 response = self.app.get(
926 865 url('repo_vcs_settings', repo_name=repo_name), status=200)
927 866 response.mustcontain('Subversion Settings')
928 867
929 868 @pytest.mark.skip_backends('svn')
930 869 def test_svn_settings_are_not_created_for_not_svn_repository(
931 870 self, autologin_user, backend, csrf_token):
932 871 repo_name = backend.repo_name
933 872 data = self.FORM_DATA.copy()
934 873 data['csrf_token'] = csrf_token
935 874 self.app.post(
936 875 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
937 876 settings = SettingsModel(repo=repo_name)
938 877 try:
939 878 svn_branches = settings.get_ui_by_section(
940 879 VcsSettingsModel.SVN_BRANCH_SECTION)
941 880 svn_tags = settings.get_ui_by_section(
942 881 VcsSettingsModel.SVN_TAG_SECTION)
943 882 assert len(svn_branches) == 0
944 883 assert len(svn_tags) == 0
945 884 finally:
946 885 self._cleanup_repo_settings(settings)
947 886
948 887 @pytest.mark.skip_backends('svn')
949 888 def test_svn_settings_are_shown_only_for_svn_repository(
950 889 self, autologin_user, backend, csrf_token):
951 890 repo_name = backend.repo_name
952 891 response = self.app.get(
953 892 url('repo_vcs_settings', repo_name=repo_name), status=200)
954 893 response.mustcontain(no='Subversion Settings')
955 894
956 895 def test_hg_settings_are_created(
957 896 self, autologin_user, backend_hg, csrf_token):
958 897 repo_name = backend_hg.repo_name
959 898 data = self.FORM_DATA.copy()
960 899 data['new_svn_tag'] = 'svn-tag'
961 900 data['new_svn_branch'] = 'svn-branch'
962 901 data['csrf_token'] = csrf_token
963 902 self.app.post(
964 903 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
965 904 settings = SettingsModel(repo=repo_name)
966 905 try:
967 906 largefiles_ui = settings.get_ui_by_section_and_key(
968 907 'extensions', 'largefiles')
969 908 assert largefiles_ui.ui_active is False
970 909 phases_ui = settings.get_ui_by_section_and_key(
971 910 'phases', 'publish')
972 911 assert str2bool(phases_ui.ui_value) is False
973 912 finally:
974 913 self._cleanup_repo_settings(settings)
975 914
976 915 def test_hg_settings_are_updated(
977 916 self, autologin_user, backend_hg, csrf_token):
978 917 repo_name = backend_hg.repo_name
979 918 settings = SettingsModel(repo=repo_name)
980 919 settings.create_ui_section_value(
981 920 'extensions', '', key='largefiles', active=True)
982 921 settings.create_ui_section_value(
983 922 'phases', '1', key='publish', active=True)
984 923
985 924 data = self.FORM_DATA.copy()
986 925 data['csrf_token'] = csrf_token
987 926 self.app.post(
988 927 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
989 928 try:
990 929 largefiles_ui = settings.get_ui_by_section_and_key(
991 930 'extensions', 'largefiles')
992 931 assert largefiles_ui.ui_active is False
993 932 phases_ui = settings.get_ui_by_section_and_key(
994 933 'phases', 'publish')
995 934 assert str2bool(phases_ui.ui_value) is False
996 935 finally:
997 936 self._cleanup_repo_settings(settings)
998 937
999 938 def test_hg_settings_are_shown_for_hg_repository(
1000 939 self, autologin_user, backend_hg, csrf_token):
1001 940 repo_name = backend_hg.repo_name
1002 941 response = self.app.get(
1003 942 url('repo_vcs_settings', repo_name=repo_name), status=200)
1004 943 response.mustcontain('Mercurial Settings')
1005 944
1006 945 @pytest.mark.skip_backends('hg')
1007 946 def test_hg_settings_are_created_only_for_hg_repository(
1008 947 self, autologin_user, backend, csrf_token):
1009 948 repo_name = backend.repo_name
1010 949 data = self.FORM_DATA.copy()
1011 950 data['csrf_token'] = csrf_token
1012 951 self.app.post(
1013 952 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1014 953 settings = SettingsModel(repo=repo_name)
1015 954 try:
1016 955 largefiles_ui = settings.get_ui_by_section_and_key(
1017 956 'extensions', 'largefiles')
1018 957 assert largefiles_ui is None
1019 958 phases_ui = settings.get_ui_by_section_and_key(
1020 959 'phases', 'publish')
1021 960 assert phases_ui is None
1022 961 finally:
1023 962 self._cleanup_repo_settings(settings)
1024 963
1025 964 @pytest.mark.skip_backends('hg')
1026 965 def test_hg_settings_are_shown_only_for_hg_repository(
1027 966 self, autologin_user, backend, csrf_token):
1028 967 repo_name = backend.repo_name
1029 968 response = self.app.get(
1030 969 url('repo_vcs_settings', repo_name=repo_name), status=200)
1031 970 response.mustcontain(no='Mercurial Settings')
1032 971
1033 972 @pytest.mark.skip_backends('hg')
1034 973 def test_hg_settings_are_updated_only_for_hg_repository(
1035 974 self, autologin_user, backend, csrf_token):
1036 975 repo_name = backend.repo_name
1037 976 settings = SettingsModel(repo=repo_name)
1038 977 settings.create_ui_section_value(
1039 978 'extensions', '', key='largefiles', active=True)
1040 979 settings.create_ui_section_value(
1041 980 'phases', '1', key='publish', active=True)
1042 981
1043 982 data = self.FORM_DATA.copy()
1044 983 data['csrf_token'] = csrf_token
1045 984 self.app.post(
1046 985 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1047 986 try:
1048 987 largefiles_ui = settings.get_ui_by_section_and_key(
1049 988 'extensions', 'largefiles')
1050 989 assert largefiles_ui.ui_active is True
1051 990 phases_ui = settings.get_ui_by_section_and_key(
1052 991 'phases', 'publish')
1053 992 assert phases_ui.ui_value == '1'
1054 993 finally:
1055 994 self._cleanup_repo_settings(settings)
1056 995
1057 996 def test_per_repo_svn_settings_are_displayed(
1058 997 self, autologin_user, backend_svn, settings_util):
1059 998 repo = backend_svn.create_repo()
1060 999 repo_name = repo.repo_name
1061 1000 branches = [
1062 1001 settings_util.create_repo_rhodecode_ui(
1063 1002 repo, VcsSettingsModel.SVN_BRANCH_SECTION,
1064 1003 'branch_{}'.format(i))
1065 1004 for i in range(10)]
1066 1005 tags = [
1067 1006 settings_util.create_repo_rhodecode_ui(
1068 1007 repo, VcsSettingsModel.SVN_TAG_SECTION, 'tag_{}'.format(i))
1069 1008 for i in range(10)]
1070 1009
1071 1010 response = self.app.get(
1072 1011 url('repo_vcs_settings', repo_name=repo_name), status=200)
1073 1012 assert_response = AssertResponse(response)
1074 1013 for branch in branches:
1075 1014 css_selector = '[name=branch_value_{}]'.format(branch.ui_id)
1076 1015 element = assert_response.get_element(css_selector)
1077 1016 assert element.value == branch.ui_value
1078 1017 for tag in tags:
1079 1018 css_selector = '[name=tag_ui_value_new_{}]'.format(tag.ui_id)
1080 1019 element = assert_response.get_element(css_selector)
1081 1020 assert element.value == tag.ui_value
1082 1021
1083 1022 def test_per_repo_hg_and_pr_settings_are_not_displayed_for_svn(
1084 1023 self, autologin_user, backend_svn, settings_util):
1085 1024 repo = backend_svn.create_repo()
1086 1025 repo_name = repo.repo_name
1087 1026 response = self.app.get(
1088 1027 url('repo_vcs_settings', repo_name=repo_name), status=200)
1089 1028 response.mustcontain(no='<label>Hooks:</label>')
1090 1029 response.mustcontain(no='<label>Pull Request Settings:</label>')
1091 1030
1092 1031 def test_inherit_global_settings_value_is_saved(
1093 1032 self, autologin_user, backend, csrf_token):
1094 1033 repo_name = backend.repo_name
1095 1034 data = self.FORM_DATA.copy()
1096 1035 data['csrf_token'] = csrf_token
1097 1036 data['inherit_global_settings'] = True
1098 1037 self.app.post(
1099 1038 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1100 1039
1101 1040 settings = SettingsModel(repo=repo_name)
1102 1041 vcs_settings = VcsSettingsModel(repo=repo_name)
1103 1042 try:
1104 1043 assert vcs_settings.inherit_global_settings is True
1105 1044 finally:
1106 1045 self._cleanup_repo_settings(settings)
1107 1046
1108 1047 def test_repo_cache_is_invalidated_when_settings_are_updated(
1109 1048 self, autologin_user, backend, csrf_token):
1110 1049 repo_name = backend.repo_name
1111 1050 data = self.FORM_DATA.copy()
1112 1051 data['csrf_token'] = csrf_token
1113 1052 data['inherit_global_settings'] = True
1114 1053 settings = SettingsModel(repo=repo_name)
1115 1054
1116 1055 invalidation_patcher = mock.patch(
1117 1056 'rhodecode.controllers.admin.repos.ScmModel.mark_for_invalidation')
1118 1057 with invalidation_patcher as invalidation_mock:
1119 1058 self.app.post(
1120 1059 url('repo_vcs_settings', repo_name=repo_name), data,
1121 1060 status=302)
1122 1061 try:
1123 1062 invalidation_mock.assert_called_once_with(repo_name, delete=True)
1124 1063 finally:
1125 1064 self._cleanup_repo_settings(settings)
1126 1065
1127 1066 def test_other_settings_not_saved_inherit_global_settings_is_true(
1128 1067 self, autologin_user, backend, csrf_token):
1129 1068 repo_name = backend.repo_name
1130 1069 data = self.FORM_DATA.copy()
1131 1070 data['csrf_token'] = csrf_token
1132 1071 data['inherit_global_settings'] = True
1133 1072 self.app.post(
1134 1073 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1135 1074
1136 1075 settings = SettingsModel(repo=repo_name)
1137 1076 ui_settings = (
1138 1077 VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS)
1139 1078
1140 1079 vcs_settings = []
1141 1080 try:
1142 1081 for section, key in ui_settings:
1143 1082 ui = settings.get_ui_by_section_and_key(section, key)
1144 1083 if ui:
1145 1084 vcs_settings.append(ui)
1146 1085 vcs_settings.extend(settings.get_ui_by_section(
1147 1086 VcsSettingsModel.SVN_BRANCH_SECTION))
1148 1087 vcs_settings.extend(settings.get_ui_by_section(
1149 1088 VcsSettingsModel.SVN_TAG_SECTION))
1150 1089 for name in VcsSettingsModel.GENERAL_SETTINGS:
1151 1090 setting = settings.get_setting_by_name(name)
1152 1091 if setting:
1153 1092 vcs_settings.append(setting)
1154 1093 assert vcs_settings == []
1155 1094 finally:
1156 1095 self._cleanup_repo_settings(settings)
1157 1096
1158 1097 def test_delete_svn_branch_and_tag_patterns(
1159 1098 self, autologin_user, backend_svn, settings_util, csrf_token):
1160 1099 repo = backend_svn.create_repo()
1161 1100 repo_name = repo.repo_name
1162 1101 branch = settings_util.create_repo_rhodecode_ui(
1163 1102 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch',
1164 1103 cleanup=False)
1165 1104 tag = settings_util.create_repo_rhodecode_ui(
1166 1105 repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag', cleanup=False)
1167 1106 data = {
1168 1107 '_method': 'delete',
1169 1108 'csrf_token': csrf_token
1170 1109 }
1171 1110 for id_ in (branch.ui_id, tag.ui_id):
1172 1111 data['delete_svn_pattern'] = id_,
1173 1112 self.app.post(
1174 1113 url('repo_vcs_settings', repo_name=repo_name), data,
1175 1114 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
1176 1115 settings = VcsSettingsModel(repo=repo_name)
1177 1116 assert settings.get_repo_svn_branch_patterns() == []
1178 1117
1179 1118 def test_delete_svn_branch_requires_repo_admin_permission(
1180 1119 self, backend_svn, user_util, settings_util, csrf_token):
1181 1120 repo = backend_svn.create_repo()
1182 1121 repo_name = repo.repo_name
1183 1122
1184 1123 logout_user_session(self.app, csrf_token)
1185 1124 session = login_user_session(
1186 1125 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
1187 1126 csrf_token = auth.get_csrf_token(session)
1188 1127
1189 1128 repo = Repository.get_by_repo_name(repo_name)
1190 1129 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1191 1130 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
1192 1131 branch = settings_util.create_repo_rhodecode_ui(
1193 1132 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch',
1194 1133 cleanup=False)
1195 1134 data = {
1196 1135 '_method': 'delete',
1197 1136 'csrf_token': csrf_token,
1198 1137 'delete_svn_pattern': branch.ui_id
1199 1138 }
1200 1139 self.app.post(
1201 1140 url('repo_vcs_settings', repo_name=repo_name), data,
1202 1141 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
1203 1142
1204 1143 def test_delete_svn_branch_raises_400_when_not_found(
1205 1144 self, autologin_user, backend_svn, settings_util, csrf_token):
1206 1145 repo_name = backend_svn.repo_name
1207 1146 data = {
1208 1147 '_method': 'delete',
1209 1148 'delete_svn_pattern': 123,
1210 1149 'csrf_token': csrf_token
1211 1150 }
1212 1151 self.app.post(
1213 1152 url('repo_vcs_settings', repo_name=repo_name), data,
1214 1153 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400)
1215 1154
1216 1155 def test_delete_svn_branch_raises_400_when_no_id_specified(
1217 1156 self, autologin_user, backend_svn, settings_util, csrf_token):
1218 1157 repo_name = backend_svn.repo_name
1219 1158 data = {
1220 1159 '_method': 'delete',
1221 1160 'csrf_token': csrf_token
1222 1161 }
1223 1162 self.app.post(
1224 1163 url('repo_vcs_settings', repo_name=repo_name), data,
1225 1164 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400)
1226 1165
1227 1166 def _cleanup_repo_settings(self, settings_model):
1228 1167 cleanup = []
1229 1168 ui_settings = (
1230 1169 VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS)
1231 1170
1232 1171 for section, key in ui_settings:
1233 1172 ui = settings_model.get_ui_by_section_and_key(section, key)
1234 1173 if ui:
1235 1174 cleanup.append(ui)
1236 1175
1237 1176 cleanup.extend(settings_model.get_ui_by_section(
1238 1177 VcsSettingsModel.INHERIT_SETTINGS))
1239 1178 cleanup.extend(settings_model.get_ui_by_section(
1240 1179 VcsSettingsModel.SVN_BRANCH_SECTION))
1241 1180 cleanup.extend(settings_model.get_ui_by_section(
1242 1181 VcsSettingsModel.SVN_TAG_SECTION))
1243 1182
1244 1183 for name in VcsSettingsModel.GENERAL_SETTINGS:
1245 1184 setting = settings_model.get_setting_by_name(name)
1246 1185 if setting:
1247 1186 cleanup.append(setting)
1248 1187
1249 1188 for object_ in cleanup:
1250 1189 Session().delete(object_)
1251 1190 Session().commit()
1252 1191
1253 1192 def assert_repo_value_equals_global_value(self, response, setting):
1254 1193 assert_response = AssertResponse(response)
1255 1194 global_css_selector = '[name={}_inherited]'.format(setting)
1256 1195 repo_css_selector = '[name={}]'.format(setting)
1257 1196 repo_element = assert_response.get_element(repo_css_selector)
1258 1197 global_element = assert_response.get_element(global_css_selector)
1259 1198 assert repo_element.value == global_element.value
1260 1199
1261 1200
1262 1201 def _get_permission_for_user(user, repo):
1263 1202 perm = UserRepoToPerm.query()\
1264 1203 .filter(UserRepoToPerm.repository ==
1265 1204 Repository.get_by_repo_name(repo))\
1266 1205 .filter(UserRepoToPerm.user == User.get_by_username(user))\
1267 1206 .all()
1268 1207 return perm
@@ -1,116 +1,116 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import colander
22 22 import pytest
23 23
24 24 from rhodecode.model.validation_schema import types
25 25 from rhodecode.model.validation_schema.schemas import repo_group_schema
26 26
27 27
28 28 class TestRepoGroupSchema(object):
29 29
30 30 @pytest.mark.parametrize('given, expected', [
31 31 ('my repo', 'my-repo'),
32 32 (' hello world mike ', 'hello-world-mike'),
33 33
34 34 ('//group1/group2//', 'group1/group2'),
35 35 ('//group1///group2//', 'group1/group2'),
36 36 ('///group1/group2///group3', 'group1/group2/group3'),
37 37 ('word g1/group2///group3', 'word-g1/group2/group3'),
38 38
39 39 ('grou p1/gro;,,##up2//.../group3', 'grou-p1/group2/group3'),
40 40
41 41 ('group,,,/,,,/1/2/3', 'group/1/2/3'),
42 42 ('grou[]p1/gro;up2///gro up3', 'group1/group2/gro-up3'),
43 43 (u'grou[]p1/gro;up2///gro up3/ąć', u'group1/group2/gro-up3/ąć'),
44 44 ])
45 45 def test_deserialize_repo_name(self, app, user_admin, given, expected):
46 46 schema = repo_group_schema.RepoGroupSchema().bind()
47 47 assert schema.get('repo_group_name').deserialize(given) == expected
48 48
49 49 def test_deserialize(self, app, user_admin):
50 50 schema = repo_group_schema.RepoGroupSchema().bind(
51 51 user=user_admin
52 52 )
53 53
54 54 schema_data = schema.deserialize(dict(
55 repo_group_name='dupa',
55 repo_group_name='my_schema_group',
56 56 repo_group_owner=user_admin.username
57 57 ))
58 58
59 assert schema_data['repo_group_name'] == 'dupa'
59 assert schema_data['repo_group_name'] == u'my_schema_group'
60 60 assert schema_data['repo_group'] == {
61 61 'repo_group_id': None,
62 62 'repo_group_name': types.RootLocation,
63 'repo_group_name_without_group': 'dupa'}
63 'repo_group_name_without_group': u'my_schema_group'}
64 64
65 65 @pytest.mark.parametrize('given, err_key, expected_exc', [
66 ('xxx/dupa', 'repo_group', 'Parent repository group `xxx` does not exist'),
66 ('xxx/my_schema_group', 'repo_group', 'Parent repository group `xxx` does not exist'),
67 67 ('', 'repo_group_name', 'Name must start with a letter or number. Got ``'),
68 68 ])
69 69 def test_deserialize_with_bad_group_name(
70 70 self, app, user_admin, given, err_key, expected_exc):
71 71 schema = repo_group_schema.RepoGroupSchema().bind(
72 72 repo_type_options=['hg'],
73 73 user=user_admin
74 74 )
75 75
76 76 with pytest.raises(colander.Invalid) as excinfo:
77 77 schema.deserialize(dict(
78 78 repo_group_name=given,
79 79 repo_group_owner=user_admin.username
80 80 ))
81 81
82 82 assert excinfo.value.asdict()[err_key] == expected_exc
83 83
84 84 def test_deserialize_with_group_name(self, app, user_admin, test_repo_group):
85 85 schema = repo_group_schema.RepoGroupSchema().bind(
86 86 user=user_admin
87 87 )
88 88
89 full_name = test_repo_group.group_name + '/dupa'
89 full_name = test_repo_group.group_name + u'/my_schema_group'
90 90 schema_data = schema.deserialize(dict(
91 91 repo_group_name=full_name,
92 92 repo_group_owner=user_admin.username
93 93 ))
94 94
95 95 assert schema_data['repo_group_name'] == full_name
96 96 assert schema_data['repo_group'] == {
97 97 'repo_group_id': test_repo_group.group_id,
98 98 'repo_group_name': test_repo_group.group_name,
99 'repo_group_name_without_group': 'dupa'}
99 'repo_group_name_without_group': u'my_schema_group'}
100 100
101 101 def test_deserialize_with_group_name_regular_user_no_perms(
102 102 self, app, user_regular, test_repo_group):
103 103 schema = repo_group_schema.RepoGroupSchema().bind(
104 104 user=user_regular
105 105 )
106 106
107 full_name = test_repo_group.group_name + '/dupa'
107 full_name = test_repo_group.group_name + u'/my_schema_group'
108 108 with pytest.raises(colander.Invalid) as excinfo:
109 109 schema.deserialize(dict(
110 110 repo_group_name=full_name,
111 111 repo_group_owner=user_regular.username
112 112 ))
113 113
114 114 expected = 'Parent repository group `{}` does not exist'.format(
115 115 test_repo_group.group_name)
116 116 assert excinfo.value.asdict()['repo_group'] == expected
@@ -1,128 +1,130 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import colander
22 22 import pytest
23 23
24 24 from rhodecode.model.validation_schema import types
25 25 from rhodecode.model.validation_schema.schemas import repo_schema
26 26
27 27
28 28 class TestRepoSchema(object):
29 29
30 30 #TODO:
31 31 # test nested groups
32 32
33 33 @pytest.mark.parametrize('given, expected', [
34 34 ('my repo', 'my-repo'),
35 35 (' hello world mike ', 'hello-world-mike'),
36 36
37 37 ('//group1/group2//', 'group1/group2'),
38 38 ('//group1///group2//', 'group1/group2'),
39 39 ('///group1/group2///group3', 'group1/group2/group3'),
40 40 ('word g1/group2///group3', 'word-g1/group2/group3'),
41 41
42 42 ('grou p1/gro;,,##up2//.../group3', 'grou-p1/group2/group3'),
43 43
44 44 ('group,,,/,,,/1/2/3', 'group/1/2/3'),
45 45 ('grou[]p1/gro;up2///gro up3', 'group1/group2/gro-up3'),
46 46 (u'grou[]p1/gro;up2///gro up3/ąć', u'group1/group2/gro-up3/ąć'),
47 47 ])
48 48 def test_deserialize_repo_name(self, app, user_admin, given, expected):
49 49
50 50 schema = repo_schema.RepoSchema().bind()
51 51 assert expected == schema.get('repo_name').deserialize(given)
52 52
53 53 def test_deserialize(self, app, user_admin):
54 54 schema = repo_schema.RepoSchema().bind(
55 55 repo_type_options=['hg'],
56 56 user=user_admin
57 57 )
58 58
59 59 schema_data = schema.deserialize(dict(
60 repo_name='dupa',
60 repo_name='my_schema_repo',
61 61 repo_type='hg',
62 62 repo_owner=user_admin.username
63 63 ))
64 64
65 assert schema_data['repo_name'] == 'dupa'
65 assert schema_data['repo_name'] == u'my_schema_repo'
66 66 assert schema_data['repo_group'] == {
67 67 'repo_group_id': None,
68 68 'repo_group_name': types.RootLocation,
69 'repo_name_without_group': 'dupa'}
69 'repo_name_with_group': u'my_schema_repo',
70 'repo_name_without_group': u'my_schema_repo'}
70 71
71 72 @pytest.mark.parametrize('given, err_key, expected_exc', [
72 ('xxx/dupa','repo_group', 'Repository group `xxx` does not exist'),
73 ('xxx/my_schema_repo','repo_group', 'Repository group `xxx` does not exist'),
73 74 ('', 'repo_name', 'Name must start with a letter or number. Got ``'),
74 75 ])
75 76 def test_deserialize_with_bad_group_name(
76 77 self, app, user_admin, given, err_key, expected_exc):
77 78
78 79 schema = repo_schema.RepoSchema().bind(
79 80 repo_type_options=['hg'],
80 81 user=user_admin
81 82 )
82 83
83 84 with pytest.raises(colander.Invalid) as excinfo:
84 85 schema.deserialize(dict(
85 86 repo_name=given,
86 87 repo_type='hg',
87 88 repo_owner=user_admin.username
88 89 ))
89 90
90 91 assert excinfo.value.asdict()[err_key] == expected_exc
91 92
92 93 def test_deserialize_with_group_name(self, app, user_admin, test_repo_group):
93 94 schema = repo_schema.RepoSchema().bind(
94 95 repo_type_options=['hg'],
95 96 user=user_admin
96 97 )
97 98
98 full_name = test_repo_group.group_name + '/dupa'
99 full_name = test_repo_group.group_name + u'/my_schema_repo'
99 100 schema_data = schema.deserialize(dict(
100 101 repo_name=full_name,
101 102 repo_type='hg',
102 103 repo_owner=user_admin.username
103 104 ))
104 105
105 106 assert schema_data['repo_name'] == full_name
106 107 assert schema_data['repo_group'] == {
107 108 'repo_group_id': test_repo_group.group_id,
108 109 'repo_group_name': test_repo_group.group_name,
109 'repo_name_without_group': 'dupa'}
110 'repo_name_with_group': full_name,
111 'repo_name_without_group': u'my_schema_repo'}
110 112
111 113 def test_deserialize_with_group_name_regular_user_no_perms(
112 114 self, app, user_regular, test_repo_group):
113 115 schema = repo_schema.RepoSchema().bind(
114 116 repo_type_options=['hg'],
115 117 user=user_regular
116 118 )
117 119
118 full_name = test_repo_group.group_name + '/dupa'
120 full_name = test_repo_group.group_name + '/my_schema_repo'
119 121 with pytest.raises(colander.Invalid) as excinfo:
120 122 schema.deserialize(dict(
121 123 repo_name=full_name,
122 124 repo_type='hg',
123 125 repo_owner=user_regular.username
124 126 ))
125 127
126 128 expected = 'Repository group `{}` does not exist'.format(
127 129 test_repo_group.group_name)
128 130 assert excinfo.value.asdict()['repo_group'] == expected
@@ -1,335 +1,336 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import stat
23 23 import sys
24 24
25 25 import pytest
26 26 from mock import Mock, patch, DEFAULT
27 27
28 28 import rhodecode
29 29 from rhodecode.model import db, scm
30 from rhodecode.tests import no_newline_id_generator
30 31
31 32
32 33 def test_scm_instance_config(backend):
33 34 repo = backend.create_repo()
34 35 with patch.multiple('rhodecode.model.db.Repository',
35 36 _get_instance=DEFAULT,
36 37 _get_instance_cached=DEFAULT) as mocks:
37 38 repo.scm_instance()
38 39 mocks['_get_instance'].assert_called_with(
39 40 config=None, cache=False)
40 41
41 42 config = {'some': 'value'}
42 43 repo.scm_instance(config=config)
43 44 mocks['_get_instance'].assert_called_with(
44 45 config=config, cache=False)
45 46
46 47 with patch.dict(rhodecode.CONFIG, {'vcs_full_cache': 'true'}):
47 48 repo.scm_instance(config=config)
48 49 mocks['_get_instance_cached'].assert_called()
49 50
50 51
51 52 def test__get_instance_config(backend):
52 53 repo = backend.create_repo()
53 54 vcs_class = Mock()
54 55 with patch.multiple('rhodecode.lib.vcs.backends',
55 56 get_scm=DEFAULT,
56 57 get_backend=DEFAULT) as mocks:
57 58 mocks['get_scm'].return_value = backend.alias
58 59 mocks['get_backend'].return_value = vcs_class
59 60 with patch('rhodecode.model.db.Repository._config') as config_mock:
60 61 repo._get_instance()
61 62 vcs_class.assert_called_with(
62 63 repo_path=repo.repo_full_path, config=config_mock,
63 64 create=False, with_wire={'cache': True})
64 65
65 66 new_config = {'override': 'old_config'}
66 67 repo._get_instance(config=new_config)
67 68 vcs_class.assert_called_with(
68 69 repo_path=repo.repo_full_path, config=new_config, create=False,
69 70 with_wire={'cache': True})
70 71
71 72
72 73 def test_mark_for_invalidation_config(backend):
73 74 repo = backend.create_repo()
74 75 with patch('rhodecode.model.db.Repository.update_commit_cache') as _mock:
75 76 scm.ScmModel().mark_for_invalidation(repo.repo_name)
76 77 _, kwargs = _mock.call_args
77 78 assert kwargs['config'].__dict__ == repo._config.__dict__
78 79
79 80
80 81 def test_mark_for_invalidation_with_delete_updates_last_commit(backend):
81 82 commits = [{'message': 'A'}, {'message': 'B'}]
82 83 repo = backend.create_repo(commits=commits)
83 84 scm.ScmModel().mark_for_invalidation(repo.repo_name, delete=True)
84 85 assert repo.changeset_cache['revision'] == 1
85 86
86 87
87 88 def test_mark_for_invalidation_with_delete_updates_last_commit_empty(backend):
88 89 repo = backend.create_repo()
89 90 scm.ScmModel().mark_for_invalidation(repo.repo_name, delete=True)
90 91 assert repo.changeset_cache['revision'] == -1
91 92
92 93
93 94 def test_strip_with_multiple_heads(backend_hg):
94 95 commits = [
95 96 {'message': 'A'},
96 97 {'message': 'a'},
97 98 {'message': 'b'},
98 99 {'message': 'B', 'parents': ['A']},
99 100 {'message': 'a1'},
100 101 ]
101 102 repo = backend_hg.create_repo(commits=commits)
102 103 commit_ids = backend_hg.commit_ids
103 104
104 105 model = scm.ScmModel()
105 106 model.strip(repo, commit_ids['b'], branch=None)
106 107
107 108 vcs_repo = repo.scm_instance()
108 109 rest_commit_ids = [c.raw_id for c in vcs_repo.get_changesets()]
109 110 assert len(rest_commit_ids) == 4
110 111 assert commit_ids['b'] not in rest_commit_ids
111 112
112 113
113 114 def test_strip_with_single_heads(backend_hg):
114 115 commits = [
115 116 {'message': 'A'},
116 117 {'message': 'a'},
117 118 {'message': 'b'},
118 119 ]
119 120 repo = backend_hg.create_repo(commits=commits)
120 121 commit_ids = backend_hg.commit_ids
121 122
122 123 model = scm.ScmModel()
123 124 model.strip(repo, commit_ids['b'], branch=None)
124 125
125 126 vcs_repo = repo.scm_instance()
126 127 rest_commit_ids = [c.raw_id for c in vcs_repo.get_changesets()]
127 128 assert len(rest_commit_ids) == 2
128 129 assert commit_ids['b'] not in rest_commit_ids
129 130
130 131
131 132 def test_get_nodes_returns_unicode_flat(backend_random):
132 133 repo = backend_random.repo
133 134 directories, files = scm.ScmModel().get_nodes(
134 135 repo.repo_name, repo.get_commit(commit_idx=0).raw_id,
135 136 flat=True)
136 137 assert_contains_only_unicode(directories)
137 138 assert_contains_only_unicode(files)
138 139
139 140
140 141 def test_get_nodes_returns_unicode_non_flat(backend_random):
141 142 repo = backend_random.repo
142 143 directories, files = scm.ScmModel().get_nodes(
143 144 repo.repo_name, repo.get_commit(commit_idx=0).raw_id,
144 145 flat=False)
145 146 # johbo: Checking only the names for now, since that is the critical
146 147 # part.
147 148 assert_contains_only_unicode([d['name'] for d in directories])
148 149 assert_contains_only_unicode([f['name'] for f in files])
149 150
150 151
151 152 def test_get_nodes_max_file_bytes(backend_random):
152 153 repo = backend_random.repo
153 154 max_file_bytes = 10
154 155 directories, files = scm.ScmModel().get_nodes(
155 156 repo.repo_name, repo.get_commit(commit_idx=0).raw_id, content=True,
156 157 extended_info=True, flat=False)
157 158 assert any(file['content'] and len(file['content']) > max_file_bytes
158 159 for file in files)
159 160
160 161 directories, files = scm.ScmModel().get_nodes(
161 162 repo.repo_name, repo.get_commit(commit_idx=0).raw_id, content=True,
162 163 extended_info=True, flat=False, max_file_bytes=max_file_bytes)
163 164 assert all(
164 165 file['content'] is None if file['size'] > max_file_bytes else True
165 166 for file in files)
166 167
167 168
168 169 def assert_contains_only_unicode(structure):
169 170 assert structure
170 171 for value in structure:
171 172 assert isinstance(value, unicode)
172 173
173 174
174 175 @pytest.mark.backends("hg", "git")
175 176 def test_get_non_unicode_reference(backend):
176 177 model = scm.ScmModel()
177 178 non_unicode_list = ["Adını".decode("cp1254")]
178 179
179 180 def scm_instance():
180 181 return Mock(
181 182 branches=non_unicode_list, bookmarks=non_unicode_list,
182 183 tags=non_unicode_list, alias=backend.alias)
183 184
184 185 repo = Mock(__class__=db.Repository, scm_instance=scm_instance)
185 186 choices, __ = model.get_repo_landing_revs(repo=repo)
186 187 if backend.alias == 'hg':
187 188 valid_choices = [
188 189 'rev:tip', u'branch:Ad\xc4\xb1n\xc4\xb1',
189 190 u'book:Ad\xc4\xb1n\xc4\xb1', u'tag:Ad\xc4\xb1n\xc4\xb1']
190 191 else:
191 192 valid_choices = [
192 193 'rev:tip', u'branch:Ad\xc4\xb1n\xc4\xb1',
193 194 u'tag:Ad\xc4\xb1n\xc4\xb1']
194 195
195 196 assert choices == valid_choices
196 197
197 198
198 199 class TestInstallSvnHooks(object):
199 200 HOOK_FILES = ('pre-commit', 'post-commit')
200 201
201 202 def test_new_hooks_are_created(self, backend_svn):
202 203 model = scm.ScmModel()
203 204 repo = backend_svn.create_repo()
204 205 vcs_repo = repo.scm_instance()
205 206 model.install_svn_hooks(vcs_repo)
206 207
207 208 hooks_path = os.path.join(vcs_repo.path, 'hooks')
208 209 assert os.path.isdir(hooks_path)
209 210 for file_name in self.HOOK_FILES:
210 211 file_path = os.path.join(hooks_path, file_name)
211 212 self._check_hook_file_mode(file_path)
212 213 self._check_hook_file_content(file_path)
213 214
214 215 def test_rc_hooks_are_replaced(self, backend_svn):
215 216 model = scm.ScmModel()
216 217 repo = backend_svn.create_repo()
217 218 vcs_repo = repo.scm_instance()
218 219 hooks_path = os.path.join(vcs_repo.path, 'hooks')
219 220 file_paths = [os.path.join(hooks_path, f) for f in self.HOOK_FILES]
220 221
221 222 for file_path in file_paths:
222 223 self._create_fake_hook(
223 224 file_path, content="RC_HOOK_VER = 'abcde'\n")
224 225
225 226 model.install_svn_hooks(vcs_repo)
226 227
227 228 for file_path in file_paths:
228 229 self._check_hook_file_content(file_path)
229 230
230 231 def test_non_rc_hooks_are_not_replaced_without_force_create(
231 232 self, backend_svn):
232 233 model = scm.ScmModel()
233 234 repo = backend_svn.create_repo()
234 235 vcs_repo = repo.scm_instance()
235 236 hooks_path = os.path.join(vcs_repo.path, 'hooks')
236 237 file_paths = [os.path.join(hooks_path, f) for f in self.HOOK_FILES]
237 238 non_rc_content = "exit 0\n"
238 239
239 240 for file_path in file_paths:
240 241 self._create_fake_hook(file_path, content=non_rc_content)
241 242
242 243 model.install_svn_hooks(vcs_repo)
243 244
244 245 for file_path in file_paths:
245 246 with open(file_path, 'rt') as hook_file:
246 247 content = hook_file.read()
247 248 assert content == non_rc_content
248 249
249 250 def test_non_rc_hooks_are_replaced_with_force_create(self, backend_svn):
250 251 model = scm.ScmModel()
251 252 repo = backend_svn.create_repo()
252 253 vcs_repo = repo.scm_instance()
253 254 hooks_path = os.path.join(vcs_repo.path, 'hooks')
254 255 file_paths = [os.path.join(hooks_path, f) for f in self.HOOK_FILES]
255 256 non_rc_content = "exit 0\n"
256 257
257 258 for file_path in file_paths:
258 259 self._create_fake_hook(file_path, content=non_rc_content)
259 260
260 261 model.install_svn_hooks(vcs_repo, force_create=True)
261 262
262 263 for file_path in file_paths:
263 264 self._check_hook_file_content(file_path)
264 265
265 266 def _check_hook_file_mode(self, file_path):
266 267 assert os.path.exists(file_path)
267 268 stat_info = os.stat(file_path)
268 269
269 270 file_mode = stat.S_IMODE(stat_info.st_mode)
270 271 expected_mode = int('755', 8)
271 272 assert expected_mode == file_mode
272 273
273 274 def _check_hook_file_content(self, file_path):
274 275 with open(file_path, 'rt') as hook_file:
275 276 content = hook_file.read()
276 277
277 278 expected_env = '#!{}'.format(sys.executable)
278 279 expected_rc_version = "\nRC_HOOK_VER = '{}'\n".format(
279 280 rhodecode.__version__)
280 281 assert content.strip().startswith(expected_env)
281 282 assert expected_rc_version in content
282 283
283 284 def _create_fake_hook(self, file_path, content):
284 285 with open(file_path, 'w') as hook_file:
285 286 hook_file.write(content)
286 287
287 288
288 289 class TestCheckRhodecodeHook(object):
289 290
290 291 @patch('os.path.exists', Mock(return_value=False))
291 292 def test_returns_true_when_no_hook_found(self):
292 293 result = scm._check_rhodecode_hook('/tmp/fake_hook_file.py')
293 294 assert result
294 295
295 296 @pytest.mark.parametrize("file_content, expected_result", [
296 297 ("RC_HOOK_VER = '3.3.3'\n", True),
297 298 ("RC_HOOK = '3.3.3'\n", False),
298 ])
299 ], ids=no_newline_id_generator)
299 300 @patch('os.path.exists', Mock(return_value=True))
300 301 def test_signatures(self, file_content, expected_result):
301 302 hook_content_patcher = patch.object(
302 303 scm, '_read_hook', return_value=file_content)
303 304 with hook_content_patcher:
304 305 result = scm._check_rhodecode_hook('/tmp/fake_hook_file.py')
305 306
306 307 assert result is expected_result
307 308
308 309
309 310 class TestInstallHooks(object):
310 311 def test_hooks_are_installed_for_git_repo(self, backend_git):
311 312 repo = backend_git.create_repo()
312 313 model = scm.ScmModel()
313 314 scm_repo = repo.scm_instance()
314 315 with patch.object(model, 'install_git_hook') as hooks_mock:
315 316 model.install_hooks(scm_repo, repo_type='git')
316 317 hooks_mock.assert_called_once_with(scm_repo)
317 318
318 319 def test_hooks_are_installed_for_svn_repo(self, backend_svn):
319 320 repo = backend_svn.create_repo()
320 321 scm_repo = repo.scm_instance()
321 322 model = scm.ScmModel()
322 323 with patch.object(scm.ScmModel, 'install_svn_hooks') as hooks_mock:
323 324 model.install_hooks(scm_repo, repo_type='svn')
324 325 hooks_mock.assert_called_once_with(scm_repo)
325 326
326 327 @pytest.mark.parametrize('hook_method', [
327 328 'install_svn_hooks',
328 329 'install_git_hook'])
329 330 def test_mercurial_doesnt_trigger_hooks(self, backend_hg, hook_method):
330 331 repo = backend_hg.create_repo()
331 332 scm_repo = repo.scm_instance()
332 333 model = scm.ScmModel()
333 334 with patch.object(scm.ScmModel, hook_method) as hooks_mock:
334 335 model.install_hooks(scm_repo, repo_type='hg')
335 336 assert hooks_mock.call_count == 0
@@ -1,527 +1,529 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21
22 22 """
23 23 Package for testing various lib/helper functions in rhodecode
24 24 """
25 25
26 26 import datetime
27 27 import string
28 28 import mock
29 29 import pytest
30
31 from rhodecode.tests import no_newline_id_generator
30 32 from rhodecode.tests.utils import run_test_concurrently
31 33 from rhodecode.lib.helpers import InitialsGravatar
32 34
33 35 from rhodecode.lib.utils2 import AttributeDict
34 36 from rhodecode.model.db import Repository
35 37
36 38
37 39 def _urls_for_proto(proto):
38 40 return [
39 41 ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
40 42 '%s://127.0.0.1' % proto),
41 43 ('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
42 44 '%s://127.0.0.1' % proto),
43 45 ('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
44 46 '%s://127.0.0.1' % proto),
45 47 ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
46 48 '%s://127.0.0.1:8080' % proto),
47 49 ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
48 50 '%s://domain.org' % proto),
49 51 ('%s://user:pass@domain.org:8080' % proto,
50 52 ['%s://' % proto, 'domain.org', '8080'],
51 53 '%s://domain.org:8080' % proto),
52 54 ]
53 55
54 56 TEST_URLS = _urls_for_proto('http') + _urls_for_proto('https')
55 57
56 58
57 59 @pytest.mark.parametrize("test_url, expected, expected_creds", TEST_URLS)
58 60 def test_uri_filter(test_url, expected, expected_creds):
59 61 from rhodecode.lib.utils2 import uri_filter
60 62 assert uri_filter(test_url) == expected
61 63
62 64
63 65 @pytest.mark.parametrize("test_url, expected, expected_creds", TEST_URLS)
64 66 def test_credentials_filter(test_url, expected, expected_creds):
65 67 from rhodecode.lib.utils2 import credentials_filter
66 68 assert credentials_filter(test_url) == expected_creds
67 69
68 70
69 71 @pytest.mark.parametrize("str_bool, expected", [
70 72 ('t', True),
71 73 ('true', True),
72 74 ('y', True),
73 75 ('yes', True),
74 76 ('on', True),
75 77 ('1', True),
76 78 ('Y', True),
77 79 ('yeS', True),
78 80 ('Y', True),
79 81 ('TRUE', True),
80 82 ('T', True),
81 83 ('False', False),
82 84 ('F', False),
83 85 ('FALSE', False),
84 86 ('0', False),
85 87 ('-1', False),
86 88 ('', False)
87 89 ])
88 90 def test_str2bool(str_bool, expected):
89 91 from rhodecode.lib.utils2 import str2bool
90 92 assert str2bool(str_bool) == expected
91 93
92 94
93 95 @pytest.mark.parametrize("text, expected", reduce(lambda a1,a2:a1+a2, [
94 96 [
95 97 (pref+"", []),
96 98 (pref+"Hi there @marcink", ['marcink']),
97 99 (pref+"Hi there @marcink and @bob", ['bob', 'marcink']),
98 100 (pref+"Hi there @marcink\n", ['marcink']),
99 101 (pref+"Hi there @marcink and @bob\n", ['bob', 'marcink']),
100 102 (pref+"Hi there marcin@rhodecode.com", []),
101 103 (pref+"Hi there @john.malcovic and @bob\n", ['bob', 'john.malcovic']),
102 104 (pref+"This needs to be reviewed: (@marcink,@john)", ["john", "marcink"]),
103 105 (pref+"This needs to be reviewed: (@marcink, @john)", ["john", "marcink"]),
104 106 (pref+"This needs to be reviewed: [@marcink,@john]", ["john", "marcink"]),
105 107 (pref+"This needs to be reviewed: (@marcink @john)", ["john", "marcink"]),
106 108 (pref+"@john @mary, please review", ["john", "mary"]),
107 109 (pref+"@john,@mary, please review", ["john", "mary"]),
108 110 (pref+"Hej @123, @22john,@mary, please review", ['123', '22john', 'mary']),
109 111 (pref+"@first hi there @marcink here's my email marcin@email.com "
110 112 "@lukaszb check @one_more22 it pls @ ttwelve @D[] @one@two@three ", ['first', 'lukaszb', 'marcink', 'one', 'one_more22']),
111 113 (pref+"@MARCIN @maRCiN @2one_more22 @john please see this http://org.pl", ['2one_more22', 'john', 'MARCIN', 'maRCiN']),
112 114 (pref+"@marian.user just do it @marco-polo and next extract @marco_polo", ['marco-polo', 'marco_polo', 'marian.user']),
113 115 (pref+"user.dot hej ! not-needed maril@domain.org", []),
114 116 (pref+"\n@marcin", ['marcin']),
115 117 ]
116 for pref in ['', '\n', 'hi !', '\t', '\n\n']]))
118 for pref in ['', '\n', 'hi !', '\t', '\n\n']]), ids=no_newline_id_generator)
117 119 def test_mention_extractor(text, expected):
118 120 from rhodecode.lib.utils2 import extract_mentioned_users
119 121 got = extract_mentioned_users(text)
120 122 assert sorted(got, key=lambda x: x.lower()) == got
121 123 assert set(expected) == set(got)
122 124
123 125 @pytest.mark.parametrize("age_args, expected, kw", [
124 126 ({}, u'just now', {}),
125 127 ({'seconds': -1}, u'1 second ago', {}),
126 128 ({'seconds': -60 * 2}, u'2 minutes ago', {}),
127 129 ({'hours': -1}, u'1 hour ago', {}),
128 130 ({'hours': -24}, u'1 day ago', {}),
129 131 ({'hours': -24 * 5}, u'5 days ago', {}),
130 132 ({'months': -1}, u'1 month ago', {}),
131 133 ({'months': -1, 'days': -2}, u'1 month and 2 days ago', {}),
132 134 ({'years': -1, 'months': -1}, u'1 year and 1 month ago', {}),
133 135 ({}, u'just now', {'short_format': True}),
134 136 ({'seconds': -1}, u'1sec ago', {'short_format': True}),
135 137 ({'seconds': -60 * 2}, u'2min ago', {'short_format': True}),
136 138 ({'hours': -1}, u'1h ago', {'short_format': True}),
137 139 ({'hours': -24}, u'1d ago', {'short_format': True}),
138 140 ({'hours': -24 * 5}, u'5d ago', {'short_format': True}),
139 141 ({'months': -1}, u'1m ago', {'short_format': True}),
140 142 ({'months': -1, 'days': -2}, u'1m, 2d ago', {'short_format': True}),
141 143 ({'years': -1, 'months': -1}, u'1y, 1m ago', {'short_format': True}),
142 144 ])
143 145 def test_age(age_args, expected, kw, pylonsapp):
144 146 from rhodecode.lib.utils2 import age
145 147 from dateutil import relativedelta
146 148 n = datetime.datetime(year=2012, month=5, day=17)
147 149 delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
148 150
149 151 def translate(elem):
150 152 return elem.interpolate()
151 153
152 154 assert translate(age(n + delt(**age_args), now=n, **kw)) == expected
153 155
154 156
155 157 @pytest.mark.parametrize("age_args, expected, kw", [
156 158 ({}, u'just now', {}),
157 159 ({'seconds': 1}, u'in 1 second', {}),
158 160 ({'seconds': 60 * 2}, u'in 2 minutes', {}),
159 161 ({'hours': 1}, u'in 1 hour', {}),
160 162 ({'hours': 24}, u'in 1 day', {}),
161 163 ({'hours': 24 * 5}, u'in 5 days', {}),
162 164 ({'months': 1}, u'in 1 month', {}),
163 165 ({'months': 1, 'days': 1}, u'in 1 month and 1 day', {}),
164 166 ({'years': 1, 'months': 1}, u'in 1 year and 1 month', {}),
165 167 ({}, u'just now', {'short_format': True}),
166 168 ({'seconds': 1}, u'in 1sec', {'short_format': True}),
167 169 ({'seconds': 60 * 2}, u'in 2min', {'short_format': True}),
168 170 ({'hours': 1}, u'in 1h', {'short_format': True}),
169 171 ({'hours': 24}, u'in 1d', {'short_format': True}),
170 172 ({'hours': 24 * 5}, u'in 5d', {'short_format': True}),
171 173 ({'months': 1}, u'in 1m', {'short_format': True}),
172 174 ({'months': 1, 'days': 1}, u'in 1m, 1d', {'short_format': True}),
173 175 ({'years': 1, 'months': 1}, u'in 1y, 1m', {'short_format': True}),
174 176 ])
175 177 def test_age_in_future(age_args, expected, kw, pylonsapp):
176 178 from rhodecode.lib.utils2 import age
177 179 from dateutil import relativedelta
178 180 n = datetime.datetime(year=2012, month=5, day=17)
179 181 delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
180 182
181 183 def translate(elem):
182 184 return elem.interpolate()
183 185
184 186 assert translate(age(n + delt(**age_args), now=n, **kw)) == expected
185 187
186 188
187 189 def test_tag_exctrator():
188 190 sample = (
189 191 "hello pta[tag] gog [[]] [[] sda ero[or]d [me =>>< sa]"
190 192 "[requires] [stale] [see<>=>] [see => http://url.com]"
191 193 "[requires => url] [lang => python] [just a tag] <html_tag first='abc' attr=\"my.url?attr=&another=\"></html_tag>"
192 194 "[,d] [ => ULR ] [obsolete] [desc]]"
193 195 )
194 196 from rhodecode.lib.helpers import desc_stylize, escaped_stylize
195 197 res = desc_stylize(sample)
196 198 assert '<div class="metatag" tag="tag">tag</div>' in res
197 199 assert '<div class="metatag" tag="obsolete">obsolete</div>' in res
198 200 assert '<div class="metatag" tag="stale">stale</div>' in res
199 201 assert '<div class="metatag" tag="lang">python</div>' in res
200 202 assert '<div class="metatag" tag="requires">requires =&gt; <a href="/url">url</a></div>' in res
201 203 assert '<div class="metatag" tag="tag">tag</div>' in res
202 204 assert '<html_tag first=\'abc\' attr=\"my.url?attr=&another=\"></html_tag>' in res
203 205
204 206 res_encoded = escaped_stylize(sample)
205 207 assert '<div class="metatag" tag="tag">tag</div>' in res_encoded
206 208 assert '<div class="metatag" tag="obsolete">obsolete</div>' in res_encoded
207 209 assert '<div class="metatag" tag="stale">stale</div>' in res_encoded
208 210 assert '<div class="metatag" tag="lang">python</div>' in res_encoded
209 211 assert '<div class="metatag" tag="requires">requires =&gt; <a href="/url">url</a></div>' in res_encoded
210 212 assert '<div class="metatag" tag="tag">tag</div>' in res_encoded
211 213 assert '&lt;html_tag first=&#39;abc&#39; attr=&#34;my.url?attr=&amp;another=&#34;&gt;&lt;/html_tag&gt;' in res_encoded
212 214
213 215
214 216 @pytest.mark.parametrize("tmpl_url, email, expected", [
215 217 ('http://test.com/{email}', 'test@foo.com', 'http://test.com/test@foo.com'),
216 218
217 219 ('http://test.com/{md5email}', 'test@foo.com', 'http://test.com/3cb7232fcc48743000cb86d0d5022bd9'),
218 220 ('http://test.com/{md5email}', 'testąć@foo.com', 'http://test.com/978debb907a3c55cd741872ab293ef30'),
219 221
220 222 ('http://testX.com/{md5email}?s={size}', 'test@foo.com', 'http://testX.com/3cb7232fcc48743000cb86d0d5022bd9?s=24'),
221 223 ('http://testX.com/{md5email}?s={size}', 'testąć@foo.com', 'http://testX.com/978debb907a3c55cd741872ab293ef30?s=24'),
222 224
223 225 ('{scheme}://{netloc}/{md5email}/{size}', 'test@foo.com', 'https://server.com/3cb7232fcc48743000cb86d0d5022bd9/24'),
224 226 ('{scheme}://{netloc}/{md5email}/{size}', 'testąć@foo.com', 'https://server.com/978debb907a3c55cd741872ab293ef30/24'),
225 227
226 228 ('http://test.com/{email}', 'testąć@foo.com', 'http://test.com/testąć@foo.com'),
227 229 ('http://test.com/{email}?size={size}', 'test@foo.com', 'http://test.com/test@foo.com?size=24'),
228 230 ('http://test.com/{email}?size={size}', 'testąć@foo.com', 'http://test.com/testąć@foo.com?size=24'),
229 231 ])
230 232 def test_gravatar_url_builder(tmpl_url, email, expected, request_stub):
231 233 from rhodecode.lib.helpers import gravatar_url
232 234
233 235 # mock pyramid.threadlocals
234 236 def fake_get_current_request():
235 237 request_stub.scheme = 'https'
236 238 request_stub.host = 'server.com'
237 239 return request_stub
238 240
239 241 # mock pylons.tmpl_context
240 242 def fake_tmpl_context(_url):
241 243 _c = AttributeDict()
242 244 _c.visual = AttributeDict()
243 245 _c.visual.use_gravatar = True
244 246 _c.visual.gravatar_url = _url
245 247
246 248 return _c
247 249
248 250 with mock.patch('rhodecode.lib.helpers.get_current_request',
249 251 fake_get_current_request):
250 252 fake = fake_tmpl_context(_url=tmpl_url)
251 253 with mock.patch('pylons.tmpl_context', fake):
252 254 grav = gravatar_url(email_address=email, size=24)
253 255 assert grav == expected
254 256
255 257
256 258 @pytest.mark.parametrize(
257 259 "email, first_name, last_name, expected_initials, expected_color", [
258 260
259 261 ('test@rhodecode.com', '', '', 'TR', '#8a994d'),
260 262 ('marcin.kuzminski@rhodecode.com', '', '', 'MK', '#6559b3'),
261 263 # special cases of email
262 264 ('john.van.dam@rhodecode.com', '', '', 'JD', '#526600'),
263 265 ('Guido.van.Rossum@rhodecode.com', '', '', 'GR', '#990052'),
264 266 ('Guido.van.Rossum@rhodecode.com', 'Guido', 'Van Rossum', 'GR', '#990052'),
265 267
266 268 ('rhodecode+Guido.van.Rossum@rhodecode.com', '', '', 'RR', '#46598c'),
267 269 ('pclouds@rhodecode.com', 'Nguyễn Thái', 'Tgọc Duy', 'ND', '#665200'),
268 270
269 271 ('john-brown@foo.com', '', '', 'JF', '#73006b'),
270 272 ('admin@rhodecode.com', 'Marcin', 'Kuzminski', 'MK', '#104036'),
271 273 # partials
272 274 ('admin@rhodecode.com', 'Marcin', '', 'MR', '#104036'), # fn+email
273 275 ('admin@rhodecode.com', '', 'Kuzminski', 'AK', '#104036'), # em+ln
274 276 # non-ascii
275 277 ('admin@rhodecode.com', 'Marcin', 'Śuzminski', 'MS', '#104036'),
276 278 ('marcin.śuzminski@rhodecode.com', '', '', 'MS', '#73000f'),
277 279
278 280 # special cases, LDAP can provide those...
279 281 ('admin@', 'Marcin', 'Śuzminski', 'MS', '#aa00ff'),
280 282 ('marcin.śuzminski', '', '', 'MS', '#402020'),
281 283 ('null', '', '', 'NL', '#8c4646'),
282 284 ])
283 285 def test_initials_gravatar_pick_of_initials_and_color_algo(
284 286 email, first_name, last_name, expected_initials, expected_color):
285 287 instance = InitialsGravatar(email, first_name, last_name)
286 288 assert instance.get_initials() == expected_initials
287 289 assert instance.str2color(email) == expected_color
288 290
289 291
290 292 def test_initials_gravatar_mapping_algo():
291 293 pos = set()
292 294 instance = InitialsGravatar('', '', '')
293 295 iterations = 0
294 296
295 297 variations = []
296 298 for letter1 in string.ascii_letters:
297 299 for letter2 in string.ascii_letters[::-1][:10]:
298 300 for letter3 in string.ascii_letters[:10]:
299 301 variations.append(
300 302 '%s@rhodecode.com' % (letter1+letter2+letter3))
301 303
302 304 max_variations = 4096
303 305 for email in variations[:max_variations]:
304 306 iterations += 1
305 307 pos.add(
306 308 instance.pick_color_bank_index(email,
307 309 instance.get_color_bank()))
308 310
309 311 # we assume that we have match all 256 possible positions,
310 312 # in reasonable amount of different email addresses
311 313 assert len(pos) == 256
312 314 assert iterations == max_variations
313 315
314 316
315 317 @pytest.mark.parametrize("tmpl, repo_name, overrides, prefix, expected", [
316 318 (Repository.DEFAULT_CLONE_URI, 'group/repo1', {}, '', 'http://vps1:8000/group/repo1'),
317 319 (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'marcink'}, '', 'http://marcink@vps1:8000/group/repo1'),
318 320 (Repository.DEFAULT_CLONE_URI, 'group/repo1', {}, '/rc', 'http://vps1:8000/rc/group/repo1'),
319 321 (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'user'}, '/rc', 'http://user@vps1:8000/rc/group/repo1'),
320 322 (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'marcink'}, '/rc', 'http://marcink@vps1:8000/rc/group/repo1'),
321 323 (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'user'}, '/rc/', 'http://user@vps1:8000/rc/group/repo1'),
322 324 (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'marcink'}, '/rc/', 'http://marcink@vps1:8000/rc/group/repo1'),
323 325 ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', {}, '', 'http://vps1:8000/_23'),
324 326 ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', {'user': 'marcink'}, '', 'http://marcink@vps1:8000/_23'),
325 327 ('http://{user}@{netloc}/_{repoid}', 'group/repo1', {'user': 'marcink'}, '', 'http://marcink@vps1:8000/_23'),
326 328 ('http://{netloc}/_{repoid}', 'group/repo1', {'user': 'marcink'}, '', 'http://vps1:8000/_23'),
327 329 ('https://{user}@proxy1.server.com/{repo}', 'group/repo1', {'user': 'marcink'}, '', 'https://marcink@proxy1.server.com/group/repo1'),
328 330 ('https://{user}@proxy1.server.com/{repo}', 'group/repo1', {}, '', 'https://proxy1.server.com/group/repo1'),
329 331 ('https://proxy1.server.com/{user}/{repo}', 'group/repo1', {'user': 'marcink'}, '', 'https://proxy1.server.com/marcink/group/repo1'),
330 332 ])
331 333 def test_clone_url_generator(tmpl, repo_name, overrides, prefix, expected):
332 334 from rhodecode.lib.utils2 import get_clone_url
333 335 clone_url = get_clone_url(uri_tmpl=tmpl, qualifed_home_url='http://vps1:8000'+prefix,
334 336 repo_name=repo_name, repo_id=23, **overrides)
335 337 assert clone_url == expected
336 338
337 339
338 340 def _quick_url(text, tmpl="""<a class="revision-link" href="%s">%s</a>""", url_=None):
339 341 """
340 342 Changes `some text url[foo]` => `some text <a href="/">foo</a>
341 343
342 344 :param text:
343 345 """
344 346 import re
345 347 # quickly change expected url[] into a link
346 348 URL_PAT = re.compile(r'(?:url\[)(.+?)(?:\])')
347 349
348 350 def url_func(match_obj):
349 351 _url = match_obj.groups()[0]
350 352 return tmpl % (url_ or '/some-url', _url)
351 353 return URL_PAT.sub(url_func, text)
352 354
353 355
354 356 @pytest.mark.parametrize("sample, expected", [
355 357 ("",
356 358 ""),
357 359 ("git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68",
358 360 "git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68"),
359 361 ("from rev 000000000000",
360 362 "from rev url[000000000000]"),
361 363 ("from rev 000000000000123123 also rev 000000000000",
362 364 "from rev url[000000000000123123] also rev url[000000000000]"),
363 365 ("this should-000 00",
364 366 "this should-000 00"),
365 367 ("longtextffffffffff rev 123123123123",
366 368 "longtextffffffffff rev url[123123123123]"),
367 369 ("rev ffffffffffffffffffffffffffffffffffffffffffffffffff",
368 370 "rev ffffffffffffffffffffffffffffffffffffffffffffffffff"),
369 371 ("ffffffffffff some text traalaa",
370 372 "url[ffffffffffff] some text traalaa"),
371 373 ("""Multi line
372 374 123123123123
373 375 some text 123123123123
374 376 sometimes !
375 377 """,
376 378 """Multi line
377 379 url[123123123123]
378 380 some text url[123123123123]
379 381 sometimes !
380 382 """)
381 ])
383 ], ids=no_newline_id_generator)
382 384 def test_urlify_commits(sample, expected):
383 385 def fake_url(self, *args, **kwargs):
384 386 return '/some-url'
385 387
386 388 expected = _quick_url(expected)
387 389
388 390 with mock.patch('pylons.url', fake_url):
389 391 from rhodecode.lib.helpers import urlify_commits
390 392 assert urlify_commits(sample, 'repo_name') == expected
391 393
392 394
393 395 @pytest.mark.parametrize("sample, expected, url_", [
394 396 ("",
395 397 "",
396 398 ""),
397 399 ("https://svn.apache.org/repos",
398 400 "url[https://svn.apache.org/repos]",
399 401 "https://svn.apache.org/repos"),
400 402 ("http://svn.apache.org/repos",
401 403 "url[http://svn.apache.org/repos]",
402 404 "http://svn.apache.org/repos"),
403 405 ("from rev a also rev http://google.com",
404 406 "from rev a also rev url[http://google.com]",
405 407 "http://google.com"),
406 408 ("""Multi line
407 409 https://foo.bar.com
408 410 some text lalala""",
409 411 """Multi line
410 412 url[https://foo.bar.com]
411 413 some text lalala""",
412 414 "https://foo.bar.com")
413 ])
415 ], ids=no_newline_id_generator)
414 416 def test_urlify_test(sample, expected, url_):
415 417 from rhodecode.lib.helpers import urlify_text
416 418 expected = _quick_url(expected, tmpl="""<a href="%s">%s</a>""", url_=url_)
417 419 assert urlify_text(sample) == expected
418 420
419 421
420 422 @pytest.mark.parametrize("test, expected", [
421 423 ("", None),
422 424 ("/_2", '2'),
423 425 ("_2", '2'),
424 426 ("/_2/", '2'),
425 427 ("_2/", '2'),
426 428
427 429 ("/_21", '21'),
428 430 ("_21", '21'),
429 431 ("/_21/", '21'),
430 432 ("_21/", '21'),
431 433
432 434 ("/_21/foobar", '21'),
433 435 ("_21/121", '21'),
434 436 ("/_21/_12", '21'),
435 437 ("_21/rc/foo", '21'),
436 438
437 439 ])
438 440 def test_get_repo_by_id(test, expected):
439 441 from rhodecode.model.repo import RepoModel
440 442 _test = RepoModel()._extract_id_from_repo_name(test)
441 443 assert _test == expected
442 444
443 445
444 446 @pytest.mark.parametrize("test_repo_name, repo_type", [
445 447 ("test_repo_1", None),
446 448 ("repo_group/foobar", None),
447 449 ("test_non_asci_ąćę", None),
448 450 (u"test_non_asci_unicode_ąćę", None),
449 451 ])
450 452 def test_invalidation_context(pylonsapp, test_repo_name, repo_type):
451 453 from beaker.cache import cache_region
452 454 from rhodecode.lib import caches
453 455 from rhodecode.model.db import CacheKey
454 456
455 457 @cache_region('long_term')
456 458 def _dummy_func(cache_key):
457 459 return 'result'
458 460
459 461 invalidator_context = CacheKey.repo_context_cache(
460 462 _dummy_func, test_repo_name, 'repo')
461 463
462 464 with invalidator_context as context:
463 465 invalidated = context.invalidate()
464 466 result = context.compute()
465 467
466 468 assert invalidated == True
467 469 assert 'result' == result
468 470 assert isinstance(context, caches.FreshRegionCache)
469 471
470 472 assert 'InvalidationContext' in repr(invalidator_context)
471 473
472 474 with invalidator_context as context:
473 475 context.invalidate()
474 476 result = context.compute()
475 477
476 478 assert 'result' == result
477 479 assert isinstance(context, caches.ActiveRegionCache)
478 480
479 481
480 482 def test_invalidation_context_exception_in_compute(pylonsapp):
481 483 from rhodecode.model.db import CacheKey
482 484 from beaker.cache import cache_region
483 485
484 486 @cache_region('long_term')
485 487 def _dummy_func(cache_key):
486 488 # this causes error since it doesn't get any params
487 489 raise Exception('ups')
488 490
489 491 invalidator_context = CacheKey.repo_context_cache(
490 492 _dummy_func, 'test_repo_2', 'repo')
491 493
492 494 with pytest.raises(Exception):
493 495 with invalidator_context as context:
494 496 context.invalidate()
495 497 context.compute()
496 498
497 499
498 500 @pytest.mark.parametrize('execution_number', range(5))
499 501 def test_cache_invalidation_race_condition(execution_number, pylonsapp):
500 502 import time
501 503 from beaker.cache import cache_region
502 504 from rhodecode.model.db import CacheKey
503 505
504 506 if CacheKey.metadata.bind.url.get_backend_name() == "mysql":
505 507 reason = (
506 508 'Fails on MariaDB due to some locking issues. Investigation'
507 509 ' needed')
508 510 pytest.xfail(reason=reason)
509 511
510 512 @run_test_concurrently(25)
511 513 def test_create_and_delete_cache_keys():
512 514 time.sleep(0.2)
513 515
514 516 @cache_region('long_term')
515 517 def _dummy_func(cache_key):
516 518 return 'result'
517 519
518 520 invalidator_context = CacheKey.repo_context_cache(
519 521 _dummy_func, 'test_repo_1', 'repo')
520 522
521 523 with invalidator_context as context:
522 524 context.invalidate()
523 525 context.compute()
524 526
525 527 CacheKey.set_invalidate('test_repo_1', delete=True)
526 528
527 529 test_create_and_delete_cache_keys()
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now