##// END OF EJS Templates
tests: made few tests handle database transactions better.
marcink -
r2786:35caee67 default
parent child Browse files
Show More
@@ -1,82 +1,82 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2018 RhodeCode GmbH
3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import pytest
21 import pytest
22
22
23 from rhodecode.tests import TestController
23 from rhodecode.tests import TestController
24 from rhodecode.tests.fixture import Fixture
24 from rhodecode.tests.fixture import Fixture
25
25
26 fixture = Fixture()
26 fixture = Fixture()
27
27
28
28
29 def route_path(name, params=None, **kwargs):
29 def route_path(name, params=None, **kwargs):
30 import urllib
30 import urllib
31 from rhodecode.apps._base import ADMIN_PREFIX
31 from rhodecode.apps._base import ADMIN_PREFIX
32
32
33 base_url = {
33 base_url = {
34 'admin_home': ADMIN_PREFIX,
34 'admin_home': ADMIN_PREFIX,
35 'pullrequest_show': '/{repo_name}/pull-request/{pull_request_id}',
35 'pullrequest_show': '/{repo_name}/pull-request/{pull_request_id}',
36 'pull_requests_global': ADMIN_PREFIX + '/pull-request/{pull_request_id}',
36 'pull_requests_global': ADMIN_PREFIX + '/pull-request/{pull_request_id}',
37 'pull_requests_global_0': ADMIN_PREFIX + '/pull_requests/{pull_request_id}',
37 'pull_requests_global_0': ADMIN_PREFIX + '/pull_requests/{pull_request_id}',
38 'pull_requests_global_1': ADMIN_PREFIX + '/pull-requests/{pull_request_id}',
38 'pull_requests_global_1': ADMIN_PREFIX + '/pull-requests/{pull_request_id}',
39
39
40 }[name].format(**kwargs)
40 }[name].format(**kwargs)
41
41
42 if params:
42 if params:
43 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
43 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
44 return base_url
44 return base_url
45
45
46
46
47 class TestAdminMainView(TestController):
47 class TestAdminMainView(TestController):
48
48
49 def test_redirect_admin_home(self):
49 def test_redirect_admin_home(self):
50 self.log_user()
50 self.log_user()
51 response = self.app.get(route_path('admin_home'), status=302)
51 response = self.app.get(route_path('admin_home'), status=302)
52 assert response.location.endswith('/audit_logs')
52 assert response.location.endswith('/audit_logs')
53
53
54 def test_redirect_pull_request_view(self, view):
54 def test_redirect_pull_request_view(self, view):
55 self.log_user()
55 self.log_user()
56 self.app.get(
56 self.app.get(
57 route_path(view, pull_request_id='xxxx'),
57 route_path(view, pull_request_id='xxxx'),
58 status=404)
58 status=404)
59
59
60 @pytest.mark.backends("git", "hg")
60 @pytest.mark.backends("git", "hg")
61 @pytest.mark.parametrize('view', [
61 @pytest.mark.parametrize('view', [
62 'pull_requests_global',
62 'pull_requests_global',
63 'pull_requests_global_0',
63 'pull_requests_global_0',
64 'pull_requests_global_1',
64 'pull_requests_global_1',
65 ])
65 ])
66 def test_redirect_pull_request_view(self, view, pr_util):
66 def test_redirect_pull_request_view(self, view, pr_util):
67 self.log_user()
67 self.log_user()
68 pull_request = pr_util.create_pull_request()
68 pull_request = pr_util.create_pull_request()
69 pull_request_id = pull_request.pull_request_id
69 pull_request_id = pull_request.pull_request_id
70 repo_name = pull_request.target_repo.repo_name
70
71
71 response = self.app.get(
72 response = self.app.get(
72 route_path(view, pull_request_id=pull_request_id),
73 route_path(view, pull_request_id=pull_request_id),
73 status=302)
74 status=302)
74 assert response.location.endswith(
75 assert response.location.endswith(
75 'pull-request/{}'.format(pull_request_id))
76 'pull-request/{}'.format(pull_request_id))
76
77
77 repo_name = pull_request.target_repo.repo_name
78 redirect_url = route_path(
78 redirect_url = route_path(
79 'pullrequest_show', repo_name=repo_name,
79 'pullrequest_show', repo_name=repo_name,
80 pull_request_id=pull_request.pull_request_id)
80 pull_request_id=pull_request_id)
81
81
82 assert redirect_url in response.location
82 assert redirect_url in response.location
@@ -1,509 +1,512 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2018 RhodeCode GmbH
3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import urllib
21 import urllib
22
22
23 import mock
23 import mock
24 import pytest
24 import pytest
25
25
26 from rhodecode.apps._base import ADMIN_PREFIX
26 from rhodecode.apps._base import ADMIN_PREFIX
27 from rhodecode.lib import auth
27 from rhodecode.lib import auth
28 from rhodecode.lib.utils2 import safe_str
28 from rhodecode.lib.utils2 import safe_str
29 from rhodecode.lib import helpers as h
29 from rhodecode.lib import helpers as h
30 from rhodecode.model.db import (
30 from rhodecode.model.db import (
31 Repository, RepoGroup, UserRepoToPerm, User, Permission)
31 Repository, RepoGroup, UserRepoToPerm, User, Permission)
32 from rhodecode.model.meta import Session
32 from rhodecode.model.meta import Session
33 from rhodecode.model.repo import RepoModel
33 from rhodecode.model.repo import RepoModel
34 from rhodecode.model.repo_group import RepoGroupModel
34 from rhodecode.model.repo_group import RepoGroupModel
35 from rhodecode.model.user import UserModel
35 from rhodecode.model.user import UserModel
36 from rhodecode.tests import (
36 from rhodecode.tests import (
37 login_user_session, assert_session_flash, TEST_USER_ADMIN_LOGIN,
37 login_user_session, assert_session_flash, TEST_USER_ADMIN_LOGIN,
38 TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
38 TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
39 from rhodecode.tests.fixture import Fixture, error_function
39 from rhodecode.tests.fixture import Fixture, error_function
40 from rhodecode.tests.utils import AssertResponse, repo_on_filesystem
40 from rhodecode.tests.utils import AssertResponse, repo_on_filesystem
41
41
42 fixture = Fixture()
42 fixture = Fixture()
43
43
44
44
45 def route_path(name, params=None, **kwargs):
45 def route_path(name, params=None, **kwargs):
46 import urllib
46 import urllib
47
47
48 base_url = {
48 base_url = {
49 'repos': ADMIN_PREFIX + '/repos',
49 'repos': ADMIN_PREFIX + '/repos',
50 'repo_new': ADMIN_PREFIX + '/repos/new',
50 'repo_new': ADMIN_PREFIX + '/repos/new',
51 'repo_create': ADMIN_PREFIX + '/repos/create',
51 'repo_create': ADMIN_PREFIX + '/repos/create',
52
52
53 'repo_creating_check': '/{repo_name}/repo_creating_check',
53 'repo_creating_check': '/{repo_name}/repo_creating_check',
54 }[name].format(**kwargs)
54 }[name].format(**kwargs)
55
55
56 if params:
56 if params:
57 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
57 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
58 return base_url
58 return base_url
59
59
60
60
61 def _get_permission_for_user(user, repo):
61 def _get_permission_for_user(user, repo):
62 perm = UserRepoToPerm.query()\
62 perm = UserRepoToPerm.query()\
63 .filter(UserRepoToPerm.repository ==
63 .filter(UserRepoToPerm.repository ==
64 Repository.get_by_repo_name(repo))\
64 Repository.get_by_repo_name(repo))\
65 .filter(UserRepoToPerm.user == User.get_by_username(user))\
65 .filter(UserRepoToPerm.user == User.get_by_username(user))\
66 .all()
66 .all()
67 return perm
67 return perm
68
68
69
69
70 @pytest.mark.usefixtures("app")
70 @pytest.mark.usefixtures("app")
71 class TestAdminRepos(object):
71 class TestAdminRepos(object):
72
72
73 def test_repo_list(self, autologin_user, user_util):
73 def test_repo_list(self, autologin_user, user_util):
74 repo = user_util.create_repo()
74 repo = user_util.create_repo()
75 repo_name = repo.repo_name
75 response = self.app.get(
76 response = self.app.get(
76 route_path('repos'), status=200)
77 route_path('repos'), status=200)
77
78
78 response.mustcontain(repo.repo_name)
79 response.mustcontain(repo_name)
79
80
80 def test_create_page_restricted_to_single_backend(self, autologin_user, backend):
81 def test_create_page_restricted_to_single_backend(self, autologin_user, backend):
81 with mock.patch('rhodecode.BACKENDS', {'git': 'git'}):
82 with mock.patch('rhodecode.BACKENDS', {'git': 'git'}):
82 response = self.app.get(route_path('repo_new'), status=200)
83 response = self.app.get(route_path('repo_new'), status=200)
83 assert_response = AssertResponse(response)
84 assert_response = AssertResponse(response)
84 element = assert_response.get_element('#repo_type')
85 element = assert_response.get_element('#repo_type')
85 assert element.text_content() == '\ngit\n'
86 assert element.text_content() == '\ngit\n'
86
87
87 def test_create_page_non_restricted_backends(self, autologin_user, backend):
88 def test_create_page_non_restricted_backends(self, autologin_user, backend):
88 response = self.app.get(route_path('repo_new'), status=200)
89 response = self.app.get(route_path('repo_new'), status=200)
89 assert_response = AssertResponse(response)
90 assert_response = AssertResponse(response)
90 assert_response.element_contains('#repo_type', 'git')
91 assert_response.element_contains('#repo_type', 'git')
91 assert_response.element_contains('#repo_type', 'svn')
92 assert_response.element_contains('#repo_type', 'svn')
92 assert_response.element_contains('#repo_type', 'hg')
93 assert_response.element_contains('#repo_type', 'hg')
93
94
94 @pytest.mark.parametrize(
95 @pytest.mark.parametrize(
95 "suffix", [u'', u'xxa'], ids=['', 'non-ascii'])
96 "suffix", [u'', u'xxa'], ids=['', 'non-ascii'])
96 def test_create(self, autologin_user, backend, suffix, csrf_token):
97 def test_create(self, autologin_user, backend, suffix, csrf_token):
97 repo_name_unicode = backend.new_repo_name(suffix=suffix)
98 repo_name_unicode = backend.new_repo_name(suffix=suffix)
98 repo_name = repo_name_unicode.encode('utf8')
99 repo_name = repo_name_unicode.encode('utf8')
99 description_unicode = u'description for newly created repo' + suffix
100 description_unicode = u'description for newly created repo' + suffix
100 description = description_unicode.encode('utf8')
101 description = description_unicode.encode('utf8')
101 response = self.app.post(
102 response = self.app.post(
102 route_path('repo_create'),
103 route_path('repo_create'),
103 fixture._get_repo_create_params(
104 fixture._get_repo_create_params(
104 repo_private=False,
105 repo_private=False,
105 repo_name=repo_name,
106 repo_name=repo_name,
106 repo_type=backend.alias,
107 repo_type=backend.alias,
107 repo_description=description,
108 repo_description=description,
108 csrf_token=csrf_token),
109 csrf_token=csrf_token),
109 status=302)
110 status=302)
110
111
111 self.assert_repository_is_created_correctly(
112 self.assert_repository_is_created_correctly(
112 repo_name, description, backend)
113 repo_name, description, backend)
113
114
114 def test_create_numeric_name(self, autologin_user, backend, csrf_token):
115 def test_create_numeric_name(self, autologin_user, backend, csrf_token):
115 numeric_repo = '1234'
116 numeric_repo = '1234'
116 repo_name = numeric_repo
117 repo_name = numeric_repo
117 description = 'description for newly created repo' + numeric_repo
118 description = 'description for newly created repo' + numeric_repo
118 self.app.post(
119 self.app.post(
119 route_path('repo_create'),
120 route_path('repo_create'),
120 fixture._get_repo_create_params(
121 fixture._get_repo_create_params(
121 repo_private=False,
122 repo_private=False,
122 repo_name=repo_name,
123 repo_name=repo_name,
123 repo_type=backend.alias,
124 repo_type=backend.alias,
124 repo_description=description,
125 repo_description=description,
125 csrf_token=csrf_token))
126 csrf_token=csrf_token))
126
127
127 self.assert_repository_is_created_correctly(
128 self.assert_repository_is_created_correctly(
128 repo_name, description, backend)
129 repo_name, description, backend)
129
130
130 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ‡Δ™'], ids=['', 'non-ascii'])
131 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ‡Δ™'], ids=['', 'non-ascii'])
131 def test_create_in_group(
132 def test_create_in_group(
132 self, autologin_user, backend, suffix, csrf_token):
133 self, autologin_user, backend, suffix, csrf_token):
133 # create GROUP
134 # create GROUP
134 group_name = 'sometest_%s' % backend.alias
135 group_name = 'sometest_%s' % backend.alias
135 gr = RepoGroupModel().create(group_name=group_name,
136 gr = RepoGroupModel().create(group_name=group_name,
136 group_description='test',
137 group_description='test',
137 owner=TEST_USER_ADMIN_LOGIN)
138 owner=TEST_USER_ADMIN_LOGIN)
138 Session().commit()
139 Session().commit()
139
140
140 repo_name = u'ingroup' + suffix
141 repo_name = u'ingroup' + suffix
141 repo_name_full = RepoGroup.url_sep().join(
142 repo_name_full = RepoGroup.url_sep().join(
142 [group_name, repo_name])
143 [group_name, repo_name])
143 description = u'description for newly created repo'
144 description = u'description for newly created repo'
144 self.app.post(
145 self.app.post(
145 route_path('repo_create'),
146 route_path('repo_create'),
146 fixture._get_repo_create_params(
147 fixture._get_repo_create_params(
147 repo_private=False,
148 repo_private=False,
148 repo_name=safe_str(repo_name),
149 repo_name=safe_str(repo_name),
149 repo_type=backend.alias,
150 repo_type=backend.alias,
150 repo_description=description,
151 repo_description=description,
151 repo_group=gr.group_id,
152 repo_group=gr.group_id,
152 csrf_token=csrf_token))
153 csrf_token=csrf_token))
153
154
154 # TODO: johbo: Cleanup work to fixture
155 # TODO: johbo: Cleanup work to fixture
155 try:
156 try:
156 self.assert_repository_is_created_correctly(
157 self.assert_repository_is_created_correctly(
157 repo_name_full, description, backend)
158 repo_name_full, description, backend)
158
159
159 new_repo = RepoModel().get_by_repo_name(repo_name_full)
160 new_repo = RepoModel().get_by_repo_name(repo_name_full)
160 inherited_perms = UserRepoToPerm.query().filter(
161 inherited_perms = UserRepoToPerm.query().filter(
161 UserRepoToPerm.repository_id == new_repo.repo_id).all()
162 UserRepoToPerm.repository_id == new_repo.repo_id).all()
162 assert len(inherited_perms) == 1
163 assert len(inherited_perms) == 1
163 finally:
164 finally:
164 RepoModel().delete(repo_name_full)
165 RepoModel().delete(repo_name_full)
165 RepoGroupModel().delete(group_name)
166 RepoGroupModel().delete(group_name)
166 Session().commit()
167 Session().commit()
167
168
168 def test_create_in_group_numeric_name(
169 def test_create_in_group_numeric_name(
169 self, autologin_user, backend, csrf_token):
170 self, autologin_user, backend, csrf_token):
170 # create GROUP
171 # create GROUP
171 group_name = 'sometest_%s' % backend.alias
172 group_name = 'sometest_%s' % backend.alias
172 gr = RepoGroupModel().create(group_name=group_name,
173 gr = RepoGroupModel().create(group_name=group_name,
173 group_description='test',
174 group_description='test',
174 owner=TEST_USER_ADMIN_LOGIN)
175 owner=TEST_USER_ADMIN_LOGIN)
175 Session().commit()
176 Session().commit()
176
177
177 repo_name = '12345'
178 repo_name = '12345'
178 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
179 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
179 description = 'description for newly created repo'
180 description = 'description for newly created repo'
180 self.app.post(
181 self.app.post(
181 route_path('repo_create'),
182 route_path('repo_create'),
182 fixture._get_repo_create_params(
183 fixture._get_repo_create_params(
183 repo_private=False,
184 repo_private=False,
184 repo_name=repo_name,
185 repo_name=repo_name,
185 repo_type=backend.alias,
186 repo_type=backend.alias,
186 repo_description=description,
187 repo_description=description,
187 repo_group=gr.group_id,
188 repo_group=gr.group_id,
188 csrf_token=csrf_token))
189 csrf_token=csrf_token))
189
190
190 # TODO: johbo: Cleanup work to fixture
191 # TODO: johbo: Cleanup work to fixture
191 try:
192 try:
192 self.assert_repository_is_created_correctly(
193 self.assert_repository_is_created_correctly(
193 repo_name_full, description, backend)
194 repo_name_full, description, backend)
194
195
195 new_repo = RepoModel().get_by_repo_name(repo_name_full)
196 new_repo = RepoModel().get_by_repo_name(repo_name_full)
196 inherited_perms = UserRepoToPerm.query()\
197 inherited_perms = UserRepoToPerm.query()\
197 .filter(UserRepoToPerm.repository_id == new_repo.repo_id).all()
198 .filter(UserRepoToPerm.repository_id == new_repo.repo_id).all()
198 assert len(inherited_perms) == 1
199 assert len(inherited_perms) == 1
199 finally:
200 finally:
200 RepoModel().delete(repo_name_full)
201 RepoModel().delete(repo_name_full)
201 RepoGroupModel().delete(group_name)
202 RepoGroupModel().delete(group_name)
202 Session().commit()
203 Session().commit()
203
204
204 def test_create_in_group_without_needed_permissions(self, backend):
205 def test_create_in_group_without_needed_permissions(self, backend):
205 session = login_user_session(
206 session = login_user_session(
206 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
207 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
207 csrf_token = auth.get_csrf_token(session)
208 csrf_token = auth.get_csrf_token(session)
208 # revoke
209 # revoke
209 user_model = UserModel()
210 user_model = UserModel()
210 # disable fork and create on default user
211 # disable fork and create on default user
211 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
212 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
212 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
213 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
213 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
214 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
214 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
215 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
215
216
216 # disable on regular user
217 # disable on regular user
217 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
218 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
218 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
219 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
219 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
220 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
220 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
221 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
221 Session().commit()
222 Session().commit()
222
223
223 # create GROUP
224 # create GROUP
224 group_name = 'reg_sometest_%s' % backend.alias
225 group_name = 'reg_sometest_%s' % backend.alias
225 gr = RepoGroupModel().create(group_name=group_name,
226 gr = RepoGroupModel().create(group_name=group_name,
226 group_description='test',
227 group_description='test',
227 owner=TEST_USER_ADMIN_LOGIN)
228 owner=TEST_USER_ADMIN_LOGIN)
228 Session().commit()
229 Session().commit()
230 repo_group_id = gr.group_id
229
231
230 group_name_allowed = 'reg_sometest_allowed_%s' % backend.alias
232 group_name_allowed = 'reg_sometest_allowed_%s' % backend.alias
231 gr_allowed = RepoGroupModel().create(
233 gr_allowed = RepoGroupModel().create(
232 group_name=group_name_allowed,
234 group_name=group_name_allowed,
233 group_description='test',
235 group_description='test',
234 owner=TEST_USER_REGULAR_LOGIN)
236 owner=TEST_USER_REGULAR_LOGIN)
237 allowed_repo_group_id = gr_allowed.group_id
235 Session().commit()
238 Session().commit()
236
239
237 repo_name = 'ingroup'
240 repo_name = 'ingroup'
238 description = 'description for newly created repo'
241 description = 'description for newly created repo'
239 response = self.app.post(
242 response = self.app.post(
240 route_path('repo_create'),
243 route_path('repo_create'),
241 fixture._get_repo_create_params(
244 fixture._get_repo_create_params(
242 repo_private=False,
245 repo_private=False,
243 repo_name=repo_name,
246 repo_name=repo_name,
244 repo_type=backend.alias,
247 repo_type=backend.alias,
245 repo_description=description,
248 repo_description=description,
246 repo_group=gr.group_id,
249 repo_group=repo_group_id,
247 csrf_token=csrf_token))
250 csrf_token=csrf_token))
248
251
249 response.mustcontain('Invalid value')
252 response.mustcontain('Invalid value')
250
253
251 # user is allowed to create in this group
254 # user is allowed to create in this group
252 repo_name = 'ingroup'
255 repo_name = 'ingroup'
253 repo_name_full = RepoGroup.url_sep().join(
256 repo_name_full = RepoGroup.url_sep().join(
254 [group_name_allowed, repo_name])
257 [group_name_allowed, repo_name])
255 description = 'description for newly created repo'
258 description = 'description for newly created repo'
256 response = self.app.post(
259 response = self.app.post(
257 route_path('repo_create'),
260 route_path('repo_create'),
258 fixture._get_repo_create_params(
261 fixture._get_repo_create_params(
259 repo_private=False,
262 repo_private=False,
260 repo_name=repo_name,
263 repo_name=repo_name,
261 repo_type=backend.alias,
264 repo_type=backend.alias,
262 repo_description=description,
265 repo_description=description,
263 repo_group=gr_allowed.group_id,
266 repo_group=allowed_repo_group_id,
264 csrf_token=csrf_token))
267 csrf_token=csrf_token))
265
268
266 # TODO: johbo: Cleanup in pytest fixture
269 # TODO: johbo: Cleanup in pytest fixture
267 try:
270 try:
268 self.assert_repository_is_created_correctly(
271 self.assert_repository_is_created_correctly(
269 repo_name_full, description, backend)
272 repo_name_full, description, backend)
270
273
271 new_repo = RepoModel().get_by_repo_name(repo_name_full)
274 new_repo = RepoModel().get_by_repo_name(repo_name_full)
272 inherited_perms = UserRepoToPerm.query().filter(
275 inherited_perms = UserRepoToPerm.query().filter(
273 UserRepoToPerm.repository_id == new_repo.repo_id).all()
276 UserRepoToPerm.repository_id == new_repo.repo_id).all()
274 assert len(inherited_perms) == 1
277 assert len(inherited_perms) == 1
275
278
276 assert repo_on_filesystem(repo_name_full)
279 assert repo_on_filesystem(repo_name_full)
277 finally:
280 finally:
278 RepoModel().delete(repo_name_full)
281 RepoModel().delete(repo_name_full)
279 RepoGroupModel().delete(group_name)
282 RepoGroupModel().delete(group_name)
280 RepoGroupModel().delete(group_name_allowed)
283 RepoGroupModel().delete(group_name_allowed)
281 Session().commit()
284 Session().commit()
282
285
283 def test_create_in_group_inherit_permissions(self, autologin_user, backend,
286 def test_create_in_group_inherit_permissions(self, autologin_user, backend,
284 csrf_token):
287 csrf_token):
285 # create GROUP
288 # create GROUP
286 group_name = 'sometest_%s' % backend.alias
289 group_name = 'sometest_%s' % backend.alias
287 gr = RepoGroupModel().create(group_name=group_name,
290 gr = RepoGroupModel().create(group_name=group_name,
288 group_description='test',
291 group_description='test',
289 owner=TEST_USER_ADMIN_LOGIN)
292 owner=TEST_USER_ADMIN_LOGIN)
290 perm = Permission.get_by_key('repository.write')
293 perm = Permission.get_by_key('repository.write')
291 RepoGroupModel().grant_user_permission(
294 RepoGroupModel().grant_user_permission(
292 gr, TEST_USER_REGULAR_LOGIN, perm)
295 gr, TEST_USER_REGULAR_LOGIN, perm)
293
296
294 # add repo permissions
297 # add repo permissions
295 Session().commit()
298 Session().commit()
296
299 repo_group_id = gr.group_id
297 repo_name = 'ingroup_inherited_%s' % backend.alias
300 repo_name = 'ingroup_inherited_%s' % backend.alias
298 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
301 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
299 description = 'description for newly created repo'
302 description = 'description for newly created repo'
300 self.app.post(
303 self.app.post(
301 route_path('repo_create'),
304 route_path('repo_create'),
302 fixture._get_repo_create_params(
305 fixture._get_repo_create_params(
303 repo_private=False,
306 repo_private=False,
304 repo_name=repo_name,
307 repo_name=repo_name,
305 repo_type=backend.alias,
308 repo_type=backend.alias,
306 repo_description=description,
309 repo_description=description,
307 repo_group=gr.group_id,
310 repo_group=repo_group_id,
308 repo_copy_permissions=True,
311 repo_copy_permissions=True,
309 csrf_token=csrf_token))
312 csrf_token=csrf_token))
310
313
311 # TODO: johbo: Cleanup to pytest fixture
314 # TODO: johbo: Cleanup to pytest fixture
312 try:
315 try:
313 self.assert_repository_is_created_correctly(
316 self.assert_repository_is_created_correctly(
314 repo_name_full, description, backend)
317 repo_name_full, description, backend)
315 except Exception:
318 except Exception:
316 RepoGroupModel().delete(group_name)
319 RepoGroupModel().delete(group_name)
317 Session().commit()
320 Session().commit()
318 raise
321 raise
319
322
320 # check if inherited permissions are applied
323 # check if inherited permissions are applied
321 new_repo = RepoModel().get_by_repo_name(repo_name_full)
324 new_repo = RepoModel().get_by_repo_name(repo_name_full)
322 inherited_perms = UserRepoToPerm.query().filter(
325 inherited_perms = UserRepoToPerm.query().filter(
323 UserRepoToPerm.repository_id == new_repo.repo_id).all()
326 UserRepoToPerm.repository_id == new_repo.repo_id).all()
324 assert len(inherited_perms) == 2
327 assert len(inherited_perms) == 2
325
328
326 assert TEST_USER_REGULAR_LOGIN in [
329 assert TEST_USER_REGULAR_LOGIN in [
327 x.user.username for x in inherited_perms]
330 x.user.username for x in inherited_perms]
328 assert 'repository.write' in [
331 assert 'repository.write' in [
329 x.permission.permission_name for x in inherited_perms]
332 x.permission.permission_name for x in inherited_perms]
330
333
331 RepoModel().delete(repo_name_full)
334 RepoModel().delete(repo_name_full)
332 RepoGroupModel().delete(group_name)
335 RepoGroupModel().delete(group_name)
333 Session().commit()
336 Session().commit()
334
337
335 @pytest.mark.xfail_backends(
338 @pytest.mark.xfail_backends(
336 "git", "hg", reason="Missing reposerver support")
339 "git", "hg", reason="Missing reposerver support")
337 def test_create_with_clone_uri(self, autologin_user, backend, reposerver,
340 def test_create_with_clone_uri(self, autologin_user, backend, reposerver,
338 csrf_token):
341 csrf_token):
339 source_repo = backend.create_repo(number_of_commits=2)
342 source_repo = backend.create_repo(number_of_commits=2)
340 source_repo_name = source_repo.repo_name
343 source_repo_name = source_repo.repo_name
341 reposerver.serve(source_repo.scm_instance())
344 reposerver.serve(source_repo.scm_instance())
342
345
343 repo_name = backend.new_repo_name()
346 repo_name = backend.new_repo_name()
344 response = self.app.post(
347 response = self.app.post(
345 route_path('repo_create'),
348 route_path('repo_create'),
346 fixture._get_repo_create_params(
349 fixture._get_repo_create_params(
347 repo_private=False,
350 repo_private=False,
348 repo_name=repo_name,
351 repo_name=repo_name,
349 repo_type=backend.alias,
352 repo_type=backend.alias,
350 repo_description='',
353 repo_description='',
351 clone_uri=reposerver.url,
354 clone_uri=reposerver.url,
352 csrf_token=csrf_token),
355 csrf_token=csrf_token),
353 status=302)
356 status=302)
354
357
355 # Should be redirected to the creating page
358 # Should be redirected to the creating page
356 response.mustcontain('repo_creating')
359 response.mustcontain('repo_creating')
357
360
358 # Expecting that both repositories have same history
361 # Expecting that both repositories have same history
359 source_repo = RepoModel().get_by_repo_name(source_repo_name)
362 source_repo = RepoModel().get_by_repo_name(source_repo_name)
360 source_vcs = source_repo.scm_instance()
363 source_vcs = source_repo.scm_instance()
361 repo = RepoModel().get_by_repo_name(repo_name)
364 repo = RepoModel().get_by_repo_name(repo_name)
362 repo_vcs = repo.scm_instance()
365 repo_vcs = repo.scm_instance()
363 assert source_vcs[0].message == repo_vcs[0].message
366 assert source_vcs[0].message == repo_vcs[0].message
364 assert source_vcs.count() == repo_vcs.count()
367 assert source_vcs.count() == repo_vcs.count()
365 assert source_vcs.commit_ids == repo_vcs.commit_ids
368 assert source_vcs.commit_ids == repo_vcs.commit_ids
366
369
367 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
370 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
368 def test_create_remote_repo_wrong_clone_uri(self, autologin_user, backend,
371 def test_create_remote_repo_wrong_clone_uri(self, autologin_user, backend,
369 csrf_token):
372 csrf_token):
370 repo_name = backend.new_repo_name()
373 repo_name = backend.new_repo_name()
371 description = 'description for newly created repo'
374 description = 'description for newly created repo'
372 response = self.app.post(
375 response = self.app.post(
373 route_path('repo_create'),
376 route_path('repo_create'),
374 fixture._get_repo_create_params(
377 fixture._get_repo_create_params(
375 repo_private=False,
378 repo_private=False,
376 repo_name=repo_name,
379 repo_name=repo_name,
377 repo_type=backend.alias,
380 repo_type=backend.alias,
378 repo_description=description,
381 repo_description=description,
379 clone_uri='http://repo.invalid/repo',
382 clone_uri='http://repo.invalid/repo',
380 csrf_token=csrf_token))
383 csrf_token=csrf_token))
381 response.mustcontain('invalid clone url')
384 response.mustcontain('invalid clone url')
382
385
383 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
386 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
384 def test_create_remote_repo_wrong_clone_uri_hg_svn(
387 def test_create_remote_repo_wrong_clone_uri_hg_svn(
385 self, autologin_user, backend, csrf_token):
388 self, autologin_user, backend, csrf_token):
386 repo_name = backend.new_repo_name()
389 repo_name = backend.new_repo_name()
387 description = 'description for newly created repo'
390 description = 'description for newly created repo'
388 response = self.app.post(
391 response = self.app.post(
389 route_path('repo_create'),
392 route_path('repo_create'),
390 fixture._get_repo_create_params(
393 fixture._get_repo_create_params(
391 repo_private=False,
394 repo_private=False,
392 repo_name=repo_name,
395 repo_name=repo_name,
393 repo_type=backend.alias,
396 repo_type=backend.alias,
394 repo_description=description,
397 repo_description=description,
395 clone_uri='svn+http://svn.invalid/repo',
398 clone_uri='svn+http://svn.invalid/repo',
396 csrf_token=csrf_token))
399 csrf_token=csrf_token))
397 response.mustcontain('invalid clone url')
400 response.mustcontain('invalid clone url')
398
401
399 def test_create_with_git_suffix(
402 def test_create_with_git_suffix(
400 self, autologin_user, backend, csrf_token):
403 self, autologin_user, backend, csrf_token):
401 repo_name = backend.new_repo_name() + ".git"
404 repo_name = backend.new_repo_name() + ".git"
402 description = 'description for newly created repo'
405 description = 'description for newly created repo'
403 response = self.app.post(
406 response = self.app.post(
404 route_path('repo_create'),
407 route_path('repo_create'),
405 fixture._get_repo_create_params(
408 fixture._get_repo_create_params(
406 repo_private=False,
409 repo_private=False,
407 repo_name=repo_name,
410 repo_name=repo_name,
408 repo_type=backend.alias,
411 repo_type=backend.alias,
409 repo_description=description,
412 repo_description=description,
410 csrf_token=csrf_token))
413 csrf_token=csrf_token))
411 response.mustcontain('Repository name cannot end with .git')
414 response.mustcontain('Repository name cannot end with .git')
412
415
413 def test_default_user_cannot_access_private_repo_in_a_group(
416 def test_default_user_cannot_access_private_repo_in_a_group(
414 self, autologin_user, user_util, backend):
417 self, autologin_user, user_util, backend):
415
418
416 group = user_util.create_repo_group()
419 group = user_util.create_repo_group()
417
420
418 repo = backend.create_repo(
421 repo = backend.create_repo(
419 repo_private=True, repo_group=group, repo_copy_permissions=True)
422 repo_private=True, repo_group=group, repo_copy_permissions=True)
420
423
421 permissions = _get_permission_for_user(
424 permissions = _get_permission_for_user(
422 user='default', repo=repo.repo_name)
425 user='default', repo=repo.repo_name)
423 assert len(permissions) == 1
426 assert len(permissions) == 1
424 assert permissions[0].permission.permission_name == 'repository.none'
427 assert permissions[0].permission.permission_name == 'repository.none'
425 assert permissions[0].repository.private is True
428 assert permissions[0].repository.private is True
426
429
427 def test_create_on_top_level_without_permissions(self, backend):
430 def test_create_on_top_level_without_permissions(self, backend):
428 session = login_user_session(
431 session = login_user_session(
429 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
432 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
430 csrf_token = auth.get_csrf_token(session)
433 csrf_token = auth.get_csrf_token(session)
431
434
432 # revoke
435 # revoke
433 user_model = UserModel()
436 user_model = UserModel()
434 # disable fork and create on default user
437 # disable fork and create on default user
435 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
438 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
436 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
439 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
437 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
440 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
438 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
441 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
439
442
440 # disable on regular user
443 # disable on regular user
441 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
444 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
442 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
445 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
443 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
446 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
444 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
447 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
445 Session().commit()
448 Session().commit()
446
449
447 repo_name = backend.new_repo_name()
450 repo_name = backend.new_repo_name()
448 description = 'description for newly created repo'
451 description = 'description for newly created repo'
449 response = self.app.post(
452 response = self.app.post(
450 route_path('repo_create'),
453 route_path('repo_create'),
451 fixture._get_repo_create_params(
454 fixture._get_repo_create_params(
452 repo_private=False,
455 repo_private=False,
453 repo_name=repo_name,
456 repo_name=repo_name,
454 repo_type=backend.alias,
457 repo_type=backend.alias,
455 repo_description=description,
458 repo_description=description,
456 csrf_token=csrf_token))
459 csrf_token=csrf_token))
457
460
458 response.mustcontain(
461 response.mustcontain(
459 u"You do not have the permission to store repositories in "
462 u"You do not have the permission to store repositories in "
460 u"the root location.")
463 u"the root location.")
461
464
462 @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
465 @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
463 def test_create_repo_when_filesystem_op_fails(
466 def test_create_repo_when_filesystem_op_fails(
464 self, autologin_user, backend, csrf_token):
467 self, autologin_user, backend, csrf_token):
465 repo_name = backend.new_repo_name()
468 repo_name = backend.new_repo_name()
466 description = 'description for newly created repo'
469 description = 'description for newly created repo'
467
470
468 response = self.app.post(
471 response = self.app.post(
469 route_path('repo_create'),
472 route_path('repo_create'),
470 fixture._get_repo_create_params(
473 fixture._get_repo_create_params(
471 repo_private=False,
474 repo_private=False,
472 repo_name=repo_name,
475 repo_name=repo_name,
473 repo_type=backend.alias,
476 repo_type=backend.alias,
474 repo_description=description,
477 repo_description=description,
475 csrf_token=csrf_token))
478 csrf_token=csrf_token))
476
479
477 assert_session_flash(
480 assert_session_flash(
478 response, 'Error creating repository %s' % repo_name)
481 response, 'Error creating repository %s' % repo_name)
479 # repo must not be in db
482 # repo must not be in db
480 assert backend.repo is None
483 assert backend.repo is None
481 # repo must not be in filesystem !
484 # repo must not be in filesystem !
482 assert not repo_on_filesystem(repo_name)
485 assert not repo_on_filesystem(repo_name)
483
486
484 def assert_repository_is_created_correctly(
487 def assert_repository_is_created_correctly(
485 self, repo_name, description, backend):
488 self, repo_name, description, backend):
486 repo_name_utf8 = safe_str(repo_name)
489 repo_name_utf8 = safe_str(repo_name)
487
490
488 # run the check page that triggers the flash message
491 # run the check page that triggers the flash message
489 response = self.app.get(
492 response = self.app.get(
490 route_path('repo_creating_check', repo_name=safe_str(repo_name)))
493 route_path('repo_creating_check', repo_name=safe_str(repo_name)))
491 assert response.json == {u'result': True}
494 assert response.json == {u'result': True}
492
495
493 flash_msg = u'Created repository <a href="/{}">{}</a>'.format(
496 flash_msg = u'Created repository <a href="/{}">{}</a>'.format(
494 urllib.quote(repo_name_utf8), repo_name)
497 urllib.quote(repo_name_utf8), repo_name)
495 assert_session_flash(response, flash_msg)
498 assert_session_flash(response, flash_msg)
496
499
497 # test if the repo was created in the database
500 # test if the repo was created in the database
498 new_repo = RepoModel().get_by_repo_name(repo_name)
501 new_repo = RepoModel().get_by_repo_name(repo_name)
499
502
500 assert new_repo.repo_name == repo_name
503 assert new_repo.repo_name == repo_name
501 assert new_repo.description == description
504 assert new_repo.description == description
502
505
503 # test if the repository is visible in the list ?
506 # test if the repository is visible in the list ?
504 response = self.app.get(
507 response = self.app.get(
505 h.route_path('repo_summary', repo_name=safe_str(repo_name)))
508 h.route_path('repo_summary', repo_name=safe_str(repo_name)))
506 response.mustcontain(repo_name)
509 response.mustcontain(repo_name)
507 response.mustcontain(backend.alias)
510 response.mustcontain(backend.alias)
508
511
509 assert repo_on_filesystem(repo_name)
512 assert repo_on_filesystem(repo_name)
@@ -1,170 +1,170 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2018 RhodeCode GmbH
3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import pytest
21 import pytest
22
22
23 from rhodecode.model.db import UserGroup, User
23 from rhodecode.model.db import UserGroup, User
24 from rhodecode.model.meta import Session
24 from rhodecode.model.meta import Session
25
25
26 from rhodecode.tests import (
26 from rhodecode.tests import (
27 TestController, TEST_USER_REGULAR_LOGIN, assert_session_flash)
27 TestController, TEST_USER_REGULAR_LOGIN, assert_session_flash)
28 from rhodecode.tests.fixture import Fixture
28 from rhodecode.tests.fixture import Fixture
29
29
30 fixture = Fixture()
30 fixture = Fixture()
31
31
32
32
33 def route_path(name, params=None, **kwargs):
33 def route_path(name, params=None, **kwargs):
34 import urllib
34 import urllib
35 from rhodecode.apps._base import ADMIN_PREFIX
35 from rhodecode.apps._base import ADMIN_PREFIX
36
36
37 base_url = {
37 base_url = {
38 'user_groups': ADMIN_PREFIX + '/user_groups',
38 'user_groups': ADMIN_PREFIX + '/user_groups',
39 'user_groups_data': ADMIN_PREFIX + '/user_groups_data',
39 'user_groups_data': ADMIN_PREFIX + '/user_groups_data',
40 'user_group_members_data': ADMIN_PREFIX + '/user_groups/{user_group_id}/members',
40 'user_group_members_data': ADMIN_PREFIX + '/user_groups/{user_group_id}/members',
41 'user_groups_new': ADMIN_PREFIX + '/user_groups/new',
41 'user_groups_new': ADMIN_PREFIX + '/user_groups/new',
42 'user_groups_create': ADMIN_PREFIX + '/user_groups/create',
42 'user_groups_create': ADMIN_PREFIX + '/user_groups/create',
43 'edit_user_group': ADMIN_PREFIX + '/user_groups/{user_group_id}/edit',
43 'edit_user_group': ADMIN_PREFIX + '/user_groups/{user_group_id}/edit',
44 }[name].format(**kwargs)
44 }[name].format(**kwargs)
45
45
46 if params:
46 if params:
47 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
47 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
48 return base_url
48 return base_url
49
49
50
50
51 class TestAdminUserGroupsView(TestController):
51 class TestAdminUserGroupsView(TestController):
52
52
53 def test_show_users(self):
53 def test_show_users(self):
54 self.log_user()
54 self.log_user()
55 self.app.get(route_path('user_groups'))
55 self.app.get(route_path('user_groups'))
56
56
57 def test_show_user_groups_data(self, xhr_header):
57 def test_show_user_groups_data(self, xhr_header):
58 self.log_user()
58 self.log_user()
59 response = self.app.get(route_path(
59 response = self.app.get(route_path(
60 'user_groups_data'), extra_environ=xhr_header)
60 'user_groups_data'), extra_environ=xhr_header)
61
61
62 all_user_groups = UserGroup.query().count()
62 all_user_groups = UserGroup.query().count()
63 assert response.json['recordsTotal'] == all_user_groups
63 assert response.json['recordsTotal'] == all_user_groups
64
64
65 def test_show_user_groups_data_filtered(self, xhr_header):
65 def test_show_user_groups_data_filtered(self, xhr_header):
66 self.log_user()
66 self.log_user()
67 response = self.app.get(route_path(
67 response = self.app.get(route_path(
68 'user_groups_data', params={'search[value]': 'empty_search'}),
68 'user_groups_data', params={'search[value]': 'empty_search'}),
69 extra_environ=xhr_header)
69 extra_environ=xhr_header)
70
70
71 all_user_groups = UserGroup.query().count()
71 all_user_groups = UserGroup.query().count()
72 assert response.json['recordsTotal'] == all_user_groups
72 assert response.json['recordsTotal'] == all_user_groups
73 assert response.json['recordsFiltered'] == 0
73 assert response.json['recordsFiltered'] == 0
74
74
75 def test_usergroup_escape(self, user_util, xhr_header):
75 def test_usergroup_escape(self, user_util, xhr_header):
76 self.log_user()
76 self.log_user()
77
77
78 xss_img = '<img src="/image1" onload="alert(\'Hello, World!\');">'
78 xss_img = '<img src="/image1" onload="alert(\'Hello, World!\');">'
79 user = user_util.create_user()
79 user = user_util.create_user()
80 user.name = xss_img
80 user.name = xss_img
81 user.lastname = xss_img
81 user.lastname = xss_img
82 Session().add(user)
82 Session().add(user)
83 Session().commit()
83 Session().commit()
84
84
85 user_group = user_util.create_user_group()
85 user_group = user_util.create_user_group()
86
86
87 user_group.users_group_name = xss_img
87 user_group.users_group_name = xss_img
88 user_group.user_group_description = '<strong onload="alert();">DESC</strong>'
88 user_group.user_group_description = '<strong onload="alert();">DESC</strong>'
89
89
90 response = self.app.get(
90 response = self.app.get(
91 route_path('user_groups_data'), extra_environ=xhr_header)
91 route_path('user_groups_data'), extra_environ=xhr_header)
92
92
93 response.mustcontain(
93 response.mustcontain(
94 '&lt;strong onload=&#34;alert();&#34;&gt;DESC&lt;/strong&gt;')
94 '&lt;strong onload=&#34;alert();&#34;&gt;DESC&lt;/strong&gt;')
95 response.mustcontain(
95 response.mustcontain(
96 '&lt;img src=&#34;/image1&#34; onload=&#34;'
96 '&lt;img src=&#34;/image1&#34; onload=&#34;'
97 'alert(&#39;Hello, World!&#39;);&#34;&gt;')
97 'alert(&#39;Hello, World!&#39;);&#34;&gt;')
98
98
99 def test_edit_user_group_autocomplete_empty_members(self, xhr_header, user_util):
99 def test_edit_user_group_autocomplete_empty_members(self, xhr_header, user_util):
100 self.log_user()
100 self.log_user()
101 ug = user_util.create_user_group()
101 ug = user_util.create_user_group()
102 response = self.app.get(
102 response = self.app.get(
103 route_path('user_group_members_data', user_group_id=ug.users_group_id),
103 route_path('user_group_members_data', user_group_id=ug.users_group_id),
104 extra_environ=xhr_header)
104 extra_environ=xhr_header)
105
105
106 assert response.json == {'members': []}
106 assert response.json == {'members': []}
107
107
108 def test_edit_user_group_autocomplete_members(self, xhr_header, user_util):
108 def test_edit_user_group_autocomplete_members(self, xhr_header, user_util):
109 self.log_user()
109 self.log_user()
110 members = [u.user_id for u in User.get_all()]
110 members = [u.user_id for u in User.get_all()]
111 ug = user_util.create_user_group(members=members)
111 ug = user_util.create_user_group(members=members)
112 response = self.app.get(
112 response = self.app.get(
113 route_path('user_group_members_data',
113 route_path('user_group_members_data',
114 user_group_id=ug.users_group_id),
114 user_group_id=ug.users_group_id),
115 extra_environ=xhr_header)
115 extra_environ=xhr_header)
116
116
117 assert len(response.json['members']) == len(members)
117 assert len(response.json['members']) == len(members)
118
118
119 def test_creation_page(self):
119 def test_creation_page(self):
120 self.log_user()
120 self.log_user()
121 self.app.get(route_path('user_groups_new'), status=200)
121 self.app.get(route_path('user_groups_new'), status=200)
122
122
123 def test_create(self):
123 def test_create(self):
124 from rhodecode.lib import helpers as h
124 from rhodecode.lib import helpers as h
125
125
126 self.log_user()
126 self.log_user()
127 users_group_name = 'test_user_group'
127 users_group_name = 'test_user_group'
128 response = self.app.post(route_path('user_groups_create'), {
128 response = self.app.post(route_path('user_groups_create'), {
129 'users_group_name': users_group_name,
129 'users_group_name': users_group_name,
130 'user_group_description': 'DESC',
130 'user_group_description': 'DESC',
131 'active': True,
131 'active': True,
132 'csrf_token': self.csrf_token})
132 'csrf_token': self.csrf_token})
133
133
134 user_group_id = UserGroup.get_by_group_name(
134 user_group_id = UserGroup.get_by_group_name(
135 users_group_name).users_group_id
135 users_group_name).users_group_id
136
136
137 user_group_link = h.link_to(
137 user_group_link = h.link_to(
138 users_group_name,
138 users_group_name,
139 route_path('edit_user_group', user_group_id=user_group_id))
139 route_path('edit_user_group', user_group_id=user_group_id))
140
140
141 assert_session_flash(
141 assert_session_flash(
142 response,
142 response,
143 'Created user group %s' % user_group_link)
143 'Created user group %s' % user_group_link)
144
144
145 fixture.destroy_user_group(users_group_name)
145 fixture.destroy_user_group(users_group_name)
146
146
147 def test_create_with_empty_name(self):
147 def test_create_with_empty_name(self):
148 self.log_user()
148 self.log_user()
149
149
150 response = self.app.post(route_path('user_groups_create'), {
150 response = self.app.post(route_path('user_groups_create'), {
151 'users_group_name': '',
151 'users_group_name': '',
152 'user_group_description': 'DESC',
152 'user_group_description': 'DESC',
153 'active': True,
153 'active': True,
154 'csrf_token': self.csrf_token}, status=200)
154 'csrf_token': self.csrf_token}, status=200)
155
155
156 response.mustcontain('Please enter a value')
156 response.mustcontain('Please enter a value')
157
157
158 def test_create_duplicate(self, user_util):
158 def test_create_duplicate(self, user_util):
159 self.log_user()
159 self.log_user()
160
160
161 user_group = user_util.create_user_group()
161 user_group = user_util.create_user_group()
162 duplicate_name = user_group.users_group_name
162 duplicate_name = user_group.users_group_name
163 response = self.app.post(route_path('user_groups_create'), {
163 response = self.app.post(route_path('user_groups_create'), {
164 'users_group_name': duplicate_name,
164 'users_group_name': duplicate_name,
165 'user_group_description': 'DESC',
165 'user_group_description': 'DESC',
166 'active': True,
166 'active': True,
167 'csrf_token': self.csrf_token}, status=200)
167 'csrf_token': self.csrf_token}, status=200)
168
168
169 response.mustcontain(
169 response.mustcontain(
170 'User group `{}` already exists'.format(user_group.users_group_name))
170 'User group `{}` already exists'.format(duplicate_name))
@@ -1,781 +1,783 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2018 RhodeCode GmbH
3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import pytest
21 import pytest
22 from sqlalchemy.orm.exc import NoResultFound
22 from sqlalchemy.orm.exc import NoResultFound
23
23
24 from rhodecode.lib import auth
24 from rhodecode.lib import auth
25 from rhodecode.lib import helpers as h
25 from rhodecode.lib import helpers as h
26 from rhodecode.model.db import User, UserApiKeys, UserEmailMap, Repository
26 from rhodecode.model.db import User, UserApiKeys, UserEmailMap, Repository
27 from rhodecode.model.meta import Session
27 from rhodecode.model.meta import Session
28 from rhodecode.model.user import UserModel
28 from rhodecode.model.user import UserModel
29
29
30 from rhodecode.tests import (
30 from rhodecode.tests import (
31 TestController, TEST_USER_REGULAR_LOGIN, assert_session_flash)
31 TestController, TEST_USER_REGULAR_LOGIN, assert_session_flash)
32 from rhodecode.tests.fixture import Fixture
32 from rhodecode.tests.fixture import Fixture
33
33
34 fixture = Fixture()
34 fixture = Fixture()
35
35
36
36
37 def route_path(name, params=None, **kwargs):
37 def route_path(name, params=None, **kwargs):
38 import urllib
38 import urllib
39 from rhodecode.apps._base import ADMIN_PREFIX
39 from rhodecode.apps._base import ADMIN_PREFIX
40
40
41 base_url = {
41 base_url = {
42 'users':
42 'users':
43 ADMIN_PREFIX + '/users',
43 ADMIN_PREFIX + '/users',
44 'users_data':
44 'users_data':
45 ADMIN_PREFIX + '/users_data',
45 ADMIN_PREFIX + '/users_data',
46 'users_create':
46 'users_create':
47 ADMIN_PREFIX + '/users/create',
47 ADMIN_PREFIX + '/users/create',
48 'users_new':
48 'users_new':
49 ADMIN_PREFIX + '/users/new',
49 ADMIN_PREFIX + '/users/new',
50 'user_edit':
50 'user_edit':
51 ADMIN_PREFIX + '/users/{user_id}/edit',
51 ADMIN_PREFIX + '/users/{user_id}/edit',
52 'user_edit_advanced':
52 'user_edit_advanced':
53 ADMIN_PREFIX + '/users/{user_id}/edit/advanced',
53 ADMIN_PREFIX + '/users/{user_id}/edit/advanced',
54 'user_edit_global_perms':
54 'user_edit_global_perms':
55 ADMIN_PREFIX + '/users/{user_id}/edit/global_permissions',
55 ADMIN_PREFIX + '/users/{user_id}/edit/global_permissions',
56 'user_edit_global_perms_update':
56 'user_edit_global_perms_update':
57 ADMIN_PREFIX + '/users/{user_id}/edit/global_permissions/update',
57 ADMIN_PREFIX + '/users/{user_id}/edit/global_permissions/update',
58 'user_update':
58 'user_update':
59 ADMIN_PREFIX + '/users/{user_id}/update',
59 ADMIN_PREFIX + '/users/{user_id}/update',
60 'user_delete':
60 'user_delete':
61 ADMIN_PREFIX + '/users/{user_id}/delete',
61 ADMIN_PREFIX + '/users/{user_id}/delete',
62 'user_force_password_reset':
62 'user_force_password_reset':
63 ADMIN_PREFIX + '/users/{user_id}/password_reset',
63 ADMIN_PREFIX + '/users/{user_id}/password_reset',
64 'user_create_personal_repo_group':
64 'user_create_personal_repo_group':
65 ADMIN_PREFIX + '/users/{user_id}/create_repo_group',
65 ADMIN_PREFIX + '/users/{user_id}/create_repo_group',
66
66
67 'edit_user_auth_tokens':
67 'edit_user_auth_tokens':
68 ADMIN_PREFIX + '/users/{user_id}/edit/auth_tokens',
68 ADMIN_PREFIX + '/users/{user_id}/edit/auth_tokens',
69 'edit_user_auth_tokens_add':
69 'edit_user_auth_tokens_add':
70 ADMIN_PREFIX + '/users/{user_id}/edit/auth_tokens/new',
70 ADMIN_PREFIX + '/users/{user_id}/edit/auth_tokens/new',
71 'edit_user_auth_tokens_delete':
71 'edit_user_auth_tokens_delete':
72 ADMIN_PREFIX + '/users/{user_id}/edit/auth_tokens/delete',
72 ADMIN_PREFIX + '/users/{user_id}/edit/auth_tokens/delete',
73
73
74 'edit_user_emails':
74 'edit_user_emails':
75 ADMIN_PREFIX + '/users/{user_id}/edit/emails',
75 ADMIN_PREFIX + '/users/{user_id}/edit/emails',
76 'edit_user_emails_add':
76 'edit_user_emails_add':
77 ADMIN_PREFIX + '/users/{user_id}/edit/emails/new',
77 ADMIN_PREFIX + '/users/{user_id}/edit/emails/new',
78 'edit_user_emails_delete':
78 'edit_user_emails_delete':
79 ADMIN_PREFIX + '/users/{user_id}/edit/emails/delete',
79 ADMIN_PREFIX + '/users/{user_id}/edit/emails/delete',
80
80
81 'edit_user_ips':
81 'edit_user_ips':
82 ADMIN_PREFIX + '/users/{user_id}/edit/ips',
82 ADMIN_PREFIX + '/users/{user_id}/edit/ips',
83 'edit_user_ips_add':
83 'edit_user_ips_add':
84 ADMIN_PREFIX + '/users/{user_id}/edit/ips/new',
84 ADMIN_PREFIX + '/users/{user_id}/edit/ips/new',
85 'edit_user_ips_delete':
85 'edit_user_ips_delete':
86 ADMIN_PREFIX + '/users/{user_id}/edit/ips/delete',
86 ADMIN_PREFIX + '/users/{user_id}/edit/ips/delete',
87
87
88 'edit_user_perms_summary':
88 'edit_user_perms_summary':
89 ADMIN_PREFIX + '/users/{user_id}/edit/permissions_summary',
89 ADMIN_PREFIX + '/users/{user_id}/edit/permissions_summary',
90 'edit_user_perms_summary_json':
90 'edit_user_perms_summary_json':
91 ADMIN_PREFIX + '/users/{user_id}/edit/permissions_summary/json',
91 ADMIN_PREFIX + '/users/{user_id}/edit/permissions_summary/json',
92
92
93 'edit_user_audit_logs':
93 'edit_user_audit_logs':
94 ADMIN_PREFIX + '/users/{user_id}/edit/audit',
94 ADMIN_PREFIX + '/users/{user_id}/edit/audit',
95
95
96 }[name].format(**kwargs)
96 }[name].format(**kwargs)
97
97
98 if params:
98 if params:
99 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
99 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
100 return base_url
100 return base_url
101
101
102
102
103 class TestAdminUsersView(TestController):
103 class TestAdminUsersView(TestController):
104
104
105 def test_show_users(self):
105 def test_show_users(self):
106 self.log_user()
106 self.log_user()
107 self.app.get(route_path('users'))
107 self.app.get(route_path('users'))
108
108
109 def test_show_users_data(self, xhr_header):
109 def test_show_users_data(self, xhr_header):
110 self.log_user()
110 self.log_user()
111 response = self.app.get(route_path(
111 response = self.app.get(route_path(
112 'users_data'), extra_environ=xhr_header)
112 'users_data'), extra_environ=xhr_header)
113
113
114 all_users = User.query().filter(
114 all_users = User.query().filter(
115 User.username != User.DEFAULT_USER).count()
115 User.username != User.DEFAULT_USER).count()
116 assert response.json['recordsTotal'] == all_users
116 assert response.json['recordsTotal'] == all_users
117
117
118 def test_show_users_data_filtered(self, xhr_header):
118 def test_show_users_data_filtered(self, xhr_header):
119 self.log_user()
119 self.log_user()
120 response = self.app.get(route_path(
120 response = self.app.get(route_path(
121 'users_data', params={'search[value]': 'empty_search'}),
121 'users_data', params={'search[value]': 'empty_search'}),
122 extra_environ=xhr_header)
122 extra_environ=xhr_header)
123
123
124 all_users = User.query().filter(
124 all_users = User.query().filter(
125 User.username != User.DEFAULT_USER).count()
125 User.username != User.DEFAULT_USER).count()
126 assert response.json['recordsTotal'] == all_users
126 assert response.json['recordsTotal'] == all_users
127 assert response.json['recordsFiltered'] == 0
127 assert response.json['recordsFiltered'] == 0
128
128
129 def test_auth_tokens_default_user(self):
129 def test_auth_tokens_default_user(self):
130 self.log_user()
130 self.log_user()
131 user = User.get_default_user()
131 user = User.get_default_user()
132 response = self.app.get(
132 response = self.app.get(
133 route_path('edit_user_auth_tokens', user_id=user.user_id),
133 route_path('edit_user_auth_tokens', user_id=user.user_id),
134 status=302)
134 status=302)
135
135
136 def test_auth_tokens(self):
136 def test_auth_tokens(self):
137 self.log_user()
137 self.log_user()
138
138
139 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
139 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
140 user_id = user.user_id
141 auth_tokens = user.auth_tokens
140 response = self.app.get(
142 response = self.app.get(
141 route_path('edit_user_auth_tokens', user_id=user.user_id))
143 route_path('edit_user_auth_tokens', user_id=user_id))
142 for token in user.auth_tokens:
144 for token in auth_tokens:
143 response.mustcontain(token)
145 response.mustcontain(token)
144 response.mustcontain('never')
146 response.mustcontain('never')
145
147
146 @pytest.mark.parametrize("desc, lifetime", [
148 @pytest.mark.parametrize("desc, lifetime", [
147 ('forever', -1),
149 ('forever', -1),
148 ('5mins', 60*5),
150 ('5mins', 60*5),
149 ('30days', 60*60*24*30),
151 ('30days', 60*60*24*30),
150 ])
152 ])
151 def test_add_auth_token(self, desc, lifetime, user_util):
153 def test_add_auth_token(self, desc, lifetime, user_util):
152 self.log_user()
154 self.log_user()
153 user = user_util.create_user()
155 user = user_util.create_user()
154 user_id = user.user_id
156 user_id = user.user_id
155
157
156 response = self.app.post(
158 response = self.app.post(
157 route_path('edit_user_auth_tokens_add', user_id=user_id),
159 route_path('edit_user_auth_tokens_add', user_id=user_id),
158 {'description': desc, 'lifetime': lifetime,
160 {'description': desc, 'lifetime': lifetime,
159 'csrf_token': self.csrf_token})
161 'csrf_token': self.csrf_token})
160 assert_session_flash(response, 'Auth token successfully created')
162 assert_session_flash(response, 'Auth token successfully created')
161
163
162 response = response.follow()
164 response = response.follow()
163 user = User.get(user_id)
165 user = User.get(user_id)
164 for auth_token in user.auth_tokens:
166 for auth_token in user.auth_tokens:
165 response.mustcontain(auth_token)
167 response.mustcontain(auth_token)
166
168
167 def test_delete_auth_token(self, user_util):
169 def test_delete_auth_token(self, user_util):
168 self.log_user()
170 self.log_user()
169 user = user_util.create_user()
171 user = user_util.create_user()
170 user_id = user.user_id
172 user_id = user.user_id
171 keys = user.auth_tokens
173 keys = user.auth_tokens
172 assert 2 == len(keys)
174 assert 2 == len(keys)
173
175
174 response = self.app.post(
176 response = self.app.post(
175 route_path('edit_user_auth_tokens_add', user_id=user_id),
177 route_path('edit_user_auth_tokens_add', user_id=user_id),
176 {'description': 'desc', 'lifetime': -1,
178 {'description': 'desc', 'lifetime': -1,
177 'csrf_token': self.csrf_token})
179 'csrf_token': self.csrf_token})
178 assert_session_flash(response, 'Auth token successfully created')
180 assert_session_flash(response, 'Auth token successfully created')
179 response.follow()
181 response.follow()
180
182
181 # now delete our key
183 # now delete our key
182 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
184 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
183 assert 3 == len(keys)
185 assert 3 == len(keys)
184
186
185 response = self.app.post(
187 response = self.app.post(
186 route_path('edit_user_auth_tokens_delete', user_id=user_id),
188 route_path('edit_user_auth_tokens_delete', user_id=user_id),
187 {'del_auth_token': keys[0].user_api_key_id,
189 {'del_auth_token': keys[0].user_api_key_id,
188 'csrf_token': self.csrf_token})
190 'csrf_token': self.csrf_token})
189
191
190 assert_session_flash(response, 'Auth token successfully deleted')
192 assert_session_flash(response, 'Auth token successfully deleted')
191 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
193 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
192 assert 2 == len(keys)
194 assert 2 == len(keys)
193
195
194 def test_ips(self):
196 def test_ips(self):
195 self.log_user()
197 self.log_user()
196 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
198 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
197 response = self.app.get(route_path('edit_user_ips', user_id=user.user_id))
199 response = self.app.get(route_path('edit_user_ips', user_id=user.user_id))
198 response.mustcontain('All IP addresses are allowed')
200 response.mustcontain('All IP addresses are allowed')
199
201
200 @pytest.mark.parametrize("test_name, ip, ip_range, failure", [
202 @pytest.mark.parametrize("test_name, ip, ip_range, failure", [
201 ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False),
203 ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False),
202 ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False),
204 ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False),
203 ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False),
205 ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False),
204 ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False),
206 ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False),
205 ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True),
207 ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True),
206 ('127_bad_ip', 'foobar', 'foobar', True),
208 ('127_bad_ip', 'foobar', 'foobar', True),
207 ])
209 ])
208 def test_ips_add(self, user_util, test_name, ip, ip_range, failure):
210 def test_ips_add(self, user_util, test_name, ip, ip_range, failure):
209 self.log_user()
211 self.log_user()
210 user = user_util.create_user(username=test_name)
212 user = user_util.create_user(username=test_name)
211 user_id = user.user_id
213 user_id = user.user_id
212
214
213 response = self.app.post(
215 response = self.app.post(
214 route_path('edit_user_ips_add', user_id=user_id),
216 route_path('edit_user_ips_add', user_id=user_id),
215 params={'new_ip': ip, 'csrf_token': self.csrf_token})
217 params={'new_ip': ip, 'csrf_token': self.csrf_token})
216
218
217 if failure:
219 if failure:
218 assert_session_flash(
220 assert_session_flash(
219 response, 'Please enter a valid IPv4 or IpV6 address')
221 response, 'Please enter a valid IPv4 or IpV6 address')
220 response = self.app.get(route_path('edit_user_ips', user_id=user_id))
222 response = self.app.get(route_path('edit_user_ips', user_id=user_id))
221
223
222 response.mustcontain(no=[ip])
224 response.mustcontain(no=[ip])
223 response.mustcontain(no=[ip_range])
225 response.mustcontain(no=[ip_range])
224
226
225 else:
227 else:
226 response = self.app.get(route_path('edit_user_ips', user_id=user_id))
228 response = self.app.get(route_path('edit_user_ips', user_id=user_id))
227 response.mustcontain(ip)
229 response.mustcontain(ip)
228 response.mustcontain(ip_range)
230 response.mustcontain(ip_range)
229
231
230 def test_ips_delete(self, user_util):
232 def test_ips_delete(self, user_util):
231 self.log_user()
233 self.log_user()
232 user = user_util.create_user()
234 user = user_util.create_user()
233 user_id = user.user_id
235 user_id = user.user_id
234 ip = '127.0.0.1/32'
236 ip = '127.0.0.1/32'
235 ip_range = '127.0.0.1 - 127.0.0.1'
237 ip_range = '127.0.0.1 - 127.0.0.1'
236 new_ip = UserModel().add_extra_ip(user_id, ip)
238 new_ip = UserModel().add_extra_ip(user_id, ip)
237 Session().commit()
239 Session().commit()
238 new_ip_id = new_ip.ip_id
240 new_ip_id = new_ip.ip_id
239
241
240 response = self.app.get(route_path('edit_user_ips', user_id=user_id))
242 response = self.app.get(route_path('edit_user_ips', user_id=user_id))
241 response.mustcontain(ip)
243 response.mustcontain(ip)
242 response.mustcontain(ip_range)
244 response.mustcontain(ip_range)
243
245
244 self.app.post(
246 self.app.post(
245 route_path('edit_user_ips_delete', user_id=user_id),
247 route_path('edit_user_ips_delete', user_id=user_id),
246 params={'del_ip_id': new_ip_id, 'csrf_token': self.csrf_token})
248 params={'del_ip_id': new_ip_id, 'csrf_token': self.csrf_token})
247
249
248 response = self.app.get(route_path('edit_user_ips', user_id=user_id))
250 response = self.app.get(route_path('edit_user_ips', user_id=user_id))
249 response.mustcontain('All IP addresses are allowed')
251 response.mustcontain('All IP addresses are allowed')
250 response.mustcontain(no=[ip])
252 response.mustcontain(no=[ip])
251 response.mustcontain(no=[ip_range])
253 response.mustcontain(no=[ip_range])
252
254
253 def test_emails(self):
255 def test_emails(self):
254 self.log_user()
256 self.log_user()
255 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
257 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
256 response = self.app.get(
258 response = self.app.get(
257 route_path('edit_user_emails', user_id=user.user_id))
259 route_path('edit_user_emails', user_id=user.user_id))
258 response.mustcontain('No additional emails specified')
260 response.mustcontain('No additional emails specified')
259
261
260 def test_emails_add(self, user_util):
262 def test_emails_add(self, user_util):
261 self.log_user()
263 self.log_user()
262 user = user_util.create_user()
264 user = user_util.create_user()
263 user_id = user.user_id
265 user_id = user.user_id
264
266
265 self.app.post(
267 self.app.post(
266 route_path('edit_user_emails_add', user_id=user_id),
268 route_path('edit_user_emails_add', user_id=user_id),
267 params={'new_email': 'example@rhodecode.com',
269 params={'new_email': 'example@rhodecode.com',
268 'csrf_token': self.csrf_token})
270 'csrf_token': self.csrf_token})
269
271
270 response = self.app.get(
272 response = self.app.get(
271 route_path('edit_user_emails', user_id=user_id))
273 route_path('edit_user_emails', user_id=user_id))
272 response.mustcontain('example@rhodecode.com')
274 response.mustcontain('example@rhodecode.com')
273
275
274 def test_emails_add_existing_email(self, user_util, user_regular):
276 def test_emails_add_existing_email(self, user_util, user_regular):
275 existing_email = user_regular.email
277 existing_email = user_regular.email
276
278
277 self.log_user()
279 self.log_user()
278 user = user_util.create_user()
280 user = user_util.create_user()
279 user_id = user.user_id
281 user_id = user.user_id
280
282
281 response = self.app.post(
283 response = self.app.post(
282 route_path('edit_user_emails_add', user_id=user_id),
284 route_path('edit_user_emails_add', user_id=user_id),
283 params={'new_email': existing_email,
285 params={'new_email': existing_email,
284 'csrf_token': self.csrf_token})
286 'csrf_token': self.csrf_token})
285 assert_session_flash(
287 assert_session_flash(
286 response, 'This e-mail address is already taken')
288 response, 'This e-mail address is already taken')
287
289
288 response = self.app.get(
290 response = self.app.get(
289 route_path('edit_user_emails', user_id=user_id))
291 route_path('edit_user_emails', user_id=user_id))
290 response.mustcontain(no=[existing_email])
292 response.mustcontain(no=[existing_email])
291
293
292 def test_emails_delete(self, user_util):
294 def test_emails_delete(self, user_util):
293 self.log_user()
295 self.log_user()
294 user = user_util.create_user()
296 user = user_util.create_user()
295 user_id = user.user_id
297 user_id = user.user_id
296
298
297 self.app.post(
299 self.app.post(
298 route_path('edit_user_emails_add', user_id=user_id),
300 route_path('edit_user_emails_add', user_id=user_id),
299 params={'new_email': 'example@rhodecode.com',
301 params={'new_email': 'example@rhodecode.com',
300 'csrf_token': self.csrf_token})
302 'csrf_token': self.csrf_token})
301
303
302 response = self.app.get(
304 response = self.app.get(
303 route_path('edit_user_emails', user_id=user_id))
305 route_path('edit_user_emails', user_id=user_id))
304 response.mustcontain('example@rhodecode.com')
306 response.mustcontain('example@rhodecode.com')
305
307
306 user_email = UserEmailMap.query()\
308 user_email = UserEmailMap.query()\
307 .filter(UserEmailMap.email == 'example@rhodecode.com') \
309 .filter(UserEmailMap.email == 'example@rhodecode.com') \
308 .filter(UserEmailMap.user_id == user_id)\
310 .filter(UserEmailMap.user_id == user_id)\
309 .one()
311 .one()
310
312
311 del_email_id = user_email.email_id
313 del_email_id = user_email.email_id
312 self.app.post(
314 self.app.post(
313 route_path('edit_user_emails_delete', user_id=user_id),
315 route_path('edit_user_emails_delete', user_id=user_id),
314 params={'del_email_id': del_email_id,
316 params={'del_email_id': del_email_id,
315 'csrf_token': self.csrf_token})
317 'csrf_token': self.csrf_token})
316
318
317 response = self.app.get(
319 response = self.app.get(
318 route_path('edit_user_emails', user_id=user_id))
320 route_path('edit_user_emails', user_id=user_id))
319 response.mustcontain(no=['example@rhodecode.com'])
321 response.mustcontain(no=['example@rhodecode.com'])
320
322
321
323
322 def test_create(self, request, xhr_header):
324 def test_create(self, request, xhr_header):
323 self.log_user()
325 self.log_user()
324 username = 'newtestuser'
326 username = 'newtestuser'
325 password = 'test12'
327 password = 'test12'
326 password_confirmation = password
328 password_confirmation = password
327 name = 'name'
329 name = 'name'
328 lastname = 'lastname'
330 lastname = 'lastname'
329 email = 'mail@mail.com'
331 email = 'mail@mail.com'
330
332
331 self.app.get(route_path('users_new'))
333 self.app.get(route_path('users_new'))
332
334
333 response = self.app.post(route_path('users_create'), params={
335 response = self.app.post(route_path('users_create'), params={
334 'username': username,
336 'username': username,
335 'password': password,
337 'password': password,
336 'password_confirmation': password_confirmation,
338 'password_confirmation': password_confirmation,
337 'firstname': name,
339 'firstname': name,
338 'active': True,
340 'active': True,
339 'lastname': lastname,
341 'lastname': lastname,
340 'extern_name': 'rhodecode',
342 'extern_name': 'rhodecode',
341 'extern_type': 'rhodecode',
343 'extern_type': 'rhodecode',
342 'email': email,
344 'email': email,
343 'csrf_token': self.csrf_token,
345 'csrf_token': self.csrf_token,
344 })
346 })
345 user_link = h.link_to(
347 user_link = h.link_to(
346 username,
348 username,
347 route_path(
349 route_path(
348 'user_edit', user_id=User.get_by_username(username).user_id))
350 'user_edit', user_id=User.get_by_username(username).user_id))
349 assert_session_flash(response, 'Created user %s' % (user_link,))
351 assert_session_flash(response, 'Created user %s' % (user_link,))
350
352
351 @request.addfinalizer
353 @request.addfinalizer
352 def cleanup():
354 def cleanup():
353 fixture.destroy_user(username)
355 fixture.destroy_user(username)
354 Session().commit()
356 Session().commit()
355
357
356 new_user = User.query().filter(User.username == username).one()
358 new_user = User.query().filter(User.username == username).one()
357
359
358 assert new_user.username == username
360 assert new_user.username == username
359 assert auth.check_password(password, new_user.password)
361 assert auth.check_password(password, new_user.password)
360 assert new_user.name == name
362 assert new_user.name == name
361 assert new_user.lastname == lastname
363 assert new_user.lastname == lastname
362 assert new_user.email == email
364 assert new_user.email == email
363
365
364 response = self.app.get(route_path('users_data'),
366 response = self.app.get(route_path('users_data'),
365 extra_environ=xhr_header)
367 extra_environ=xhr_header)
366 response.mustcontain(username)
368 response.mustcontain(username)
367
369
368 def test_create_err(self):
370 def test_create_err(self):
369 self.log_user()
371 self.log_user()
370 username = 'new_user'
372 username = 'new_user'
371 password = ''
373 password = ''
372 name = 'name'
374 name = 'name'
373 lastname = 'lastname'
375 lastname = 'lastname'
374 email = 'errmail.com'
376 email = 'errmail.com'
375
377
376 self.app.get(route_path('users_new'))
378 self.app.get(route_path('users_new'))
377
379
378 response = self.app.post(route_path('users_create'), params={
380 response = self.app.post(route_path('users_create'), params={
379 'username': username,
381 'username': username,
380 'password': password,
382 'password': password,
381 'name': name,
383 'name': name,
382 'active': False,
384 'active': False,
383 'lastname': lastname,
385 'lastname': lastname,
384 'email': email,
386 'email': email,
385 'csrf_token': self.csrf_token,
387 'csrf_token': self.csrf_token,
386 })
388 })
387
389
388 msg = u'Username "%(username)s" is forbidden'
390 msg = u'Username "%(username)s" is forbidden'
389 msg = h.html_escape(msg % {'username': 'new_user'})
391 msg = h.html_escape(msg % {'username': 'new_user'})
390 response.mustcontain('<span class="error-message">%s</span>' % msg)
392 response.mustcontain('<span class="error-message">%s</span>' % msg)
391 response.mustcontain(
393 response.mustcontain(
392 '<span class="error-message">Please enter a value</span>')
394 '<span class="error-message">Please enter a value</span>')
393 response.mustcontain(
395 response.mustcontain(
394 '<span class="error-message">An email address must contain a'
396 '<span class="error-message">An email address must contain a'
395 ' single @</span>')
397 ' single @</span>')
396
398
397 def get_user():
399 def get_user():
398 Session().query(User).filter(User.username == username).one()
400 Session().query(User).filter(User.username == username).one()
399
401
400 with pytest.raises(NoResultFound):
402 with pytest.raises(NoResultFound):
401 get_user()
403 get_user()
402
404
403 def test_new(self):
405 def test_new(self):
404 self.log_user()
406 self.log_user()
405 self.app.get(route_path('users_new'))
407 self.app.get(route_path('users_new'))
406
408
407 @pytest.mark.parametrize("name, attrs", [
409 @pytest.mark.parametrize("name, attrs", [
408 ('firstname', {'firstname': 'new_username'}),
410 ('firstname', {'firstname': 'new_username'}),
409 ('lastname', {'lastname': 'new_username'}),
411 ('lastname', {'lastname': 'new_username'}),
410 ('admin', {'admin': True}),
412 ('admin', {'admin': True}),
411 ('admin', {'admin': False}),
413 ('admin', {'admin': False}),
412 ('extern_type', {'extern_type': 'ldap'}),
414 ('extern_type', {'extern_type': 'ldap'}),
413 ('extern_type', {'extern_type': None}),
415 ('extern_type', {'extern_type': None}),
414 ('extern_name', {'extern_name': 'test'}),
416 ('extern_name', {'extern_name': 'test'}),
415 ('extern_name', {'extern_name': None}),
417 ('extern_name', {'extern_name': None}),
416 ('active', {'active': False}),
418 ('active', {'active': False}),
417 ('active', {'active': True}),
419 ('active', {'active': True}),
418 ('email', {'email': 'some@email.com'}),
420 ('email', {'email': 'some@email.com'}),
419 ('language', {'language': 'de'}),
421 ('language', {'language': 'de'}),
420 ('language', {'language': 'en'}),
422 ('language', {'language': 'en'}),
421 # ('new_password', {'new_password': 'foobar123',
423 # ('new_password', {'new_password': 'foobar123',
422 # 'password_confirmation': 'foobar123'})
424 # 'password_confirmation': 'foobar123'})
423 ])
425 ])
424 def test_update(self, name, attrs, user_util):
426 def test_update(self, name, attrs, user_util):
425 self.log_user()
427 self.log_user()
426 usr = user_util.create_user(
428 usr = user_util.create_user(
427 password='qweqwe',
429 password='qweqwe',
428 email='testme@rhodecode.org',
430 email='testme@rhodecode.org',
429 extern_type='rhodecode',
431 extern_type='rhodecode',
430 extern_name='xxx',
432 extern_name='xxx',
431 )
433 )
432 user_id = usr.user_id
434 user_id = usr.user_id
433 Session().commit()
435 Session().commit()
434
436
435 params = usr.get_api_data()
437 params = usr.get_api_data()
436 cur_lang = params['language'] or 'en'
438 cur_lang = params['language'] or 'en'
437 params.update({
439 params.update({
438 'password_confirmation': '',
440 'password_confirmation': '',
439 'new_password': '',
441 'new_password': '',
440 'language': cur_lang,
442 'language': cur_lang,
441 'csrf_token': self.csrf_token,
443 'csrf_token': self.csrf_token,
442 })
444 })
443 params.update({'new_password': ''})
445 params.update({'new_password': ''})
444 params.update(attrs)
446 params.update(attrs)
445 if name == 'email':
447 if name == 'email':
446 params['emails'] = [attrs['email']]
448 params['emails'] = [attrs['email']]
447 elif name == 'extern_type':
449 elif name == 'extern_type':
448 # cannot update this via form, expected value is original one
450 # cannot update this via form, expected value is original one
449 params['extern_type'] = "rhodecode"
451 params['extern_type'] = "rhodecode"
450 elif name == 'extern_name':
452 elif name == 'extern_name':
451 # cannot update this via form, expected value is original one
453 # cannot update this via form, expected value is original one
452 params['extern_name'] = 'xxx'
454 params['extern_name'] = 'xxx'
453 # special case since this user is not
455 # special case since this user is not
454 # logged in yet his data is not filled
456 # logged in yet his data is not filled
455 # so we use creation data
457 # so we use creation data
456
458
457 response = self.app.post(
459 response = self.app.post(
458 route_path('user_update', user_id=usr.user_id), params)
460 route_path('user_update', user_id=usr.user_id), params)
459 assert response.status_int == 302
461 assert response.status_int == 302
460 assert_session_flash(response, 'User updated successfully')
462 assert_session_flash(response, 'User updated successfully')
461
463
462 updated_user = User.get(user_id)
464 updated_user = User.get(user_id)
463 updated_params = updated_user.get_api_data()
465 updated_params = updated_user.get_api_data()
464 updated_params.update({'password_confirmation': ''})
466 updated_params.update({'password_confirmation': ''})
465 updated_params.update({'new_password': ''})
467 updated_params.update({'new_password': ''})
466
468
467 del params['csrf_token']
469 del params['csrf_token']
468 assert params == updated_params
470 assert params == updated_params
469
471
470 def test_update_and_migrate_password(
472 def test_update_and_migrate_password(
471 self, autologin_user, real_crypto_backend, user_util):
473 self, autologin_user, real_crypto_backend, user_util):
472
474
473 user = user_util.create_user()
475 user = user_util.create_user()
474 temp_user = user.username
476 temp_user = user.username
475 user.password = auth._RhodeCodeCryptoSha256().hash_create(
477 user.password = auth._RhodeCodeCryptoSha256().hash_create(
476 b'test123')
478 b'test123')
477 Session().add(user)
479 Session().add(user)
478 Session().commit()
480 Session().commit()
479
481
480 params = user.get_api_data()
482 params = user.get_api_data()
481
483
482 params.update({
484 params.update({
483 'password_confirmation': 'qweqwe123',
485 'password_confirmation': 'qweqwe123',
484 'new_password': 'qweqwe123',
486 'new_password': 'qweqwe123',
485 'language': 'en',
487 'language': 'en',
486 'csrf_token': autologin_user.csrf_token,
488 'csrf_token': autologin_user.csrf_token,
487 })
489 })
488
490
489 response = self.app.post(
491 response = self.app.post(
490 route_path('user_update', user_id=user.user_id), params)
492 route_path('user_update', user_id=user.user_id), params)
491 assert response.status_int == 302
493 assert response.status_int == 302
492 assert_session_flash(response, 'User updated successfully')
494 assert_session_flash(response, 'User updated successfully')
493
495
494 # new password should be bcrypted, after log-in and transfer
496 # new password should be bcrypted, after log-in and transfer
495 user = User.get_by_username(temp_user)
497 user = User.get_by_username(temp_user)
496 assert user.password.startswith('$')
498 assert user.password.startswith('$')
497
499
498 updated_user = User.get_by_username(temp_user)
500 updated_user = User.get_by_username(temp_user)
499 updated_params = updated_user.get_api_data()
501 updated_params = updated_user.get_api_data()
500 updated_params.update({'password_confirmation': 'qweqwe123'})
502 updated_params.update({'password_confirmation': 'qweqwe123'})
501 updated_params.update({'new_password': 'qweqwe123'})
503 updated_params.update({'new_password': 'qweqwe123'})
502
504
503 del params['csrf_token']
505 del params['csrf_token']
504 assert params == updated_params
506 assert params == updated_params
505
507
506 def test_delete(self):
508 def test_delete(self):
507 self.log_user()
509 self.log_user()
508 username = 'newtestuserdeleteme'
510 username = 'newtestuserdeleteme'
509
511
510 fixture.create_user(name=username)
512 fixture.create_user(name=username)
511
513
512 new_user = Session().query(User)\
514 new_user = Session().query(User)\
513 .filter(User.username == username).one()
515 .filter(User.username == username).one()
514 response = self.app.post(
516 response = self.app.post(
515 route_path('user_delete', user_id=new_user.user_id),
517 route_path('user_delete', user_id=new_user.user_id),
516 params={'csrf_token': self.csrf_token})
518 params={'csrf_token': self.csrf_token})
517
519
518 assert_session_flash(response, 'Successfully deleted user')
520 assert_session_flash(response, 'Successfully deleted user')
519
521
520 def test_delete_owner_of_repository(self, request, user_util):
522 def test_delete_owner_of_repository(self, request, user_util):
521 self.log_user()
523 self.log_user()
522 obj_name = 'test_repo'
524 obj_name = 'test_repo'
523 usr = user_util.create_user()
525 usr = user_util.create_user()
524 username = usr.username
526 username = usr.username
525 fixture.create_repo(obj_name, cur_user=usr.username)
527 fixture.create_repo(obj_name, cur_user=usr.username)
526
528
527 new_user = Session().query(User)\
529 new_user = Session().query(User)\
528 .filter(User.username == username).one()
530 .filter(User.username == username).one()
529 response = self.app.post(
531 response = self.app.post(
530 route_path('user_delete', user_id=new_user.user_id),
532 route_path('user_delete', user_id=new_user.user_id),
531 params={'csrf_token': self.csrf_token})
533 params={'csrf_token': self.csrf_token})
532
534
533 msg = 'user "%s" still owns 1 repositories and cannot be removed. ' \
535 msg = 'user "%s" still owns 1 repositories and cannot be removed. ' \
534 'Switch owners or remove those repositories:%s' % (username,
536 'Switch owners or remove those repositories:%s' % (username,
535 obj_name)
537 obj_name)
536 assert_session_flash(response, msg)
538 assert_session_flash(response, msg)
537 fixture.destroy_repo(obj_name)
539 fixture.destroy_repo(obj_name)
538
540
539 def test_delete_owner_of_repository_detaching(self, request, user_util):
541 def test_delete_owner_of_repository_detaching(self, request, user_util):
540 self.log_user()
542 self.log_user()
541 obj_name = 'test_repo'
543 obj_name = 'test_repo'
542 usr = user_util.create_user(auto_cleanup=False)
544 usr = user_util.create_user(auto_cleanup=False)
543 username = usr.username
545 username = usr.username
544 fixture.create_repo(obj_name, cur_user=usr.username)
546 fixture.create_repo(obj_name, cur_user=usr.username)
545
547
546 new_user = Session().query(User)\
548 new_user = Session().query(User)\
547 .filter(User.username == username).one()
549 .filter(User.username == username).one()
548 response = self.app.post(
550 response = self.app.post(
549 route_path('user_delete', user_id=new_user.user_id),
551 route_path('user_delete', user_id=new_user.user_id),
550 params={'user_repos': 'detach', 'csrf_token': self.csrf_token})
552 params={'user_repos': 'detach', 'csrf_token': self.csrf_token})
551
553
552 msg = 'Detached 1 repositories'
554 msg = 'Detached 1 repositories'
553 assert_session_flash(response, msg)
555 assert_session_flash(response, msg)
554 fixture.destroy_repo(obj_name)
556 fixture.destroy_repo(obj_name)
555
557
556 def test_delete_owner_of_repository_deleting(self, request, user_util):
558 def test_delete_owner_of_repository_deleting(self, request, user_util):
557 self.log_user()
559 self.log_user()
558 obj_name = 'test_repo'
560 obj_name = 'test_repo'
559 usr = user_util.create_user(auto_cleanup=False)
561 usr = user_util.create_user(auto_cleanup=False)
560 username = usr.username
562 username = usr.username
561 fixture.create_repo(obj_name, cur_user=usr.username)
563 fixture.create_repo(obj_name, cur_user=usr.username)
562
564
563 new_user = Session().query(User)\
565 new_user = Session().query(User)\
564 .filter(User.username == username).one()
566 .filter(User.username == username).one()
565 response = self.app.post(
567 response = self.app.post(
566 route_path('user_delete', user_id=new_user.user_id),
568 route_path('user_delete', user_id=new_user.user_id),
567 params={'user_repos': 'delete', 'csrf_token': self.csrf_token})
569 params={'user_repos': 'delete', 'csrf_token': self.csrf_token})
568
570
569 msg = 'Deleted 1 repositories'
571 msg = 'Deleted 1 repositories'
570 assert_session_flash(response, msg)
572 assert_session_flash(response, msg)
571
573
572 def test_delete_owner_of_repository_group(self, request, user_util):
574 def test_delete_owner_of_repository_group(self, request, user_util):
573 self.log_user()
575 self.log_user()
574 obj_name = 'test_group'
576 obj_name = 'test_group'
575 usr = user_util.create_user()
577 usr = user_util.create_user()
576 username = usr.username
578 username = usr.username
577 fixture.create_repo_group(obj_name, cur_user=usr.username)
579 fixture.create_repo_group(obj_name, cur_user=usr.username)
578
580
579 new_user = Session().query(User)\
581 new_user = Session().query(User)\
580 .filter(User.username == username).one()
582 .filter(User.username == username).one()
581 response = self.app.post(
583 response = self.app.post(
582 route_path('user_delete', user_id=new_user.user_id),
584 route_path('user_delete', user_id=new_user.user_id),
583 params={'csrf_token': self.csrf_token})
585 params={'csrf_token': self.csrf_token})
584
586
585 msg = 'user "%s" still owns 1 repository groups and cannot be removed. ' \
587 msg = 'user "%s" still owns 1 repository groups and cannot be removed. ' \
586 'Switch owners or remove those repository groups:%s' % (username,
588 'Switch owners or remove those repository groups:%s' % (username,
587 obj_name)
589 obj_name)
588 assert_session_flash(response, msg)
590 assert_session_flash(response, msg)
589 fixture.destroy_repo_group(obj_name)
591 fixture.destroy_repo_group(obj_name)
590
592
591 def test_delete_owner_of_repository_group_detaching(self, request, user_util):
593 def test_delete_owner_of_repository_group_detaching(self, request, user_util):
592 self.log_user()
594 self.log_user()
593 obj_name = 'test_group'
595 obj_name = 'test_group'
594 usr = user_util.create_user(auto_cleanup=False)
596 usr = user_util.create_user(auto_cleanup=False)
595 username = usr.username
597 username = usr.username
596 fixture.create_repo_group(obj_name, cur_user=usr.username)
598 fixture.create_repo_group(obj_name, cur_user=usr.username)
597
599
598 new_user = Session().query(User)\
600 new_user = Session().query(User)\
599 .filter(User.username == username).one()
601 .filter(User.username == username).one()
600 response = self.app.post(
602 response = self.app.post(
601 route_path('user_delete', user_id=new_user.user_id),
603 route_path('user_delete', user_id=new_user.user_id),
602 params={'user_repo_groups': 'delete', 'csrf_token': self.csrf_token})
604 params={'user_repo_groups': 'delete', 'csrf_token': self.csrf_token})
603
605
604 msg = 'Deleted 1 repository groups'
606 msg = 'Deleted 1 repository groups'
605 assert_session_flash(response, msg)
607 assert_session_flash(response, msg)
606
608
607 def test_delete_owner_of_repository_group_deleting(self, request, user_util):
609 def test_delete_owner_of_repository_group_deleting(self, request, user_util):
608 self.log_user()
610 self.log_user()
609 obj_name = 'test_group'
611 obj_name = 'test_group'
610 usr = user_util.create_user(auto_cleanup=False)
612 usr = user_util.create_user(auto_cleanup=False)
611 username = usr.username
613 username = usr.username
612 fixture.create_repo_group(obj_name, cur_user=usr.username)
614 fixture.create_repo_group(obj_name, cur_user=usr.username)
613
615
614 new_user = Session().query(User)\
616 new_user = Session().query(User)\
615 .filter(User.username == username).one()
617 .filter(User.username == username).one()
616 response = self.app.post(
618 response = self.app.post(
617 route_path('user_delete', user_id=new_user.user_id),
619 route_path('user_delete', user_id=new_user.user_id),
618 params={'user_repo_groups': 'detach', 'csrf_token': self.csrf_token})
620 params={'user_repo_groups': 'detach', 'csrf_token': self.csrf_token})
619
621
620 msg = 'Detached 1 repository groups'
622 msg = 'Detached 1 repository groups'
621 assert_session_flash(response, msg)
623 assert_session_flash(response, msg)
622 fixture.destroy_repo_group(obj_name)
624 fixture.destroy_repo_group(obj_name)
623
625
624 def test_delete_owner_of_user_group(self, request, user_util):
626 def test_delete_owner_of_user_group(self, request, user_util):
625 self.log_user()
627 self.log_user()
626 obj_name = 'test_user_group'
628 obj_name = 'test_user_group'
627 usr = user_util.create_user()
629 usr = user_util.create_user()
628 username = usr.username
630 username = usr.username
629 fixture.create_user_group(obj_name, cur_user=usr.username)
631 fixture.create_user_group(obj_name, cur_user=usr.username)
630
632
631 new_user = Session().query(User)\
633 new_user = Session().query(User)\
632 .filter(User.username == username).one()
634 .filter(User.username == username).one()
633 response = self.app.post(
635 response = self.app.post(
634 route_path('user_delete', user_id=new_user.user_id),
636 route_path('user_delete', user_id=new_user.user_id),
635 params={'csrf_token': self.csrf_token})
637 params={'csrf_token': self.csrf_token})
636
638
637 msg = 'user "%s" still owns 1 user groups and cannot be removed. ' \
639 msg = 'user "%s" still owns 1 user groups and cannot be removed. ' \
638 'Switch owners or remove those user groups:%s' % (username,
640 'Switch owners or remove those user groups:%s' % (username,
639 obj_name)
641 obj_name)
640 assert_session_flash(response, msg)
642 assert_session_flash(response, msg)
641 fixture.destroy_user_group(obj_name)
643 fixture.destroy_user_group(obj_name)
642
644
643 def test_delete_owner_of_user_group_detaching(self, request, user_util):
645 def test_delete_owner_of_user_group_detaching(self, request, user_util):
644 self.log_user()
646 self.log_user()
645 obj_name = 'test_user_group'
647 obj_name = 'test_user_group'
646 usr = user_util.create_user(auto_cleanup=False)
648 usr = user_util.create_user(auto_cleanup=False)
647 username = usr.username
649 username = usr.username
648 fixture.create_user_group(obj_name, cur_user=usr.username)
650 fixture.create_user_group(obj_name, cur_user=usr.username)
649
651
650 new_user = Session().query(User)\
652 new_user = Session().query(User)\
651 .filter(User.username == username).one()
653 .filter(User.username == username).one()
652 try:
654 try:
653 response = self.app.post(
655 response = self.app.post(
654 route_path('user_delete', user_id=new_user.user_id),
656 route_path('user_delete', user_id=new_user.user_id),
655 params={'user_user_groups': 'detach',
657 params={'user_user_groups': 'detach',
656 'csrf_token': self.csrf_token})
658 'csrf_token': self.csrf_token})
657
659
658 msg = 'Detached 1 user groups'
660 msg = 'Detached 1 user groups'
659 assert_session_flash(response, msg)
661 assert_session_flash(response, msg)
660 finally:
662 finally:
661 fixture.destroy_user_group(obj_name)
663 fixture.destroy_user_group(obj_name)
662
664
663 def test_delete_owner_of_user_group_deleting(self, request, user_util):
665 def test_delete_owner_of_user_group_deleting(self, request, user_util):
664 self.log_user()
666 self.log_user()
665 obj_name = 'test_user_group'
667 obj_name = 'test_user_group'
666 usr = user_util.create_user(auto_cleanup=False)
668 usr = user_util.create_user(auto_cleanup=False)
667 username = usr.username
669 username = usr.username
668 fixture.create_user_group(obj_name, cur_user=usr.username)
670 fixture.create_user_group(obj_name, cur_user=usr.username)
669
671
670 new_user = Session().query(User)\
672 new_user = Session().query(User)\
671 .filter(User.username == username).one()
673 .filter(User.username == username).one()
672 response = self.app.post(
674 response = self.app.post(
673 route_path('user_delete', user_id=new_user.user_id),
675 route_path('user_delete', user_id=new_user.user_id),
674 params={'user_user_groups': 'delete', 'csrf_token': self.csrf_token})
676 params={'user_user_groups': 'delete', 'csrf_token': self.csrf_token})
675
677
676 msg = 'Deleted 1 user groups'
678 msg = 'Deleted 1 user groups'
677 assert_session_flash(response, msg)
679 assert_session_flash(response, msg)
678
680
679 def test_edit(self, user_util):
681 def test_edit(self, user_util):
680 self.log_user()
682 self.log_user()
681 user = user_util.create_user()
683 user = user_util.create_user()
682 self.app.get(route_path('user_edit', user_id=user.user_id))
684 self.app.get(route_path('user_edit', user_id=user.user_id))
683
685
684 def test_edit_default_user_redirect(self):
686 def test_edit_default_user_redirect(self):
685 self.log_user()
687 self.log_user()
686 user = User.get_default_user()
688 user = User.get_default_user()
687 self.app.get(route_path('user_edit', user_id=user.user_id), status=302)
689 self.app.get(route_path('user_edit', user_id=user.user_id), status=302)
688
690
689 @pytest.mark.parametrize(
691 @pytest.mark.parametrize(
690 'repo_create, repo_create_write, user_group_create, repo_group_create,'
692 'repo_create, repo_create_write, user_group_create, repo_group_create,'
691 'fork_create, inherit_default_permissions, expect_error,'
693 'fork_create, inherit_default_permissions, expect_error,'
692 'expect_form_error', [
694 'expect_form_error', [
693 ('hg.create.none', 'hg.create.write_on_repogroup.false',
695 ('hg.create.none', 'hg.create.write_on_repogroup.false',
694 'hg.usergroup.create.false', 'hg.repogroup.create.false',
696 'hg.usergroup.create.false', 'hg.repogroup.create.false',
695 'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
697 'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
696 ('hg.create.repository', 'hg.create.write_on_repogroup.false',
698 ('hg.create.repository', 'hg.create.write_on_repogroup.false',
697 'hg.usergroup.create.false', 'hg.repogroup.create.false',
699 'hg.usergroup.create.false', 'hg.repogroup.create.false',
698 'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
700 'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
699 ('hg.create.repository', 'hg.create.write_on_repogroup.true',
701 ('hg.create.repository', 'hg.create.write_on_repogroup.true',
700 'hg.usergroup.create.true', 'hg.repogroup.create.true',
702 'hg.usergroup.create.true', 'hg.repogroup.create.true',
701 'hg.fork.repository', 'hg.inherit_default_perms.false', False,
703 'hg.fork.repository', 'hg.inherit_default_perms.false', False,
702 False),
704 False),
703 ('hg.create.XXX', 'hg.create.write_on_repogroup.true',
705 ('hg.create.XXX', 'hg.create.write_on_repogroup.true',
704 'hg.usergroup.create.true', 'hg.repogroup.create.true',
706 'hg.usergroup.create.true', 'hg.repogroup.create.true',
705 'hg.fork.repository', 'hg.inherit_default_perms.false', False,
707 'hg.fork.repository', 'hg.inherit_default_perms.false', False,
706 True),
708 True),
707 ('', '', '', '', '', '', True, False),
709 ('', '', '', '', '', '', True, False),
708 ])
710 ])
709 def test_global_perms_on_user(
711 def test_global_perms_on_user(
710 self, repo_create, repo_create_write, user_group_create,
712 self, repo_create, repo_create_write, user_group_create,
711 repo_group_create, fork_create, expect_error, expect_form_error,
713 repo_group_create, fork_create, expect_error, expect_form_error,
712 inherit_default_permissions, user_util):
714 inherit_default_permissions, user_util):
713 self.log_user()
715 self.log_user()
714 user = user_util.create_user()
716 user = user_util.create_user()
715 uid = user.user_id
717 uid = user.user_id
716
718
717 # ENABLE REPO CREATE ON A GROUP
719 # ENABLE REPO CREATE ON A GROUP
718 perm_params = {
720 perm_params = {
719 'inherit_default_permissions': False,
721 'inherit_default_permissions': False,
720 'default_repo_create': repo_create,
722 'default_repo_create': repo_create,
721 'default_repo_create_on_write': repo_create_write,
723 'default_repo_create_on_write': repo_create_write,
722 'default_user_group_create': user_group_create,
724 'default_user_group_create': user_group_create,
723 'default_repo_group_create': repo_group_create,
725 'default_repo_group_create': repo_group_create,
724 'default_fork_create': fork_create,
726 'default_fork_create': fork_create,
725 'default_inherit_default_permissions': inherit_default_permissions,
727 'default_inherit_default_permissions': inherit_default_permissions,
726 'csrf_token': self.csrf_token,
728 'csrf_token': self.csrf_token,
727 }
729 }
728 response = self.app.post(
730 response = self.app.post(
729 route_path('user_edit_global_perms_update', user_id=uid),
731 route_path('user_edit_global_perms_update', user_id=uid),
730 params=perm_params)
732 params=perm_params)
731
733
732 if expect_form_error:
734 if expect_form_error:
733 assert response.status_int == 200
735 assert response.status_int == 200
734 response.mustcontain('Value must be one of')
736 response.mustcontain('Value must be one of')
735 else:
737 else:
736 if expect_error:
738 if expect_error:
737 msg = 'An error occurred during permissions saving'
739 msg = 'An error occurred during permissions saving'
738 else:
740 else:
739 msg = 'User global permissions updated successfully'
741 msg = 'User global permissions updated successfully'
740 ug = User.get(uid)
742 ug = User.get(uid)
741 del perm_params['inherit_default_permissions']
743 del perm_params['inherit_default_permissions']
742 del perm_params['csrf_token']
744 del perm_params['csrf_token']
743 assert perm_params == ug.get_default_perms()
745 assert perm_params == ug.get_default_perms()
744 assert_session_flash(response, msg)
746 assert_session_flash(response, msg)
745
747
746 def test_global_permissions_initial_values(self, user_util):
748 def test_global_permissions_initial_values(self, user_util):
747 self.log_user()
749 self.log_user()
748 user = user_util.create_user()
750 user = user_util.create_user()
749 uid = user.user_id
751 uid = user.user_id
750 response = self.app.get(
752 response = self.app.get(
751 route_path('user_edit_global_perms', user_id=uid))
753 route_path('user_edit_global_perms', user_id=uid))
752 default_user = User.get_default_user()
754 default_user = User.get_default_user()
753 default_permissions = default_user.get_default_perms()
755 default_permissions = default_user.get_default_perms()
754 assert_response = response.assert_response()
756 assert_response = response.assert_response()
755 expected_permissions = (
757 expected_permissions = (
756 'default_repo_create', 'default_repo_create_on_write',
758 'default_repo_create', 'default_repo_create_on_write',
757 'default_fork_create', 'default_repo_group_create',
759 'default_fork_create', 'default_repo_group_create',
758 'default_user_group_create', 'default_inherit_default_permissions')
760 'default_user_group_create', 'default_inherit_default_permissions')
759 for permission in expected_permissions:
761 for permission in expected_permissions:
760 css_selector = '[name={}][checked=checked]'.format(permission)
762 css_selector = '[name={}][checked=checked]'.format(permission)
761 element = assert_response.get_element(css_selector)
763 element = assert_response.get_element(css_selector)
762 assert element.value == default_permissions[permission]
764 assert element.value == default_permissions[permission]
763
765
764 def test_perms_summary_page(self):
766 def test_perms_summary_page(self):
765 user = self.log_user()
767 user = self.log_user()
766 response = self.app.get(
768 response = self.app.get(
767 route_path('edit_user_perms_summary', user_id=user['user_id']))
769 route_path('edit_user_perms_summary', user_id=user['user_id']))
768 for repo in Repository.query().all():
770 for repo in Repository.query().all():
769 response.mustcontain(repo.repo_name)
771 response.mustcontain(repo.repo_name)
770
772
771 def test_perms_summary_page_json(self):
773 def test_perms_summary_page_json(self):
772 user = self.log_user()
774 user = self.log_user()
773 response = self.app.get(
775 response = self.app.get(
774 route_path('edit_user_perms_summary_json', user_id=user['user_id']))
776 route_path('edit_user_perms_summary_json', user_id=user['user_id']))
775 for repo in Repository.query().all():
777 for repo in Repository.query().all():
776 response.mustcontain(repo.repo_name)
778 response.mustcontain(repo.repo_name)
777
779
778 def test_audit_log_page(self):
780 def test_audit_log_page(self):
779 user = self.log_user()
781 user = self.log_user()
780 self.app.get(
782 self.app.get(
781 route_path('edit_user_audit_logs', user_id=user['user_id']))
783 route_path('edit_user_audit_logs', user_id=user['user_id']))
General Comments 0
You need to be logged in to leave comments. Login now