##// END OF EJS Templates
tests: Fix a helper method that checks if a repository is available on filesystem....
Martin Bornhold -
r486:e773cd0c default
parent child Browse files
Show More
@@ -1,1260 +1,1250 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
22 import urllib
21 import urllib
23
22
24 import mock
23 import mock
25 import pytest
24 import pytest
26
25
27 from rhodecode.lib import auth
26 from rhodecode.lib import auth
28 from rhodecode.lib import vcs
29 from rhodecode.lib.utils2 import safe_str, str2bool
27 from rhodecode.lib.utils2 import safe_str, str2bool
30 from rhodecode.lib.vcs.exceptions import RepositoryRequirementError
28 from rhodecode.lib.vcs.exceptions import RepositoryRequirementError
31 from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\
29 from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\
32 Permission
30 Permission
33 from rhodecode.model.meta import Session
31 from rhodecode.model.meta import Session
34 from rhodecode.model.repo import RepoModel
32 from rhodecode.model.repo import RepoModel
35 from rhodecode.model.repo_group import RepoGroupModel
33 from rhodecode.model.repo_group import RepoGroupModel
36 from rhodecode.model.settings import SettingsModel, VcsSettingsModel
34 from rhodecode.model.settings import SettingsModel, VcsSettingsModel
37 from rhodecode.model.user import UserModel
35 from rhodecode.model.user import UserModel
38 from rhodecode.tests import (
36 from rhodecode.tests import (
39 login_user_session, url, assert_session_flash, TEST_USER_ADMIN_LOGIN,
37 login_user_session, url, assert_session_flash, TEST_USER_ADMIN_LOGIN,
40 TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS, HG_REPO, GIT_REPO,
38 TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS, HG_REPO, GIT_REPO,
41 TESTS_TMP_PATH, logout_user_session)
39 logout_user_session)
42 from rhodecode.tests.fixture import Fixture, error_function
40 from rhodecode.tests.fixture import Fixture, error_function
43 from rhodecode.tests.utils import AssertResponse
41 from rhodecode.tests.utils import AssertResponse, repo_on_filesystem
44
42
45 fixture = Fixture()
43 fixture = Fixture()
46
44
47
45
48 @pytest.mark.usefixtures("app")
46 @pytest.mark.usefixtures("app")
49 class TestAdminRepos:
47 class TestAdminRepos:
50
48
51 def test_index(self):
49 def test_index(self):
52 self.app.get(url('repos'))
50 self.app.get(url('repos'))
53
51
54 def test_create_page_restricted(self, autologin_user, backend):
52 def test_create_page_restricted(self, autologin_user, backend):
55 with mock.patch('rhodecode.BACKENDS', {'git': 'git'}):
53 with mock.patch('rhodecode.BACKENDS', {'git': 'git'}):
56 response = self.app.get(url('new_repo'), status=200)
54 response = self.app.get(url('new_repo'), status=200)
57 assert_response = AssertResponse(response)
55 assert_response = AssertResponse(response)
58 element = assert_response.get_element('#repo_type')
56 element = assert_response.get_element('#repo_type')
59 assert element.text_content() == '\ngit\n'
57 assert element.text_content() == '\ngit\n'
60
58
61 def test_create_page_non_restricted(self, autologin_user, backend):
59 def test_create_page_non_restricted(self, autologin_user, backend):
62 response = self.app.get(url('new_repo'), status=200)
60 response = self.app.get(url('new_repo'), status=200)
63 assert_response = AssertResponse(response)
61 assert_response = AssertResponse(response)
64 assert_response.element_contains('#repo_type', 'git')
62 assert_response.element_contains('#repo_type', 'git')
65 assert_response.element_contains('#repo_type', 'svn')
63 assert_response.element_contains('#repo_type', 'svn')
66 assert_response.element_contains('#repo_type', 'hg')
64 assert_response.element_contains('#repo_type', 'hg')
67
65
68 @pytest.mark.parametrize("suffix", [u'', u''], ids=['', 'non-ascii'])
66 @pytest.mark.parametrize("suffix", [u'', u''], ids=['', 'non-ascii'])
69 def test_create(self, autologin_user, backend, suffix, csrf_token):
67 def test_create(self, autologin_user, backend, suffix, csrf_token):
70 repo_name_unicode = backend.new_repo_name(suffix=suffix)
68 repo_name_unicode = backend.new_repo_name(suffix=suffix)
71 repo_name = repo_name_unicode.encode('utf8')
69 repo_name = repo_name_unicode.encode('utf8')
72 description_unicode = u'description for newly created repo' + suffix
70 description_unicode = u'description for newly created repo' + suffix
73 description = description_unicode.encode('utf8')
71 description = description_unicode.encode('utf8')
74 self.app.post(
72 self.app.post(
75 url('repos'),
73 url('repos'),
76 fixture._get_repo_create_params(
74 fixture._get_repo_create_params(
77 repo_private=False,
75 repo_private=False,
78 repo_name=repo_name,
76 repo_name=repo_name,
79 repo_type=backend.alias,
77 repo_type=backend.alias,
80 repo_description=description,
78 repo_description=description,
81 csrf_token=csrf_token),
79 csrf_token=csrf_token),
82 status=302
80 status=302
83 )
81 )
84
82
85 self.assert_repository_is_created_correctly(
83 self.assert_repository_is_created_correctly(
86 repo_name, description, backend)
84 repo_name, description, backend)
87
85
88 def test_create_numeric(self, autologin_user, backend, csrf_token):
86 def test_create_numeric(self, autologin_user, backend, csrf_token):
89 numeric_repo = '1234'
87 numeric_repo = '1234'
90 repo_name = numeric_repo
88 repo_name = numeric_repo
91 description = 'description for newly created repo' + numeric_repo
89 description = 'description for newly created repo' + numeric_repo
92 self.app.post(
90 self.app.post(
93 url('repos'),
91 url('repos'),
94 fixture._get_repo_create_params(
92 fixture._get_repo_create_params(
95 repo_private=False,
93 repo_private=False,
96 repo_name=repo_name,
94 repo_name=repo_name,
97 repo_type=backend.alias,
95 repo_type=backend.alias,
98 repo_description=description,
96 repo_description=description,
99 csrf_token=csrf_token))
97 csrf_token=csrf_token))
100
98
101 self.assert_repository_is_created_correctly(
99 self.assert_repository_is_created_correctly(
102 repo_name, description, backend)
100 repo_name, description, backend)
103
101
104 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ‡Δ™'], ids=['', 'non-ascii'])
102 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ‡Δ™'], ids=['', 'non-ascii'])
105 def test_create_in_group(
103 def test_create_in_group(
106 self, autologin_user, backend, suffix, csrf_token):
104 self, autologin_user, backend, suffix, csrf_token):
107 # create GROUP
105 # create GROUP
108 group_name = 'sometest_%s' % backend.alias
106 group_name = 'sometest_%s' % backend.alias
109 gr = RepoGroupModel().create(group_name=group_name,
107 gr = RepoGroupModel().create(group_name=group_name,
110 group_description='test',
108 group_description='test',
111 owner=TEST_USER_ADMIN_LOGIN)
109 owner=TEST_USER_ADMIN_LOGIN)
112 Session().commit()
110 Session().commit()
113
111
114 repo_name = u'ingroup' + suffix
112 repo_name = u'ingroup' + suffix
115 repo_name_full = RepoGroup.url_sep().join(
113 repo_name_full = RepoGroup.url_sep().join(
116 [group_name, repo_name])
114 [group_name, repo_name])
117 description = u'description for newly created repo'
115 description = u'description for newly created repo'
118 self.app.post(
116 self.app.post(
119 url('repos'),
117 url('repos'),
120 fixture._get_repo_create_params(
118 fixture._get_repo_create_params(
121 repo_private=False,
119 repo_private=False,
122 repo_name=safe_str(repo_name),
120 repo_name=safe_str(repo_name),
123 repo_type=backend.alias,
121 repo_type=backend.alias,
124 repo_description=description,
122 repo_description=description,
125 repo_group=gr.group_id,
123 repo_group=gr.group_id,
126 csrf_token=csrf_token))
124 csrf_token=csrf_token))
127
125
128 # TODO: johbo: Cleanup work to fixture
126 # TODO: johbo: Cleanup work to fixture
129 try:
127 try:
130 self.assert_repository_is_created_correctly(
128 self.assert_repository_is_created_correctly(
131 repo_name_full, description, backend)
129 repo_name_full, description, backend)
132
130
133 new_repo = RepoModel().get_by_repo_name(repo_name_full)
131 new_repo = RepoModel().get_by_repo_name(repo_name_full)
134 inherited_perms = UserRepoToPerm.query().filter(
132 inherited_perms = UserRepoToPerm.query().filter(
135 UserRepoToPerm.repository_id == new_repo.repo_id).all()
133 UserRepoToPerm.repository_id == new_repo.repo_id).all()
136 assert len(inherited_perms) == 1
134 assert len(inherited_perms) == 1
137 finally:
135 finally:
138 RepoModel().delete(repo_name_full)
136 RepoModel().delete(repo_name_full)
139 RepoGroupModel().delete(group_name)
137 RepoGroupModel().delete(group_name)
140 Session().commit()
138 Session().commit()
141
139
142 def test_create_in_group_numeric(
140 def test_create_in_group_numeric(
143 self, autologin_user, backend, csrf_token):
141 self, autologin_user, backend, csrf_token):
144 # create GROUP
142 # create GROUP
145 group_name = 'sometest_%s' % backend.alias
143 group_name = 'sometest_%s' % backend.alias
146 gr = RepoGroupModel().create(group_name=group_name,
144 gr = RepoGroupModel().create(group_name=group_name,
147 group_description='test',
145 group_description='test',
148 owner=TEST_USER_ADMIN_LOGIN)
146 owner=TEST_USER_ADMIN_LOGIN)
149 Session().commit()
147 Session().commit()
150
148
151 repo_name = '12345'
149 repo_name = '12345'
152 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
150 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
153 description = 'description for newly created repo'
151 description = 'description for newly created repo'
154 self.app.post(
152 self.app.post(
155 url('repos'),
153 url('repos'),
156 fixture._get_repo_create_params(
154 fixture._get_repo_create_params(
157 repo_private=False,
155 repo_private=False,
158 repo_name=repo_name,
156 repo_name=repo_name,
159 repo_type=backend.alias,
157 repo_type=backend.alias,
160 repo_description=description,
158 repo_description=description,
161 repo_group=gr.group_id,
159 repo_group=gr.group_id,
162 csrf_token=csrf_token))
160 csrf_token=csrf_token))
163
161
164 # TODO: johbo: Cleanup work to fixture
162 # TODO: johbo: Cleanup work to fixture
165 try:
163 try:
166 self.assert_repository_is_created_correctly(
164 self.assert_repository_is_created_correctly(
167 repo_name_full, description, backend)
165 repo_name_full, description, backend)
168
166
169 new_repo = RepoModel().get_by_repo_name(repo_name_full)
167 new_repo = RepoModel().get_by_repo_name(repo_name_full)
170 inherited_perms = UserRepoToPerm.query()\
168 inherited_perms = UserRepoToPerm.query()\
171 .filter(UserRepoToPerm.repository_id == new_repo.repo_id).all()
169 .filter(UserRepoToPerm.repository_id == new_repo.repo_id).all()
172 assert len(inherited_perms) == 1
170 assert len(inherited_perms) == 1
173 finally:
171 finally:
174 RepoModel().delete(repo_name_full)
172 RepoModel().delete(repo_name_full)
175 RepoGroupModel().delete(group_name)
173 RepoGroupModel().delete(group_name)
176 Session().commit()
174 Session().commit()
177
175
178 def test_create_in_group_without_needed_permissions(self, backend):
176 def test_create_in_group_without_needed_permissions(self, backend):
179 session = login_user_session(
177 session = login_user_session(
180 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
178 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
181 csrf_token = auth.get_csrf_token(session)
179 csrf_token = auth.get_csrf_token(session)
182 # revoke
180 # revoke
183 user_model = UserModel()
181 user_model = UserModel()
184 # disable fork and create on default user
182 # disable fork and create on default user
185 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
183 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
186 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
184 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
187 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
185 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
188 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
186 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
189
187
190 # disable on regular user
188 # disable on regular user
191 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
189 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
192 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
190 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
193 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
191 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
194 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
192 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
195 Session().commit()
193 Session().commit()
196
194
197 # create GROUP
195 # create GROUP
198 group_name = 'reg_sometest_%s' % backend.alias
196 group_name = 'reg_sometest_%s' % backend.alias
199 gr = RepoGroupModel().create(group_name=group_name,
197 gr = RepoGroupModel().create(group_name=group_name,
200 group_description='test',
198 group_description='test',
201 owner=TEST_USER_ADMIN_LOGIN)
199 owner=TEST_USER_ADMIN_LOGIN)
202 Session().commit()
200 Session().commit()
203
201
204 group_name_allowed = 'reg_sometest_allowed_%s' % backend.alias
202 group_name_allowed = 'reg_sometest_allowed_%s' % backend.alias
205 gr_allowed = RepoGroupModel().create(
203 gr_allowed = RepoGroupModel().create(
206 group_name=group_name_allowed,
204 group_name=group_name_allowed,
207 group_description='test',
205 group_description='test',
208 owner=TEST_USER_REGULAR_LOGIN)
206 owner=TEST_USER_REGULAR_LOGIN)
209 Session().commit()
207 Session().commit()
210
208
211 repo_name = 'ingroup'
209 repo_name = 'ingroup'
212 description = 'description for newly created repo'
210 description = 'description for newly created repo'
213 response = self.app.post(
211 response = self.app.post(
214 url('repos'),
212 url('repos'),
215 fixture._get_repo_create_params(
213 fixture._get_repo_create_params(
216 repo_private=False,
214 repo_private=False,
217 repo_name=repo_name,
215 repo_name=repo_name,
218 repo_type=backend.alias,
216 repo_type=backend.alias,
219 repo_description=description,
217 repo_description=description,
220 repo_group=gr.group_id,
218 repo_group=gr.group_id,
221 csrf_token=csrf_token))
219 csrf_token=csrf_token))
222
220
223 response.mustcontain('Invalid value')
221 response.mustcontain('Invalid value')
224
222
225 # user is allowed to create in this group
223 # user is allowed to create in this group
226 repo_name = 'ingroup'
224 repo_name = 'ingroup'
227 repo_name_full = RepoGroup.url_sep().join(
225 repo_name_full = RepoGroup.url_sep().join(
228 [group_name_allowed, repo_name])
226 [group_name_allowed, repo_name])
229 description = 'description for newly created repo'
227 description = 'description for newly created repo'
230 response = self.app.post(
228 response = self.app.post(
231 url('repos'),
229 url('repos'),
232 fixture._get_repo_create_params(
230 fixture._get_repo_create_params(
233 repo_private=False,
231 repo_private=False,
234 repo_name=repo_name,
232 repo_name=repo_name,
235 repo_type=backend.alias,
233 repo_type=backend.alias,
236 repo_description=description,
234 repo_description=description,
237 repo_group=gr_allowed.group_id,
235 repo_group=gr_allowed.group_id,
238 csrf_token=csrf_token))
236 csrf_token=csrf_token))
239
237
240 # TODO: johbo: Cleanup in pytest fixture
238 # TODO: johbo: Cleanup in pytest fixture
241 try:
239 try:
242 self.assert_repository_is_created_correctly(
240 self.assert_repository_is_created_correctly(
243 repo_name_full, description, backend)
241 repo_name_full, description, backend)
244
242
245 new_repo = RepoModel().get_by_repo_name(repo_name_full)
243 new_repo = RepoModel().get_by_repo_name(repo_name_full)
246 inherited_perms = UserRepoToPerm.query().filter(
244 inherited_perms = UserRepoToPerm.query().filter(
247 UserRepoToPerm.repository_id == new_repo.repo_id).all()
245 UserRepoToPerm.repository_id == new_repo.repo_id).all()
248 assert len(inherited_perms) == 1
246 assert len(inherited_perms) == 1
249
247
250 assert repo_on_filesystem(repo_name_full)
248 assert repo_on_filesystem(repo_name_full)
251 finally:
249 finally:
252 RepoModel().delete(repo_name_full)
250 RepoModel().delete(repo_name_full)
253 RepoGroupModel().delete(group_name)
251 RepoGroupModel().delete(group_name)
254 RepoGroupModel().delete(group_name_allowed)
252 RepoGroupModel().delete(group_name_allowed)
255 Session().commit()
253 Session().commit()
256
254
257 def test_create_in_group_inherit_permissions(self, autologin_user, backend,
255 def test_create_in_group_inherit_permissions(self, autologin_user, backend,
258 csrf_token):
256 csrf_token):
259 # create GROUP
257 # create GROUP
260 group_name = 'sometest_%s' % backend.alias
258 group_name = 'sometest_%s' % backend.alias
261 gr = RepoGroupModel().create(group_name=group_name,
259 gr = RepoGroupModel().create(group_name=group_name,
262 group_description='test',
260 group_description='test',
263 owner=TEST_USER_ADMIN_LOGIN)
261 owner=TEST_USER_ADMIN_LOGIN)
264 perm = Permission.get_by_key('repository.write')
262 perm = Permission.get_by_key('repository.write')
265 RepoGroupModel().grant_user_permission(
263 RepoGroupModel().grant_user_permission(
266 gr, TEST_USER_REGULAR_LOGIN, perm)
264 gr, TEST_USER_REGULAR_LOGIN, perm)
267
265
268 # add repo permissions
266 # add repo permissions
269 Session().commit()
267 Session().commit()
270
268
271 repo_name = 'ingroup_inherited_%s' % backend.alias
269 repo_name = 'ingroup_inherited_%s' % backend.alias
272 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
270 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
273 description = 'description for newly created repo'
271 description = 'description for newly created repo'
274 self.app.post(
272 self.app.post(
275 url('repos'),
273 url('repos'),
276 fixture._get_repo_create_params(
274 fixture._get_repo_create_params(
277 repo_private=False,
275 repo_private=False,
278 repo_name=repo_name,
276 repo_name=repo_name,
279 repo_type=backend.alias,
277 repo_type=backend.alias,
280 repo_description=description,
278 repo_description=description,
281 repo_group=gr.group_id,
279 repo_group=gr.group_id,
282 repo_copy_permissions=True,
280 repo_copy_permissions=True,
283 csrf_token=csrf_token))
281 csrf_token=csrf_token))
284
282
285 # TODO: johbo: Cleanup to pytest fixture
283 # TODO: johbo: Cleanup to pytest fixture
286 try:
284 try:
287 self.assert_repository_is_created_correctly(
285 self.assert_repository_is_created_correctly(
288 repo_name_full, description, backend)
286 repo_name_full, description, backend)
289 except Exception:
287 except Exception:
290 RepoGroupModel().delete(group_name)
288 RepoGroupModel().delete(group_name)
291 Session().commit()
289 Session().commit()
292 raise
290 raise
293
291
294 # check if inherited permissions are applied
292 # check if inherited permissions are applied
295 new_repo = RepoModel().get_by_repo_name(repo_name_full)
293 new_repo = RepoModel().get_by_repo_name(repo_name_full)
296 inherited_perms = UserRepoToPerm.query().filter(
294 inherited_perms = UserRepoToPerm.query().filter(
297 UserRepoToPerm.repository_id == new_repo.repo_id).all()
295 UserRepoToPerm.repository_id == new_repo.repo_id).all()
298 assert len(inherited_perms) == 2
296 assert len(inherited_perms) == 2
299
297
300 assert TEST_USER_REGULAR_LOGIN in [
298 assert TEST_USER_REGULAR_LOGIN in [
301 x.user.username for x in inherited_perms]
299 x.user.username for x in inherited_perms]
302 assert 'repository.write' in [
300 assert 'repository.write' in [
303 x.permission.permission_name for x in inherited_perms]
301 x.permission.permission_name for x in inherited_perms]
304
302
305 RepoModel().delete(repo_name_full)
303 RepoModel().delete(repo_name_full)
306 RepoGroupModel().delete(group_name)
304 RepoGroupModel().delete(group_name)
307 Session().commit()
305 Session().commit()
308
306
309 @pytest.mark.xfail_backends(
307 @pytest.mark.xfail_backends(
310 "git", "hg", reason="Missing reposerver support")
308 "git", "hg", reason="Missing reposerver support")
311 def test_create_with_clone_uri(self, autologin_user, backend, reposerver,
309 def test_create_with_clone_uri(self, autologin_user, backend, reposerver,
312 csrf_token):
310 csrf_token):
313 source_repo = backend.create_repo(number_of_commits=2)
311 source_repo = backend.create_repo(number_of_commits=2)
314 source_repo_name = source_repo.repo_name
312 source_repo_name = source_repo.repo_name
315 reposerver.serve(source_repo.scm_instance())
313 reposerver.serve(source_repo.scm_instance())
316
314
317 repo_name = backend.new_repo_name()
315 repo_name = backend.new_repo_name()
318 response = self.app.post(
316 response = self.app.post(
319 url('repos'),
317 url('repos'),
320 fixture._get_repo_create_params(
318 fixture._get_repo_create_params(
321 repo_private=False,
319 repo_private=False,
322 repo_name=repo_name,
320 repo_name=repo_name,
323 repo_type=backend.alias,
321 repo_type=backend.alias,
324 repo_description='',
322 repo_description='',
325 clone_uri=reposerver.url,
323 clone_uri=reposerver.url,
326 csrf_token=csrf_token),
324 csrf_token=csrf_token),
327 status=302)
325 status=302)
328
326
329 # Should be redirected to the creating page
327 # Should be redirected to the creating page
330 response.mustcontain('repo_creating')
328 response.mustcontain('repo_creating')
331
329
332 # Expecting that both repositories have same history
330 # Expecting that both repositories have same history
333 source_repo = RepoModel().get_by_repo_name(source_repo_name)
331 source_repo = RepoModel().get_by_repo_name(source_repo_name)
334 source_vcs = source_repo.scm_instance()
332 source_vcs = source_repo.scm_instance()
335 repo = RepoModel().get_by_repo_name(repo_name)
333 repo = RepoModel().get_by_repo_name(repo_name)
336 repo_vcs = repo.scm_instance()
334 repo_vcs = repo.scm_instance()
337 assert source_vcs[0].message == repo_vcs[0].message
335 assert source_vcs[0].message == repo_vcs[0].message
338 assert source_vcs.count() == repo_vcs.count()
336 assert source_vcs.count() == repo_vcs.count()
339 assert source_vcs.commit_ids == repo_vcs.commit_ids
337 assert source_vcs.commit_ids == repo_vcs.commit_ids
340
338
341 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
339 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
342 def test_create_remote_repo_wrong_clone_uri(self, autologin_user, backend,
340 def test_create_remote_repo_wrong_clone_uri(self, autologin_user, backend,
343 csrf_token):
341 csrf_token):
344 repo_name = backend.new_repo_name()
342 repo_name = backend.new_repo_name()
345 description = 'description for newly created repo'
343 description = 'description for newly created repo'
346 response = self.app.post(
344 response = self.app.post(
347 url('repos'),
345 url('repos'),
348 fixture._get_repo_create_params(
346 fixture._get_repo_create_params(
349 repo_private=False,
347 repo_private=False,
350 repo_name=repo_name,
348 repo_name=repo_name,
351 repo_type=backend.alias,
349 repo_type=backend.alias,
352 repo_description=description,
350 repo_description=description,
353 clone_uri='http://repo.invalid/repo',
351 clone_uri='http://repo.invalid/repo',
354 csrf_token=csrf_token))
352 csrf_token=csrf_token))
355 response.mustcontain('invalid clone url')
353 response.mustcontain('invalid clone url')
356
354
357 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
355 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
358 def test_create_remote_repo_wrong_clone_uri_hg_svn(
356 def test_create_remote_repo_wrong_clone_uri_hg_svn(
359 self, autologin_user, backend, csrf_token):
357 self, autologin_user, backend, csrf_token):
360 repo_name = backend.new_repo_name()
358 repo_name = backend.new_repo_name()
361 description = 'description for newly created repo'
359 description = 'description for newly created repo'
362 response = self.app.post(
360 response = self.app.post(
363 url('repos'),
361 url('repos'),
364 fixture._get_repo_create_params(
362 fixture._get_repo_create_params(
365 repo_private=False,
363 repo_private=False,
366 repo_name=repo_name,
364 repo_name=repo_name,
367 repo_type=backend.alias,
365 repo_type=backend.alias,
368 repo_description=description,
366 repo_description=description,
369 clone_uri='svn+http://svn.invalid/repo',
367 clone_uri='svn+http://svn.invalid/repo',
370 csrf_token=csrf_token))
368 csrf_token=csrf_token))
371 response.mustcontain('invalid clone url')
369 response.mustcontain('invalid clone url')
372
370
373 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ™Ε‚'], ids=['', 'non-ascii'])
371 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ™Ε‚'], ids=['', 'non-ascii'])
374 def test_delete(self, autologin_user, backend, suffix, csrf_token):
372 def test_delete(self, autologin_user, backend, suffix, csrf_token):
375 repo = backend.create_repo(name_suffix=suffix)
373 repo = backend.create_repo(name_suffix=suffix)
376 repo_name = repo.repo_name
374 repo_name = repo.repo_name
377
375
378 response = self.app.post(url('repo', repo_name=repo_name),
376 response = self.app.post(url('repo', repo_name=repo_name),
379 params={'_method': 'delete',
377 params={'_method': 'delete',
380 'csrf_token': csrf_token})
378 'csrf_token': csrf_token})
381 assert_session_flash(response, 'Deleted repository %s' % (repo_name))
379 assert_session_flash(response, 'Deleted repository %s' % (repo_name))
382 response.follow()
380 response.follow()
383
381
384 # check if repo was deleted from db
382 # check if repo was deleted from db
385 assert RepoModel().get_by_repo_name(repo_name) is None
383 assert RepoModel().get_by_repo_name(repo_name) is None
386 assert not repo_on_filesystem(repo_name)
384 assert not repo_on_filesystem(repo_name)
387
385
388 def test_show(self, autologin_user, backend):
386 def test_show(self, autologin_user, backend):
389 self.app.get(url('repo', repo_name=backend.repo_name))
387 self.app.get(url('repo', repo_name=backend.repo_name))
390
388
391 def test_edit(self, backend, autologin_user):
389 def test_edit(self, backend, autologin_user):
392 self.app.get(url('edit_repo', repo_name=backend.repo_name))
390 self.app.get(url('edit_repo', repo_name=backend.repo_name))
393
391
394 def test_edit_accessible_when_missing_requirements(
392 def test_edit_accessible_when_missing_requirements(
395 self, backend_hg, autologin_user):
393 self, backend_hg, autologin_user):
396 scm_patcher = mock.patch.object(
394 scm_patcher = mock.patch.object(
397 Repository, 'scm_instance', side_effect=RepositoryRequirementError)
395 Repository, 'scm_instance', side_effect=RepositoryRequirementError)
398 with scm_patcher:
396 with scm_patcher:
399 self.app.get(url('edit_repo', repo_name=backend_hg.repo_name))
397 self.app.get(url('edit_repo', repo_name=backend_hg.repo_name))
400
398
401 def test_set_private_flag_sets_default_to_none(
399 def test_set_private_flag_sets_default_to_none(
402 self, autologin_user, backend, csrf_token):
400 self, autologin_user, backend, csrf_token):
403 # initially repository perm should be read
401 # initially repository perm should be read
404 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
402 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
405 assert len(perm) == 1
403 assert len(perm) == 1
406 assert perm[0].permission.permission_name == 'repository.read'
404 assert perm[0].permission.permission_name == 'repository.read'
407 assert not backend.repo.private
405 assert not backend.repo.private
408
406
409 response = self.app.post(
407 response = self.app.post(
410 url('repo', repo_name=backend.repo_name),
408 url('repo', repo_name=backend.repo_name),
411 fixture._get_repo_create_params(
409 fixture._get_repo_create_params(
412 repo_private=1,
410 repo_private=1,
413 repo_name=backend.repo_name,
411 repo_name=backend.repo_name,
414 repo_type=backend.alias,
412 repo_type=backend.alias,
415 user=TEST_USER_ADMIN_LOGIN,
413 user=TEST_USER_ADMIN_LOGIN,
416 _method='put',
414 _method='put',
417 csrf_token=csrf_token))
415 csrf_token=csrf_token))
418 assert_session_flash(
416 assert_session_flash(
419 response,
417 response,
420 msg='Repository %s updated successfully' % (backend.repo_name))
418 msg='Repository %s updated successfully' % (backend.repo_name))
421 assert backend.repo.private
419 assert backend.repo.private
422
420
423 # now the repo default permission should be None
421 # now the repo default permission should be None
424 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
422 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
425 assert len(perm) == 1
423 assert len(perm) == 1
426 assert perm[0].permission.permission_name == 'repository.none'
424 assert perm[0].permission.permission_name == 'repository.none'
427
425
428 response = self.app.post(
426 response = self.app.post(
429 url('repo', repo_name=backend.repo_name),
427 url('repo', repo_name=backend.repo_name),
430 fixture._get_repo_create_params(
428 fixture._get_repo_create_params(
431 repo_private=False,
429 repo_private=False,
432 repo_name=backend.repo_name,
430 repo_name=backend.repo_name,
433 repo_type=backend.alias,
431 repo_type=backend.alias,
434 user=TEST_USER_ADMIN_LOGIN,
432 user=TEST_USER_ADMIN_LOGIN,
435 _method='put',
433 _method='put',
436 csrf_token=csrf_token))
434 csrf_token=csrf_token))
437 assert_session_flash(
435 assert_session_flash(
438 response,
436 response,
439 msg='Repository %s updated successfully' % (backend.repo_name))
437 msg='Repository %s updated successfully' % (backend.repo_name))
440 assert not backend.repo.private
438 assert not backend.repo.private
441
439
442 # we turn off private now the repo default permission should stay None
440 # we turn off private now the repo default permission should stay None
443 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
441 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
444 assert len(perm) == 1
442 assert len(perm) == 1
445 assert perm[0].permission.permission_name == 'repository.none'
443 assert perm[0].permission.permission_name == 'repository.none'
446
444
447 # update this permission back
445 # update this permission back
448 perm[0].permission = Permission.get_by_key('repository.read')
446 perm[0].permission = Permission.get_by_key('repository.read')
449 Session().add(perm[0])
447 Session().add(perm[0])
450 Session().commit()
448 Session().commit()
451
449
452 def test_default_user_cannot_access_private_repo_in_a_group(
450 def test_default_user_cannot_access_private_repo_in_a_group(
453 self, autologin_user, user_util, backend, csrf_token):
451 self, autologin_user, user_util, backend, csrf_token):
454
452
455 group = user_util.create_repo_group()
453 group = user_util.create_repo_group()
456
454
457 repo = backend.create_repo(
455 repo = backend.create_repo(
458 repo_private=True, repo_group=group, repo_copy_permissions=True)
456 repo_private=True, repo_group=group, repo_copy_permissions=True)
459
457
460 permissions = _get_permission_for_user(
458 permissions = _get_permission_for_user(
461 user='default', repo=repo.repo_name)
459 user='default', repo=repo.repo_name)
462 assert len(permissions) == 1
460 assert len(permissions) == 1
463 assert permissions[0].permission.permission_name == 'repository.none'
461 assert permissions[0].permission.permission_name == 'repository.none'
464 assert permissions[0].repository.private is True
462 assert permissions[0].repository.private is True
465
463
466 def test_set_repo_fork_has_no_self_id(self, autologin_user, backend):
464 def test_set_repo_fork_has_no_self_id(self, autologin_user, backend):
467 repo = backend.repo
465 repo = backend.repo
468 response = self.app.get(
466 response = self.app.get(
469 url('edit_repo_advanced', repo_name=backend.repo_name))
467 url('edit_repo_advanced', repo_name=backend.repo_name))
470 opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id
468 opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id
471 response.mustcontain(no=[opt])
469 response.mustcontain(no=[opt])
472
470
473 def test_set_fork_of_target_repo(
471 def test_set_fork_of_target_repo(
474 self, autologin_user, backend, csrf_token):
472 self, autologin_user, backend, csrf_token):
475 target_repo = 'target_%s' % backend.alias
473 target_repo = 'target_%s' % backend.alias
476 fixture.create_repo(target_repo, repo_type=backend.alias)
474 fixture.create_repo(target_repo, repo_type=backend.alias)
477 repo2 = Repository.get_by_repo_name(target_repo)
475 repo2 = Repository.get_by_repo_name(target_repo)
478 response = self.app.post(
476 response = self.app.post(
479 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
477 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
480 params={'id_fork_of': repo2.repo_id, '_method': 'put',
478 params={'id_fork_of': repo2.repo_id, '_method': 'put',
481 'csrf_token': csrf_token})
479 'csrf_token': csrf_token})
482 repo = Repository.get_by_repo_name(backend.repo_name)
480 repo = Repository.get_by_repo_name(backend.repo_name)
483 repo2 = Repository.get_by_repo_name(target_repo)
481 repo2 = Repository.get_by_repo_name(target_repo)
484 assert_session_flash(
482 assert_session_flash(
485 response,
483 response,
486 'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name))
484 'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name))
487
485
488 assert repo.fork == repo2
486 assert repo.fork == repo2
489 response = response.follow()
487 response = response.follow()
490 # check if given repo is selected
488 # check if given repo is selected
491
489
492 opt = 'This repository is a fork of <a href="%s">%s</a>' % (
490 opt = 'This repository is a fork of <a href="%s">%s</a>' % (
493 url('summary_home', repo_name=repo2.repo_name), repo2.repo_name)
491 url('summary_home', repo_name=repo2.repo_name), repo2.repo_name)
494
492
495 response.mustcontain(opt)
493 response.mustcontain(opt)
496
494
497 fixture.destroy_repo(target_repo, forks='detach')
495 fixture.destroy_repo(target_repo, forks='detach')
498
496
499 @pytest.mark.backends("hg", "git")
497 @pytest.mark.backends("hg", "git")
500 def test_set_fork_of_other_type_repo(self, autologin_user, backend,
498 def test_set_fork_of_other_type_repo(self, autologin_user, backend,
501 csrf_token):
499 csrf_token):
502 TARGET_REPO_MAP = {
500 TARGET_REPO_MAP = {
503 'git': {
501 'git': {
504 'type': 'hg',
502 'type': 'hg',
505 'repo_name': HG_REPO},
503 'repo_name': HG_REPO},
506 'hg': {
504 'hg': {
507 'type': 'git',
505 'type': 'git',
508 'repo_name': GIT_REPO},
506 'repo_name': GIT_REPO},
509 }
507 }
510 target_repo = TARGET_REPO_MAP[backend.alias]
508 target_repo = TARGET_REPO_MAP[backend.alias]
511
509
512 repo2 = Repository.get_by_repo_name(target_repo['repo_name'])
510 repo2 = Repository.get_by_repo_name(target_repo['repo_name'])
513 response = self.app.post(
511 response = self.app.post(
514 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
512 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
515 params={'id_fork_of': repo2.repo_id, '_method': 'put',
513 params={'id_fork_of': repo2.repo_id, '_method': 'put',
516 'csrf_token': csrf_token})
514 'csrf_token': csrf_token})
517 assert_session_flash(
515 assert_session_flash(
518 response,
516 response,
519 'Cannot set repository as fork of repository with other type')
517 'Cannot set repository as fork of repository with other type')
520
518
521 def test_set_fork_of_none(self, autologin_user, backend, csrf_token):
519 def test_set_fork_of_none(self, autologin_user, backend, csrf_token):
522 # mark it as None
520 # mark it as None
523 response = self.app.post(
521 response = self.app.post(
524 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
522 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
525 params={'id_fork_of': None, '_method': 'put',
523 params={'id_fork_of': None, '_method': 'put',
526 'csrf_token': csrf_token})
524 'csrf_token': csrf_token})
527 assert_session_flash(
525 assert_session_flash(
528 response,
526 response,
529 'Marked repo %s as fork of %s'
527 'Marked repo %s as fork of %s'
530 % (backend.repo_name, "Nothing"))
528 % (backend.repo_name, "Nothing"))
531 assert backend.repo.fork is None
529 assert backend.repo.fork is None
532
530
533 def test_set_fork_of_same_repo(self, autologin_user, backend, csrf_token):
531 def test_set_fork_of_same_repo(self, autologin_user, backend, csrf_token):
534 repo = Repository.get_by_repo_name(backend.repo_name)
532 repo = Repository.get_by_repo_name(backend.repo_name)
535 response = self.app.post(
533 response = self.app.post(
536 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
534 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
537 params={'id_fork_of': repo.repo_id, '_method': 'put',
535 params={'id_fork_of': repo.repo_id, '_method': 'put',
538 'csrf_token': csrf_token})
536 'csrf_token': csrf_token})
539 assert_session_flash(
537 assert_session_flash(
540 response, 'An error occurred during this operation')
538 response, 'An error occurred during this operation')
541
539
542 def test_create_on_top_level_without_permissions(self, backend):
540 def test_create_on_top_level_without_permissions(self, backend):
543 session = login_user_session(
541 session = login_user_session(
544 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
542 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
545 csrf_token = auth.get_csrf_token(session)
543 csrf_token = auth.get_csrf_token(session)
546
544
547 # revoke
545 # revoke
548 user_model = UserModel()
546 user_model = UserModel()
549 # disable fork and create on default user
547 # disable fork and create on default user
550 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
548 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
551 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
549 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
552 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
550 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
553 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
551 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
554
552
555 # disable on regular user
553 # disable on regular user
556 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
554 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
557 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
555 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
558 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
556 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
559 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
557 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
560 Session().commit()
558 Session().commit()
561
559
562 repo_name = backend.new_repo_name()
560 repo_name = backend.new_repo_name()
563 description = 'description for newly created repo'
561 description = 'description for newly created repo'
564 response = self.app.post(
562 response = self.app.post(
565 url('repos'),
563 url('repos'),
566 fixture._get_repo_create_params(
564 fixture._get_repo_create_params(
567 repo_private=False,
565 repo_private=False,
568 repo_name=repo_name,
566 repo_name=repo_name,
569 repo_type=backend.alias,
567 repo_type=backend.alias,
570 repo_description=description,
568 repo_description=description,
571 csrf_token=csrf_token))
569 csrf_token=csrf_token))
572
570
573 response.mustcontain(
571 response.mustcontain(
574 u"You do not have the permission to store repositories in "
572 u"You do not have the permission to store repositories in "
575 u"the root location.")
573 u"the root location.")
576
574
577 @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
575 @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
578 def test_create_repo_when_filesystem_op_fails(
576 def test_create_repo_when_filesystem_op_fails(
579 self, autologin_user, backend, csrf_token):
577 self, autologin_user, backend, csrf_token):
580 repo_name = backend.new_repo_name()
578 repo_name = backend.new_repo_name()
581 description = 'description for newly created repo'
579 description = 'description for newly created repo'
582
580
583 response = self.app.post(
581 response = self.app.post(
584 url('repos'),
582 url('repos'),
585 fixture._get_repo_create_params(
583 fixture._get_repo_create_params(
586 repo_private=False,
584 repo_private=False,
587 repo_name=repo_name,
585 repo_name=repo_name,
588 repo_type=backend.alias,
586 repo_type=backend.alias,
589 repo_description=description,
587 repo_description=description,
590 csrf_token=csrf_token))
588 csrf_token=csrf_token))
591
589
592 assert_session_flash(
590 assert_session_flash(
593 response, 'Error creating repository %s' % repo_name)
591 response, 'Error creating repository %s' % repo_name)
594 # repo must not be in db
592 # repo must not be in db
595 assert backend.repo is None
593 assert backend.repo is None
596 # repo must not be in filesystem !
594 # repo must not be in filesystem !
597 assert not repo_on_filesystem(repo_name)
595 assert not repo_on_filesystem(repo_name)
598
596
599 def assert_repository_is_created_correctly(
597 def assert_repository_is_created_correctly(
600 self, repo_name, description, backend):
598 self, repo_name, description, backend):
601 repo_name_utf8 = repo_name.encode('utf-8')
599 repo_name_utf8 = repo_name.encode('utf-8')
602
600
603 # run the check page that triggers the flash message
601 # run the check page that triggers the flash message
604 response = self.app.get(url('repo_check_home', repo_name=repo_name))
602 response = self.app.get(url('repo_check_home', repo_name=repo_name))
605 assert response.json == {u'result': True}
603 assert response.json == {u'result': True}
606 assert_session_flash(
604 assert_session_flash(
607 response,
605 response,
608 u'Created repository <a href="/%s">%s</a>'
606 u'Created repository <a href="/%s">%s</a>'
609 % (urllib.quote(repo_name_utf8), repo_name))
607 % (urllib.quote(repo_name_utf8), repo_name))
610
608
611 # test if the repo was created in the database
609 # test if the repo was created in the database
612 new_repo = RepoModel().get_by_repo_name(repo_name)
610 new_repo = RepoModel().get_by_repo_name(repo_name)
613
611
614 assert new_repo.repo_name == repo_name
612 assert new_repo.repo_name == repo_name
615 assert new_repo.description == description
613 assert new_repo.description == description
616
614
617 # test if the repository is visible in the list ?
615 # test if the repository is visible in the list ?
618 response = self.app.get(url('summary_home', repo_name=repo_name))
616 response = self.app.get(url('summary_home', repo_name=repo_name))
619 response.mustcontain(repo_name)
617 response.mustcontain(repo_name)
620 response.mustcontain(backend.alias)
618 response.mustcontain(backend.alias)
621
619
622 assert repo_on_filesystem(repo_name)
620 assert repo_on_filesystem(repo_name)
623
621
624
622
625 @pytest.mark.usefixtures("app")
623 @pytest.mark.usefixtures("app")
626 class TestVcsSettings(object):
624 class TestVcsSettings(object):
627 FORM_DATA = {
625 FORM_DATA = {
628 'inherit_global_settings': False,
626 'inherit_global_settings': False,
629 'hooks_changegroup_repo_size': False,
627 'hooks_changegroup_repo_size': False,
630 'hooks_changegroup_push_logger': False,
628 'hooks_changegroup_push_logger': False,
631 'hooks_outgoing_pull_logger': False,
629 'hooks_outgoing_pull_logger': False,
632 'extensions_largefiles': False,
630 'extensions_largefiles': False,
633 'phases_publish': 'false',
631 'phases_publish': 'false',
634 'rhodecode_pr_merge_enabled': False,
632 'rhodecode_pr_merge_enabled': False,
635 'rhodecode_use_outdated_comments': False,
633 'rhodecode_use_outdated_comments': False,
636 'new_svn_branch': '',
634 'new_svn_branch': '',
637 'new_svn_tag': ''
635 'new_svn_tag': ''
638 }
636 }
639
637
640 @pytest.mark.skip_backends('svn')
638 @pytest.mark.skip_backends('svn')
641 def test_global_settings_initial_values(self, autologin_user, backend):
639 def test_global_settings_initial_values(self, autologin_user, backend):
642 repo_name = backend.repo_name
640 repo_name = backend.repo_name
643 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
641 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
644
642
645 expected_settings = (
643 expected_settings = (
646 'rhodecode_use_outdated_comments', 'rhodecode_pr_merge_enabled',
644 'rhodecode_use_outdated_comments', 'rhodecode_pr_merge_enabled',
647 'hooks_changegroup_repo_size', 'hooks_changegroup_push_logger',
645 'hooks_changegroup_repo_size', 'hooks_changegroup_push_logger',
648 'hooks_outgoing_pull_logger'
646 'hooks_outgoing_pull_logger'
649 )
647 )
650 for setting in expected_settings:
648 for setting in expected_settings:
651 self.assert_repo_value_equals_global_value(response, setting)
649 self.assert_repo_value_equals_global_value(response, setting)
652
650
653 def test_show_settings_requires_repo_admin_permission(
651 def test_show_settings_requires_repo_admin_permission(
654 self, backend, user_util, settings_util):
652 self, backend, user_util, settings_util):
655 repo = backend.create_repo()
653 repo = backend.create_repo()
656 repo_name = repo.repo_name
654 repo_name = repo.repo_name
657 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
655 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
658 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
656 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
659 login_user_session(
657 login_user_session(
660 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
658 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
661 self.app.get(url('repo_vcs_settings', repo_name=repo_name), status=200)
659 self.app.get(url('repo_vcs_settings', repo_name=repo_name), status=200)
662
660
663 def test_inherit_global_settings_flag_is_true_by_default(
661 def test_inherit_global_settings_flag_is_true_by_default(
664 self, autologin_user, backend):
662 self, autologin_user, backend):
665 repo_name = backend.repo_name
663 repo_name = backend.repo_name
666 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
664 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
667
665
668 assert_response = AssertResponse(response)
666 assert_response = AssertResponse(response)
669 element = assert_response.get_element('#inherit_global_settings')
667 element = assert_response.get_element('#inherit_global_settings')
670 assert element.checked
668 assert element.checked
671
669
672 @pytest.mark.parametrize('checked_value', [True, False])
670 @pytest.mark.parametrize('checked_value', [True, False])
673 def test_inherit_global_settings_value(
671 def test_inherit_global_settings_value(
674 self, autologin_user, backend, checked_value, settings_util):
672 self, autologin_user, backend, checked_value, settings_util):
675 repo = backend.create_repo()
673 repo = backend.create_repo()
676 repo_name = repo.repo_name
674 repo_name = repo.repo_name
677 settings_util.create_repo_rhodecode_setting(
675 settings_util.create_repo_rhodecode_setting(
678 repo, 'inherit_vcs_settings', checked_value, 'bool')
676 repo, 'inherit_vcs_settings', checked_value, 'bool')
679 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
677 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
680
678
681 assert_response = AssertResponse(response)
679 assert_response = AssertResponse(response)
682 element = assert_response.get_element('#inherit_global_settings')
680 element = assert_response.get_element('#inherit_global_settings')
683 assert element.checked == checked_value
681 assert element.checked == checked_value
684
682
685 @pytest.mark.skip_backends('svn')
683 @pytest.mark.skip_backends('svn')
686 def test_hooks_settings_are_created(
684 def test_hooks_settings_are_created(
687 self, autologin_user, backend, csrf_token):
685 self, autologin_user, backend, csrf_token):
688 repo_name = backend.repo_name
686 repo_name = backend.repo_name
689 data = self.FORM_DATA.copy()
687 data = self.FORM_DATA.copy()
690 data['csrf_token'] = csrf_token
688 data['csrf_token'] = csrf_token
691 self.app.post(
689 self.app.post(
692 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
690 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
693 settings = SettingsModel(repo=repo_name)
691 settings = SettingsModel(repo=repo_name)
694 try:
692 try:
695 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
693 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
696 ui = settings.get_ui_by_section_and_key(section, key)
694 ui = settings.get_ui_by_section_and_key(section, key)
697 assert ui.ui_active is False
695 assert ui.ui_active is False
698 finally:
696 finally:
699 self._cleanup_repo_settings(settings)
697 self._cleanup_repo_settings(settings)
700
698
701 def test_hooks_settings_are_not_created_for_svn(
699 def test_hooks_settings_are_not_created_for_svn(
702 self, autologin_user, backend_svn, csrf_token):
700 self, autologin_user, backend_svn, csrf_token):
703 repo_name = backend_svn.repo_name
701 repo_name = backend_svn.repo_name
704 data = self.FORM_DATA.copy()
702 data = self.FORM_DATA.copy()
705 data['csrf_token'] = csrf_token
703 data['csrf_token'] = csrf_token
706 self.app.post(
704 self.app.post(
707 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
705 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
708 settings = SettingsModel(repo=repo_name)
706 settings = SettingsModel(repo=repo_name)
709 try:
707 try:
710 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
708 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
711 ui = settings.get_ui_by_section_and_key(section, key)
709 ui = settings.get_ui_by_section_and_key(section, key)
712 assert ui is None
710 assert ui is None
713 finally:
711 finally:
714 self._cleanup_repo_settings(settings)
712 self._cleanup_repo_settings(settings)
715
713
716 @pytest.mark.skip_backends('svn')
714 @pytest.mark.skip_backends('svn')
717 def test_hooks_settings_are_updated(
715 def test_hooks_settings_are_updated(
718 self, autologin_user, backend, csrf_token):
716 self, autologin_user, backend, csrf_token):
719 repo_name = backend.repo_name
717 repo_name = backend.repo_name
720 settings = SettingsModel(repo=repo_name)
718 settings = SettingsModel(repo=repo_name)
721 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
719 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
722 settings.create_ui_section_value(section, '', key=key, active=True)
720 settings.create_ui_section_value(section, '', key=key, active=True)
723
721
724 data = self.FORM_DATA.copy()
722 data = self.FORM_DATA.copy()
725 data['csrf_token'] = csrf_token
723 data['csrf_token'] = csrf_token
726 self.app.post(
724 self.app.post(
727 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
725 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
728 try:
726 try:
729 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
727 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
730 ui = settings.get_ui_by_section_and_key(section, key)
728 ui = settings.get_ui_by_section_and_key(section, key)
731 assert ui.ui_active is False
729 assert ui.ui_active is False
732 finally:
730 finally:
733 self._cleanup_repo_settings(settings)
731 self._cleanup_repo_settings(settings)
734
732
735 def test_hooks_settings_are_not_updated_for_svn(
733 def test_hooks_settings_are_not_updated_for_svn(
736 self, autologin_user, backend_svn, csrf_token):
734 self, autologin_user, backend_svn, csrf_token):
737 repo_name = backend_svn.repo_name
735 repo_name = backend_svn.repo_name
738 settings = SettingsModel(repo=repo_name)
736 settings = SettingsModel(repo=repo_name)
739 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
737 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
740 settings.create_ui_section_value(section, '', key=key, active=True)
738 settings.create_ui_section_value(section, '', key=key, active=True)
741
739
742 data = self.FORM_DATA.copy()
740 data = self.FORM_DATA.copy()
743 data['csrf_token'] = csrf_token
741 data['csrf_token'] = csrf_token
744 self.app.post(
742 self.app.post(
745 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
743 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
746 try:
744 try:
747 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
745 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
748 ui = settings.get_ui_by_section_and_key(section, key)
746 ui = settings.get_ui_by_section_and_key(section, key)
749 assert ui.ui_active is True
747 assert ui.ui_active is True
750 finally:
748 finally:
751 self._cleanup_repo_settings(settings)
749 self._cleanup_repo_settings(settings)
752
750
753 @pytest.mark.skip_backends('svn')
751 @pytest.mark.skip_backends('svn')
754 def test_pr_settings_are_created(
752 def test_pr_settings_are_created(
755 self, autologin_user, backend, csrf_token):
753 self, autologin_user, backend, csrf_token):
756 repo_name = backend.repo_name
754 repo_name = backend.repo_name
757 data = self.FORM_DATA.copy()
755 data = self.FORM_DATA.copy()
758 data['csrf_token'] = csrf_token
756 data['csrf_token'] = csrf_token
759 self.app.post(
757 self.app.post(
760 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
758 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
761 settings = SettingsModel(repo=repo_name)
759 settings = SettingsModel(repo=repo_name)
762 try:
760 try:
763 for name in VcsSettingsModel.GENERAL_SETTINGS:
761 for name in VcsSettingsModel.GENERAL_SETTINGS:
764 setting = settings.get_setting_by_name(name)
762 setting = settings.get_setting_by_name(name)
765 assert setting.app_settings_value is False
763 assert setting.app_settings_value is False
766 finally:
764 finally:
767 self._cleanup_repo_settings(settings)
765 self._cleanup_repo_settings(settings)
768
766
769 def test_pr_settings_are_not_created_for_svn(
767 def test_pr_settings_are_not_created_for_svn(
770 self, autologin_user, backend_svn, csrf_token):
768 self, autologin_user, backend_svn, csrf_token):
771 repo_name = backend_svn.repo_name
769 repo_name = backend_svn.repo_name
772 data = self.FORM_DATA.copy()
770 data = self.FORM_DATA.copy()
773 data['csrf_token'] = csrf_token
771 data['csrf_token'] = csrf_token
774 self.app.post(
772 self.app.post(
775 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
773 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
776 settings = SettingsModel(repo=repo_name)
774 settings = SettingsModel(repo=repo_name)
777 try:
775 try:
778 for name in VcsSettingsModel.GENERAL_SETTINGS:
776 for name in VcsSettingsModel.GENERAL_SETTINGS:
779 setting = settings.get_setting_by_name(name)
777 setting = settings.get_setting_by_name(name)
780 assert setting is None
778 assert setting is None
781 finally:
779 finally:
782 self._cleanup_repo_settings(settings)
780 self._cleanup_repo_settings(settings)
783
781
784 def test_pr_settings_creation_requires_repo_admin_permission(
782 def test_pr_settings_creation_requires_repo_admin_permission(
785 self, backend, user_util, settings_util, csrf_token):
783 self, backend, user_util, settings_util, csrf_token):
786 repo = backend.create_repo()
784 repo = backend.create_repo()
787 repo_name = repo.repo_name
785 repo_name = repo.repo_name
788 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
786 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
789
787
790 logout_user_session(self.app, csrf_token)
788 logout_user_session(self.app, csrf_token)
791 session = login_user_session(
789 session = login_user_session(
792 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
790 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
793 new_csrf_token = auth.get_csrf_token(session)
791 new_csrf_token = auth.get_csrf_token(session)
794
792
795 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
793 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
796 data = self.FORM_DATA.copy()
794 data = self.FORM_DATA.copy()
797 data['csrf_token'] = new_csrf_token
795 data['csrf_token'] = new_csrf_token
798 settings = SettingsModel(repo=repo_name)
796 settings = SettingsModel(repo=repo_name)
799
797
800 try:
798 try:
801 self.app.post(
799 self.app.post(
802 url('repo_vcs_settings', repo_name=repo_name), data,
800 url('repo_vcs_settings', repo_name=repo_name), data,
803 status=302)
801 status=302)
804 finally:
802 finally:
805 self._cleanup_repo_settings(settings)
803 self._cleanup_repo_settings(settings)
806
804
807 @pytest.mark.skip_backends('svn')
805 @pytest.mark.skip_backends('svn')
808 def test_pr_settings_are_updated(
806 def test_pr_settings_are_updated(
809 self, autologin_user, backend, csrf_token):
807 self, autologin_user, backend, csrf_token):
810 repo_name = backend.repo_name
808 repo_name = backend.repo_name
811 settings = SettingsModel(repo=repo_name)
809 settings = SettingsModel(repo=repo_name)
812 for name in VcsSettingsModel.GENERAL_SETTINGS:
810 for name in VcsSettingsModel.GENERAL_SETTINGS:
813 settings.create_or_update_setting(name, True, 'bool')
811 settings.create_or_update_setting(name, True, 'bool')
814
812
815 data = self.FORM_DATA.copy()
813 data = self.FORM_DATA.copy()
816 data['csrf_token'] = csrf_token
814 data['csrf_token'] = csrf_token
817 self.app.post(
815 self.app.post(
818 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
816 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
819 try:
817 try:
820 for name in VcsSettingsModel.GENERAL_SETTINGS:
818 for name in VcsSettingsModel.GENERAL_SETTINGS:
821 setting = settings.get_setting_by_name(name)
819 setting = settings.get_setting_by_name(name)
822 assert setting.app_settings_value is False
820 assert setting.app_settings_value is False
823 finally:
821 finally:
824 self._cleanup_repo_settings(settings)
822 self._cleanup_repo_settings(settings)
825
823
826 def test_pr_settings_are_not_updated_for_svn(
824 def test_pr_settings_are_not_updated_for_svn(
827 self, autologin_user, backend_svn, csrf_token):
825 self, autologin_user, backend_svn, csrf_token):
828 repo_name = backend_svn.repo_name
826 repo_name = backend_svn.repo_name
829 settings = SettingsModel(repo=repo_name)
827 settings = SettingsModel(repo=repo_name)
830 for name in VcsSettingsModel.GENERAL_SETTINGS:
828 for name in VcsSettingsModel.GENERAL_SETTINGS:
831 settings.create_or_update_setting(name, True, 'bool')
829 settings.create_or_update_setting(name, True, 'bool')
832
830
833 data = self.FORM_DATA.copy()
831 data = self.FORM_DATA.copy()
834 data['csrf_token'] = csrf_token
832 data['csrf_token'] = csrf_token
835 self.app.post(
833 self.app.post(
836 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
834 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
837 try:
835 try:
838 for name in VcsSettingsModel.GENERAL_SETTINGS:
836 for name in VcsSettingsModel.GENERAL_SETTINGS:
839 setting = settings.get_setting_by_name(name)
837 setting = settings.get_setting_by_name(name)
840 assert setting.app_settings_value is True
838 assert setting.app_settings_value is True
841 finally:
839 finally:
842 self._cleanup_repo_settings(settings)
840 self._cleanup_repo_settings(settings)
843
841
844 def test_svn_settings_are_created(
842 def test_svn_settings_are_created(
845 self, autologin_user, backend_svn, csrf_token, settings_util):
843 self, autologin_user, backend_svn, csrf_token, settings_util):
846 repo_name = backend_svn.repo_name
844 repo_name = backend_svn.repo_name
847 data = self.FORM_DATA.copy()
845 data = self.FORM_DATA.copy()
848 data['new_svn_tag'] = 'svn-tag'
846 data['new_svn_tag'] = 'svn-tag'
849 data['new_svn_branch'] = 'svn-branch'
847 data['new_svn_branch'] = 'svn-branch'
850 data['csrf_token'] = csrf_token
848 data['csrf_token'] = csrf_token
851
849
852 # Create few global settings to make sure that uniqueness validators
850 # Create few global settings to make sure that uniqueness validators
853 # are not triggered
851 # are not triggered
854 settings_util.create_rhodecode_ui(
852 settings_util.create_rhodecode_ui(
855 VcsSettingsModel.SVN_BRANCH_SECTION, 'svn-branch')
853 VcsSettingsModel.SVN_BRANCH_SECTION, 'svn-branch')
856 settings_util.create_rhodecode_ui(
854 settings_util.create_rhodecode_ui(
857 VcsSettingsModel.SVN_TAG_SECTION, 'svn-tag')
855 VcsSettingsModel.SVN_TAG_SECTION, 'svn-tag')
858
856
859 self.app.post(
857 self.app.post(
860 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
858 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
861 settings = SettingsModel(repo=repo_name)
859 settings = SettingsModel(repo=repo_name)
862 try:
860 try:
863 svn_branches = settings.get_ui_by_section(
861 svn_branches = settings.get_ui_by_section(
864 VcsSettingsModel.SVN_BRANCH_SECTION)
862 VcsSettingsModel.SVN_BRANCH_SECTION)
865 svn_branch_names = [b.ui_value for b in svn_branches]
863 svn_branch_names = [b.ui_value for b in svn_branches]
866 svn_tags = settings.get_ui_by_section(
864 svn_tags = settings.get_ui_by_section(
867 VcsSettingsModel.SVN_TAG_SECTION)
865 VcsSettingsModel.SVN_TAG_SECTION)
868 svn_tag_names = [b.ui_value for b in svn_tags]
866 svn_tag_names = [b.ui_value for b in svn_tags]
869 assert 'svn-branch' in svn_branch_names
867 assert 'svn-branch' in svn_branch_names
870 assert 'svn-tag' in svn_tag_names
868 assert 'svn-tag' in svn_tag_names
871 finally:
869 finally:
872 self._cleanup_repo_settings(settings)
870 self._cleanup_repo_settings(settings)
873
871
874 def test_svn_settings_are_unique(
872 def test_svn_settings_are_unique(
875 self, autologin_user, backend_svn, csrf_token, settings_util):
873 self, autologin_user, backend_svn, csrf_token, settings_util):
876 repo = backend_svn.repo
874 repo = backend_svn.repo
877 repo_name = repo.repo_name
875 repo_name = repo.repo_name
878 data = self.FORM_DATA.copy()
876 data = self.FORM_DATA.copy()
879 data['new_svn_tag'] = 'test_tag'
877 data['new_svn_tag'] = 'test_tag'
880 data['new_svn_branch'] = 'test_branch'
878 data['new_svn_branch'] = 'test_branch'
881 data['csrf_token'] = csrf_token
879 data['csrf_token'] = csrf_token
882 settings_util.create_repo_rhodecode_ui(
880 settings_util.create_repo_rhodecode_ui(
883 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch')
881 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch')
884 settings_util.create_repo_rhodecode_ui(
882 settings_util.create_repo_rhodecode_ui(
885 repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag')
883 repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag')
886
884
887 response = self.app.post(
885 response = self.app.post(
888 url('repo_vcs_settings', repo_name=repo_name), data, status=200)
886 url('repo_vcs_settings', repo_name=repo_name), data, status=200)
889 response.mustcontain('Pattern already exists')
887 response.mustcontain('Pattern already exists')
890
888
891 def test_svn_settings_with_empty_values_are_not_created(
889 def test_svn_settings_with_empty_values_are_not_created(
892 self, autologin_user, backend_svn, csrf_token):
890 self, autologin_user, backend_svn, csrf_token):
893 repo_name = backend_svn.repo_name
891 repo_name = backend_svn.repo_name
894 data = self.FORM_DATA.copy()
892 data = self.FORM_DATA.copy()
895 data['csrf_token'] = csrf_token
893 data['csrf_token'] = csrf_token
896 self.app.post(
894 self.app.post(
897 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
895 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
898 settings = SettingsModel(repo=repo_name)
896 settings = SettingsModel(repo=repo_name)
899 try:
897 try:
900 svn_branches = settings.get_ui_by_section(
898 svn_branches = settings.get_ui_by_section(
901 VcsSettingsModel.SVN_BRANCH_SECTION)
899 VcsSettingsModel.SVN_BRANCH_SECTION)
902 svn_tags = settings.get_ui_by_section(
900 svn_tags = settings.get_ui_by_section(
903 VcsSettingsModel.SVN_TAG_SECTION)
901 VcsSettingsModel.SVN_TAG_SECTION)
904 assert len(svn_branches) == 0
902 assert len(svn_branches) == 0
905 assert len(svn_tags) == 0
903 assert len(svn_tags) == 0
906 finally:
904 finally:
907 self._cleanup_repo_settings(settings)
905 self._cleanup_repo_settings(settings)
908
906
909 def test_svn_settings_are_shown_for_svn_repository(
907 def test_svn_settings_are_shown_for_svn_repository(
910 self, autologin_user, backend_svn, csrf_token):
908 self, autologin_user, backend_svn, csrf_token):
911 repo_name = backend_svn.repo_name
909 repo_name = backend_svn.repo_name
912 response = self.app.get(
910 response = self.app.get(
913 url('repo_vcs_settings', repo_name=repo_name), status=200)
911 url('repo_vcs_settings', repo_name=repo_name), status=200)
914 response.mustcontain('Subversion Settings')
912 response.mustcontain('Subversion Settings')
915
913
916 @pytest.mark.skip_backends('svn')
914 @pytest.mark.skip_backends('svn')
917 def test_svn_settings_are_not_created_for_not_svn_repository(
915 def test_svn_settings_are_not_created_for_not_svn_repository(
918 self, autologin_user, backend, csrf_token):
916 self, autologin_user, backend, csrf_token):
919 repo_name = backend.repo_name
917 repo_name = backend.repo_name
920 data = self.FORM_DATA.copy()
918 data = self.FORM_DATA.copy()
921 data['csrf_token'] = csrf_token
919 data['csrf_token'] = csrf_token
922 self.app.post(
920 self.app.post(
923 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
921 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
924 settings = SettingsModel(repo=repo_name)
922 settings = SettingsModel(repo=repo_name)
925 try:
923 try:
926 svn_branches = settings.get_ui_by_section(
924 svn_branches = settings.get_ui_by_section(
927 VcsSettingsModel.SVN_BRANCH_SECTION)
925 VcsSettingsModel.SVN_BRANCH_SECTION)
928 svn_tags = settings.get_ui_by_section(
926 svn_tags = settings.get_ui_by_section(
929 VcsSettingsModel.SVN_TAG_SECTION)
927 VcsSettingsModel.SVN_TAG_SECTION)
930 assert len(svn_branches) == 0
928 assert len(svn_branches) == 0
931 assert len(svn_tags) == 0
929 assert len(svn_tags) == 0
932 finally:
930 finally:
933 self._cleanup_repo_settings(settings)
931 self._cleanup_repo_settings(settings)
934
932
935 @pytest.mark.skip_backends('svn')
933 @pytest.mark.skip_backends('svn')
936 def test_svn_settings_are_shown_only_for_svn_repository(
934 def test_svn_settings_are_shown_only_for_svn_repository(
937 self, autologin_user, backend, csrf_token):
935 self, autologin_user, backend, csrf_token):
938 repo_name = backend.repo_name
936 repo_name = backend.repo_name
939 response = self.app.get(
937 response = self.app.get(
940 url('repo_vcs_settings', repo_name=repo_name), status=200)
938 url('repo_vcs_settings', repo_name=repo_name), status=200)
941 response.mustcontain(no='Subversion Settings')
939 response.mustcontain(no='Subversion Settings')
942
940
943 def test_hg_settings_are_created(
941 def test_hg_settings_are_created(
944 self, autologin_user, backend_hg, csrf_token):
942 self, autologin_user, backend_hg, csrf_token):
945 repo_name = backend_hg.repo_name
943 repo_name = backend_hg.repo_name
946 data = self.FORM_DATA.copy()
944 data = self.FORM_DATA.copy()
947 data['new_svn_tag'] = 'svn-tag'
945 data['new_svn_tag'] = 'svn-tag'
948 data['new_svn_branch'] = 'svn-branch'
946 data['new_svn_branch'] = 'svn-branch'
949 data['csrf_token'] = csrf_token
947 data['csrf_token'] = csrf_token
950 self.app.post(
948 self.app.post(
951 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
949 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
952 settings = SettingsModel(repo=repo_name)
950 settings = SettingsModel(repo=repo_name)
953 try:
951 try:
954 largefiles_ui = settings.get_ui_by_section_and_key(
952 largefiles_ui = settings.get_ui_by_section_and_key(
955 'extensions', 'largefiles')
953 'extensions', 'largefiles')
956 assert largefiles_ui.ui_active is False
954 assert largefiles_ui.ui_active is False
957 phases_ui = settings.get_ui_by_section_and_key(
955 phases_ui = settings.get_ui_by_section_and_key(
958 'phases', 'publish')
956 'phases', 'publish')
959 assert str2bool(phases_ui.ui_value) is False
957 assert str2bool(phases_ui.ui_value) is False
960 finally:
958 finally:
961 self._cleanup_repo_settings(settings)
959 self._cleanup_repo_settings(settings)
962
960
963 def test_hg_settings_are_updated(
961 def test_hg_settings_are_updated(
964 self, autologin_user, backend_hg, csrf_token):
962 self, autologin_user, backend_hg, csrf_token):
965 repo_name = backend_hg.repo_name
963 repo_name = backend_hg.repo_name
966 settings = SettingsModel(repo=repo_name)
964 settings = SettingsModel(repo=repo_name)
967 settings.create_ui_section_value(
965 settings.create_ui_section_value(
968 'extensions', '', key='largefiles', active=True)
966 'extensions', '', key='largefiles', active=True)
969 settings.create_ui_section_value(
967 settings.create_ui_section_value(
970 'phases', '1', key='publish', active=True)
968 'phases', '1', key='publish', active=True)
971
969
972 data = self.FORM_DATA.copy()
970 data = self.FORM_DATA.copy()
973 data['csrf_token'] = csrf_token
971 data['csrf_token'] = csrf_token
974 self.app.post(
972 self.app.post(
975 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
973 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
976 try:
974 try:
977 largefiles_ui = settings.get_ui_by_section_and_key(
975 largefiles_ui = settings.get_ui_by_section_and_key(
978 'extensions', 'largefiles')
976 'extensions', 'largefiles')
979 assert largefiles_ui.ui_active is False
977 assert largefiles_ui.ui_active is False
980 phases_ui = settings.get_ui_by_section_and_key(
978 phases_ui = settings.get_ui_by_section_and_key(
981 'phases', 'publish')
979 'phases', 'publish')
982 assert str2bool(phases_ui.ui_value) is False
980 assert str2bool(phases_ui.ui_value) is False
983 finally:
981 finally:
984 self._cleanup_repo_settings(settings)
982 self._cleanup_repo_settings(settings)
985
983
986 def test_hg_settings_are_shown_for_hg_repository(
984 def test_hg_settings_are_shown_for_hg_repository(
987 self, autologin_user, backend_hg, csrf_token):
985 self, autologin_user, backend_hg, csrf_token):
988 repo_name = backend_hg.repo_name
986 repo_name = backend_hg.repo_name
989 response = self.app.get(
987 response = self.app.get(
990 url('repo_vcs_settings', repo_name=repo_name), status=200)
988 url('repo_vcs_settings', repo_name=repo_name), status=200)
991 response.mustcontain('Mercurial Settings')
989 response.mustcontain('Mercurial Settings')
992
990
993 @pytest.mark.skip_backends('hg')
991 @pytest.mark.skip_backends('hg')
994 def test_hg_settings_are_created_only_for_hg_repository(
992 def test_hg_settings_are_created_only_for_hg_repository(
995 self, autologin_user, backend, csrf_token):
993 self, autologin_user, backend, csrf_token):
996 repo_name = backend.repo_name
994 repo_name = backend.repo_name
997 data = self.FORM_DATA.copy()
995 data = self.FORM_DATA.copy()
998 data['csrf_token'] = csrf_token
996 data['csrf_token'] = csrf_token
999 self.app.post(
997 self.app.post(
1000 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
998 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1001 settings = SettingsModel(repo=repo_name)
999 settings = SettingsModel(repo=repo_name)
1002 try:
1000 try:
1003 largefiles_ui = settings.get_ui_by_section_and_key(
1001 largefiles_ui = settings.get_ui_by_section_and_key(
1004 'extensions', 'largefiles')
1002 'extensions', 'largefiles')
1005 assert largefiles_ui is None
1003 assert largefiles_ui is None
1006 phases_ui = settings.get_ui_by_section_and_key(
1004 phases_ui = settings.get_ui_by_section_and_key(
1007 'phases', 'publish')
1005 'phases', 'publish')
1008 assert phases_ui is None
1006 assert phases_ui is None
1009 finally:
1007 finally:
1010 self._cleanup_repo_settings(settings)
1008 self._cleanup_repo_settings(settings)
1011
1009
1012 @pytest.mark.skip_backends('hg')
1010 @pytest.mark.skip_backends('hg')
1013 def test_hg_settings_are_shown_only_for_hg_repository(
1011 def test_hg_settings_are_shown_only_for_hg_repository(
1014 self, autologin_user, backend, csrf_token):
1012 self, autologin_user, backend, csrf_token):
1015 repo_name = backend.repo_name
1013 repo_name = backend.repo_name
1016 response = self.app.get(
1014 response = self.app.get(
1017 url('repo_vcs_settings', repo_name=repo_name), status=200)
1015 url('repo_vcs_settings', repo_name=repo_name), status=200)
1018 response.mustcontain(no='Mercurial Settings')
1016 response.mustcontain(no='Mercurial Settings')
1019
1017
1020 @pytest.mark.skip_backends('hg')
1018 @pytest.mark.skip_backends('hg')
1021 def test_hg_settings_are_updated_only_for_hg_repository(
1019 def test_hg_settings_are_updated_only_for_hg_repository(
1022 self, autologin_user, backend, csrf_token):
1020 self, autologin_user, backend, csrf_token):
1023 repo_name = backend.repo_name
1021 repo_name = backend.repo_name
1024 settings = SettingsModel(repo=repo_name)
1022 settings = SettingsModel(repo=repo_name)
1025 settings.create_ui_section_value(
1023 settings.create_ui_section_value(
1026 'extensions', '', key='largefiles', active=True)
1024 'extensions', '', key='largefiles', active=True)
1027 settings.create_ui_section_value(
1025 settings.create_ui_section_value(
1028 'phases', '1', key='publish', active=True)
1026 'phases', '1', key='publish', active=True)
1029
1027
1030 data = self.FORM_DATA.copy()
1028 data = self.FORM_DATA.copy()
1031 data['csrf_token'] = csrf_token
1029 data['csrf_token'] = csrf_token
1032 self.app.post(
1030 self.app.post(
1033 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1031 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1034 try:
1032 try:
1035 largefiles_ui = settings.get_ui_by_section_and_key(
1033 largefiles_ui = settings.get_ui_by_section_and_key(
1036 'extensions', 'largefiles')
1034 'extensions', 'largefiles')
1037 assert largefiles_ui.ui_active is True
1035 assert largefiles_ui.ui_active is True
1038 phases_ui = settings.get_ui_by_section_and_key(
1036 phases_ui = settings.get_ui_by_section_and_key(
1039 'phases', 'publish')
1037 'phases', 'publish')
1040 assert phases_ui.ui_value == '1'
1038 assert phases_ui.ui_value == '1'
1041 finally:
1039 finally:
1042 self._cleanup_repo_settings(settings)
1040 self._cleanup_repo_settings(settings)
1043
1041
1044 def test_per_repo_svn_settings_are_displayed(
1042 def test_per_repo_svn_settings_are_displayed(
1045 self, autologin_user, backend_svn, settings_util):
1043 self, autologin_user, backend_svn, settings_util):
1046 repo = backend_svn.create_repo()
1044 repo = backend_svn.create_repo()
1047 repo_name = repo.repo_name
1045 repo_name = repo.repo_name
1048 branches = [
1046 branches = [
1049 settings_util.create_repo_rhodecode_ui(
1047 settings_util.create_repo_rhodecode_ui(
1050 repo, VcsSettingsModel.SVN_BRANCH_SECTION,
1048 repo, VcsSettingsModel.SVN_BRANCH_SECTION,
1051 'branch_{}'.format(i))
1049 'branch_{}'.format(i))
1052 for i in range(10)]
1050 for i in range(10)]
1053 tags = [
1051 tags = [
1054 settings_util.create_repo_rhodecode_ui(
1052 settings_util.create_repo_rhodecode_ui(
1055 repo, VcsSettingsModel.SVN_TAG_SECTION, 'tag_{}'.format(i))
1053 repo, VcsSettingsModel.SVN_TAG_SECTION, 'tag_{}'.format(i))
1056 for i in range(10)]
1054 for i in range(10)]
1057
1055
1058 response = self.app.get(
1056 response = self.app.get(
1059 url('repo_vcs_settings', repo_name=repo_name), status=200)
1057 url('repo_vcs_settings', repo_name=repo_name), status=200)
1060 assert_response = AssertResponse(response)
1058 assert_response = AssertResponse(response)
1061 for branch in branches:
1059 for branch in branches:
1062 css_selector = '[name=branch_value_{}]'.format(branch.ui_id)
1060 css_selector = '[name=branch_value_{}]'.format(branch.ui_id)
1063 element = assert_response.get_element(css_selector)
1061 element = assert_response.get_element(css_selector)
1064 assert element.value == branch.ui_value
1062 assert element.value == branch.ui_value
1065 for tag in tags:
1063 for tag in tags:
1066 css_selector = '[name=tag_ui_value_new_{}]'.format(tag.ui_id)
1064 css_selector = '[name=tag_ui_value_new_{}]'.format(tag.ui_id)
1067 element = assert_response.get_element(css_selector)
1065 element = assert_response.get_element(css_selector)
1068 assert element.value == tag.ui_value
1066 assert element.value == tag.ui_value
1069
1067
1070 def test_per_repo_hg_and_pr_settings_are_not_displayed_for_svn(
1068 def test_per_repo_hg_and_pr_settings_are_not_displayed_for_svn(
1071 self, autologin_user, backend_svn, settings_util):
1069 self, autologin_user, backend_svn, settings_util):
1072 repo = backend_svn.create_repo()
1070 repo = backend_svn.create_repo()
1073 repo_name = repo.repo_name
1071 repo_name = repo.repo_name
1074 response = self.app.get(
1072 response = self.app.get(
1075 url('repo_vcs_settings', repo_name=repo_name), status=200)
1073 url('repo_vcs_settings', repo_name=repo_name), status=200)
1076 response.mustcontain(no='<label>Hooks:</label>')
1074 response.mustcontain(no='<label>Hooks:</label>')
1077 response.mustcontain(no='<label>Pull Request Settings:</label>')
1075 response.mustcontain(no='<label>Pull Request Settings:</label>')
1078
1076
1079 def test_inherit_global_settings_value_is_saved(
1077 def test_inherit_global_settings_value_is_saved(
1080 self, autologin_user, backend, csrf_token):
1078 self, autologin_user, backend, csrf_token):
1081 repo_name = backend.repo_name
1079 repo_name = backend.repo_name
1082 data = self.FORM_DATA.copy()
1080 data = self.FORM_DATA.copy()
1083 data['csrf_token'] = csrf_token
1081 data['csrf_token'] = csrf_token
1084 data['inherit_global_settings'] = True
1082 data['inherit_global_settings'] = True
1085 self.app.post(
1083 self.app.post(
1086 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1084 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1087
1085
1088 settings = SettingsModel(repo=repo_name)
1086 settings = SettingsModel(repo=repo_name)
1089 vcs_settings = VcsSettingsModel(repo=repo_name)
1087 vcs_settings = VcsSettingsModel(repo=repo_name)
1090 try:
1088 try:
1091 assert vcs_settings.inherit_global_settings is True
1089 assert vcs_settings.inherit_global_settings is True
1092 finally:
1090 finally:
1093 self._cleanup_repo_settings(settings)
1091 self._cleanup_repo_settings(settings)
1094
1092
1095 def test_repo_cache_is_invalidated_when_settings_are_updated(
1093 def test_repo_cache_is_invalidated_when_settings_are_updated(
1096 self, autologin_user, backend, csrf_token):
1094 self, autologin_user, backend, csrf_token):
1097 repo_name = backend.repo_name
1095 repo_name = backend.repo_name
1098 data = self.FORM_DATA.copy()
1096 data = self.FORM_DATA.copy()
1099 data['csrf_token'] = csrf_token
1097 data['csrf_token'] = csrf_token
1100 data['inherit_global_settings'] = True
1098 data['inherit_global_settings'] = True
1101 settings = SettingsModel(repo=repo_name)
1099 settings = SettingsModel(repo=repo_name)
1102
1100
1103 invalidation_patcher = mock.patch(
1101 invalidation_patcher = mock.patch(
1104 'rhodecode.controllers.admin.repos.ScmModel.mark_for_invalidation')
1102 'rhodecode.controllers.admin.repos.ScmModel.mark_for_invalidation')
1105 with invalidation_patcher as invalidation_mock:
1103 with invalidation_patcher as invalidation_mock:
1106 self.app.post(
1104 self.app.post(
1107 url('repo_vcs_settings', repo_name=repo_name), data,
1105 url('repo_vcs_settings', repo_name=repo_name), data,
1108 status=302)
1106 status=302)
1109 try:
1107 try:
1110 invalidation_mock.assert_called_once_with(repo_name, delete=True)
1108 invalidation_mock.assert_called_once_with(repo_name, delete=True)
1111 finally:
1109 finally:
1112 self._cleanup_repo_settings(settings)
1110 self._cleanup_repo_settings(settings)
1113
1111
1114 def test_other_settings_not_saved_inherit_global_settings_is_true(
1112 def test_other_settings_not_saved_inherit_global_settings_is_true(
1115 self, autologin_user, backend, csrf_token):
1113 self, autologin_user, backend, csrf_token):
1116 repo_name = backend.repo_name
1114 repo_name = backend.repo_name
1117 data = self.FORM_DATA.copy()
1115 data = self.FORM_DATA.copy()
1118 data['csrf_token'] = csrf_token
1116 data['csrf_token'] = csrf_token
1119 data['inherit_global_settings'] = True
1117 data['inherit_global_settings'] = True
1120 self.app.post(
1118 self.app.post(
1121 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1119 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1122
1120
1123 settings = SettingsModel(repo=repo_name)
1121 settings = SettingsModel(repo=repo_name)
1124 ui_settings = (
1122 ui_settings = (
1125 VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS)
1123 VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS)
1126
1124
1127 vcs_settings = []
1125 vcs_settings = []
1128 try:
1126 try:
1129 for section, key in ui_settings:
1127 for section, key in ui_settings:
1130 ui = settings.get_ui_by_section_and_key(section, key)
1128 ui = settings.get_ui_by_section_and_key(section, key)
1131 if ui:
1129 if ui:
1132 vcs_settings.append(ui)
1130 vcs_settings.append(ui)
1133 vcs_settings.extend(settings.get_ui_by_section(
1131 vcs_settings.extend(settings.get_ui_by_section(
1134 VcsSettingsModel.SVN_BRANCH_SECTION))
1132 VcsSettingsModel.SVN_BRANCH_SECTION))
1135 vcs_settings.extend(settings.get_ui_by_section(
1133 vcs_settings.extend(settings.get_ui_by_section(
1136 VcsSettingsModel.SVN_TAG_SECTION))
1134 VcsSettingsModel.SVN_TAG_SECTION))
1137 for name in VcsSettingsModel.GENERAL_SETTINGS:
1135 for name in VcsSettingsModel.GENERAL_SETTINGS:
1138 setting = settings.get_setting_by_name(name)
1136 setting = settings.get_setting_by_name(name)
1139 if setting:
1137 if setting:
1140 vcs_settings.append(setting)
1138 vcs_settings.append(setting)
1141 assert vcs_settings == []
1139 assert vcs_settings == []
1142 finally:
1140 finally:
1143 self._cleanup_repo_settings(settings)
1141 self._cleanup_repo_settings(settings)
1144
1142
1145 def test_delete_svn_branch_and_tag_patterns(
1143 def test_delete_svn_branch_and_tag_patterns(
1146 self, autologin_user, backend_svn, settings_util, csrf_token):
1144 self, autologin_user, backend_svn, settings_util, csrf_token):
1147 repo = backend_svn.create_repo()
1145 repo = backend_svn.create_repo()
1148 repo_name = repo.repo_name
1146 repo_name = repo.repo_name
1149 branch = settings_util.create_repo_rhodecode_ui(
1147 branch = settings_util.create_repo_rhodecode_ui(
1150 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch',
1148 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch',
1151 cleanup=False)
1149 cleanup=False)
1152 tag = settings_util.create_repo_rhodecode_ui(
1150 tag = settings_util.create_repo_rhodecode_ui(
1153 repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag', cleanup=False)
1151 repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag', cleanup=False)
1154 data = {
1152 data = {
1155 '_method': 'delete',
1153 '_method': 'delete',
1156 'csrf_token': csrf_token
1154 'csrf_token': csrf_token
1157 }
1155 }
1158 for id_ in (branch.ui_id, tag.ui_id):
1156 for id_ in (branch.ui_id, tag.ui_id):
1159 data['delete_svn_pattern'] = id_,
1157 data['delete_svn_pattern'] = id_,
1160 self.app.post(
1158 self.app.post(
1161 url('repo_vcs_settings', repo_name=repo_name), data,
1159 url('repo_vcs_settings', repo_name=repo_name), data,
1162 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
1160 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
1163 settings = VcsSettingsModel(repo=repo_name)
1161 settings = VcsSettingsModel(repo=repo_name)
1164 assert settings.get_repo_svn_branch_patterns() == []
1162 assert settings.get_repo_svn_branch_patterns() == []
1165
1163
1166 def test_delete_svn_branch_requires_repo_admin_permission(
1164 def test_delete_svn_branch_requires_repo_admin_permission(
1167 self, backend_svn, user_util, settings_util, csrf_token):
1165 self, backend_svn, user_util, settings_util, csrf_token):
1168 repo = backend_svn.create_repo()
1166 repo = backend_svn.create_repo()
1169 repo_name = repo.repo_name
1167 repo_name = repo.repo_name
1170 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1168 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1171 logout_user_session(self.app, csrf_token)
1169 logout_user_session(self.app, csrf_token)
1172 session = login_user_session(
1170 session = login_user_session(
1173 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
1171 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
1174 csrf_token = auth.get_csrf_token(session)
1172 csrf_token = auth.get_csrf_token(session)
1175 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
1173 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
1176 branch = settings_util.create_repo_rhodecode_ui(
1174 branch = settings_util.create_repo_rhodecode_ui(
1177 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch',
1175 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch',
1178 cleanup=False)
1176 cleanup=False)
1179 data = {
1177 data = {
1180 '_method': 'delete',
1178 '_method': 'delete',
1181 'csrf_token': csrf_token,
1179 'csrf_token': csrf_token,
1182 'delete_svn_pattern': branch.ui_id
1180 'delete_svn_pattern': branch.ui_id
1183 }
1181 }
1184 self.app.post(
1182 self.app.post(
1185 url('repo_vcs_settings', repo_name=repo_name), data,
1183 url('repo_vcs_settings', repo_name=repo_name), data,
1186 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
1184 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
1187
1185
1188 def test_delete_svn_branch_raises_400_when_not_found(
1186 def test_delete_svn_branch_raises_400_when_not_found(
1189 self, autologin_user, backend_svn, settings_util, csrf_token):
1187 self, autologin_user, backend_svn, settings_util, csrf_token):
1190 repo_name = backend_svn.repo_name
1188 repo_name = backend_svn.repo_name
1191 data = {
1189 data = {
1192 '_method': 'delete',
1190 '_method': 'delete',
1193 'delete_svn_pattern': 123,
1191 'delete_svn_pattern': 123,
1194 'csrf_token': csrf_token
1192 'csrf_token': csrf_token
1195 }
1193 }
1196 self.app.post(
1194 self.app.post(
1197 url('repo_vcs_settings', repo_name=repo_name), data,
1195 url('repo_vcs_settings', repo_name=repo_name), data,
1198 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400)
1196 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400)
1199
1197
1200 def test_delete_svn_branch_raises_400_when_no_id_specified(
1198 def test_delete_svn_branch_raises_400_when_no_id_specified(
1201 self, autologin_user, backend_svn, settings_util, csrf_token):
1199 self, autologin_user, backend_svn, settings_util, csrf_token):
1202 repo_name = backend_svn.repo_name
1200 repo_name = backend_svn.repo_name
1203 data = {
1201 data = {
1204 '_method': 'delete',
1202 '_method': 'delete',
1205 'csrf_token': csrf_token
1203 'csrf_token': csrf_token
1206 }
1204 }
1207 self.app.post(
1205 self.app.post(
1208 url('repo_vcs_settings', repo_name=repo_name), data,
1206 url('repo_vcs_settings', repo_name=repo_name), data,
1209 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400)
1207 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400)
1210
1208
1211 def _cleanup_repo_settings(self, settings_model):
1209 def _cleanup_repo_settings(self, settings_model):
1212 cleanup = []
1210 cleanup = []
1213 ui_settings = (
1211 ui_settings = (
1214 VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS)
1212 VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS)
1215
1213
1216 for section, key in ui_settings:
1214 for section, key in ui_settings:
1217 ui = settings_model.get_ui_by_section_and_key(section, key)
1215 ui = settings_model.get_ui_by_section_and_key(section, key)
1218 if ui:
1216 if ui:
1219 cleanup.append(ui)
1217 cleanup.append(ui)
1220
1218
1221 cleanup.extend(settings_model.get_ui_by_section(
1219 cleanup.extend(settings_model.get_ui_by_section(
1222 VcsSettingsModel.INHERIT_SETTINGS))
1220 VcsSettingsModel.INHERIT_SETTINGS))
1223 cleanup.extend(settings_model.get_ui_by_section(
1221 cleanup.extend(settings_model.get_ui_by_section(
1224 VcsSettingsModel.SVN_BRANCH_SECTION))
1222 VcsSettingsModel.SVN_BRANCH_SECTION))
1225 cleanup.extend(settings_model.get_ui_by_section(
1223 cleanup.extend(settings_model.get_ui_by_section(
1226 VcsSettingsModel.SVN_TAG_SECTION))
1224 VcsSettingsModel.SVN_TAG_SECTION))
1227
1225
1228 for name in VcsSettingsModel.GENERAL_SETTINGS:
1226 for name in VcsSettingsModel.GENERAL_SETTINGS:
1229 setting = settings_model.get_setting_by_name(name)
1227 setting = settings_model.get_setting_by_name(name)
1230 if setting:
1228 if setting:
1231 cleanup.append(setting)
1229 cleanup.append(setting)
1232
1230
1233 for object_ in cleanup:
1231 for object_ in cleanup:
1234 Session().delete(object_)
1232 Session().delete(object_)
1235 Session().commit()
1233 Session().commit()
1236
1234
1237 def assert_repo_value_equals_global_value(self, response, setting):
1235 def assert_repo_value_equals_global_value(self, response, setting):
1238 assert_response = AssertResponse(response)
1236 assert_response = AssertResponse(response)
1239 global_css_selector = '[name={}_inherited]'.format(setting)
1237 global_css_selector = '[name={}_inherited]'.format(setting)
1240 repo_css_selector = '[name={}]'.format(setting)
1238 repo_css_selector = '[name={}]'.format(setting)
1241 repo_element = assert_response.get_element(repo_css_selector)
1239 repo_element = assert_response.get_element(repo_css_selector)
1242 global_element = assert_response.get_element(global_css_selector)
1240 global_element = assert_response.get_element(global_css_selector)
1243 assert repo_element.value == global_element.value
1241 assert repo_element.value == global_element.value
1244
1242
1245
1243
1246 def repo_on_filesystem(repo_name):
1247 try:
1248 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
1249 return True
1250 except Exception:
1251 return False
1252
1253
1254 def _get_permission_for_user(user, repo):
1244 def _get_permission_for_user(user, repo):
1255 perm = UserRepoToPerm.query()\
1245 perm = UserRepoToPerm.query()\
1256 .filter(UserRepoToPerm.repository ==
1246 .filter(UserRepoToPerm.repository ==
1257 Repository.get_by_repo_name(repo))\
1247 Repository.get_by_repo_name(repo))\
1258 .filter(UserRepoToPerm.user == User.get_by_username(user))\
1248 .filter(UserRepoToPerm.user == User.get_by_username(user))\
1259 .all()
1249 .all()
1260 return perm
1250 return perm
@@ -1,525 +1,515 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import re
21 import re
22 import os
23
22
24 import mock
23 import mock
25 import pytest
24 import pytest
26
25
27 from rhodecode.controllers import summary
26 from rhodecode.controllers import summary
28 from rhodecode.lib import vcs
29 from rhodecode.lib import helpers as h
27 from rhodecode.lib import helpers as h
30 from rhodecode.lib.compat import OrderedDict
28 from rhodecode.lib.compat import OrderedDict
31 from rhodecode.lib.vcs.exceptions import RepositoryRequirementError
29 from rhodecode.lib.vcs.exceptions import RepositoryRequirementError
32 from rhodecode.model.db import Repository
30 from rhodecode.model.db import Repository
33 from rhodecode.model.meta import Session
31 from rhodecode.model.meta import Session
34 from rhodecode.model.repo import RepoModel
32 from rhodecode.model.repo import RepoModel
35 from rhodecode.model.scm import ScmModel
33 from rhodecode.model.scm import ScmModel
36 from rhodecode.tests import (
34 from rhodecode.tests import (
37 TestController, url, HG_REPO, assert_session_flash, TESTS_TMP_PATH)
35 TestController, url, HG_REPO, assert_session_flash)
38 from rhodecode.tests.fixture import Fixture
36 from rhodecode.tests.fixture import Fixture
39 from rhodecode.tests.utils import AssertResponse
37 from rhodecode.tests.utils import AssertResponse, repo_on_filesystem
40
38
41
39
42 fixture = Fixture()
40 fixture = Fixture()
43
41
44
42
45 class TestSummaryController(TestController):
43 class TestSummaryController(TestController):
46 def test_index(self, backend):
44 def test_index(self, backend):
47 self.log_user()
45 self.log_user()
48 repo_id = backend.repo.repo_id
46 repo_id = backend.repo.repo_id
49 repo_name = backend.repo_name
47 repo_name = backend.repo_name
50 with mock.patch('rhodecode.lib.helpers.is_svn_without_proxy',
48 with mock.patch('rhodecode.lib.helpers.is_svn_without_proxy',
51 return_value=False):
49 return_value=False):
52 response = self.app.get(url('summary_home', repo_name=repo_name))
50 response = self.app.get(url('summary_home', repo_name=repo_name))
53
51
54 # repo type
52 # repo type
55 response.mustcontain(
53 response.mustcontain(
56 '<i class="icon-%s">' % (backend.alias, )
54 '<i class="icon-%s">' % (backend.alias, )
57 )
55 )
58 # public/private
56 # public/private
59 response.mustcontain(
57 response.mustcontain(
60 """<i class="icon-unlock-alt">"""
58 """<i class="icon-unlock-alt">"""
61 )
59 )
62
60
63 # clone url...
61 # clone url...
64 response.mustcontain(
62 response.mustcontain(
65 'id="clone_url" readonly="readonly"'
63 'id="clone_url" readonly="readonly"'
66 ' value="http://test_admin@test.example.com:80/%s"' % (repo_name, ))
64 ' value="http://test_admin@test.example.com:80/%s"' % (repo_name, ))
67 response.mustcontain(
65 response.mustcontain(
68 'id="clone_url_id" readonly="readonly"'
66 'id="clone_url_id" readonly="readonly"'
69 ' value="http://test_admin@test.example.com:80/_%s"' % (repo_id, ))
67 ' value="http://test_admin@test.example.com:80/_%s"' % (repo_id, ))
70
68
71 def test_index_svn_without_proxy(self, backend_svn):
69 def test_index_svn_without_proxy(self, backend_svn):
72 self.log_user()
70 self.log_user()
73 repo_id = backend_svn.repo.repo_id
71 repo_id = backend_svn.repo.repo_id
74 repo_name = backend_svn.repo_name
72 repo_name = backend_svn.repo_name
75 response = self.app.get(url('summary_home', repo_name=repo_name))
73 response = self.app.get(url('summary_home', repo_name=repo_name))
76 # clone url...
74 # clone url...
77 response.mustcontain(
75 response.mustcontain(
78 'id="clone_url" disabled'
76 'id="clone_url" disabled'
79 ' value="http://test_admin@test.example.com:80/%s"' % (repo_name, ))
77 ' value="http://test_admin@test.example.com:80/%s"' % (repo_name, ))
80 response.mustcontain(
78 response.mustcontain(
81 'id="clone_url_id" disabled'
79 'id="clone_url_id" disabled'
82 ' value="http://test_admin@test.example.com:80/_%s"' % (repo_id, ))
80 ' value="http://test_admin@test.example.com:80/_%s"' % (repo_id, ))
83
81
84 def test_index_with_trailing_slash(self, autologin_user, backend):
82 def test_index_with_trailing_slash(self, autologin_user, backend):
85 repo_id = backend.repo.repo_id
83 repo_id = backend.repo.repo_id
86 repo_name = backend.repo_name
84 repo_name = backend.repo_name
87 with mock.patch('rhodecode.lib.helpers.is_svn_without_proxy',
85 with mock.patch('rhodecode.lib.helpers.is_svn_without_proxy',
88 return_value=False):
86 return_value=False):
89 response = self.app.get(
87 response = self.app.get(
90 url('summary_home', repo_name=repo_name) + '/',
88 url('summary_home', repo_name=repo_name) + '/',
91 status=200)
89 status=200)
92
90
93 # clone url...
91 # clone url...
94 response.mustcontain(
92 response.mustcontain(
95 'id="clone_url" readonly="readonly"'
93 'id="clone_url" readonly="readonly"'
96 ' value="http://test_admin@test.example.com:80/%s"' % (repo_name, ))
94 ' value="http://test_admin@test.example.com:80/%s"' % (repo_name, ))
97 response.mustcontain(
95 response.mustcontain(
98 'id="clone_url_id" readonly="readonly"'
96 'id="clone_url_id" readonly="readonly"'
99 ' value="http://test_admin@test.example.com:80/_%s"' % (repo_id, ))
97 ' value="http://test_admin@test.example.com:80/_%s"' % (repo_id, ))
100
98
101 def test_index_by_id(self, backend):
99 def test_index_by_id(self, backend):
102 self.log_user()
100 self.log_user()
103 repo_id = backend.repo.repo_id
101 repo_id = backend.repo.repo_id
104 response = self.app.get(url(
102 response = self.app.get(url(
105 'summary_home', repo_name='_%s' % (repo_id,)))
103 'summary_home', repo_name='_%s' % (repo_id,)))
106
104
107 # repo type
105 # repo type
108 response.mustcontain(
106 response.mustcontain(
109 '<i class="icon-%s">' % (backend.alias, )
107 '<i class="icon-%s">' % (backend.alias, )
110 )
108 )
111 # public/private
109 # public/private
112 response.mustcontain(
110 response.mustcontain(
113 """<i class="icon-unlock-alt">"""
111 """<i class="icon-unlock-alt">"""
114 )
112 )
115
113
116 def test_index_by_repo_having_id_path_in_name_hg(self):
114 def test_index_by_repo_having_id_path_in_name_hg(self):
117 self.log_user()
115 self.log_user()
118 fixture.create_repo(name='repo_1')
116 fixture.create_repo(name='repo_1')
119 response = self.app.get(url('summary_home', repo_name='repo_1'))
117 response = self.app.get(url('summary_home', repo_name='repo_1'))
120
118
121 try:
119 try:
122 response.mustcontain("repo_1")
120 response.mustcontain("repo_1")
123 finally:
121 finally:
124 RepoModel().delete(Repository.get_by_repo_name('repo_1'))
122 RepoModel().delete(Repository.get_by_repo_name('repo_1'))
125 Session().commit()
123 Session().commit()
126
124
127 def test_index_with_anonymous_access_disabled(self):
125 def test_index_with_anonymous_access_disabled(self):
128 with fixture.anon_access(False):
126 with fixture.anon_access(False):
129 response = self.app.get(url('summary_home', repo_name=HG_REPO),
127 response = self.app.get(url('summary_home', repo_name=HG_REPO),
130 status=302)
128 status=302)
131 assert 'login' in response.location
129 assert 'login' in response.location
132
130
133 def _enable_stats(self, repo):
131 def _enable_stats(self, repo):
134 r = Repository.get_by_repo_name(repo)
132 r = Repository.get_by_repo_name(repo)
135 r.enable_statistics = True
133 r.enable_statistics = True
136 Session().add(r)
134 Session().add(r)
137 Session().commit()
135 Session().commit()
138
136
139 expected_trending = {
137 expected_trending = {
140 'hg': {
138 'hg': {
141 "py": {"count": 68, "desc": ["Python"]},
139 "py": {"count": 68, "desc": ["Python"]},
142 "rst": {"count": 16, "desc": ["Rst"]},
140 "rst": {"count": 16, "desc": ["Rst"]},
143 "css": {"count": 2, "desc": ["Css"]},
141 "css": {"count": 2, "desc": ["Css"]},
144 "sh": {"count": 2, "desc": ["Bash"]},
142 "sh": {"count": 2, "desc": ["Bash"]},
145 "bat": {"count": 1, "desc": ["Batch"]},
143 "bat": {"count": 1, "desc": ["Batch"]},
146 "cfg": {"count": 1, "desc": ["Ini"]},
144 "cfg": {"count": 1, "desc": ["Ini"]},
147 "html": {"count": 1, "desc": ["EvoqueHtml", "Html"]},
145 "html": {"count": 1, "desc": ["EvoqueHtml", "Html"]},
148 "ini": {"count": 1, "desc": ["Ini"]},
146 "ini": {"count": 1, "desc": ["Ini"]},
149 "js": {"count": 1, "desc": ["Javascript"]},
147 "js": {"count": 1, "desc": ["Javascript"]},
150 "makefile": {"count": 1, "desc": ["Makefile", "Makefile"]}
148 "makefile": {"count": 1, "desc": ["Makefile", "Makefile"]}
151 },
149 },
152 'git': {
150 'git': {
153 "py": {"count": 68, "desc": ["Python"]},
151 "py": {"count": 68, "desc": ["Python"]},
154 "rst": {"count": 16, "desc": ["Rst"]},
152 "rst": {"count": 16, "desc": ["Rst"]},
155 "css": {"count": 2, "desc": ["Css"]},
153 "css": {"count": 2, "desc": ["Css"]},
156 "sh": {"count": 2, "desc": ["Bash"]},
154 "sh": {"count": 2, "desc": ["Bash"]},
157 "bat": {"count": 1, "desc": ["Batch"]},
155 "bat": {"count": 1, "desc": ["Batch"]},
158 "cfg": {"count": 1, "desc": ["Ini"]},
156 "cfg": {"count": 1, "desc": ["Ini"]},
159 "html": {"count": 1, "desc": ["EvoqueHtml", "Html"]},
157 "html": {"count": 1, "desc": ["EvoqueHtml", "Html"]},
160 "ini": {"count": 1, "desc": ["Ini"]},
158 "ini": {"count": 1, "desc": ["Ini"]},
161 "js": {"count": 1, "desc": ["Javascript"]},
159 "js": {"count": 1, "desc": ["Javascript"]},
162 "makefile": {"count": 1, "desc": ["Makefile", "Makefile"]}
160 "makefile": {"count": 1, "desc": ["Makefile", "Makefile"]}
163 },
161 },
164 'svn': {
162 'svn': {
165 "py": {"count": 75, "desc": ["Python"]},
163 "py": {"count": 75, "desc": ["Python"]},
166 "rst": {"count": 16, "desc": ["Rst"]},
164 "rst": {"count": 16, "desc": ["Rst"]},
167 "html": {"count": 11, "desc": ["EvoqueHtml", "Html"]},
165 "html": {"count": 11, "desc": ["EvoqueHtml", "Html"]},
168 "css": {"count": 2, "desc": ["Css"]},
166 "css": {"count": 2, "desc": ["Css"]},
169 "bat": {"count": 1, "desc": ["Batch"]},
167 "bat": {"count": 1, "desc": ["Batch"]},
170 "cfg": {"count": 1, "desc": ["Ini"]},
168 "cfg": {"count": 1, "desc": ["Ini"]},
171 "ini": {"count": 1, "desc": ["Ini"]},
169 "ini": {"count": 1, "desc": ["Ini"]},
172 "js": {"count": 1, "desc": ["Javascript"]},
170 "js": {"count": 1, "desc": ["Javascript"]},
173 "makefile": {"count": 1, "desc": ["Makefile", "Makefile"]},
171 "makefile": {"count": 1, "desc": ["Makefile", "Makefile"]},
174 "sh": {"count": 1, "desc": ["Bash"]}
172 "sh": {"count": 1, "desc": ["Bash"]}
175 },
173 },
176 }
174 }
177
175
178 def test_repo_stats(self, backend, xhr_header):
176 def test_repo_stats(self, backend, xhr_header):
179 self.log_user()
177 self.log_user()
180 response = self.app.get(
178 response = self.app.get(
181 url('repo_stats',
179 url('repo_stats',
182 repo_name=backend.repo_name, commit_id='tip'),
180 repo_name=backend.repo_name, commit_id='tip'),
183 extra_environ=xhr_header,
181 extra_environ=xhr_header,
184 status=200)
182 status=200)
185 assert re.match(r'6[\d\.]+ KiB', response.json['size'])
183 assert re.match(r'6[\d\.]+ KiB', response.json['size'])
186
184
187 def test_repo_stats_code_stats_enabled(self, backend, xhr_header):
185 def test_repo_stats_code_stats_enabled(self, backend, xhr_header):
188 self.log_user()
186 self.log_user()
189 repo_name = backend.repo_name
187 repo_name = backend.repo_name
190
188
191 # codes stats
189 # codes stats
192 self._enable_stats(repo_name)
190 self._enable_stats(repo_name)
193 ScmModel().mark_for_invalidation(repo_name)
191 ScmModel().mark_for_invalidation(repo_name)
194
192
195 response = self.app.get(
193 response = self.app.get(
196 url('repo_stats',
194 url('repo_stats',
197 repo_name=backend.repo_name, commit_id='tip'),
195 repo_name=backend.repo_name, commit_id='tip'),
198 extra_environ=xhr_header,
196 extra_environ=xhr_header,
199 status=200)
197 status=200)
200
198
201 expected_data = self.expected_trending[backend.alias]
199 expected_data = self.expected_trending[backend.alias]
202 returned_stats = response.json['code_stats']
200 returned_stats = response.json['code_stats']
203 for k, v in expected_data.items():
201 for k, v in expected_data.items():
204 assert v == returned_stats[k]
202 assert v == returned_stats[k]
205
203
206 def test_repo_refs_data(self, backend):
204 def test_repo_refs_data(self, backend):
207 response = self.app.get(
205 response = self.app.get(
208 url('repo_refs_data', repo_name=backend.repo_name),
206 url('repo_refs_data', repo_name=backend.repo_name),
209 status=200)
207 status=200)
210
208
211 # Ensure that there is the correct amount of items in the result
209 # Ensure that there is the correct amount of items in the result
212 repo = backend.repo.scm_instance()
210 repo = backend.repo.scm_instance()
213 data = response.json['results']
211 data = response.json['results']
214 items = sum(len(section['children']) for section in data)
212 items = sum(len(section['children']) for section in data)
215 repo_refs = len(repo.branches) + len(repo.tags) + len(repo.bookmarks)
213 repo_refs = len(repo.branches) + len(repo.tags) + len(repo.bookmarks)
216 assert items == repo_refs
214 assert items == repo_refs
217
215
218 def test_index_shows_missing_requirements_message(
216 def test_index_shows_missing_requirements_message(
219 self, backend, autologin_user):
217 self, backend, autologin_user):
220 repo_name = backend.repo_name
218 repo_name = backend.repo_name
221 scm_patcher = mock.patch.object(
219 scm_patcher = mock.patch.object(
222 Repository, 'scm_instance', side_effect=RepositoryRequirementError)
220 Repository, 'scm_instance', side_effect=RepositoryRequirementError)
223
221
224 with scm_patcher:
222 with scm_patcher:
225 response = self.app.get(url('summary_home', repo_name=repo_name))
223 response = self.app.get(url('summary_home', repo_name=repo_name))
226 assert_response = AssertResponse(response)
224 assert_response = AssertResponse(response)
227 assert_response.element_contains(
225 assert_response.element_contains(
228 '.main .alert-warning strong', 'Missing requirements')
226 '.main .alert-warning strong', 'Missing requirements')
229 assert_response.element_contains(
227 assert_response.element_contains(
230 '.main .alert-warning',
228 '.main .alert-warning',
231 'These commits cannot be displayed, because this repository'
229 'These commits cannot be displayed, because this repository'
232 ' uses the Mercurial largefiles extension, which was not enabled.')
230 ' uses the Mercurial largefiles extension, which was not enabled.')
233
231
234 def test_missing_requirements_page_does_not_contains_switch_to(
232 def test_missing_requirements_page_does_not_contains_switch_to(
235 self, backend):
233 self, backend):
236 self.log_user()
234 self.log_user()
237 repo_name = backend.repo_name
235 repo_name = backend.repo_name
238 scm_patcher = mock.patch.object(
236 scm_patcher = mock.patch.object(
239 Repository, 'scm_instance', side_effect=RepositoryRequirementError)
237 Repository, 'scm_instance', side_effect=RepositoryRequirementError)
240
238
241 with scm_patcher:
239 with scm_patcher:
242 response = self.app.get(url('summary_home', repo_name=repo_name))
240 response = self.app.get(url('summary_home', repo_name=repo_name))
243 response.mustcontain(no='Switch To')
241 response.mustcontain(no='Switch To')
244
242
245
243
246 @pytest.mark.usefixtures('pylonsapp')
244 @pytest.mark.usefixtures('pylonsapp')
247 class TestSwitcherReferenceData:
245 class TestSwitcherReferenceData:
248
246
249 def test_creates_reference_urls_based_on_name(self):
247 def test_creates_reference_urls_based_on_name(self):
250 references = {
248 references = {
251 'name': 'commit_id',
249 'name': 'commit_id',
252 }
250 }
253 controller = summary.SummaryController()
251 controller = summary.SummaryController()
254 is_svn = False
252 is_svn = False
255 result = controller._switcher_reference_data(
253 result = controller._switcher_reference_data(
256 'repo_name', references, is_svn)
254 'repo_name', references, is_svn)
257 expected_url = h.url(
255 expected_url = h.url(
258 'files_home', repo_name='repo_name', revision='name',
256 'files_home', repo_name='repo_name', revision='name',
259 at='name')
257 at='name')
260 assert result[0]['files_url'] == expected_url
258 assert result[0]['files_url'] == expected_url
261
259
262 def test_urls_contain_commit_id_if_slash_in_name(self):
260 def test_urls_contain_commit_id_if_slash_in_name(self):
263 references = {
261 references = {
264 'name/with/slash': 'commit_id',
262 'name/with/slash': 'commit_id',
265 }
263 }
266 controller = summary.SummaryController()
264 controller = summary.SummaryController()
267 is_svn = False
265 is_svn = False
268 result = controller._switcher_reference_data(
266 result = controller._switcher_reference_data(
269 'repo_name', references, is_svn)
267 'repo_name', references, is_svn)
270 expected_url = h.url(
268 expected_url = h.url(
271 'files_home', repo_name='repo_name', revision='commit_id',
269 'files_home', repo_name='repo_name', revision='commit_id',
272 at='name/with/slash')
270 at='name/with/slash')
273 assert result[0]['files_url'] == expected_url
271 assert result[0]['files_url'] == expected_url
274
272
275 def test_adds_reference_to_path_for_svn(self):
273 def test_adds_reference_to_path_for_svn(self):
276 references = {
274 references = {
277 'name/with/slash': 'commit_id',
275 'name/with/slash': 'commit_id',
278 }
276 }
279 controller = summary.SummaryController()
277 controller = summary.SummaryController()
280 is_svn = True
278 is_svn = True
281 result = controller._switcher_reference_data(
279 result = controller._switcher_reference_data(
282 'repo_name', references, is_svn)
280 'repo_name', references, is_svn)
283 expected_url = h.url(
281 expected_url = h.url(
284 'files_home', repo_name='repo_name', f_path='name/with/slash',
282 'files_home', repo_name='repo_name', f_path='name/with/slash',
285 revision='commit_id', at='name/with/slash')
283 revision='commit_id', at='name/with/slash')
286 assert result[0]['files_url'] == expected_url
284 assert result[0]['files_url'] == expected_url
287
285
288
286
289 @pytest.mark.usefixtures('pylonsapp')
287 @pytest.mark.usefixtures('pylonsapp')
290 class TestCreateReferenceData:
288 class TestCreateReferenceData:
291
289
292 @pytest.fixture
290 @pytest.fixture
293 def example_refs(self):
291 def example_refs(self):
294 section_1_refs = OrderedDict((('a', 'a_id'), ('b', 'b_id')))
292 section_1_refs = OrderedDict((('a', 'a_id'), ('b', 'b_id')))
295 example_refs = [
293 example_refs = [
296 ('section_1', section_1_refs, 't1'),
294 ('section_1', section_1_refs, 't1'),
297 ('section_2', {'c': 'c_id'}, 't2'),
295 ('section_2', {'c': 'c_id'}, 't2'),
298 ]
296 ]
299 return example_refs
297 return example_refs
300
298
301 def test_generates_refs_based_on_commit_ids(self, example_refs):
299 def test_generates_refs_based_on_commit_ids(self, example_refs):
302 repo = mock.Mock()
300 repo = mock.Mock()
303 repo.name = 'test-repo'
301 repo.name = 'test-repo'
304 repo.alias = 'git'
302 repo.alias = 'git'
305 full_repo_name = 'pytest-repo-group/' + repo.name
303 full_repo_name = 'pytest-repo-group/' + repo.name
306 controller = summary.SummaryController()
304 controller = summary.SummaryController()
307
305
308 result = controller._create_reference_data(
306 result = controller._create_reference_data(
309 repo, full_repo_name, example_refs)
307 repo, full_repo_name, example_refs)
310
308
311 expected_files_url = '/{}/files/'.format(full_repo_name)
309 expected_files_url = '/{}/files/'.format(full_repo_name)
312 expected_result = [
310 expected_result = [
313 {
311 {
314 'children': [
312 'children': [
315 {
313 {
316 'id': 'a', 'raw_id': 'a_id', 'text': 'a', 'type': 't1',
314 'id': 'a', 'raw_id': 'a_id', 'text': 'a', 'type': 't1',
317 'files_url': expected_files_url + 'a/?at=a',
315 'files_url': expected_files_url + 'a/?at=a',
318 },
316 },
319 {
317 {
320 'id': 'b', 'raw_id': 'b_id', 'text': 'b', 'type': 't1',
318 'id': 'b', 'raw_id': 'b_id', 'text': 'b', 'type': 't1',
321 'files_url': expected_files_url + 'b/?at=b',
319 'files_url': expected_files_url + 'b/?at=b',
322 }
320 }
323 ],
321 ],
324 'text': 'section_1'
322 'text': 'section_1'
325 },
323 },
326 {
324 {
327 'children': [
325 'children': [
328 {
326 {
329 'id': 'c', 'raw_id': 'c_id', 'text': 'c', 'type': 't2',
327 'id': 'c', 'raw_id': 'c_id', 'text': 'c', 'type': 't2',
330 'files_url': expected_files_url + 'c/?at=c',
328 'files_url': expected_files_url + 'c/?at=c',
331 }
329 }
332 ],
330 ],
333 'text': 'section_2'
331 'text': 'section_2'
334 }]
332 }]
335 assert result == expected_result
333 assert result == expected_result
336
334
337 def test_generates_refs_with_path_for_svn(self, example_refs):
335 def test_generates_refs_with_path_for_svn(self, example_refs):
338 repo = mock.Mock()
336 repo = mock.Mock()
339 repo.name = 'test-repo'
337 repo.name = 'test-repo'
340 repo.alias = 'svn'
338 repo.alias = 'svn'
341 full_repo_name = 'pytest-repo-group/' + repo.name
339 full_repo_name = 'pytest-repo-group/' + repo.name
342 controller = summary.SummaryController()
340 controller = summary.SummaryController()
343 result = controller._create_reference_data(
341 result = controller._create_reference_data(
344 repo, full_repo_name, example_refs)
342 repo, full_repo_name, example_refs)
345
343
346 expected_files_url = '/{}/files/'.format(full_repo_name)
344 expected_files_url = '/{}/files/'.format(full_repo_name)
347 expected_result = [
345 expected_result = [
348 {
346 {
349 'children': [
347 'children': [
350 {
348 {
351 'id': 'a@a_id', 'raw_id': 'a_id',
349 'id': 'a@a_id', 'raw_id': 'a_id',
352 'text': 'a', 'type': 't1',
350 'text': 'a', 'type': 't1',
353 'files_url': expected_files_url + 'a_id/a?at=a',
351 'files_url': expected_files_url + 'a_id/a?at=a',
354 },
352 },
355 {
353 {
356 'id': 'b@b_id', 'raw_id': 'b_id',
354 'id': 'b@b_id', 'raw_id': 'b_id',
357 'text': 'b', 'type': 't1',
355 'text': 'b', 'type': 't1',
358 'files_url': expected_files_url + 'b_id/b?at=b',
356 'files_url': expected_files_url + 'b_id/b?at=b',
359 }
357 }
360 ],
358 ],
361 'text': 'section_1'
359 'text': 'section_1'
362 },
360 },
363 {
361 {
364 'children': [
362 'children': [
365 {
363 {
366 'id': 'c@c_id', 'raw_id': 'c_id',
364 'id': 'c@c_id', 'raw_id': 'c_id',
367 'text': 'c', 'type': 't2',
365 'text': 'c', 'type': 't2',
368 'files_url': expected_files_url + 'c_id/c?at=c',
366 'files_url': expected_files_url + 'c_id/c?at=c',
369 }
367 }
370 ],
368 ],
371 'text': 'section_2'
369 'text': 'section_2'
372 }
370 }
373 ]
371 ]
374 assert result == expected_result
372 assert result == expected_result
375
373
376
374
377 @pytest.mark.usefixtures("app")
375 @pytest.mark.usefixtures("app")
378 class TestRepoLocation:
376 class TestRepoLocation:
379
377
380 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ™Ε‚'], ids=['', 'non-ascii'])
378 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ™Ε‚'], ids=['', 'non-ascii'])
381 def test_manual_delete(self, autologin_user, backend, suffix, csrf_token):
379 def test_manual_delete(self, autologin_user, backend, suffix, csrf_token):
382 repo = backend.create_repo(name_suffix=suffix)
380 repo = backend.create_repo(name_suffix=suffix)
383 repo_name = repo.repo_name
381 repo_name = repo.repo_name
384
382
385 # delete from file system
383 # delete from file system
386 RepoModel()._delete_filesystem_repo(repo)
384 RepoModel()._delete_filesystem_repo(repo)
387
385
388 # test if the repo is still in the database
386 # test if the repo is still in the database
389 new_repo = RepoModel().get_by_repo_name(repo_name)
387 new_repo = RepoModel().get_by_repo_name(repo_name)
390 assert new_repo.repo_name == repo_name
388 assert new_repo.repo_name == repo_name
391
389
392 # check if repo is not in the filesystem
390 # check if repo is not in the filesystem
393 assert not repo_on_filesystem(repo_name)
391 assert not repo_on_filesystem(repo_name)
394 self.assert_repo_not_found_redirect(repo_name)
392 self.assert_repo_not_found_redirect(repo_name)
395
393
396 def assert_repo_not_found_redirect(self, repo_name):
394 def assert_repo_not_found_redirect(self, repo_name):
397 # run the check page that triggers the other flash message
395 # run the check page that triggers the other flash message
398 response = self.app.get(url('repo_check_home', repo_name=repo_name))
396 response = self.app.get(url('repo_check_home', repo_name=repo_name))
399 assert_session_flash(
397 assert_session_flash(
400 response, 'The repository at %s cannot be located.' % repo_name)
398 response, 'The repository at %s cannot be located.' % repo_name)
401
399
402
400
403 def repo_on_filesystem(repo_name):
404 try:
405 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
406 return True
407 except Exception:
408 return False
409
410
411 class TestCreateFilesUrl(object):
401 class TestCreateFilesUrl(object):
412 def test_creates_non_svn_url(self):
402 def test_creates_non_svn_url(self):
413 controller = summary.SummaryController()
403 controller = summary.SummaryController()
414 repo = mock.Mock()
404 repo = mock.Mock()
415 repo.name = 'abcde'
405 repo.name = 'abcde'
416 full_repo_name = 'test-repo-group/' + repo.name
406 full_repo_name = 'test-repo-group/' + repo.name
417 ref_name = 'branch1'
407 ref_name = 'branch1'
418 raw_id = 'deadbeef0123456789'
408 raw_id = 'deadbeef0123456789'
419 is_svn = False
409 is_svn = False
420
410
421 with mock.patch.object(summary.h, 'url') as url_mock:
411 with mock.patch.object(summary.h, 'url') as url_mock:
422 result = controller._create_files_url(
412 result = controller._create_files_url(
423 repo, full_repo_name, ref_name, raw_id, is_svn)
413 repo, full_repo_name, ref_name, raw_id, is_svn)
424 url_mock.assert_called_once_with(
414 url_mock.assert_called_once_with(
425 'files_home', repo_name=full_repo_name, f_path='',
415 'files_home', repo_name=full_repo_name, f_path='',
426 revision=ref_name, at=ref_name)
416 revision=ref_name, at=ref_name)
427 assert result == url_mock.return_value
417 assert result == url_mock.return_value
428
418
429 def test_creates_svn_url(self):
419 def test_creates_svn_url(self):
430 controller = summary.SummaryController()
420 controller = summary.SummaryController()
431 repo = mock.Mock()
421 repo = mock.Mock()
432 repo.name = 'abcde'
422 repo.name = 'abcde'
433 full_repo_name = 'test-repo-group/' + repo.name
423 full_repo_name = 'test-repo-group/' + repo.name
434 ref_name = 'branch1'
424 ref_name = 'branch1'
435 raw_id = 'deadbeef0123456789'
425 raw_id = 'deadbeef0123456789'
436 is_svn = True
426 is_svn = True
437
427
438 with mock.patch.object(summary.h, 'url') as url_mock:
428 with mock.patch.object(summary.h, 'url') as url_mock:
439 result = controller._create_files_url(
429 result = controller._create_files_url(
440 repo, full_repo_name, ref_name, raw_id, is_svn)
430 repo, full_repo_name, ref_name, raw_id, is_svn)
441 url_mock.assert_called_once_with(
431 url_mock.assert_called_once_with(
442 'files_home', repo_name=full_repo_name, f_path=ref_name,
432 'files_home', repo_name=full_repo_name, f_path=ref_name,
443 revision=raw_id, at=ref_name)
433 revision=raw_id, at=ref_name)
444 assert result == url_mock.return_value
434 assert result == url_mock.return_value
445
435
446 def test_name_has_slashes(self):
436 def test_name_has_slashes(self):
447 controller = summary.SummaryController()
437 controller = summary.SummaryController()
448 repo = mock.Mock()
438 repo = mock.Mock()
449 repo.name = 'abcde'
439 repo.name = 'abcde'
450 full_repo_name = 'test-repo-group/' + repo.name
440 full_repo_name = 'test-repo-group/' + repo.name
451 ref_name = 'branch1/branch2'
441 ref_name = 'branch1/branch2'
452 raw_id = 'deadbeef0123456789'
442 raw_id = 'deadbeef0123456789'
453 is_svn = False
443 is_svn = False
454
444
455 with mock.patch.object(summary.h, 'url') as url_mock:
445 with mock.patch.object(summary.h, 'url') as url_mock:
456 result = controller._create_files_url(
446 result = controller._create_files_url(
457 repo, full_repo_name, ref_name, raw_id, is_svn)
447 repo, full_repo_name, ref_name, raw_id, is_svn)
458 url_mock.assert_called_once_with(
448 url_mock.assert_called_once_with(
459 'files_home', repo_name=full_repo_name, f_path='', revision=raw_id,
449 'files_home', repo_name=full_repo_name, f_path='', revision=raw_id,
460 at=ref_name)
450 at=ref_name)
461 assert result == url_mock.return_value
451 assert result == url_mock.return_value
462
452
463
453
464 class TestReferenceItems(object):
454 class TestReferenceItems(object):
465 repo = mock.Mock()
455 repo = mock.Mock()
466 repo.name = 'pytest-repo'
456 repo.name = 'pytest-repo'
467 repo_full_name = 'pytest-repo-group/' + repo.name
457 repo_full_name = 'pytest-repo-group/' + repo.name
468 ref_type = 'branch'
458 ref_type = 'branch'
469 fake_url = '/abcde/'
459 fake_url = '/abcde/'
470
460
471 @staticmethod
461 @staticmethod
472 def _format_function(name, id_):
462 def _format_function(name, id_):
473 return 'format_function_{}_{}'.format(name, id_)
463 return 'format_function_{}_{}'.format(name, id_)
474
464
475 def test_creates_required_amount_of_items(self):
465 def test_creates_required_amount_of_items(self):
476 amount = 100
466 amount = 100
477 refs = {
467 refs = {
478 'ref{}'.format(i): '{0:040d}'.format(i)
468 'ref{}'.format(i): '{0:040d}'.format(i)
479 for i in range(amount)
469 for i in range(amount)
480 }
470 }
481
471
482 controller = summary.SummaryController()
472 controller = summary.SummaryController()
483
473
484 url_patcher = mock.patch.object(
474 url_patcher = mock.patch.object(
485 controller, '_create_files_url')
475 controller, '_create_files_url')
486 svn_patcher = mock.patch.object(
476 svn_patcher = mock.patch.object(
487 summary.h, 'is_svn', return_value=False)
477 summary.h, 'is_svn', return_value=False)
488
478
489 with url_patcher as url_mock, svn_patcher:
479 with url_patcher as url_mock, svn_patcher:
490 result = controller._create_reference_items(
480 result = controller._create_reference_items(
491 self.repo, self.repo_full_name, refs, self.ref_type,
481 self.repo, self.repo_full_name, refs, self.ref_type,
492 self._format_function)
482 self._format_function)
493 assert len(result) == amount
483 assert len(result) == amount
494 assert url_mock.call_count == amount
484 assert url_mock.call_count == amount
495
485
496 def test_single_item_details(self):
486 def test_single_item_details(self):
497 ref_name = 'ref1'
487 ref_name = 'ref1'
498 ref_id = 'deadbeef'
488 ref_id = 'deadbeef'
499 refs = {
489 refs = {
500 ref_name: ref_id
490 ref_name: ref_id
501 }
491 }
502
492
503 controller = summary.SummaryController()
493 controller = summary.SummaryController()
504 url_patcher = mock.patch.object(
494 url_patcher = mock.patch.object(
505 controller, '_create_files_url', return_value=self.fake_url)
495 controller, '_create_files_url', return_value=self.fake_url)
506 svn_patcher = mock.patch.object(
496 svn_patcher = mock.patch.object(
507 summary.h, 'is_svn', return_value=False)
497 summary.h, 'is_svn', return_value=False)
508
498
509 with url_patcher as url_mock, svn_patcher:
499 with url_patcher as url_mock, svn_patcher:
510 result = controller._create_reference_items(
500 result = controller._create_reference_items(
511 self.repo, self.repo_full_name, refs, self.ref_type,
501 self.repo, self.repo_full_name, refs, self.ref_type,
512 self._format_function)
502 self._format_function)
513
503
514 url_mock.assert_called_once_with(
504 url_mock.assert_called_once_with(
515 self.repo, self.repo_full_name, ref_name, ref_id, False)
505 self.repo, self.repo_full_name, ref_name, ref_id, False)
516 expected_result = [
506 expected_result = [
517 {
507 {
518 'text': ref_name,
508 'text': ref_name,
519 'id': self._format_function(ref_name, ref_id),
509 'id': self._format_function(ref_name, ref_id),
520 'raw_id': ref_id,
510 'raw_id': ref_id,
521 'type': self.ref_type,
511 'type': self.ref_type,
522 'files_url': self.fake_url
512 'files_url': self.fake_url
523 }
513 }
524 ]
514 ]
525 assert result == expected_result
515 assert result == expected_result
@@ -1,285 +1,293 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import threading
21 import threading
22 import time
22 import time
23 import logging
23 import logging
24 import os.path
24 import os.path
25 import subprocess
25 import subprocess
26 import urllib2
26 import urllib2
27 from urlparse import urlparse, parse_qsl
27 from urlparse import urlparse, parse_qsl
28 from urllib import unquote_plus
28 from urllib import unquote_plus
29
29
30 import pytest
30 import pytest
31 import rc_testdata
31 import rc_testdata
32 from lxml.html import fromstring, tostring
32 from lxml.html import fromstring, tostring
33 from lxml.cssselect import CSSSelector
33 from lxml.cssselect import CSSSelector
34
34
35 from rhodecode.model.db import User
35 from rhodecode.model.db import User
36 from rhodecode.model.meta import Session
36 from rhodecode.model.meta import Session
37 from rhodecode.model.scm import ScmModel
37 from rhodecode.model.scm import ScmModel
38 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
38 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
39
39
40
40
41 log = logging.getLogger(__name__)
41 log = logging.getLogger(__name__)
42
42
43
43
44 def set_anonymous_access(enabled):
44 def set_anonymous_access(enabled):
45 """(Dis)allows anonymous access depending on parameter `enabled`"""
45 """(Dis)allows anonymous access depending on parameter `enabled`"""
46 user = User.get_default_user()
46 user = User.get_default_user()
47 user.active = enabled
47 user.active = enabled
48 Session().add(user)
48 Session().add(user)
49 Session().commit()
49 Session().commit()
50 log.info('anonymous access is now: %s', enabled)
50 log.info('anonymous access is now: %s', enabled)
51 assert enabled == User.get_default_user().active, (
51 assert enabled == User.get_default_user().active, (
52 'Cannot set anonymous access')
52 'Cannot set anonymous access')
53
53
54
54
55 def check_xfail_backends(node, backend_alias):
55 def check_xfail_backends(node, backend_alias):
56 # Using "xfail_backends" here intentionally, since this marks work
56 # Using "xfail_backends" here intentionally, since this marks work
57 # which is "to be done" soon.
57 # which is "to be done" soon.
58 skip_marker = node.get_marker('xfail_backends')
58 skip_marker = node.get_marker('xfail_backends')
59 if skip_marker and backend_alias in skip_marker.args:
59 if skip_marker and backend_alias in skip_marker.args:
60 msg = "Support for backend %s to be developed." % (backend_alias, )
60 msg = "Support for backend %s to be developed." % (backend_alias, )
61 msg = skip_marker.kwargs.get('reason', msg)
61 msg = skip_marker.kwargs.get('reason', msg)
62 pytest.xfail(msg)
62 pytest.xfail(msg)
63
63
64
64
65 def check_skip_backends(node, backend_alias):
65 def check_skip_backends(node, backend_alias):
66 # Using "skip_backends" here intentionally, since this marks work which is
66 # Using "skip_backends" here intentionally, since this marks work which is
67 # not supported.
67 # not supported.
68 skip_marker = node.get_marker('skip_backends')
68 skip_marker = node.get_marker('skip_backends')
69 if skip_marker and backend_alias in skip_marker.args:
69 if skip_marker and backend_alias in skip_marker.args:
70 msg = "Feature not supported for backend %s." % (backend_alias, )
70 msg = "Feature not supported for backend %s." % (backend_alias, )
71 msg = skip_marker.kwargs.get('reason', msg)
71 msg = skip_marker.kwargs.get('reason', msg)
72 pytest.skip(msg)
72 pytest.skip(msg)
73
73
74
74
75 def extract_git_repo_from_dump(dump_name, repo_name):
75 def extract_git_repo_from_dump(dump_name, repo_name):
76 """Create git repo `repo_name` from dump `dump_name`."""
76 """Create git repo `repo_name` from dump `dump_name`."""
77 repos_path = ScmModel().repos_path
77 repos_path = ScmModel().repos_path
78 target_path = os.path.join(repos_path, repo_name)
78 target_path = os.path.join(repos_path, repo_name)
79 rc_testdata.extract_git_dump(dump_name, target_path)
79 rc_testdata.extract_git_dump(dump_name, target_path)
80 return target_path
80 return target_path
81
81
82
82
83 def extract_hg_repo_from_dump(dump_name, repo_name):
83 def extract_hg_repo_from_dump(dump_name, repo_name):
84 """Create hg repo `repo_name` from dump `dump_name`."""
84 """Create hg repo `repo_name` from dump `dump_name`."""
85 repos_path = ScmModel().repos_path
85 repos_path = ScmModel().repos_path
86 target_path = os.path.join(repos_path, repo_name)
86 target_path = os.path.join(repos_path, repo_name)
87 rc_testdata.extract_hg_dump(dump_name, target_path)
87 rc_testdata.extract_hg_dump(dump_name, target_path)
88 return target_path
88 return target_path
89
89
90
90
91 def extract_svn_repo_from_dump(dump_name, repo_name):
91 def extract_svn_repo_from_dump(dump_name, repo_name):
92 """Create a svn repo `repo_name` from dump `dump_name`."""
92 """Create a svn repo `repo_name` from dump `dump_name`."""
93 repos_path = ScmModel().repos_path
93 repos_path = ScmModel().repos_path
94 target_path = os.path.join(repos_path, repo_name)
94 target_path = os.path.join(repos_path, repo_name)
95 SubversionRepository(target_path, create=True)
95 SubversionRepository(target_path, create=True)
96 _load_svn_dump_into_repo(dump_name, target_path)
96 _load_svn_dump_into_repo(dump_name, target_path)
97 return target_path
97 return target_path
98
98
99
99
100 def assert_message_in_log(log_records, message, levelno, module):
100 def assert_message_in_log(log_records, message, levelno, module):
101 messages = [
101 messages = [
102 r.message for r in log_records
102 r.message for r in log_records
103 if r.module == module and r.levelno == levelno
103 if r.module == module and r.levelno == levelno
104 ]
104 ]
105 assert message in messages
105 assert message in messages
106
106
107
107
108 def _load_svn_dump_into_repo(dump_name, repo_path):
108 def _load_svn_dump_into_repo(dump_name, repo_path):
109 """
109 """
110 Utility to populate a svn repository with a named dump
110 Utility to populate a svn repository with a named dump
111
111
112 Currently the dumps are in rc_testdata. They might later on be
112 Currently the dumps are in rc_testdata. They might later on be
113 integrated with the main repository once they stabilize more.
113 integrated with the main repository once they stabilize more.
114 """
114 """
115 dump = rc_testdata.load_svn_dump(dump_name)
115 dump = rc_testdata.load_svn_dump(dump_name)
116 load_dump = subprocess.Popen(
116 load_dump = subprocess.Popen(
117 ['svnadmin', 'load', repo_path],
117 ['svnadmin', 'load', repo_path],
118 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
118 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
119 stderr=subprocess.PIPE)
119 stderr=subprocess.PIPE)
120 out, err = load_dump.communicate(dump)
120 out, err = load_dump.communicate(dump)
121 if load_dump.returncode != 0:
121 if load_dump.returncode != 0:
122 log.error("Output of load_dump command: %s", out)
122 log.error("Output of load_dump command: %s", out)
123 log.error("Error output of load_dump command: %s", err)
123 log.error("Error output of load_dump command: %s", err)
124 raise Exception(
124 raise Exception(
125 'Failed to load dump "%s" into repository at path "%s".'
125 'Failed to load dump "%s" into repository at path "%s".'
126 % (dump_name, repo_path))
126 % (dump_name, repo_path))
127
127
128
128
129 class AssertResponse(object):
129 class AssertResponse(object):
130 """
130 """
131 Utility that helps to assert things about a given HTML response.
131 Utility that helps to assert things about a given HTML response.
132 """
132 """
133
133
134 def __init__(self, response):
134 def __init__(self, response):
135 self.response = response
135 self.response = response
136
136
137 def one_element_exists(self, css_selector):
137 def one_element_exists(self, css_selector):
138 self.get_element(css_selector)
138 self.get_element(css_selector)
139
139
140 def no_element_exists(self, css_selector):
140 def no_element_exists(self, css_selector):
141 assert not self._get_elements(css_selector)
141 assert not self._get_elements(css_selector)
142
142
143 def element_equals_to(self, css_selector, expected_content):
143 def element_equals_to(self, css_selector, expected_content):
144 element = self.get_element(css_selector)
144 element = self.get_element(css_selector)
145 element_text = self._element_to_string(element)
145 element_text = self._element_to_string(element)
146 assert expected_content in element_text
146 assert expected_content in element_text
147
147
148 def element_contains(self, css_selector, expected_content):
148 def element_contains(self, css_selector, expected_content):
149 element = self.get_element(css_selector)
149 element = self.get_element(css_selector)
150 assert expected_content in element.text_content()
150 assert expected_content in element.text_content()
151
151
152 def contains_one_link(self, link_text, href):
152 def contains_one_link(self, link_text, href):
153 doc = fromstring(self.response.body)
153 doc = fromstring(self.response.body)
154 sel = CSSSelector('a[href]')
154 sel = CSSSelector('a[href]')
155 elements = [
155 elements = [
156 e for e in sel(doc) if e.text_content().strip() == link_text]
156 e for e in sel(doc) if e.text_content().strip() == link_text]
157 assert len(elements) == 1, "Did not find link or found multiple links"
157 assert len(elements) == 1, "Did not find link or found multiple links"
158 self._ensure_url_equal(elements[0].attrib.get('href'), href)
158 self._ensure_url_equal(elements[0].attrib.get('href'), href)
159
159
160 def contains_one_anchor(self, anchor_id):
160 def contains_one_anchor(self, anchor_id):
161 doc = fromstring(self.response.body)
161 doc = fromstring(self.response.body)
162 sel = CSSSelector('#' + anchor_id)
162 sel = CSSSelector('#' + anchor_id)
163 elements = sel(doc)
163 elements = sel(doc)
164 assert len(elements) == 1
164 assert len(elements) == 1
165
165
166 def _ensure_url_equal(self, found, expected):
166 def _ensure_url_equal(self, found, expected):
167 assert _Url(found) == _Url(expected)
167 assert _Url(found) == _Url(expected)
168
168
169 def get_element(self, css_selector):
169 def get_element(self, css_selector):
170 elements = self._get_elements(css_selector)
170 elements = self._get_elements(css_selector)
171 assert len(elements) == 1
171 assert len(elements) == 1
172 return elements[0]
172 return elements[0]
173
173
174 def get_elements(self, css_selector):
174 def get_elements(self, css_selector):
175 return self._get_elements(css_selector)
175 return self._get_elements(css_selector)
176
176
177 def _get_elements(self, css_selector):
177 def _get_elements(self, css_selector):
178 doc = fromstring(self.response.body)
178 doc = fromstring(self.response.body)
179 sel = CSSSelector(css_selector)
179 sel = CSSSelector(css_selector)
180 elements = sel(doc)
180 elements = sel(doc)
181 return elements
181 return elements
182
182
183 def _element_to_string(self, element):
183 def _element_to_string(self, element):
184 return tostring(element)
184 return tostring(element)
185
185
186
186
187 class _Url(object):
187 class _Url(object):
188 """
188 """
189 A url object that can be compared with other url orbjects
189 A url object that can be compared with other url orbjects
190 without regard to the vagaries of encoding, escaping, and ordering
190 without regard to the vagaries of encoding, escaping, and ordering
191 of parameters in query strings.
191 of parameters in query strings.
192
192
193 Inspired by
193 Inspired by
194 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
194 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
195 """
195 """
196
196
197 def __init__(self, url):
197 def __init__(self, url):
198 parts = urlparse(url)
198 parts = urlparse(url)
199 _query = frozenset(parse_qsl(parts.query))
199 _query = frozenset(parse_qsl(parts.query))
200 _path = unquote_plus(parts.path)
200 _path = unquote_plus(parts.path)
201 parts = parts._replace(query=_query, path=_path)
201 parts = parts._replace(query=_query, path=_path)
202 self.parts = parts
202 self.parts = parts
203
203
204 def __eq__(self, other):
204 def __eq__(self, other):
205 return self.parts == other.parts
205 return self.parts == other.parts
206
206
207 def __hash__(self):
207 def __hash__(self):
208 return hash(self.parts)
208 return hash(self.parts)
209
209
210
210
211 def run_test_concurrently(times, raise_catched_exc=True):
211 def run_test_concurrently(times, raise_catched_exc=True):
212 """
212 """
213 Add this decorator to small pieces of code that you want to test
213 Add this decorator to small pieces of code that you want to test
214 concurrently
214 concurrently
215
215
216 ex:
216 ex:
217
217
218 @test_concurrently(25)
218 @test_concurrently(25)
219 def my_test_function():
219 def my_test_function():
220 ...
220 ...
221 """
221 """
222 def test_concurrently_decorator(test_func):
222 def test_concurrently_decorator(test_func):
223 def wrapper(*args, **kwargs):
223 def wrapper(*args, **kwargs):
224 exceptions = []
224 exceptions = []
225
225
226 def call_test_func():
226 def call_test_func():
227 try:
227 try:
228 test_func(*args, **kwargs)
228 test_func(*args, **kwargs)
229 except Exception, e:
229 except Exception, e:
230 exceptions.append(e)
230 exceptions.append(e)
231 if raise_catched_exc:
231 if raise_catched_exc:
232 raise
232 raise
233 threads = []
233 threads = []
234 for i in range(times):
234 for i in range(times):
235 threads.append(threading.Thread(target=call_test_func))
235 threads.append(threading.Thread(target=call_test_func))
236 for t in threads:
236 for t in threads:
237 t.start()
237 t.start()
238 for t in threads:
238 for t in threads:
239 t.join()
239 t.join()
240 if exceptions:
240 if exceptions:
241 raise Exception(
241 raise Exception(
242 'test_concurrently intercepted %s exceptions: %s' % (
242 'test_concurrently intercepted %s exceptions: %s' % (
243 len(exceptions), exceptions))
243 len(exceptions), exceptions))
244 return wrapper
244 return wrapper
245 return test_concurrently_decorator
245 return test_concurrently_decorator
246
246
247
247
248 def wait_for_url(url, timeout=10):
248 def wait_for_url(url, timeout=10):
249 """
249 """
250 Wait until URL becomes reachable.
250 Wait until URL becomes reachable.
251
251
252 It polls the URL until the timeout is reached or it became reachable.
252 It polls the URL until the timeout is reached or it became reachable.
253 If will call to `py.test.fail` in case the URL is not reachable.
253 If will call to `py.test.fail` in case the URL is not reachable.
254 """
254 """
255 timeout = time.time() + timeout
255 timeout = time.time() + timeout
256 last = 0
256 last = 0
257 wait = 0.1
257 wait = 0.1
258
258
259 while (timeout > last):
259 while (timeout > last):
260 last = time.time()
260 last = time.time()
261 if is_url_reachable(url):
261 if is_url_reachable(url):
262 break
262 break
263 elif ((last + wait) > time.time()):
263 elif ((last + wait) > time.time()):
264 # Go to sleep because not enough time has passed since last check.
264 # Go to sleep because not enough time has passed since last check.
265 time.sleep(wait)
265 time.sleep(wait)
266 else:
266 else:
267 pytest.fail("Timeout while waiting for URL {}".format(url))
267 pytest.fail("Timeout while waiting for URL {}".format(url))
268
268
269
269
270 def is_url_reachable(url):
270 def is_url_reachable(url):
271 try:
271 try:
272 urllib2.urlopen(url)
272 urllib2.urlopen(url)
273 except urllib2.URLError:
273 except urllib2.URLError:
274 return False
274 return False
275 return True
275 return True
276
276
277
277
278 def get_session_from_response(response):
278 def get_session_from_response(response):
279 """
279 """
280 This returns the session from a response object. Pylons has some magic
280 This returns the session from a response object. Pylons has some magic
281 to make the session available as `response.session`. But pyramid
281 to make the session available as `response.session`. But pyramid
282 doesn't expose it.
282 doesn't expose it.
283 """
283 """
284 # TODO: Try to look up the session key also.
284 # TODO: Try to look up the session key also.
285 return response.request.environ['beaker.session']
285 return response.request.environ['beaker.session']
286
287
288 def repo_on_filesystem(repo_name):
289 from rhodecode.lib import vcs
290 from rhodecode.tests import TESTS_TMP_PATH
291 repo = vcs.get_vcs_instance(
292 os.path.join(TESTS_TMP_PATH, repo_name), create=False)
293 return repo is not None
General Comments 0
You need to be logged in to leave comments. Login now