##// END OF EJS Templates
chore(cleanups): cleanup unicode usage...
super-admin -
r5453:c08e0f36 default
parent child Browse files
Show More
@@ -1,348 +1,348 b''
1
1
2 # Copyright (C) 2010-2023 RhodeCode GmbH
2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software: you can redistribute it and/or modify
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License, version 3
5 # it under the terms of the GNU Affero General Public License, version 3
6 # (only), as published by the Free Software Foundation.
6 # (only), as published by the Free Software Foundation.
7 #
7 #
8 # This program is distributed in the hope that it will be useful,
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
11 # GNU General Public License for more details.
12 #
12 #
13 # You should have received a copy of the GNU Affero General Public License
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 #
15 #
16 # This program is dual-licensed. If you wish to learn more about the
16 # This program is dual-licensed. If you wish to learn more about the
17 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19
19
20 import mock
20 import mock
21 import pytest
21 import pytest
22
22
23 from rhodecode.lib.vcs import settings
23 from rhodecode.lib.vcs import settings
24 from rhodecode.model.meta import Session
24 from rhodecode.model.meta import Session
25 from rhodecode.model.repo import RepoModel
25 from rhodecode.model.repo import RepoModel
26 from rhodecode.model.user import UserModel
26 from rhodecode.model.user import UserModel
27 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
27 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
28 from rhodecode.api.tests.utils import (
28 from rhodecode.api.tests.utils import (
29 build_data, api_call, assert_ok, assert_error, crash)
29 build_data, api_call, assert_ok, assert_error, crash)
30 from rhodecode.tests.fixture import Fixture
30 from rhodecode.tests.fixture import Fixture
31 from rhodecode.lib.ext_json import json
31 from rhodecode.lib.ext_json import json
32 from rhodecode.lib.str_utils import safe_str
32 from rhodecode.lib.str_utils import safe_str
33
33
34
34
35 fixture = Fixture()
35 fixture = Fixture()
36
36
37
37
38 @pytest.mark.usefixtures("testuser_api", "app")
38 @pytest.mark.usefixtures("testuser_api", "app")
39 class TestCreateRepo(object):
39 class TestCreateRepo(object):
40
40
41 @pytest.mark.parametrize('given, expected_name, expected_exc', [
41 @pytest.mark.parametrize('given, expected_name, expected_exc', [
42 ('api repo-1', 'api-repo-1', False),
42 ('api repo-1', 'api-repo-1', False),
43 ('api-repo 1-Δ…Δ‡', 'api-repo-1-Δ…Δ‡', False),
43 ('api-repo 1-Δ…Δ‡', 'api-repo-1-Δ…Δ‡', False),
44 (u'unicode-Δ…Δ‡', u'unicode-Δ…Δ‡', False),
44 ('unicode-Δ…Δ‡', u'unicode-Δ…Δ‡', False),
45 ('some repo v1.2', 'some-repo-v1.2', False),
45 ('some repo v1.2', 'some-repo-v1.2', False),
46 ('v2.0', 'v2.0', False),
46 ('v2.0', 'v2.0', False),
47 ])
47 ])
48 def test_api_create_repo(self, backend, given, expected_name, expected_exc):
48 def test_api_create_repo(self, backend, given, expected_name, expected_exc):
49
49
50 id_, params = build_data(
50 id_, params = build_data(
51 self.apikey,
51 self.apikey,
52 'create_repo',
52 'create_repo',
53 repo_name=given,
53 repo_name=given,
54 owner=TEST_USER_ADMIN_LOGIN,
54 owner=TEST_USER_ADMIN_LOGIN,
55 repo_type=backend.alias,
55 repo_type=backend.alias,
56 )
56 )
57 response = api_call(self.app, params)
57 response = api_call(self.app, params)
58
58
59 ret = {
59 ret = {
60 'msg': 'Created new repository `%s`' % (expected_name,),
60 'msg': 'Created new repository `%s`' % (expected_name,),
61 'success': True,
61 'success': True,
62 'task': None,
62 'task': None,
63 }
63 }
64 expected = ret
64 expected = ret
65 assert_ok(id_, expected, given=response.body)
65 assert_ok(id_, expected, given=response.body)
66
66
67 repo = RepoModel().get_by_repo_name(safe_str(expected_name))
67 repo = RepoModel().get_by_repo_name(safe_str(expected_name))
68 assert repo is not None
68 assert repo is not None
69
69
70 id_, params = build_data(self.apikey, 'get_repo', repoid=expected_name)
70 id_, params = build_data(self.apikey, 'get_repo', repoid=expected_name)
71 response = api_call(self.app, params)
71 response = api_call(self.app, params)
72 body = json.loads(response.body)
72 body = json.loads(response.body)
73
73
74 assert body['result']['enable_downloads'] is False
74 assert body['result']['enable_downloads'] is False
75 assert body['result']['enable_locking'] is False
75 assert body['result']['enable_locking'] is False
76 assert body['result']['enable_statistics'] is False
76 assert body['result']['enable_statistics'] is False
77
77
78 fixture.destroy_repo(safe_str(expected_name))
78 fixture.destroy_repo(safe_str(expected_name))
79
79
80 def test_api_create_restricted_repo_type(self, backend):
80 def test_api_create_restricted_repo_type(self, backend):
81 repo_name = 'api-repo-type-{0}'.format(backend.alias)
81 repo_name = 'api-repo-type-{0}'.format(backend.alias)
82 id_, params = build_data(
82 id_, params = build_data(
83 self.apikey,
83 self.apikey,
84 'create_repo',
84 'create_repo',
85 repo_name=repo_name,
85 repo_name=repo_name,
86 owner=TEST_USER_ADMIN_LOGIN,
86 owner=TEST_USER_ADMIN_LOGIN,
87 repo_type=backend.alias,
87 repo_type=backend.alias,
88 )
88 )
89 git_backend = settings.BACKENDS['git']
89 git_backend = settings.BACKENDS['git']
90 with mock.patch(
90 with mock.patch(
91 'rhodecode.lib.vcs.settings.BACKENDS', {'git': git_backend}):
91 'rhodecode.lib.vcs.settings.BACKENDS', {'git': git_backend}):
92 response = api_call(self.app, params)
92 response = api_call(self.app, params)
93
93
94 repo = RepoModel().get_by_repo_name(repo_name)
94 repo = RepoModel().get_by_repo_name(repo_name)
95
95
96 if backend.alias == 'git':
96 if backend.alias == 'git':
97 assert repo is not None
97 assert repo is not None
98 expected = {
98 expected = {
99 'msg': 'Created new repository `{0}`'.format(repo_name,),
99 'msg': 'Created new repository `{0}`'.format(repo_name,),
100 'success': True,
100 'success': True,
101 'task': None,
101 'task': None,
102 }
102 }
103 assert_ok(id_, expected, given=response.body)
103 assert_ok(id_, expected, given=response.body)
104 else:
104 else:
105 assert repo is None
105 assert repo is None
106
106
107 fixture.destroy_repo(repo_name)
107 fixture.destroy_repo(repo_name)
108
108
109 def test_api_create_repo_with_booleans(self, backend):
109 def test_api_create_repo_with_booleans(self, backend):
110 repo_name = 'api-repo-2'
110 repo_name = 'api-repo-2'
111 id_, params = build_data(
111 id_, params = build_data(
112 self.apikey,
112 self.apikey,
113 'create_repo',
113 'create_repo',
114 repo_name=repo_name,
114 repo_name=repo_name,
115 owner=TEST_USER_ADMIN_LOGIN,
115 owner=TEST_USER_ADMIN_LOGIN,
116 repo_type=backend.alias,
116 repo_type=backend.alias,
117 enable_statistics=True,
117 enable_statistics=True,
118 enable_locking=True,
118 enable_locking=True,
119 enable_downloads=True
119 enable_downloads=True
120 )
120 )
121 response = api_call(self.app, params)
121 response = api_call(self.app, params)
122
122
123 repo = RepoModel().get_by_repo_name(repo_name)
123 repo = RepoModel().get_by_repo_name(repo_name)
124
124
125 assert repo is not None
125 assert repo is not None
126 ret = {
126 ret = {
127 'msg': 'Created new repository `%s`' % (repo_name,),
127 'msg': 'Created new repository `%s`' % (repo_name,),
128 'success': True,
128 'success': True,
129 'task': None,
129 'task': None,
130 }
130 }
131 expected = ret
131 expected = ret
132 assert_ok(id_, expected, given=response.body)
132 assert_ok(id_, expected, given=response.body)
133
133
134 id_, params = build_data(self.apikey, 'get_repo', repoid=repo_name)
134 id_, params = build_data(self.apikey, 'get_repo', repoid=repo_name)
135 response = api_call(self.app, params)
135 response = api_call(self.app, params)
136 body = json.loads(response.body)
136 body = json.loads(response.body)
137
137
138 assert body['result']['enable_downloads'] is True
138 assert body['result']['enable_downloads'] is True
139 assert body['result']['enable_locking'] is True
139 assert body['result']['enable_locking'] is True
140 assert body['result']['enable_statistics'] is True
140 assert body['result']['enable_statistics'] is True
141
141
142 fixture.destroy_repo(repo_name)
142 fixture.destroy_repo(repo_name)
143
143
144 def test_api_create_repo_in_group(self, backend):
144 def test_api_create_repo_in_group(self, backend):
145 repo_group_name = 'my_gr'
145 repo_group_name = 'my_gr'
146 # create the parent
146 # create the parent
147 fixture.create_repo_group(repo_group_name)
147 fixture.create_repo_group(repo_group_name)
148
148
149 repo_name = '%s/api-repo-gr' % (repo_group_name,)
149 repo_name = '%s/api-repo-gr' % (repo_group_name,)
150 id_, params = build_data(
150 id_, params = build_data(
151 self.apikey, 'create_repo',
151 self.apikey, 'create_repo',
152 repo_name=repo_name,
152 repo_name=repo_name,
153 owner=TEST_USER_ADMIN_LOGIN,
153 owner=TEST_USER_ADMIN_LOGIN,
154 repo_type=backend.alias,)
154 repo_type=backend.alias,)
155 response = api_call(self.app, params)
155 response = api_call(self.app, params)
156 repo = RepoModel().get_by_repo_name(repo_name)
156 repo = RepoModel().get_by_repo_name(repo_name)
157 assert repo is not None
157 assert repo is not None
158 assert repo.group is not None
158 assert repo.group is not None
159
159
160 ret = {
160 ret = {
161 'msg': 'Created new repository `%s`' % (repo_name,),
161 'msg': 'Created new repository `%s`' % (repo_name,),
162 'success': True,
162 'success': True,
163 'task': None,
163 'task': None,
164 }
164 }
165 expected = ret
165 expected = ret
166 assert_ok(id_, expected, given=response.body)
166 assert_ok(id_, expected, given=response.body)
167 fixture.destroy_repo(repo_name)
167 fixture.destroy_repo(repo_name)
168 fixture.destroy_repo_group(repo_group_name)
168 fixture.destroy_repo_group(repo_group_name)
169
169
170 def test_create_repo_in_group_that_doesnt_exist(self, backend, user_util):
170 def test_create_repo_in_group_that_doesnt_exist(self, backend, user_util):
171 repo_group_name = 'fake_group'
171 repo_group_name = 'fake_group'
172
172
173 repo_name = '%s/api-repo-gr' % (repo_group_name,)
173 repo_name = '%s/api-repo-gr' % (repo_group_name,)
174 id_, params = build_data(
174 id_, params = build_data(
175 self.apikey, 'create_repo',
175 self.apikey, 'create_repo',
176 repo_name=repo_name,
176 repo_name=repo_name,
177 owner=TEST_USER_ADMIN_LOGIN,
177 owner=TEST_USER_ADMIN_LOGIN,
178 repo_type=backend.alias,)
178 repo_type=backend.alias,)
179 response = api_call(self.app, params)
179 response = api_call(self.app, params)
180
180
181 expected = {'repo_group': 'Repository group `{}` does not exist'.format(
181 expected = {'repo_group': 'Repository group `{}` does not exist'.format(
182 repo_group_name)}
182 repo_group_name)}
183 assert_error(id_, expected, given=response.body)
183 assert_error(id_, expected, given=response.body)
184
184
185 def test_api_create_repo_unknown_owner(self, backend):
185 def test_api_create_repo_unknown_owner(self, backend):
186 repo_name = 'api-repo-2'
186 repo_name = 'api-repo-2'
187 owner = 'i-dont-exist'
187 owner = 'i-dont-exist'
188 id_, params = build_data(
188 id_, params = build_data(
189 self.apikey, 'create_repo',
189 self.apikey, 'create_repo',
190 repo_name=repo_name,
190 repo_name=repo_name,
191 owner=owner,
191 owner=owner,
192 repo_type=backend.alias)
192 repo_type=backend.alias)
193 response = api_call(self.app, params)
193 response = api_call(self.app, params)
194 expected = 'user `%s` does not exist' % (owner,)
194 expected = 'user `%s` does not exist' % (owner,)
195 assert_error(id_, expected, given=response.body)
195 assert_error(id_, expected, given=response.body)
196
196
197 def test_api_create_repo_dont_specify_owner(self, backend):
197 def test_api_create_repo_dont_specify_owner(self, backend):
198 repo_name = 'api-repo-3'
198 repo_name = 'api-repo-3'
199 id_, params = build_data(
199 id_, params = build_data(
200 self.apikey, 'create_repo',
200 self.apikey, 'create_repo',
201 repo_name=repo_name,
201 repo_name=repo_name,
202 repo_type=backend.alias)
202 repo_type=backend.alias)
203 response = api_call(self.app, params)
203 response = api_call(self.app, params)
204
204
205 repo = RepoModel().get_by_repo_name(repo_name)
205 repo = RepoModel().get_by_repo_name(repo_name)
206 assert repo is not None
206 assert repo is not None
207 ret = {
207 ret = {
208 'msg': 'Created new repository `%s`' % (repo_name,),
208 'msg': 'Created new repository `%s`' % (repo_name,),
209 'success': True,
209 'success': True,
210 'task': None,
210 'task': None,
211 }
211 }
212 expected = ret
212 expected = ret
213 assert_ok(id_, expected, given=response.body)
213 assert_ok(id_, expected, given=response.body)
214 fixture.destroy_repo(repo_name)
214 fixture.destroy_repo(repo_name)
215
215
216 def test_api_create_repo_by_non_admin(self, backend):
216 def test_api_create_repo_by_non_admin(self, backend):
217 repo_name = 'api-repo-4'
217 repo_name = 'api-repo-4'
218 id_, params = build_data(
218 id_, params = build_data(
219 self.apikey_regular, 'create_repo',
219 self.apikey_regular, 'create_repo',
220 repo_name=repo_name,
220 repo_name=repo_name,
221 repo_type=backend.alias)
221 repo_type=backend.alias)
222 response = api_call(self.app, params)
222 response = api_call(self.app, params)
223
223
224 repo = RepoModel().get_by_repo_name(repo_name)
224 repo = RepoModel().get_by_repo_name(repo_name)
225 assert repo is not None
225 assert repo is not None
226 ret = {
226 ret = {
227 'msg': 'Created new repository `%s`' % (repo_name,),
227 'msg': 'Created new repository `%s`' % (repo_name,),
228 'success': True,
228 'success': True,
229 'task': None,
229 'task': None,
230 }
230 }
231 expected = ret
231 expected = ret
232 assert_ok(id_, expected, given=response.body)
232 assert_ok(id_, expected, given=response.body)
233 fixture.destroy_repo(repo_name)
233 fixture.destroy_repo(repo_name)
234
234
235 def test_api_create_repo_by_non_admin_specify_owner(self, backend):
235 def test_api_create_repo_by_non_admin_specify_owner(self, backend):
236 repo_name = 'api-repo-5'
236 repo_name = 'api-repo-5'
237 owner = 'i-dont-exist'
237 owner = 'i-dont-exist'
238 id_, params = build_data(
238 id_, params = build_data(
239 self.apikey_regular, 'create_repo',
239 self.apikey_regular, 'create_repo',
240 repo_name=repo_name,
240 repo_name=repo_name,
241 repo_type=backend.alias,
241 repo_type=backend.alias,
242 owner=owner)
242 owner=owner)
243 response = api_call(self.app, params)
243 response = api_call(self.app, params)
244
244
245 expected = 'Only RhodeCode super-admin can specify `owner` param'
245 expected = 'Only RhodeCode super-admin can specify `owner` param'
246 assert_error(id_, expected, given=response.body)
246 assert_error(id_, expected, given=response.body)
247 fixture.destroy_repo(repo_name)
247 fixture.destroy_repo(repo_name)
248
248
249 def test_api_create_repo_by_non_admin_no_parent_group_perms(self, backend):
249 def test_api_create_repo_by_non_admin_no_parent_group_perms(self, backend):
250 repo_group_name = 'no-access'
250 repo_group_name = 'no-access'
251 fixture.create_repo_group(repo_group_name)
251 fixture.create_repo_group(repo_group_name)
252 repo_name = 'no-access/api-repo'
252 repo_name = 'no-access/api-repo'
253
253
254 id_, params = build_data(
254 id_, params = build_data(
255 self.apikey_regular, 'create_repo',
255 self.apikey_regular, 'create_repo',
256 repo_name=repo_name,
256 repo_name=repo_name,
257 repo_type=backend.alias)
257 repo_type=backend.alias)
258 response = api_call(self.app, params)
258 response = api_call(self.app, params)
259
259
260 expected = {'repo_group': 'Repository group `{}` does not exist'.format(
260 expected = {'repo_group': 'Repository group `{}` does not exist'.format(
261 repo_group_name)}
261 repo_group_name)}
262 assert_error(id_, expected, given=response.body)
262 assert_error(id_, expected, given=response.body)
263 fixture.destroy_repo_group(repo_group_name)
263 fixture.destroy_repo_group(repo_group_name)
264 fixture.destroy_repo(repo_name)
264 fixture.destroy_repo(repo_name)
265
265
266 def test_api_create_repo_non_admin_no_permission_to_create_to_root_level(
266 def test_api_create_repo_non_admin_no_permission_to_create_to_root_level(
267 self, backend, user_util):
267 self, backend, user_util):
268
268
269 regular_user = user_util.create_user()
269 regular_user = user_util.create_user()
270 regular_user_api_key = regular_user.api_key
270 regular_user_api_key = regular_user.api_key
271
271
272 usr = UserModel().get_by_username(regular_user.username)
272 usr = UserModel().get_by_username(regular_user.username)
273 usr.inherit_default_permissions = False
273 usr.inherit_default_permissions = False
274 Session().add(usr)
274 Session().add(usr)
275
275
276 repo_name = backend.new_repo_name()
276 repo_name = backend.new_repo_name()
277 id_, params = build_data(
277 id_, params = build_data(
278 regular_user_api_key, 'create_repo',
278 regular_user_api_key, 'create_repo',
279 repo_name=repo_name,
279 repo_name=repo_name,
280 repo_type=backend.alias)
280 repo_type=backend.alias)
281 response = api_call(self.app, params)
281 response = api_call(self.app, params)
282 expected = {
282 expected = {
283 "repo_name": "You do not have the permission to "
283 "repo_name": "You do not have the permission to "
284 "store repositories in the root location."}
284 "store repositories in the root location."}
285 assert_error(id_, expected, given=response.body)
285 assert_error(id_, expected, given=response.body)
286
286
287 def test_api_create_repo_exists(self, backend):
287 def test_api_create_repo_exists(self, backend):
288 repo_name = backend.repo_name
288 repo_name = backend.repo_name
289 id_, params = build_data(
289 id_, params = build_data(
290 self.apikey, 'create_repo',
290 self.apikey, 'create_repo',
291 repo_name=repo_name,
291 repo_name=repo_name,
292 owner=TEST_USER_ADMIN_LOGIN,
292 owner=TEST_USER_ADMIN_LOGIN,
293 repo_type=backend.alias,)
293 repo_type=backend.alias,)
294 response = api_call(self.app, params)
294 response = api_call(self.app, params)
295 expected = {
295 expected = {
296 'unique_repo_name': 'Repository with name `{}` already exists'.format(
296 'unique_repo_name': 'Repository with name `{}` already exists'.format(
297 repo_name)}
297 repo_name)}
298 assert_error(id_, expected, given=response.body)
298 assert_error(id_, expected, given=response.body)
299
299
300 @mock.patch.object(RepoModel, 'create', crash)
300 @mock.patch.object(RepoModel, 'create', crash)
301 def test_api_create_repo_exception_occurred(self, backend):
301 def test_api_create_repo_exception_occurred(self, backend):
302 repo_name = 'api-repo-6'
302 repo_name = 'api-repo-6'
303 id_, params = build_data(
303 id_, params = build_data(
304 self.apikey, 'create_repo',
304 self.apikey, 'create_repo',
305 repo_name=repo_name,
305 repo_name=repo_name,
306 owner=TEST_USER_ADMIN_LOGIN,
306 owner=TEST_USER_ADMIN_LOGIN,
307 repo_type=backend.alias,)
307 repo_type=backend.alias,)
308 response = api_call(self.app, params)
308 response = api_call(self.app, params)
309 expected = 'failed to create repository `%s`' % (repo_name,)
309 expected = 'failed to create repository `%s`' % (repo_name,)
310 assert_error(id_, expected, given=response.body)
310 assert_error(id_, expected, given=response.body)
311
311
312 @pytest.mark.parametrize('parent_group, dirty_name, expected_name', [
312 @pytest.mark.parametrize('parent_group, dirty_name, expected_name', [
313 (None, 'foo bar x', 'foo-bar-x'),
313 (None, 'foo bar x', 'foo-bar-x'),
314 ('foo', '/foo//bar x', 'foo/bar-x'),
314 ('foo', '/foo//bar x', 'foo/bar-x'),
315 ('foo-bar', 'foo-bar //bar x', 'foo-bar/bar-x'),
315 ('foo-bar', 'foo-bar //bar x', 'foo-bar/bar-x'),
316 ])
316 ])
317 def test_create_repo_with_extra_slashes_in_name(
317 def test_create_repo_with_extra_slashes_in_name(
318 self, backend, parent_group, dirty_name, expected_name):
318 self, backend, parent_group, dirty_name, expected_name):
319
319
320 if parent_group:
320 if parent_group:
321 gr = fixture.create_repo_group(parent_group)
321 gr = fixture.create_repo_group(parent_group)
322 assert gr.group_name == parent_group
322 assert gr.group_name == parent_group
323
323
324 id_, params = build_data(
324 id_, params = build_data(
325 self.apikey, 'create_repo',
325 self.apikey, 'create_repo',
326 repo_name=dirty_name,
326 repo_name=dirty_name,
327 repo_type=backend.alias,
327 repo_type=backend.alias,
328 owner=TEST_USER_ADMIN_LOGIN,)
328 owner=TEST_USER_ADMIN_LOGIN,)
329 response = api_call(self.app, params)
329 response = api_call(self.app, params)
330 expected ={
330 expected ={
331 "msg": "Created new repository `{}`".format(expected_name),
331 "msg": "Created new repository `{}`".format(expected_name),
332 "task": None,
332 "task": None,
333 "success": True
333 "success": True
334 }
334 }
335 assert_ok(id_, expected, response.body)
335 assert_ok(id_, expected, response.body)
336
336
337 repo = RepoModel().get_by_repo_name(expected_name)
337 repo = RepoModel().get_by_repo_name(expected_name)
338 assert repo is not None
338 assert repo is not None
339
339
340 expected = {
340 expected = {
341 'msg': 'Created new repository `%s`' % (expected_name,),
341 'msg': 'Created new repository `%s`' % (expected_name,),
342 'success': True,
342 'success': True,
343 'task': None,
343 'task': None,
344 }
344 }
345 assert_ok(id_, expected, given=response.body)
345 assert_ok(id_, expected, given=response.body)
346 fixture.destroy_repo(expected_name)
346 fixture.destroy_repo(expected_name)
347 if parent_group:
347 if parent_group:
348 fixture.destroy_repo_group(parent_group)
348 fixture.destroy_repo_group(parent_group)
@@ -1,288 +1,288 b''
1
1
2 # Copyright (C) 2010-2023 RhodeCode GmbH
2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software: you can redistribute it and/or modify
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License, version 3
5 # it under the terms of the GNU Affero General Public License, version 3
6 # (only), as published by the Free Software Foundation.
6 # (only), as published by the Free Software Foundation.
7 #
7 #
8 # This program is distributed in the hope that it will be useful,
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
11 # GNU General Public License for more details.
12 #
12 #
13 # You should have received a copy of the GNU Affero General Public License
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 #
15 #
16 # This program is dual-licensed. If you wish to learn more about the
16 # This program is dual-licensed. If you wish to learn more about the
17 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19
19
20 import mock
20 import mock
21 import pytest
21 import pytest
22
22
23 from rhodecode.model.meta import Session
23 from rhodecode.model.meta import Session
24 from rhodecode.model.repo_group import RepoGroupModel
24 from rhodecode.model.repo_group import RepoGroupModel
25 from rhodecode.model.user import UserModel
25 from rhodecode.model.user import UserModel
26 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
26 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
27 from rhodecode.api.tests.utils import (
27 from rhodecode.api.tests.utils import (
28 build_data, api_call, assert_ok, assert_error, crash)
28 build_data, api_call, assert_ok, assert_error, crash)
29 from rhodecode.tests.fixture import Fixture
29 from rhodecode.tests.fixture import Fixture
30
30
31
31
32 fixture = Fixture()
32 fixture = Fixture()
33
33
34
34
35 @pytest.mark.usefixtures("testuser_api", "app")
35 @pytest.mark.usefixtures("testuser_api", "app")
36 class TestCreateRepoGroup(object):
36 class TestCreateRepoGroup(object):
37 def test_api_create_repo_group(self):
37 def test_api_create_repo_group(self):
38 repo_group_name = 'api-repo-group'
38 repo_group_name = 'api-repo-group'
39
39
40 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
40 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
41 assert repo_group is None
41 assert repo_group is None
42
42
43 id_, params = build_data(
43 id_, params = build_data(
44 self.apikey, 'create_repo_group',
44 self.apikey, 'create_repo_group',
45 group_name=repo_group_name,
45 group_name=repo_group_name,
46 owner=TEST_USER_ADMIN_LOGIN,)
46 owner=TEST_USER_ADMIN_LOGIN,)
47 response = api_call(self.app, params)
47 response = api_call(self.app, params)
48
48
49 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
49 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
50 assert repo_group is not None
50 assert repo_group is not None
51 ret = {
51 ret = {
52 'msg': 'Created new repo group `%s`' % (repo_group_name,),
52 'msg': 'Created new repo group `%s`' % (repo_group_name,),
53 'repo_group': repo_group.get_api_data()
53 'repo_group': repo_group.get_api_data()
54 }
54 }
55 expected = ret
55 expected = ret
56 try:
56 try:
57 assert_ok(id_, expected, given=response.body)
57 assert_ok(id_, expected, given=response.body)
58 finally:
58 finally:
59 fixture.destroy_repo_group(repo_group_name)
59 fixture.destroy_repo_group(repo_group_name)
60
60
61 def test_api_create_repo_group_in_another_group(self):
61 def test_api_create_repo_group_in_another_group(self):
62 repo_group_name = 'api-repo-group'
62 repo_group_name = 'api-repo-group'
63
63
64 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
64 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
65 assert repo_group is None
65 assert repo_group is None
66 # create the parent
66 # create the parent
67 fixture.create_repo_group(repo_group_name)
67 fixture.create_repo_group(repo_group_name)
68
68
69 full_repo_group_name = repo_group_name+'/'+repo_group_name
69 full_repo_group_name = repo_group_name+'/'+repo_group_name
70 id_, params = build_data(
70 id_, params = build_data(
71 self.apikey, 'create_repo_group',
71 self.apikey, 'create_repo_group',
72 group_name=full_repo_group_name,
72 group_name=full_repo_group_name,
73 owner=TEST_USER_ADMIN_LOGIN,
73 owner=TEST_USER_ADMIN_LOGIN,
74 copy_permissions=True)
74 copy_permissions=True)
75 response = api_call(self.app, params)
75 response = api_call(self.app, params)
76
76
77 repo_group = RepoGroupModel.cls.get_by_group_name(full_repo_group_name)
77 repo_group = RepoGroupModel.cls.get_by_group_name(full_repo_group_name)
78 assert repo_group is not None
78 assert repo_group is not None
79 ret = {
79 ret = {
80 'msg': 'Created new repo group `%s`' % (full_repo_group_name,),
80 'msg': 'Created new repo group `%s`' % (full_repo_group_name,),
81 'repo_group': repo_group.get_api_data()
81 'repo_group': repo_group.get_api_data()
82 }
82 }
83 expected = ret
83 expected = ret
84 try:
84 try:
85 assert_ok(id_, expected, given=response.body)
85 assert_ok(id_, expected, given=response.body)
86 finally:
86 finally:
87 fixture.destroy_repo_group(full_repo_group_name)
87 fixture.destroy_repo_group(full_repo_group_name)
88 fixture.destroy_repo_group(repo_group_name)
88 fixture.destroy_repo_group(repo_group_name)
89
89
90 def test_api_create_repo_group_in_another_group_not_existing(self):
90 def test_api_create_repo_group_in_another_group_not_existing(self):
91 repo_group_name = 'api-repo-group-no'
91 repo_group_name = 'api-repo-group-no'
92
92
93 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
93 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
94 assert repo_group is None
94 assert repo_group is None
95
95
96 full_repo_group_name = repo_group_name+'/'+repo_group_name
96 full_repo_group_name = repo_group_name+'/'+repo_group_name
97 id_, params = build_data(
97 id_, params = build_data(
98 self.apikey, 'create_repo_group',
98 self.apikey, 'create_repo_group',
99 group_name=full_repo_group_name,
99 group_name=full_repo_group_name,
100 owner=TEST_USER_ADMIN_LOGIN,
100 owner=TEST_USER_ADMIN_LOGIN,
101 copy_permissions=True)
101 copy_permissions=True)
102 response = api_call(self.app, params)
102 response = api_call(self.app, params)
103 expected = {
103 expected = {
104 'repo_group':
104 'repo_group':
105 'Parent repository group `{}` does not exist'.format(
105 'Parent repository group `{}` does not exist'.format(
106 repo_group_name)}
106 repo_group_name)}
107 assert_error(id_, expected, given=response.body)
107 assert_error(id_, expected, given=response.body)
108
108
109 def test_api_create_repo_group_that_exists(self):
109 def test_api_create_repo_group_that_exists(self):
110 repo_group_name = 'api-repo-group'
110 repo_group_name = 'api-repo-group'
111
111
112 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
112 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
113 assert repo_group is None
113 assert repo_group is None
114
114
115 fixture.create_repo_group(repo_group_name)
115 fixture.create_repo_group(repo_group_name)
116 id_, params = build_data(
116 id_, params = build_data(
117 self.apikey, 'create_repo_group',
117 self.apikey, 'create_repo_group',
118 group_name=repo_group_name,
118 group_name=repo_group_name,
119 owner=TEST_USER_ADMIN_LOGIN,)
119 owner=TEST_USER_ADMIN_LOGIN,)
120 response = api_call(self.app, params)
120 response = api_call(self.app, params)
121 expected = {
121 expected = {
122 'unique_repo_group_name':
122 'unique_repo_group_name':
123 'Repository group with name `{}` already exists'.format(
123 'Repository group with name `{}` already exists'.format(
124 repo_group_name)}
124 repo_group_name)}
125 try:
125 try:
126 assert_error(id_, expected, given=response.body)
126 assert_error(id_, expected, given=response.body)
127 finally:
127 finally:
128 fixture.destroy_repo_group(repo_group_name)
128 fixture.destroy_repo_group(repo_group_name)
129
129
130 def test_api_create_repo_group_regular_user_wit_root_location_perms(
130 def test_api_create_repo_group_regular_user_wit_root_location_perms(
131 self, user_util):
131 self, user_util):
132 regular_user = user_util.create_user()
132 regular_user = user_util.create_user()
133 regular_user_api_key = regular_user.api_key
133 regular_user_api_key = regular_user.api_key
134
134
135 repo_group_name = 'api-repo-group-by-regular-user'
135 repo_group_name = 'api-repo-group-by-regular-user'
136
136
137 usr = UserModel().get_by_username(regular_user.username)
137 usr = UserModel().get_by_username(regular_user.username)
138 usr.inherit_default_permissions = False
138 usr.inherit_default_permissions = False
139 Session().add(usr)
139 Session().add(usr)
140
140
141 UserModel().grant_perm(
141 UserModel().grant_perm(
142 regular_user.username, 'hg.repogroup.create.true')
142 regular_user.username, 'hg.repogroup.create.true')
143 Session().commit()
143 Session().commit()
144
144
145 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
145 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
146 assert repo_group is None
146 assert repo_group is None
147
147
148 id_, params = build_data(
148 id_, params = build_data(
149 regular_user_api_key, 'create_repo_group',
149 regular_user_api_key, 'create_repo_group',
150 group_name=repo_group_name)
150 group_name=repo_group_name)
151 response = api_call(self.app, params)
151 response = api_call(self.app, params)
152
152
153 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
153 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
154 assert repo_group is not None
154 assert repo_group is not None
155 expected = {
155 expected = {
156 'msg': 'Created new repo group `%s`' % (repo_group_name,),
156 'msg': 'Created new repo group `%s`' % (repo_group_name,),
157 'repo_group': repo_group.get_api_data()
157 'repo_group': repo_group.get_api_data()
158 }
158 }
159 try:
159 try:
160 assert_ok(id_, expected, given=response.body)
160 assert_ok(id_, expected, given=response.body)
161 finally:
161 finally:
162 fixture.destroy_repo_group(repo_group_name)
162 fixture.destroy_repo_group(repo_group_name)
163
163
164 def test_api_create_repo_group_regular_user_with_admin_perms_to_parent(
164 def test_api_create_repo_group_regular_user_with_admin_perms_to_parent(
165 self, user_util):
165 self, user_util):
166
166
167 repo_group_name = 'api-repo-group-parent'
167 repo_group_name = 'api-repo-group-parent'
168
168
169 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
169 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
170 assert repo_group is None
170 assert repo_group is None
171 # create the parent
171 # create the parent
172 fixture.create_repo_group(repo_group_name)
172 fixture.create_repo_group(repo_group_name)
173
173
174 # user perms
174 # user perms
175 regular_user = user_util.create_user()
175 regular_user = user_util.create_user()
176 regular_user_api_key = regular_user.api_key
176 regular_user_api_key = regular_user.api_key
177
177
178 usr = UserModel().get_by_username(regular_user.username)
178 usr = UserModel().get_by_username(regular_user.username)
179 usr.inherit_default_permissions = False
179 usr.inherit_default_permissions = False
180 Session().add(usr)
180 Session().add(usr)
181
181
182 RepoGroupModel().grant_user_permission(
182 RepoGroupModel().grant_user_permission(
183 repo_group_name, regular_user.username, 'group.admin')
183 repo_group_name, regular_user.username, 'group.admin')
184 Session().commit()
184 Session().commit()
185
185
186 full_repo_group_name = repo_group_name + '/' + repo_group_name
186 full_repo_group_name = repo_group_name + '/' + repo_group_name
187 id_, params = build_data(
187 id_, params = build_data(
188 regular_user_api_key, 'create_repo_group',
188 regular_user_api_key, 'create_repo_group',
189 group_name=full_repo_group_name)
189 group_name=full_repo_group_name)
190 response = api_call(self.app, params)
190 response = api_call(self.app, params)
191
191
192 repo_group = RepoGroupModel.cls.get_by_group_name(full_repo_group_name)
192 repo_group = RepoGroupModel.cls.get_by_group_name(full_repo_group_name)
193 assert repo_group is not None
193 assert repo_group is not None
194 expected = {
194 expected = {
195 'msg': 'Created new repo group `{}`'.format(full_repo_group_name),
195 'msg': 'Created new repo group `{}`'.format(full_repo_group_name),
196 'repo_group': repo_group.get_api_data()
196 'repo_group': repo_group.get_api_data()
197 }
197 }
198 try:
198 try:
199 assert_ok(id_, expected, given=response.body)
199 assert_ok(id_, expected, given=response.body)
200 finally:
200 finally:
201 fixture.destroy_repo_group(full_repo_group_name)
201 fixture.destroy_repo_group(full_repo_group_name)
202 fixture.destroy_repo_group(repo_group_name)
202 fixture.destroy_repo_group(repo_group_name)
203
203
204 def test_api_create_repo_group_regular_user_no_permission_to_create_to_root_level(self):
204 def test_api_create_repo_group_regular_user_no_permission_to_create_to_root_level(self):
205 repo_group_name = 'api-repo-group'
205 repo_group_name = 'api-repo-group'
206
206
207 id_, params = build_data(
207 id_, params = build_data(
208 self.apikey_regular, 'create_repo_group',
208 self.apikey_regular, 'create_repo_group',
209 group_name=repo_group_name)
209 group_name=repo_group_name)
210 response = api_call(self.app, params)
210 response = api_call(self.app, params)
211
211
212 expected = {
212 expected = {
213 'repo_group':
213 'repo_group':
214 u'You do not have the permission to store '
214 'You do not have the permission to store '
215 u'repository groups in the root location.'}
215 'repository groups in the root location.'}
216 assert_error(id_, expected, given=response.body)
216 assert_error(id_, expected, given=response.body)
217
217
218 def test_api_create_repo_group_regular_user_no_parent_group_perms(self):
218 def test_api_create_repo_group_regular_user_no_parent_group_perms(self):
219 repo_group_name = 'api-repo-group-regular-user'
219 repo_group_name = 'api-repo-group-regular-user'
220
220
221 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
221 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
222 assert repo_group is None
222 assert repo_group is None
223 # create the parent
223 # create the parent
224 fixture.create_repo_group(repo_group_name)
224 fixture.create_repo_group(repo_group_name)
225
225
226 full_repo_group_name = repo_group_name+'/'+repo_group_name
226 full_repo_group_name = repo_group_name+'/'+repo_group_name
227
227
228 id_, params = build_data(
228 id_, params = build_data(
229 self.apikey_regular, 'create_repo_group',
229 self.apikey_regular, 'create_repo_group',
230 group_name=full_repo_group_name)
230 group_name=full_repo_group_name)
231 response = api_call(self.app, params)
231 response = api_call(self.app, params)
232
232
233 expected = {
233 expected = {
234 'repo_group':
234 'repo_group':
235 u"You do not have the permissions to store "
235 "You do not have the permissions to store "
236 u"repository groups inside repository group `{}`".format(repo_group_name)}
236 "repository groups inside repository group `{}`".format(repo_group_name)}
237 try:
237 try:
238 assert_error(id_, expected, given=response.body)
238 assert_error(id_, expected, given=response.body)
239 finally:
239 finally:
240 fixture.destroy_repo_group(repo_group_name)
240 fixture.destroy_repo_group(repo_group_name)
241
241
242 def test_api_create_repo_group_regular_user_no_permission_to_specify_owner(
242 def test_api_create_repo_group_regular_user_no_permission_to_specify_owner(
243 self):
243 self):
244 repo_group_name = 'api-repo-group'
244 repo_group_name = 'api-repo-group'
245
245
246 id_, params = build_data(
246 id_, params = build_data(
247 self.apikey_regular, 'create_repo_group',
247 self.apikey_regular, 'create_repo_group',
248 group_name=repo_group_name,
248 group_name=repo_group_name,
249 owner=TEST_USER_ADMIN_LOGIN,)
249 owner=TEST_USER_ADMIN_LOGIN,)
250 response = api_call(self.app, params)
250 response = api_call(self.app, params)
251
251
252 expected = "Only RhodeCode super-admin can specify `owner` param"
252 expected = "Only RhodeCode super-admin can specify `owner` param"
253 assert_error(id_, expected, given=response.body)
253 assert_error(id_, expected, given=response.body)
254
254
255 @mock.patch.object(RepoGroupModel, 'create', crash)
255 @mock.patch.object(RepoGroupModel, 'create', crash)
256 def test_api_create_repo_group_exception_occurred(self):
256 def test_api_create_repo_group_exception_occurred(self):
257 repo_group_name = 'api-repo-group'
257 repo_group_name = 'api-repo-group'
258
258
259 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
259 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
260 assert repo_group is None
260 assert repo_group is None
261
261
262 id_, params = build_data(
262 id_, params = build_data(
263 self.apikey, 'create_repo_group',
263 self.apikey, 'create_repo_group',
264 group_name=repo_group_name,
264 group_name=repo_group_name,
265 owner=TEST_USER_ADMIN_LOGIN,)
265 owner=TEST_USER_ADMIN_LOGIN,)
266 response = api_call(self.app, params)
266 response = api_call(self.app, params)
267 expected = 'failed to create repo group `%s`' % (repo_group_name,)
267 expected = 'failed to create repo group `%s`' % (repo_group_name,)
268 assert_error(id_, expected, given=response.body)
268 assert_error(id_, expected, given=response.body)
269
269
270 def test_create_group_with_extra_slashes_in_name(self, user_util):
270 def test_create_group_with_extra_slashes_in_name(self, user_util):
271 existing_repo_group = user_util.create_repo_group()
271 existing_repo_group = user_util.create_repo_group()
272 dirty_group_name = '//{}//group2//'.format(
272 dirty_group_name = '//{}//group2//'.format(
273 existing_repo_group.group_name)
273 existing_repo_group.group_name)
274 cleaned_group_name = '{}/group2'.format(
274 cleaned_group_name = '{}/group2'.format(
275 existing_repo_group.group_name)
275 existing_repo_group.group_name)
276
276
277 id_, params = build_data(
277 id_, params = build_data(
278 self.apikey, 'create_repo_group',
278 self.apikey, 'create_repo_group',
279 group_name=dirty_group_name,
279 group_name=dirty_group_name,
280 owner=TEST_USER_ADMIN_LOGIN,)
280 owner=TEST_USER_ADMIN_LOGIN,)
281 response = api_call(self.app, params)
281 response = api_call(self.app, params)
282 repo_group = RepoGroupModel.cls.get_by_group_name(cleaned_group_name)
282 repo_group = RepoGroupModel.cls.get_by_group_name(cleaned_group_name)
283 expected = {
283 expected = {
284 'msg': 'Created new repo group `%s`' % (cleaned_group_name,),
284 'msg': 'Created new repo group `%s`' % (cleaned_group_name,),
285 'repo_group': repo_group.get_api_data()
285 'repo_group': repo_group.get_api_data()
286 }
286 }
287 assert_ok(id_, expected, given=response.body)
287 assert_ok(id_, expected, given=response.body)
288 fixture.destroy_repo_group(cleaned_group_name)
288 fixture.destroy_repo_group(cleaned_group_name)
@@ -1,101 +1,101 b''
1
1
2 # Copyright (C) 2010-2023 RhodeCode GmbH
2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software: you can redistribute it and/or modify
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License, version 3
5 # it under the terms of the GNU Affero General Public License, version 3
6 # (only), as published by the Free Software Foundation.
6 # (only), as published by the Free Software Foundation.
7 #
7 #
8 # This program is distributed in the hope that it will be useful,
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
11 # GNU General Public License for more details.
12 #
12 #
13 # You should have received a copy of the GNU Affero General Public License
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 #
15 #
16 # This program is dual-licensed. If you wish to learn more about the
16 # This program is dual-licensed. If you wish to learn more about the
17 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19
19
20
20
21 import pytest
21 import pytest
22
22
23 from rhodecode.lib.str_utils import safe_bytes
23 from rhodecode.lib.str_utils import safe_bytes
24 from rhodecode.model.db import Gist
24 from rhodecode.model.db import Gist
25 from rhodecode.api.tests.utils import (
25 from rhodecode.api.tests.utils import (
26 build_data, api_call, assert_error, assert_ok)
26 build_data, api_call, assert_error, assert_ok)
27
27
28
28
29 @pytest.mark.usefixtures("testuser_api", "app")
29 @pytest.mark.usefixtures("testuser_api", "app")
30 class TestApiGetGist(object):
30 class TestApiGetGist(object):
31 def test_api_get_gist(self, gist_util, http_host_only_stub):
31 def test_api_get_gist(self, gist_util, http_host_only_stub):
32 gist = gist_util.create_gist()
32 gist = gist_util.create_gist()
33 gist_id = gist.gist_access_id
33 gist_id = gist.gist_access_id
34 gist_created_on = gist.created_on
34 gist_created_on = gist.created_on
35 gist_modified_at = gist.modified_at
35 gist_modified_at = gist.modified_at
36 id_, params = build_data(
36 id_, params = build_data(
37 self.apikey, 'get_gist', gistid=gist_id, )
37 self.apikey, 'get_gist', gistid=gist_id, )
38 response = api_call(self.app, params)
38 response = api_call(self.app, params)
39
39
40 expected = {
40 expected = {
41 'access_id': gist_id,
41 'access_id': gist_id,
42 'created_on': gist_created_on,
42 'created_on': gist_created_on,
43 'modified_at': gist_modified_at,
43 'modified_at': gist_modified_at,
44 'description': 'new-gist',
44 'description': 'new-gist',
45 'expires': -1.0,
45 'expires': -1.0,
46 'gist_id': int(gist_id),
46 'gist_id': int(gist_id),
47 'type': 'public',
47 'type': 'public',
48 'url': 'http://%s/_admin/gists/%s' % (http_host_only_stub, gist_id,),
48 'url': 'http://%s/_admin/gists/%s' % (http_host_only_stub, gist_id,),
49 'acl_level': Gist.ACL_LEVEL_PUBLIC,
49 'acl_level': Gist.ACL_LEVEL_PUBLIC,
50 'content': None,
50 'content': None,
51 }
51 }
52
52
53 assert_ok(id_, expected, given=response.body)
53 assert_ok(id_, expected, given=response.body)
54
54
55 def test_api_get_gist_with_content(self, gist_util, http_host_only_stub):
55 def test_api_get_gist_with_content(self, gist_util, http_host_only_stub):
56 mapping = {
56 mapping = {
57 b'filename1.txt': {'content': b'hello world'},
57 b'filename1.txt': {'content': b'hello world'},
58 safe_bytes('filename1Δ….txt'): {'content': safe_bytes('hello worldΔ™')}
58 safe_bytes('filename1Δ….txt'): {'content': safe_bytes('hello worldΔ™')}
59 }
59 }
60 gist = gist_util.create_gist(gist_mapping=mapping)
60 gist = gist_util.create_gist(gist_mapping=mapping)
61 gist_id = gist.gist_access_id
61 gist_id = gist.gist_access_id
62 gist_created_on = gist.created_on
62 gist_created_on = gist.created_on
63 gist_modified_at = gist.modified_at
63 gist_modified_at = gist.modified_at
64 id_, params = build_data(
64 id_, params = build_data(
65 self.apikey, 'get_gist', gistid=gist_id, content=True)
65 self.apikey, 'get_gist', gistid=gist_id, content=True)
66 response = api_call(self.app, params)
66 response = api_call(self.app, params)
67
67
68 expected = {
68 expected = {
69 'access_id': gist_id,
69 'access_id': gist_id,
70 'created_on': gist_created_on,
70 'created_on': gist_created_on,
71 'modified_at': gist_modified_at,
71 'modified_at': gist_modified_at,
72 'description': 'new-gist',
72 'description': 'new-gist',
73 'expires': -1.0,
73 'expires': -1.0,
74 'gist_id': int(gist_id),
74 'gist_id': int(gist_id),
75 'type': 'public',
75 'type': 'public',
76 'url': 'http://%s/_admin/gists/%s' % (http_host_only_stub, gist_id,),
76 'url': 'http://%s/_admin/gists/%s' % (http_host_only_stub, gist_id,),
77 'acl_level': Gist.ACL_LEVEL_PUBLIC,
77 'acl_level': Gist.ACL_LEVEL_PUBLIC,
78 'content': {
78 'content': {
79 u'filename1.txt': u'hello world',
79 'filename1.txt': 'hello world',
80 u'filename1Δ….txt': u'hello worldΔ™'
80 'filename1Δ….txt': 'hello worldΔ™'
81 },
81 },
82 }
82 }
83
83
84 assert_ok(id_, expected, given=response.body)
84 assert_ok(id_, expected, given=response.body)
85
85
86 def test_api_get_gist_not_existing(self):
86 def test_api_get_gist_not_existing(self):
87 id_, params = build_data(
87 id_, params = build_data(
88 self.apikey_regular, 'get_gist', gistid='12345', )
88 self.apikey_regular, 'get_gist', gistid='12345', )
89 response = api_call(self.app, params)
89 response = api_call(self.app, params)
90 expected = 'gist `%s` does not exist' % ('12345',)
90 expected = 'gist `%s` does not exist' % ('12345',)
91 assert_error(id_, expected, given=response.body)
91 assert_error(id_, expected, given=response.body)
92
92
93 def test_api_get_gist_private_gist_without_permission(self, gist_util):
93 def test_api_get_gist_private_gist_without_permission(self, gist_util):
94 gist = gist_util.create_gist()
94 gist = gist_util.create_gist()
95 gist_id = gist.gist_access_id
95 gist_id = gist.gist_access_id
96 id_, params = build_data(
96 id_, params = build_data(
97 self.apikey_regular, 'get_gist', gistid=gist_id, )
97 self.apikey_regular, 'get_gist', gistid=gist_id, )
98 response = api_call(self.app, params)
98 response = api_call(self.app, params)
99
99
100 expected = 'gist `%s` does not exist' % (gist_id,)
100 expected = 'gist `%s` does not exist' % (gist_id,)
101 assert_error(id_, expected, given=response.body)
101 assert_error(id_, expected, given=response.body)
@@ -1,497 +1,497 b''
1
1
2 # Copyright (C) 2010-2023 RhodeCode GmbH
2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software: you can redistribute it and/or modify
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License, version 3
5 # it under the terms of the GNU Affero General Public License, version 3
6 # (only), as published by the Free Software Foundation.
6 # (only), as published by the Free Software Foundation.
7 #
7 #
8 # This program is distributed in the hope that it will be useful,
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
11 # GNU General Public License for more details.
12 #
12 #
13 # You should have received a copy of the GNU Affero General Public License
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 #
15 #
16 # This program is dual-licensed. If you wish to learn more about the
16 # This program is dual-licensed. If you wish to learn more about the
17 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19
19
20 import urllib.request
20 import urllib.request
21 import urllib.parse
21 import urllib.parse
22 import urllib.error
22 import urllib.error
23
23
24 import mock
24 import mock
25 import pytest
25 import pytest
26
26
27 from rhodecode.apps._base import ADMIN_PREFIX
27 from rhodecode.apps._base import ADMIN_PREFIX
28 from rhodecode.lib import auth
28 from rhodecode.lib import auth
29 from rhodecode.lib.utils2 import safe_str
29 from rhodecode.lib.utils2 import safe_str
30 from rhodecode.lib import helpers as h
30 from rhodecode.lib import helpers as h
31 from rhodecode.model.db import (
31 from rhodecode.model.db import (
32 Repository, RepoGroup, UserRepoToPerm, User, Permission)
32 Repository, RepoGroup, UserRepoToPerm, User, Permission)
33 from rhodecode.model.meta import Session
33 from rhodecode.model.meta import Session
34 from rhodecode.model.repo import RepoModel
34 from rhodecode.model.repo import RepoModel
35 from rhodecode.model.repo_group import RepoGroupModel
35 from rhodecode.model.repo_group import RepoGroupModel
36 from rhodecode.model.user import UserModel
36 from rhodecode.model.user import UserModel
37 from rhodecode.tests import (
37 from rhodecode.tests import (
38 login_user_session, assert_session_flash, TEST_USER_ADMIN_LOGIN,
38 login_user_session, assert_session_flash, TEST_USER_ADMIN_LOGIN,
39 TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
39 TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
40 from rhodecode.tests.fixture import Fixture, error_function
40 from rhodecode.tests.fixture import Fixture, error_function
41 from rhodecode.tests.utils import repo_on_filesystem
41 from rhodecode.tests.utils import repo_on_filesystem
42 from rhodecode.tests.routes import route_path
42 from rhodecode.tests.routes import route_path
43
43
44 fixture = Fixture()
44 fixture = Fixture()
45
45
46
46
47 def _get_permission_for_user(user, repo):
47 def _get_permission_for_user(user, repo):
48 perm = UserRepoToPerm.query()\
48 perm = UserRepoToPerm.query()\
49 .filter(UserRepoToPerm.repository ==
49 .filter(UserRepoToPerm.repository ==
50 Repository.get_by_repo_name(repo))\
50 Repository.get_by_repo_name(repo))\
51 .filter(UserRepoToPerm.user == User.get_by_username(user))\
51 .filter(UserRepoToPerm.user == User.get_by_username(user))\
52 .all()
52 .all()
53 return perm
53 return perm
54
54
55
55
56 @pytest.mark.usefixtures("app")
56 @pytest.mark.usefixtures("app")
57 class TestAdminRepos(object):
57 class TestAdminRepos(object):
58
58
59 def test_repo_list(self, autologin_user, user_util, xhr_header):
59 def test_repo_list(self, autologin_user, user_util, xhr_header):
60 repo = user_util.create_repo()
60 repo = user_util.create_repo()
61 repo_name = repo.repo_name
61 repo_name = repo.repo_name
62 response = self.app.get(
62 response = self.app.get(
63 route_path('repos_data'), status=200,
63 route_path('repos_data'), status=200,
64 extra_environ=xhr_header)
64 extra_environ=xhr_header)
65
65
66 response.mustcontain(repo_name)
66 response.mustcontain(repo_name)
67
67
68 def test_create_page_restricted_to_single_backend(self, autologin_user, backend):
68 def test_create_page_restricted_to_single_backend(self, autologin_user, backend):
69 with mock.patch('rhodecode.BACKENDS', {'git': 'git'}):
69 with mock.patch('rhodecode.BACKENDS', {'git': 'git'}):
70 response = self.app.get(route_path('repo_new'), status=200)
70 response = self.app.get(route_path('repo_new'), status=200)
71 assert_response = response.assert_response()
71 assert_response = response.assert_response()
72 element = assert_response.get_element('[name=repo_type]')
72 element = assert_response.get_element('[name=repo_type]')
73 assert element.get('value') == 'git'
73 assert element.get('value') == 'git'
74
74
75 def test_create_page_non_restricted_backends(self, autologin_user, backend):
75 def test_create_page_non_restricted_backends(self, autologin_user, backend):
76 response = self.app.get(route_path('repo_new'), status=200)
76 response = self.app.get(route_path('repo_new'), status=200)
77 assert_response = response.assert_response()
77 assert_response = response.assert_response()
78 assert ['hg', 'git', 'svn'] == [x.get('value') for x in assert_response.get_elements('[name=repo_type]')]
78 assert ['hg', 'git', 'svn'] == [x.get('value') for x in assert_response.get_elements('[name=repo_type]')]
79
79
80 @pytest.mark.parametrize(
80 @pytest.mark.parametrize(
81 "suffix", ['', 'xxa'], ids=['', 'non-ascii'])
81 "suffix", ['', 'xxa'], ids=['', 'non-ascii'])
82 def test_create(self, autologin_user, backend, suffix, csrf_token):
82 def test_create(self, autologin_user, backend, suffix, csrf_token):
83 repo_name_unicode = backend.new_repo_name(suffix=suffix)
83 repo_name_unicode = backend.new_repo_name(suffix=suffix)
84 repo_name = repo_name_unicode
84 repo_name = repo_name_unicode
85
85
86 description_unicode = 'description for newly created repo' + suffix
86 description_unicode = 'description for newly created repo' + suffix
87 description = description_unicode
87 description = description_unicode
88
88
89 response = self.app.post(
89 response = self.app.post(
90 route_path('repo_create'),
90 route_path('repo_create'),
91 fixture._get_repo_create_params(
91 fixture._get_repo_create_params(
92 repo_private=False,
92 repo_private=False,
93 repo_name=repo_name,
93 repo_name=repo_name,
94 repo_type=backend.alias,
94 repo_type=backend.alias,
95 repo_description=description,
95 repo_description=description,
96 csrf_token=csrf_token),
96 csrf_token=csrf_token),
97 status=302)
97 status=302)
98
98
99 self.assert_repository_is_created_correctly(
99 self.assert_repository_is_created_correctly(
100 repo_name, description, backend)
100 repo_name, description, backend)
101
101
102 def test_create_numeric_name(self, autologin_user, backend, csrf_token):
102 def test_create_numeric_name(self, autologin_user, backend, csrf_token):
103 numeric_repo = '1234'
103 numeric_repo = '1234'
104 repo_name = numeric_repo
104 repo_name = numeric_repo
105 description = 'description for newly created repo' + numeric_repo
105 description = 'description for newly created repo' + numeric_repo
106 self.app.post(
106 self.app.post(
107 route_path('repo_create'),
107 route_path('repo_create'),
108 fixture._get_repo_create_params(
108 fixture._get_repo_create_params(
109 repo_private=False,
109 repo_private=False,
110 repo_name=repo_name,
110 repo_name=repo_name,
111 repo_type=backend.alias,
111 repo_type=backend.alias,
112 repo_description=description,
112 repo_description=description,
113 csrf_token=csrf_token))
113 csrf_token=csrf_token))
114
114
115 self.assert_repository_is_created_correctly(
115 self.assert_repository_is_created_correctly(
116 repo_name, description, backend)
116 repo_name, description, backend)
117
117
118 @pytest.mark.parametrize("suffix", ['', '_Δ…Δ‡Δ™'], ids=['', 'non-ascii'])
118 @pytest.mark.parametrize("suffix", ['', '_Δ…Δ‡Δ™'], ids=['', 'non-ascii'])
119 def test_create_in_group(
119 def test_create_in_group(
120 self, autologin_user, backend, suffix, csrf_token):
120 self, autologin_user, backend, suffix, csrf_token):
121 # create GROUP
121 # create GROUP
122 group_name = f'sometest_{backend.alias}'
122 group_name = f'sometest_{backend.alias}'
123 gr = RepoGroupModel().create(group_name=group_name,
123 gr = RepoGroupModel().create(group_name=group_name,
124 group_description='test',
124 group_description='test',
125 owner=TEST_USER_ADMIN_LOGIN)
125 owner=TEST_USER_ADMIN_LOGIN)
126 Session().commit()
126 Session().commit()
127
127
128 repo_name = f'ingroup{suffix}'
128 repo_name = f'ingroup{suffix}'
129 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
129 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
130 description = 'description for newly created repo'
130 description = 'description for newly created repo'
131
131
132 self.app.post(
132 self.app.post(
133 route_path('repo_create'),
133 route_path('repo_create'),
134 fixture._get_repo_create_params(
134 fixture._get_repo_create_params(
135 repo_private=False,
135 repo_private=False,
136 repo_name=safe_str(repo_name),
136 repo_name=safe_str(repo_name),
137 repo_type=backend.alias,
137 repo_type=backend.alias,
138 repo_description=description,
138 repo_description=description,
139 repo_group=gr.group_id,
139 repo_group=gr.group_id,
140 csrf_token=csrf_token))
140 csrf_token=csrf_token))
141
141
142 # TODO: johbo: Cleanup work to fixture
142 # TODO: johbo: Cleanup work to fixture
143 try:
143 try:
144 self.assert_repository_is_created_correctly(
144 self.assert_repository_is_created_correctly(
145 repo_name_full, description, backend)
145 repo_name_full, description, backend)
146
146
147 new_repo = RepoModel().get_by_repo_name(repo_name_full)
147 new_repo = RepoModel().get_by_repo_name(repo_name_full)
148 inherited_perms = UserRepoToPerm.query().filter(
148 inherited_perms = UserRepoToPerm.query().filter(
149 UserRepoToPerm.repository_id == new_repo.repo_id).all()
149 UserRepoToPerm.repository_id == new_repo.repo_id).all()
150 assert len(inherited_perms) == 1
150 assert len(inherited_perms) == 1
151 finally:
151 finally:
152 RepoModel().delete(repo_name_full)
152 RepoModel().delete(repo_name_full)
153 RepoGroupModel().delete(group_name)
153 RepoGroupModel().delete(group_name)
154 Session().commit()
154 Session().commit()
155
155
156 def test_create_in_group_numeric_name(
156 def test_create_in_group_numeric_name(
157 self, autologin_user, backend, csrf_token):
157 self, autologin_user, backend, csrf_token):
158 # create GROUP
158 # create GROUP
159 group_name = 'sometest_%s' % backend.alias
159 group_name = 'sometest_%s' % backend.alias
160 gr = RepoGroupModel().create(group_name=group_name,
160 gr = RepoGroupModel().create(group_name=group_name,
161 group_description='test',
161 group_description='test',
162 owner=TEST_USER_ADMIN_LOGIN)
162 owner=TEST_USER_ADMIN_LOGIN)
163 Session().commit()
163 Session().commit()
164
164
165 repo_name = '12345'
165 repo_name = '12345'
166 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
166 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
167 description = 'description for newly created repo'
167 description = 'description for newly created repo'
168 self.app.post(
168 self.app.post(
169 route_path('repo_create'),
169 route_path('repo_create'),
170 fixture._get_repo_create_params(
170 fixture._get_repo_create_params(
171 repo_private=False,
171 repo_private=False,
172 repo_name=repo_name,
172 repo_name=repo_name,
173 repo_type=backend.alias,
173 repo_type=backend.alias,
174 repo_description=description,
174 repo_description=description,
175 repo_group=gr.group_id,
175 repo_group=gr.group_id,
176 csrf_token=csrf_token))
176 csrf_token=csrf_token))
177
177
178 # TODO: johbo: Cleanup work to fixture
178 # TODO: johbo: Cleanup work to fixture
179 try:
179 try:
180 self.assert_repository_is_created_correctly(
180 self.assert_repository_is_created_correctly(
181 repo_name_full, description, backend)
181 repo_name_full, description, backend)
182
182
183 new_repo = RepoModel().get_by_repo_name(repo_name_full)
183 new_repo = RepoModel().get_by_repo_name(repo_name_full)
184 inherited_perms = UserRepoToPerm.query()\
184 inherited_perms = UserRepoToPerm.query()\
185 .filter(UserRepoToPerm.repository_id == new_repo.repo_id).all()
185 .filter(UserRepoToPerm.repository_id == new_repo.repo_id).all()
186 assert len(inherited_perms) == 1
186 assert len(inherited_perms) == 1
187 finally:
187 finally:
188 RepoModel().delete(repo_name_full)
188 RepoModel().delete(repo_name_full)
189 RepoGroupModel().delete(group_name)
189 RepoGroupModel().delete(group_name)
190 Session().commit()
190 Session().commit()
191
191
192 def test_create_in_group_without_needed_permissions(self, backend):
192 def test_create_in_group_without_needed_permissions(self, backend):
193 session = login_user_session(
193 session = login_user_session(
194 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
194 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
195 csrf_token = auth.get_csrf_token(session)
195 csrf_token = auth.get_csrf_token(session)
196 # revoke
196 # revoke
197 user_model = UserModel()
197 user_model = UserModel()
198 # disable fork and create on default user
198 # disable fork and create on default user
199 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
199 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
200 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
200 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
201 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
201 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
202 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
202 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
203
203
204 # disable on regular user
204 # disable on regular user
205 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
205 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
206 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
206 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
207 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
207 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
208 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
208 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
209 Session().commit()
209 Session().commit()
210
210
211 # create GROUP
211 # create GROUP
212 group_name = 'reg_sometest_%s' % backend.alias
212 group_name = 'reg_sometest_%s' % backend.alias
213 gr = RepoGroupModel().create(group_name=group_name,
213 gr = RepoGroupModel().create(group_name=group_name,
214 group_description='test',
214 group_description='test',
215 owner=TEST_USER_ADMIN_LOGIN)
215 owner=TEST_USER_ADMIN_LOGIN)
216 Session().commit()
216 Session().commit()
217 repo_group_id = gr.group_id
217 repo_group_id = gr.group_id
218
218
219 group_name_allowed = 'reg_sometest_allowed_%s' % backend.alias
219 group_name_allowed = 'reg_sometest_allowed_%s' % backend.alias
220 gr_allowed = RepoGroupModel().create(
220 gr_allowed = RepoGroupModel().create(
221 group_name=group_name_allowed,
221 group_name=group_name_allowed,
222 group_description='test',
222 group_description='test',
223 owner=TEST_USER_REGULAR_LOGIN)
223 owner=TEST_USER_REGULAR_LOGIN)
224 allowed_repo_group_id = gr_allowed.group_id
224 allowed_repo_group_id = gr_allowed.group_id
225 Session().commit()
225 Session().commit()
226
226
227 repo_name = 'ingroup'
227 repo_name = 'ingroup'
228 description = 'description for newly created repo'
228 description = 'description for newly created repo'
229 response = self.app.post(
229 response = self.app.post(
230 route_path('repo_create'),
230 route_path('repo_create'),
231 fixture._get_repo_create_params(
231 fixture._get_repo_create_params(
232 repo_private=False,
232 repo_private=False,
233 repo_name=repo_name,
233 repo_name=repo_name,
234 repo_type=backend.alias,
234 repo_type=backend.alias,
235 repo_description=description,
235 repo_description=description,
236 repo_group=repo_group_id,
236 repo_group=repo_group_id,
237 csrf_token=csrf_token))
237 csrf_token=csrf_token))
238
238
239 response.mustcontain('Invalid value')
239 response.mustcontain('Invalid value')
240
240
241 # user is allowed to create in this group
241 # user is allowed to create in this group
242 repo_name = 'ingroup'
242 repo_name = 'ingroup'
243 repo_name_full = RepoGroup.url_sep().join(
243 repo_name_full = RepoGroup.url_sep().join(
244 [group_name_allowed, repo_name])
244 [group_name_allowed, repo_name])
245 description = 'description for newly created repo'
245 description = 'description for newly created repo'
246 response = self.app.post(
246 response = self.app.post(
247 route_path('repo_create'),
247 route_path('repo_create'),
248 fixture._get_repo_create_params(
248 fixture._get_repo_create_params(
249 repo_private=False,
249 repo_private=False,
250 repo_name=repo_name,
250 repo_name=repo_name,
251 repo_type=backend.alias,
251 repo_type=backend.alias,
252 repo_description=description,
252 repo_description=description,
253 repo_group=allowed_repo_group_id,
253 repo_group=allowed_repo_group_id,
254 csrf_token=csrf_token))
254 csrf_token=csrf_token))
255
255
256 # TODO: johbo: Cleanup in pytest fixture
256 # TODO: johbo: Cleanup in pytest fixture
257 try:
257 try:
258 self.assert_repository_is_created_correctly(
258 self.assert_repository_is_created_correctly(
259 repo_name_full, description, backend)
259 repo_name_full, description, backend)
260
260
261 new_repo = RepoModel().get_by_repo_name(repo_name_full)
261 new_repo = RepoModel().get_by_repo_name(repo_name_full)
262 inherited_perms = UserRepoToPerm.query().filter(
262 inherited_perms = UserRepoToPerm.query().filter(
263 UserRepoToPerm.repository_id == new_repo.repo_id).all()
263 UserRepoToPerm.repository_id == new_repo.repo_id).all()
264 assert len(inherited_perms) == 1
264 assert len(inherited_perms) == 1
265
265
266 assert repo_on_filesystem(repo_name_full)
266 assert repo_on_filesystem(repo_name_full)
267 finally:
267 finally:
268 RepoModel().delete(repo_name_full)
268 RepoModel().delete(repo_name_full)
269 RepoGroupModel().delete(group_name)
269 RepoGroupModel().delete(group_name)
270 RepoGroupModel().delete(group_name_allowed)
270 RepoGroupModel().delete(group_name_allowed)
271 Session().commit()
271 Session().commit()
272
272
273 def test_create_in_group_inherit_permissions(self, autologin_user, backend,
273 def test_create_in_group_inherit_permissions(self, autologin_user, backend,
274 csrf_token):
274 csrf_token):
275 # create GROUP
275 # create GROUP
276 group_name = 'sometest_%s' % backend.alias
276 group_name = 'sometest_%s' % backend.alias
277 gr = RepoGroupModel().create(group_name=group_name,
277 gr = RepoGroupModel().create(group_name=group_name,
278 group_description='test',
278 group_description='test',
279 owner=TEST_USER_ADMIN_LOGIN)
279 owner=TEST_USER_ADMIN_LOGIN)
280 perm = Permission.get_by_key('repository.write')
280 perm = Permission.get_by_key('repository.write')
281 RepoGroupModel().grant_user_permission(
281 RepoGroupModel().grant_user_permission(
282 gr, TEST_USER_REGULAR_LOGIN, perm)
282 gr, TEST_USER_REGULAR_LOGIN, perm)
283
283
284 # add repo permissions
284 # add repo permissions
285 Session().commit()
285 Session().commit()
286 repo_group_id = gr.group_id
286 repo_group_id = gr.group_id
287 repo_name = 'ingroup_inherited_%s' % backend.alias
287 repo_name = 'ingroup_inherited_%s' % backend.alias
288 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
288 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
289 description = 'description for newly created repo'
289 description = 'description for newly created repo'
290 self.app.post(
290 self.app.post(
291 route_path('repo_create'),
291 route_path('repo_create'),
292 fixture._get_repo_create_params(
292 fixture._get_repo_create_params(
293 repo_private=False,
293 repo_private=False,
294 repo_name=repo_name,
294 repo_name=repo_name,
295 repo_type=backend.alias,
295 repo_type=backend.alias,
296 repo_description=description,
296 repo_description=description,
297 repo_group=repo_group_id,
297 repo_group=repo_group_id,
298 repo_copy_permissions=True,
298 repo_copy_permissions=True,
299 csrf_token=csrf_token))
299 csrf_token=csrf_token))
300
300
301 # TODO: johbo: Cleanup to pytest fixture
301 # TODO: johbo: Cleanup to pytest fixture
302 try:
302 try:
303 self.assert_repository_is_created_correctly(
303 self.assert_repository_is_created_correctly(
304 repo_name_full, description, backend)
304 repo_name_full, description, backend)
305 except Exception:
305 except Exception:
306 RepoGroupModel().delete(group_name)
306 RepoGroupModel().delete(group_name)
307 Session().commit()
307 Session().commit()
308 raise
308 raise
309
309
310 # check if inherited permissions are applied
310 # check if inherited permissions are applied
311 new_repo = RepoModel().get_by_repo_name(repo_name_full)
311 new_repo = RepoModel().get_by_repo_name(repo_name_full)
312 inherited_perms = UserRepoToPerm.query().filter(
312 inherited_perms = UserRepoToPerm.query().filter(
313 UserRepoToPerm.repository_id == new_repo.repo_id).all()
313 UserRepoToPerm.repository_id == new_repo.repo_id).all()
314 assert len(inherited_perms) == 2
314 assert len(inherited_perms) == 2
315
315
316 assert TEST_USER_REGULAR_LOGIN in [
316 assert TEST_USER_REGULAR_LOGIN in [
317 x.user.username for x in inherited_perms]
317 x.user.username for x in inherited_perms]
318 assert 'repository.write' in [
318 assert 'repository.write' in [
319 x.permission.permission_name for x in inherited_perms]
319 x.permission.permission_name for x in inherited_perms]
320
320
321 RepoModel().delete(repo_name_full)
321 RepoModel().delete(repo_name_full)
322 RepoGroupModel().delete(group_name)
322 RepoGroupModel().delete(group_name)
323 Session().commit()
323 Session().commit()
324
324
325 @pytest.mark.xfail_backends(
325 @pytest.mark.xfail_backends(
326 "git", "hg", reason="Missing reposerver support")
326 "git", "hg", reason="Missing reposerver support")
327 def test_create_with_clone_uri(self, autologin_user, backend, reposerver,
327 def test_create_with_clone_uri(self, autologin_user, backend, reposerver,
328 csrf_token):
328 csrf_token):
329 source_repo = backend.create_repo(number_of_commits=2)
329 source_repo = backend.create_repo(number_of_commits=2)
330 source_repo_name = source_repo.repo_name
330 source_repo_name = source_repo.repo_name
331 reposerver.serve(source_repo.scm_instance())
331 reposerver.serve(source_repo.scm_instance())
332
332
333 repo_name = backend.new_repo_name()
333 repo_name = backend.new_repo_name()
334 response = self.app.post(
334 response = self.app.post(
335 route_path('repo_create'),
335 route_path('repo_create'),
336 fixture._get_repo_create_params(
336 fixture._get_repo_create_params(
337 repo_private=False,
337 repo_private=False,
338 repo_name=repo_name,
338 repo_name=repo_name,
339 repo_type=backend.alias,
339 repo_type=backend.alias,
340 repo_description='',
340 repo_description='',
341 clone_uri=reposerver.url,
341 clone_uri=reposerver.url,
342 csrf_token=csrf_token),
342 csrf_token=csrf_token),
343 status=302)
343 status=302)
344
344
345 # Should be redirected to the creating page
345 # Should be redirected to the creating page
346 response.mustcontain('repo_creating')
346 response.mustcontain('repo_creating')
347
347
348 # Expecting that both repositories have same history
348 # Expecting that both repositories have same history
349 source_repo = RepoModel().get_by_repo_name(source_repo_name)
349 source_repo = RepoModel().get_by_repo_name(source_repo_name)
350 source_vcs = source_repo.scm_instance()
350 source_vcs = source_repo.scm_instance()
351 repo = RepoModel().get_by_repo_name(repo_name)
351 repo = RepoModel().get_by_repo_name(repo_name)
352 repo_vcs = repo.scm_instance()
352 repo_vcs = repo.scm_instance()
353 assert source_vcs[0].message == repo_vcs[0].message
353 assert source_vcs[0].message == repo_vcs[0].message
354 assert source_vcs.count() == repo_vcs.count()
354 assert source_vcs.count() == repo_vcs.count()
355 assert source_vcs.commit_ids == repo_vcs.commit_ids
355 assert source_vcs.commit_ids == repo_vcs.commit_ids
356
356
357 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
357 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
358 def test_create_remote_repo_wrong_clone_uri(self, autologin_user, backend,
358 def test_create_remote_repo_wrong_clone_uri(self, autologin_user, backend,
359 csrf_token):
359 csrf_token):
360 repo_name = backend.new_repo_name()
360 repo_name = backend.new_repo_name()
361 description = 'description for newly created repo'
361 description = 'description for newly created repo'
362 response = self.app.post(
362 response = self.app.post(
363 route_path('repo_create'),
363 route_path('repo_create'),
364 fixture._get_repo_create_params(
364 fixture._get_repo_create_params(
365 repo_private=False,
365 repo_private=False,
366 repo_name=repo_name,
366 repo_name=repo_name,
367 repo_type=backend.alias,
367 repo_type=backend.alias,
368 repo_description=description,
368 repo_description=description,
369 clone_uri='http://repo.invalid/repo',
369 clone_uri='http://repo.invalid/repo',
370 csrf_token=csrf_token))
370 csrf_token=csrf_token))
371 response.mustcontain('invalid clone url')
371 response.mustcontain('invalid clone url')
372
372
373 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
373 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
374 def test_create_remote_repo_wrong_clone_uri_hg_svn(
374 def test_create_remote_repo_wrong_clone_uri_hg_svn(
375 self, autologin_user, backend, csrf_token):
375 self, autologin_user, backend, csrf_token):
376 repo_name = backend.new_repo_name()
376 repo_name = backend.new_repo_name()
377 description = 'description for newly created repo'
377 description = 'description for newly created repo'
378 response = self.app.post(
378 response = self.app.post(
379 route_path('repo_create'),
379 route_path('repo_create'),
380 fixture._get_repo_create_params(
380 fixture._get_repo_create_params(
381 repo_private=False,
381 repo_private=False,
382 repo_name=repo_name,
382 repo_name=repo_name,
383 repo_type=backend.alias,
383 repo_type=backend.alias,
384 repo_description=description,
384 repo_description=description,
385 clone_uri='svn+http://svn.invalid/repo',
385 clone_uri='svn+http://svn.invalid/repo',
386 csrf_token=csrf_token))
386 csrf_token=csrf_token))
387 response.mustcontain('invalid clone url')
387 response.mustcontain('invalid clone url')
388
388
389 def test_create_with_git_suffix(
389 def test_create_with_git_suffix(
390 self, autologin_user, backend, csrf_token):
390 self, autologin_user, backend, csrf_token):
391 repo_name = backend.new_repo_name() + ".git"
391 repo_name = backend.new_repo_name() + ".git"
392 description = 'description for newly created repo'
392 description = 'description for newly created repo'
393 response = self.app.post(
393 response = self.app.post(
394 route_path('repo_create'),
394 route_path('repo_create'),
395 fixture._get_repo_create_params(
395 fixture._get_repo_create_params(
396 repo_private=False,
396 repo_private=False,
397 repo_name=repo_name,
397 repo_name=repo_name,
398 repo_type=backend.alias,
398 repo_type=backend.alias,
399 repo_description=description,
399 repo_description=description,
400 csrf_token=csrf_token))
400 csrf_token=csrf_token))
401 response.mustcontain('Repository name cannot end with .git')
401 response.mustcontain('Repository name cannot end with .git')
402
402
403 def test_default_user_cannot_access_private_repo_in_a_group(
403 def test_default_user_cannot_access_private_repo_in_a_group(
404 self, autologin_user, user_util, backend):
404 self, autologin_user, user_util, backend):
405
405
406 group = user_util.create_repo_group()
406 group = user_util.create_repo_group()
407
407
408 repo = backend.create_repo(
408 repo = backend.create_repo(
409 repo_private=True, repo_group=group, repo_copy_permissions=True)
409 repo_private=True, repo_group=group, repo_copy_permissions=True)
410
410
411 permissions = _get_permission_for_user(
411 permissions = _get_permission_for_user(
412 user='default', repo=repo.repo_name)
412 user='default', repo=repo.repo_name)
413 assert len(permissions) == 1
413 assert len(permissions) == 1
414 assert permissions[0].permission.permission_name == 'repository.none'
414 assert permissions[0].permission.permission_name == 'repository.none'
415 assert permissions[0].repository.private is True
415 assert permissions[0].repository.private is True
416
416
417 def test_create_on_top_level_without_permissions(self, backend):
417 def test_create_on_top_level_without_permissions(self, backend):
418 session = login_user_session(
418 session = login_user_session(
419 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
419 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
420 csrf_token = auth.get_csrf_token(session)
420 csrf_token = auth.get_csrf_token(session)
421
421
422 # revoke
422 # revoke
423 user_model = UserModel()
423 user_model = UserModel()
424 # disable fork and create on default user
424 # disable fork and create on default user
425 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
425 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
426 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
426 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
427 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
427 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
428 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
428 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
429
429
430 # disable on regular user
430 # disable on regular user
431 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
431 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
432 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
432 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
433 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
433 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
434 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
434 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
435 Session().commit()
435 Session().commit()
436
436
437 repo_name = backend.new_repo_name()
437 repo_name = backend.new_repo_name()
438 description = 'description for newly created repo'
438 description = 'description for newly created repo'
439 response = self.app.post(
439 response = self.app.post(
440 route_path('repo_create'),
440 route_path('repo_create'),
441 fixture._get_repo_create_params(
441 fixture._get_repo_create_params(
442 repo_private=False,
442 repo_private=False,
443 repo_name=repo_name,
443 repo_name=repo_name,
444 repo_type=backend.alias,
444 repo_type=backend.alias,
445 repo_description=description,
445 repo_description=description,
446 csrf_token=csrf_token))
446 csrf_token=csrf_token))
447
447
448 response.mustcontain(
448 response.mustcontain(
449 u"You do not have the permission to store repositories in "
449 "You do not have the permission to store repositories in "
450 u"the root location.")
450 "the root location.")
451
451
452 @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
452 @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
453 def test_create_repo_when_filesystem_op_fails(
453 def test_create_repo_when_filesystem_op_fails(
454 self, autologin_user, backend, csrf_token):
454 self, autologin_user, backend, csrf_token):
455 repo_name = backend.new_repo_name()
455 repo_name = backend.new_repo_name()
456 description = 'description for newly created repo'
456 description = 'description for newly created repo'
457
457
458 response = self.app.post(
458 response = self.app.post(
459 route_path('repo_create'),
459 route_path('repo_create'),
460 fixture._get_repo_create_params(
460 fixture._get_repo_create_params(
461 repo_private=False,
461 repo_private=False,
462 repo_name=repo_name,
462 repo_name=repo_name,
463 repo_type=backend.alias,
463 repo_type=backend.alias,
464 repo_description=description,
464 repo_description=description,
465 csrf_token=csrf_token))
465 csrf_token=csrf_token))
466
466
467 assert_session_flash(
467 assert_session_flash(
468 response, 'Error creating repository %s' % repo_name)
468 response, 'Error creating repository %s' % repo_name)
469 # repo must not be in db
469 # repo must not be in db
470 assert backend.repo is None
470 assert backend.repo is None
471 # repo must not be in filesystem !
471 # repo must not be in filesystem !
472 assert not repo_on_filesystem(repo_name)
472 assert not repo_on_filesystem(repo_name)
473
473
474 def assert_repository_is_created_correctly(self, repo_name, description, backend):
474 def assert_repository_is_created_correctly(self, repo_name, description, backend):
475 url_quoted_repo_name = urllib.parse.quote(repo_name)
475 url_quoted_repo_name = urllib.parse.quote(repo_name)
476
476
477 # run the check page that triggers the flash message
477 # run the check page that triggers the flash message
478 response = self.app.get(
478 response = self.app.get(
479 route_path('repo_creating_check', repo_name=repo_name))
479 route_path('repo_creating_check', repo_name=repo_name))
480 assert response.json == {'result': True}
480 assert response.json == {'result': True}
481
481
482 flash_msg = 'Created repository <a href="/{}">{}</a>'.format(url_quoted_repo_name, repo_name)
482 flash_msg = 'Created repository <a href="/{}">{}</a>'.format(url_quoted_repo_name, repo_name)
483 assert_session_flash(response, flash_msg)
483 assert_session_flash(response, flash_msg)
484
484
485 # test if the repo was created in the database
485 # test if the repo was created in the database
486 new_repo = RepoModel().get_by_repo_name(repo_name)
486 new_repo = RepoModel().get_by_repo_name(repo_name)
487
487
488 assert new_repo.repo_name == repo_name
488 assert new_repo.repo_name == repo_name
489 assert new_repo.description == description
489 assert new_repo.description == description
490
490
491 # test if the repository is visible in the list ?
491 # test if the repository is visible in the list ?
492 response = self.app.get(
492 response = self.app.get(
493 h.route_path('repo_summary', repo_name=repo_name))
493 h.route_path('repo_summary', repo_name=repo_name))
494 response.mustcontain(repo_name)
494 response.mustcontain(repo_name)
495 response.mustcontain(backend.alias)
495 response.mustcontain(backend.alias)
496
496
497 assert repo_on_filesystem(repo_name)
497 assert repo_on_filesystem(repo_name)
@@ -1,593 +1,593 b''
1 # Copyright (C) 2010-2023 RhodeCode GmbH
1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import urllib.parse
19 import urllib.parse
20
20
21 import mock
21 import mock
22 import pytest
22 import pytest
23
23
24
24
25 from rhodecode.lib.auth import check_password
25 from rhodecode.lib.auth import check_password
26 from rhodecode.lib import helpers as h
26 from rhodecode.lib import helpers as h
27 from rhodecode.model.auth_token import AuthTokenModel
27 from rhodecode.model.auth_token import AuthTokenModel
28 from rhodecode.model.db import User, Notification, UserApiKeys
28 from rhodecode.model.db import User, Notification, UserApiKeys
29 from rhodecode.model.meta import Session
29 from rhodecode.model.meta import Session
30
30
31 from rhodecode.tests import (
31 from rhodecode.tests import (
32 assert_session_flash, HG_REPO, TEST_USER_ADMIN_LOGIN,
32 assert_session_flash, HG_REPO, TEST_USER_ADMIN_LOGIN,
33 no_newline_id_generator)
33 no_newline_id_generator)
34 from rhodecode.tests.fixture import Fixture
34 from rhodecode.tests.fixture import Fixture
35 from rhodecode.tests.routes import route_path
35 from rhodecode.tests.routes import route_path
36
36
37 fixture = Fixture()
37 fixture = Fixture()
38
38
39 whitelist_view = ['RepoCommitsView:repo_commit_raw']
39 whitelist_view = ['RepoCommitsView:repo_commit_raw']
40
40
41
41
42 @pytest.mark.usefixtures('app')
42 @pytest.mark.usefixtures('app')
43 class TestLoginController(object):
43 class TestLoginController(object):
44 destroy_users = set()
44 destroy_users = set()
45
45
46 @classmethod
46 @classmethod
47 def teardown_class(cls):
47 def teardown_class(cls):
48 fixture.destroy_users(cls.destroy_users)
48 fixture.destroy_users(cls.destroy_users)
49
49
50 def teardown_method(self, method):
50 def teardown_method(self, method):
51 for n in Notification.query().all():
51 for n in Notification.query().all():
52 Session().delete(n)
52 Session().delete(n)
53
53
54 Session().commit()
54 Session().commit()
55 assert Notification.query().all() == []
55 assert Notification.query().all() == []
56
56
57 def test_index(self):
57 def test_index(self):
58 response = self.app.get(route_path('login'))
58 response = self.app.get(route_path('login'))
59 assert response.status == '200 OK'
59 assert response.status == '200 OK'
60 # Test response...
60 # Test response...
61
61
62 def test_login_admin_ok(self):
62 def test_login_admin_ok(self):
63 response = self.app.post(route_path('login'),
63 response = self.app.post(route_path('login'),
64 {'username': 'test_admin',
64 {'username': 'test_admin',
65 'password': 'test12'}, status=302)
65 'password': 'test12'}, status=302)
66 response = response.follow()
66 response = response.follow()
67 session = response.get_session_from_response()
67 session = response.get_session_from_response()
68 username = session['rhodecode_user'].get('username')
68 username = session['rhodecode_user'].get('username')
69 assert username == 'test_admin'
69 assert username == 'test_admin'
70 response.mustcontain('logout')
70 response.mustcontain('logout')
71
71
72 def test_login_regular_ok(self):
72 def test_login_regular_ok(self):
73 response = self.app.post(route_path('login'),
73 response = self.app.post(route_path('login'),
74 {'username': 'test_regular',
74 {'username': 'test_regular',
75 'password': 'test12'}, status=302)
75 'password': 'test12'}, status=302)
76
76
77 response = response.follow()
77 response = response.follow()
78 session = response.get_session_from_response()
78 session = response.get_session_from_response()
79 username = session['rhodecode_user'].get('username')
79 username = session['rhodecode_user'].get('username')
80 assert username == 'test_regular'
80 assert username == 'test_regular'
81 response.mustcontain('logout')
81 response.mustcontain('logout')
82
82
83 def test_login_with_primary_email(self):
83 def test_login_with_primary_email(self):
84 user_email = 'test_regular@mail.com'
84 user_email = 'test_regular@mail.com'
85 response = self.app.post(route_path('login'),
85 response = self.app.post(route_path('login'),
86 {'username': user_email,
86 {'username': user_email,
87 'password': 'test12'}, status=302)
87 'password': 'test12'}, status=302)
88 response = response.follow()
88 response = response.follow()
89 session = response.get_session_from_response()
89 session = response.get_session_from_response()
90 user = session['rhodecode_user']
90 user = session['rhodecode_user']
91 assert user['username'] == user_email.split('@')[0]
91 assert user['username'] == user_email.split('@')[0]
92 assert user['is_authenticated']
92 assert user['is_authenticated']
93 response.mustcontain('logout')
93 response.mustcontain('logout')
94
94
95 def test_login_regular_forbidden_when_super_admin_restriction(self):
95 def test_login_regular_forbidden_when_super_admin_restriction(self):
96 from rhodecode.authentication.plugins.auth_rhodecode import RhodeCodeAuthPlugin
96 from rhodecode.authentication.plugins.auth_rhodecode import RhodeCodeAuthPlugin
97 with fixture.auth_restriction(self.app._pyramid_registry,
97 with fixture.auth_restriction(self.app._pyramid_registry,
98 RhodeCodeAuthPlugin.AUTH_RESTRICTION_SUPER_ADMIN):
98 RhodeCodeAuthPlugin.AUTH_RESTRICTION_SUPER_ADMIN):
99 response = self.app.post(route_path('login'),
99 response = self.app.post(route_path('login'),
100 {'username': 'test_regular',
100 {'username': 'test_regular',
101 'password': 'test12'})
101 'password': 'test12'})
102
102
103 response.mustcontain('invalid user name')
103 response.mustcontain('invalid user name')
104 response.mustcontain('invalid password')
104 response.mustcontain('invalid password')
105
105
106 def test_login_regular_forbidden_when_scope_restriction(self):
106 def test_login_regular_forbidden_when_scope_restriction(self):
107 from rhodecode.authentication.plugins.auth_rhodecode import RhodeCodeAuthPlugin
107 from rhodecode.authentication.plugins.auth_rhodecode import RhodeCodeAuthPlugin
108 with fixture.scope_restriction(self.app._pyramid_registry,
108 with fixture.scope_restriction(self.app._pyramid_registry,
109 RhodeCodeAuthPlugin.AUTH_RESTRICTION_SCOPE_VCS):
109 RhodeCodeAuthPlugin.AUTH_RESTRICTION_SCOPE_VCS):
110 response = self.app.post(route_path('login'),
110 response = self.app.post(route_path('login'),
111 {'username': 'test_regular',
111 {'username': 'test_regular',
112 'password': 'test12'})
112 'password': 'test12'})
113
113
114 response.mustcontain('invalid user name')
114 response.mustcontain('invalid user name')
115 response.mustcontain('invalid password')
115 response.mustcontain('invalid password')
116
116
117 def test_login_ok_came_from(self):
117 def test_login_ok_came_from(self):
118 test_came_from = '/_admin/users?branch=stable'
118 test_came_from = '/_admin/users?branch=stable'
119 _url = '{}?came_from={}'.format(route_path('login'), test_came_from)
119 _url = '{}?came_from={}'.format(route_path('login'), test_came_from)
120 response = self.app.post(
120 response = self.app.post(
121 _url, {'username': 'test_admin', 'password': 'test12'}, status=302)
121 _url, {'username': 'test_admin', 'password': 'test12'}, status=302)
122
122
123 assert 'branch=stable' in response.location
123 assert 'branch=stable' in response.location
124 response = response.follow()
124 response = response.follow()
125
125
126 assert response.status == '200 OK'
126 assert response.status == '200 OK'
127 response.mustcontain('Users administration')
127 response.mustcontain('Users administration')
128
128
129 def test_redirect_to_login_with_get_args(self):
129 def test_redirect_to_login_with_get_args(self):
130 with fixture.anon_access(False):
130 with fixture.anon_access(False):
131 kwargs = {'branch': 'stable'}
131 kwargs = {'branch': 'stable'}
132 response = self.app.get(
132 response = self.app.get(
133 h.route_path('repo_summary', repo_name=HG_REPO, _query=kwargs),
133 h.route_path('repo_summary', repo_name=HG_REPO, _query=kwargs),
134 status=302)
134 status=302)
135
135
136 response_query = urllib.parse.parse_qsl(response.location)
136 response_query = urllib.parse.parse_qsl(response.location)
137 assert 'branch=stable' in response_query[0][1]
137 assert 'branch=stable' in response_query[0][1]
138
138
139 def test_login_form_with_get_args(self):
139 def test_login_form_with_get_args(self):
140 _url = '{}?came_from=/_admin/users,branch=stable'.format(route_path('login'))
140 _url = '{}?came_from=/_admin/users,branch=stable'.format(route_path('login'))
141 response = self.app.get(_url)
141 response = self.app.get(_url)
142 assert 'branch%3Dstable' in response.form.action
142 assert 'branch%3Dstable' in response.form.action
143
143
144 @pytest.mark.parametrize("url_came_from", [
144 @pytest.mark.parametrize("url_came_from", [
145 'data:text/html,<script>window.alert("xss")</script>',
145 'data:text/html,<script>window.alert("xss")</script>',
146 'mailto:test@rhodecode.org',
146 'mailto:test@rhodecode.org',
147 'file:///etc/passwd',
147 'file:///etc/passwd',
148 'ftp://some.ftp.server',
148 'ftp://some.ftp.server',
149 'http://other.domain',
149 'http://other.domain',
150 ], ids=no_newline_id_generator)
150 ], ids=no_newline_id_generator)
151 def test_login_bad_came_froms(self, url_came_from):
151 def test_login_bad_came_froms(self, url_came_from):
152 _url = '{}?came_from={}'.format(route_path('login'), url_came_from)
152 _url = '{}?came_from={}'.format(route_path('login'), url_came_from)
153 response = self.app.post(
153 response = self.app.post(
154 _url, {'username': 'test_admin', 'password': 'test12'}, status=302)
154 _url, {'username': 'test_admin', 'password': 'test12'}, status=302)
155 assert response.status == '302 Found'
155 assert response.status == '302 Found'
156 response = response.follow()
156 response = response.follow()
157 assert response.status == '200 OK'
157 assert response.status == '200 OK'
158 assert response.request.path == '/'
158 assert response.request.path == '/'
159
159
160 @pytest.mark.xfail(reason="newline params changed behaviour in python3")
160 @pytest.mark.xfail(reason="newline params changed behaviour in python3")
161 @pytest.mark.parametrize("url_came_from", [
161 @pytest.mark.parametrize("url_came_from", [
162 '/\r\nX-Forwarded-Host: \rhttp://example.org',
162 '/\r\nX-Forwarded-Host: \rhttp://example.org',
163 ], ids=no_newline_id_generator)
163 ], ids=no_newline_id_generator)
164 def test_login_bad_came_froms_404(self, url_came_from):
164 def test_login_bad_came_froms_404(self, url_came_from):
165 _url = '{}?came_from={}'.format(route_path('login'), url_came_from)
165 _url = '{}?came_from={}'.format(route_path('login'), url_came_from)
166 response = self.app.post(
166 response = self.app.post(
167 _url, {'username': 'test_admin', 'password': 'test12'}, status=302)
167 _url, {'username': 'test_admin', 'password': 'test12'}, status=302)
168
168
169 response = response.follow()
169 response = response.follow()
170 assert response.status == '404 Not Found'
170 assert response.status == '404 Not Found'
171
171
172 def test_login_short_password(self):
172 def test_login_short_password(self):
173 response = self.app.post(route_path('login'),
173 response = self.app.post(route_path('login'),
174 {'username': 'test_admin',
174 {'username': 'test_admin',
175 'password': 'as'})
175 'password': 'as'})
176 assert response.status == '200 OK'
176 assert response.status == '200 OK'
177
177
178 response.mustcontain('Enter 3 characters or more')
178 response.mustcontain('Enter 3 characters or more')
179
179
180 def test_login_wrong_non_ascii_password(self, user_regular):
180 def test_login_wrong_non_ascii_password(self, user_regular):
181 response = self.app.post(
181 response = self.app.post(
182 route_path('login'),
182 route_path('login'),
183 {'username': user_regular.username,
183 {'username': user_regular.username,
184 'password': 'invalid-non-asci\xe4'.encode('utf8')})
184 'password': 'invalid-non-asci\xe4'.encode('utf8')})
185
185
186 response.mustcontain('invalid user name')
186 response.mustcontain('invalid user name')
187 response.mustcontain('invalid password')
187 response.mustcontain('invalid password')
188
188
189 def test_login_with_non_ascii_password(self, user_util):
189 def test_login_with_non_ascii_password(self, user_util):
190 password = u'valid-non-ascii\xe4'
190 password = u'valid-non-ascii\xe4'
191 user = user_util.create_user(password=password)
191 user = user_util.create_user(password=password)
192 response = self.app.post(
192 response = self.app.post(
193 route_path('login'),
193 route_path('login'),
194 {'username': user.username,
194 {'username': user.username,
195 'password': password})
195 'password': password})
196 assert response.status_code == 302
196 assert response.status_code == 302
197
197
198 def test_login_wrong_username_password(self):
198 def test_login_wrong_username_password(self):
199 response = self.app.post(route_path('login'),
199 response = self.app.post(route_path('login'),
200 {'username': 'error',
200 {'username': 'error',
201 'password': 'test12'})
201 'password': 'test12'})
202
202
203 response.mustcontain('invalid user name')
203 response.mustcontain('invalid user name')
204 response.mustcontain('invalid password')
204 response.mustcontain('invalid password')
205
205
206 def test_login_admin_ok_password_migration(self, real_crypto_backend):
206 def test_login_admin_ok_password_migration(self, real_crypto_backend):
207 from rhodecode.lib import auth
207 from rhodecode.lib import auth
208
208
209 # create new user, with sha256 password
209 # create new user, with sha256 password
210 temp_user = 'test_admin_sha256'
210 temp_user = 'test_admin_sha256'
211 user = fixture.create_user(temp_user)
211 user = fixture.create_user(temp_user)
212 user.password = auth._RhodeCodeCryptoSha256().hash_create(
212 user.password = auth._RhodeCodeCryptoSha256().hash_create(
213 b'test123')
213 b'test123')
214 Session().add(user)
214 Session().add(user)
215 Session().commit()
215 Session().commit()
216 self.destroy_users.add(temp_user)
216 self.destroy_users.add(temp_user)
217 response = self.app.post(route_path('login'),
217 response = self.app.post(route_path('login'),
218 {'username': temp_user,
218 {'username': temp_user,
219 'password': 'test123'}, status=302)
219 'password': 'test123'}, status=302)
220
220
221 response = response.follow()
221 response = response.follow()
222 session = response.get_session_from_response()
222 session = response.get_session_from_response()
223 username = session['rhodecode_user'].get('username')
223 username = session['rhodecode_user'].get('username')
224 assert username == temp_user
224 assert username == temp_user
225 response.mustcontain('logout')
225 response.mustcontain('logout')
226
226
227 # new password should be bcrypted, after log-in and transfer
227 # new password should be bcrypted, after log-in and transfer
228 user = User.get_by_username(temp_user)
228 user = User.get_by_username(temp_user)
229 assert user.password.startswith('$')
229 assert user.password.startswith('$')
230
230
231 # REGISTRATIONS
231 # REGISTRATIONS
232 def test_register(self):
232 def test_register(self):
233 response = self.app.get(route_path('register'))
233 response = self.app.get(route_path('register'))
234 response.mustcontain('Create an Account')
234 response.mustcontain('Create an Account')
235
235
236 def test_register_err_same_username(self):
236 def test_register_err_same_username(self):
237 uname = 'test_admin'
237 uname = 'test_admin'
238 response = self.app.post(
238 response = self.app.post(
239 route_path('register'),
239 route_path('register'),
240 {
240 {
241 'username': uname,
241 'username': uname,
242 'password': 'test12',
242 'password': 'test12',
243 'password_confirmation': 'test12',
243 'password_confirmation': 'test12',
244 'email': 'goodmail@domain.com',
244 'email': 'goodmail@domain.com',
245 'firstname': 'test',
245 'firstname': 'test',
246 'lastname': 'test'
246 'lastname': 'test'
247 }
247 }
248 )
248 )
249
249
250 assertr = response.assert_response()
250 assertr = response.assert_response()
251 msg = 'Username "%(username)s" already exists'
251 msg = 'Username "%(username)s" already exists'
252 msg = msg % {'username': uname}
252 msg = msg % {'username': uname}
253 assertr.element_contains('#username+.error-message', msg)
253 assertr.element_contains('#username+.error-message', msg)
254
254
255 def test_register_err_same_email(self):
255 def test_register_err_same_email(self):
256 response = self.app.post(
256 response = self.app.post(
257 route_path('register'),
257 route_path('register'),
258 {
258 {
259 'username': 'test_admin_0',
259 'username': 'test_admin_0',
260 'password': 'test12',
260 'password': 'test12',
261 'password_confirmation': 'test12',
261 'password_confirmation': 'test12',
262 'email': 'test_admin@mail.com',
262 'email': 'test_admin@mail.com',
263 'firstname': 'test',
263 'firstname': 'test',
264 'lastname': 'test'
264 'lastname': 'test'
265 }
265 }
266 )
266 )
267
267
268 assertr = response.assert_response()
268 assertr = response.assert_response()
269 msg = u'This e-mail address is already taken'
269 msg = 'This e-mail address is already taken'
270 assertr.element_contains('#email+.error-message', msg)
270 assertr.element_contains('#email+.error-message', msg)
271
271
272 def test_register_err_same_email_case_sensitive(self):
272 def test_register_err_same_email_case_sensitive(self):
273 response = self.app.post(
273 response = self.app.post(
274 route_path('register'),
274 route_path('register'),
275 {
275 {
276 'username': 'test_admin_1',
276 'username': 'test_admin_1',
277 'password': 'test12',
277 'password': 'test12',
278 'password_confirmation': 'test12',
278 'password_confirmation': 'test12',
279 'email': 'TesT_Admin@mail.COM',
279 'email': 'TesT_Admin@mail.COM',
280 'firstname': 'test',
280 'firstname': 'test',
281 'lastname': 'test'
281 'lastname': 'test'
282 }
282 }
283 )
283 )
284 assertr = response.assert_response()
284 assertr = response.assert_response()
285 msg = u'This e-mail address is already taken'
285 msg = 'This e-mail address is already taken'
286 assertr.element_contains('#email+.error-message', msg)
286 assertr.element_contains('#email+.error-message', msg)
287
287
288 def test_register_err_wrong_data(self):
288 def test_register_err_wrong_data(self):
289 response = self.app.post(
289 response = self.app.post(
290 route_path('register'),
290 route_path('register'),
291 {
291 {
292 'username': 'xs',
292 'username': 'xs',
293 'password': 'test',
293 'password': 'test',
294 'password_confirmation': 'test',
294 'password_confirmation': 'test',
295 'email': 'goodmailm',
295 'email': 'goodmailm',
296 'firstname': 'test',
296 'firstname': 'test',
297 'lastname': 'test'
297 'lastname': 'test'
298 }
298 }
299 )
299 )
300 assert response.status == '200 OK'
300 assert response.status == '200 OK'
301 response.mustcontain('An email address must contain a single @')
301 response.mustcontain('An email address must contain a single @')
302 response.mustcontain('Enter a value 6 characters long or more')
302 response.mustcontain('Enter a value 6 characters long or more')
303
303
304 def test_register_err_username(self):
304 def test_register_err_username(self):
305 response = self.app.post(
305 response = self.app.post(
306 route_path('register'),
306 route_path('register'),
307 {
307 {
308 'username': 'error user',
308 'username': 'error user',
309 'password': 'test12',
309 'password': 'test12',
310 'password_confirmation': 'test12',
310 'password_confirmation': 'test12',
311 'email': 'goodmailm',
311 'email': 'goodmailm',
312 'firstname': 'test',
312 'firstname': 'test',
313 'lastname': 'test'
313 'lastname': 'test'
314 }
314 }
315 )
315 )
316
316
317 response.mustcontain('An email address must contain a single @')
317 response.mustcontain('An email address must contain a single @')
318 response.mustcontain(
318 response.mustcontain(
319 'Username may only contain '
319 'Username may only contain '
320 'alphanumeric characters underscores, '
320 'alphanumeric characters underscores, '
321 'periods or dashes and must begin with '
321 'periods or dashes and must begin with '
322 'alphanumeric character')
322 'alphanumeric character')
323
323
324 def test_register_err_case_sensitive(self):
324 def test_register_err_case_sensitive(self):
325 usr = 'Test_Admin'
325 usr = 'Test_Admin'
326 response = self.app.post(
326 response = self.app.post(
327 route_path('register'),
327 route_path('register'),
328 {
328 {
329 'username': usr,
329 'username': usr,
330 'password': 'test12',
330 'password': 'test12',
331 'password_confirmation': 'test12',
331 'password_confirmation': 'test12',
332 'email': 'goodmailm',
332 'email': 'goodmailm',
333 'firstname': 'test',
333 'firstname': 'test',
334 'lastname': 'test'
334 'lastname': 'test'
335 }
335 }
336 )
336 )
337
337
338 assertr = response.assert_response()
338 assertr = response.assert_response()
339 msg = u'Username "%(username)s" already exists'
339 msg = u'Username "%(username)s" already exists'
340 msg = msg % {'username': usr}
340 msg = msg % {'username': usr}
341 assertr.element_contains('#username+.error-message', msg)
341 assertr.element_contains('#username+.error-message', msg)
342
342
343 def test_register_special_chars(self):
343 def test_register_special_chars(self):
344 response = self.app.post(
344 response = self.app.post(
345 route_path('register'),
345 route_path('register'),
346 {
346 {
347 'username': 'xxxaxn',
347 'username': 'xxxaxn',
348 'password': 'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
348 'password': 'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
349 'password_confirmation': 'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
349 'password_confirmation': 'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
350 'email': 'goodmailm@test.plx',
350 'email': 'goodmailm@test.plx',
351 'firstname': 'test',
351 'firstname': 'test',
352 'lastname': 'test'
352 'lastname': 'test'
353 }
353 }
354 )
354 )
355
355
356 msg = u'Invalid characters (non-ascii) in password'
356 msg = u'Invalid characters (non-ascii) in password'
357 response.mustcontain(msg)
357 response.mustcontain(msg)
358
358
359 def test_register_password_mismatch(self):
359 def test_register_password_mismatch(self):
360 response = self.app.post(
360 response = self.app.post(
361 route_path('register'),
361 route_path('register'),
362 {
362 {
363 'username': 'xs',
363 'username': 'xs',
364 'password': '123qwe',
364 'password': '123qwe',
365 'password_confirmation': 'qwe123',
365 'password_confirmation': 'qwe123',
366 'email': 'goodmailm@test.plxa',
366 'email': 'goodmailm@test.plxa',
367 'firstname': 'test',
367 'firstname': 'test',
368 'lastname': 'test'
368 'lastname': 'test'
369 }
369 }
370 )
370 )
371 msg = u'Passwords do not match'
371 msg = u'Passwords do not match'
372 response.mustcontain(msg)
372 response.mustcontain(msg)
373
373
374 def test_register_ok(self):
374 def test_register_ok(self):
375 username = 'test_regular4'
375 username = 'test_regular4'
376 password = 'qweqwe'
376 password = 'qweqwe'
377 email = 'marcin@test.com'
377 email = 'marcin@test.com'
378 name = 'testname'
378 name = 'testname'
379 lastname = 'testlastname'
379 lastname = 'testlastname'
380
380
381 # this initializes a session
381 # this initializes a session
382 response = self.app.get(route_path('register'))
382 response = self.app.get(route_path('register'))
383 response.mustcontain('Create an Account')
383 response.mustcontain('Create an Account')
384
384
385
385
386 response = self.app.post(
386 response = self.app.post(
387 route_path('register'),
387 route_path('register'),
388 {
388 {
389 'username': username,
389 'username': username,
390 'password': password,
390 'password': password,
391 'password_confirmation': password,
391 'password_confirmation': password,
392 'email': email,
392 'email': email,
393 'firstname': name,
393 'firstname': name,
394 'lastname': lastname,
394 'lastname': lastname,
395 'admin': True
395 'admin': True
396 },
396 },
397 status=302
397 status=302
398 ) # This should be overridden
398 ) # This should be overridden
399
399
400 assert_session_flash(
400 assert_session_flash(
401 response, 'You have successfully registered with RhodeCode. You can log-in now.')
401 response, 'You have successfully registered with RhodeCode. You can log-in now.')
402
402
403 ret = Session().query(User).filter(
403 ret = Session().query(User).filter(
404 User.username == 'test_regular4').one()
404 User.username == 'test_regular4').one()
405 assert ret.username == username
405 assert ret.username == username
406 assert check_password(password, ret.password)
406 assert check_password(password, ret.password)
407 assert ret.email == email
407 assert ret.email == email
408 assert ret.name == name
408 assert ret.name == name
409 assert ret.lastname == lastname
409 assert ret.lastname == lastname
410 assert ret.auth_tokens is not None
410 assert ret.auth_tokens is not None
411 assert not ret.admin
411 assert not ret.admin
412
412
413 def test_forgot_password_wrong_mail(self):
413 def test_forgot_password_wrong_mail(self):
414 bad_email = 'marcin@wrongmail.org'
414 bad_email = 'marcin@wrongmail.org'
415 # this initializes a session
415 # this initializes a session
416 self.app.get(route_path('reset_password'))
416 self.app.get(route_path('reset_password'))
417
417
418 response = self.app.post(
418 response = self.app.post(
419 route_path('reset_password'), {'email': bad_email, }
419 route_path('reset_password'), {'email': bad_email, }
420 )
420 )
421 assert_session_flash(response,
421 assert_session_flash(response,
422 'If such email exists, a password reset link was sent to it.')
422 'If such email exists, a password reset link was sent to it.')
423
423
424 def test_forgot_password(self, user_util):
424 def test_forgot_password(self, user_util):
425 # this initializes a session
425 # this initializes a session
426 self.app.get(route_path('reset_password'))
426 self.app.get(route_path('reset_password'))
427
427
428 user = user_util.create_user()
428 user = user_util.create_user()
429 user_id = user.user_id
429 user_id = user.user_id
430 email = user.email
430 email = user.email
431
431
432 response = self.app.post(route_path('reset_password'), {'email': email, })
432 response = self.app.post(route_path('reset_password'), {'email': email, })
433
433
434 assert_session_flash(response,
434 assert_session_flash(response,
435 'If such email exists, a password reset link was sent to it.')
435 'If such email exists, a password reset link was sent to it.')
436
436
437 # BAD KEY
437 # BAD KEY
438 confirm_url = route_path('reset_password_confirmation', params={'key': 'badkey'})
438 confirm_url = route_path('reset_password_confirmation', params={'key': 'badkey'})
439 response = self.app.get(confirm_url, status=302)
439 response = self.app.get(confirm_url, status=302)
440 assert response.location.endswith(route_path('reset_password'))
440 assert response.location.endswith(route_path('reset_password'))
441 assert_session_flash(response, 'Given reset token is invalid')
441 assert_session_flash(response, 'Given reset token is invalid')
442
442
443 response.follow() # cleanup flash
443 response.follow() # cleanup flash
444
444
445 # GOOD KEY
445 # GOOD KEY
446 key = UserApiKeys.query()\
446 key = UserApiKeys.query()\
447 .filter(UserApiKeys.user_id == user_id)\
447 .filter(UserApiKeys.user_id == user_id)\
448 .filter(UserApiKeys.role == UserApiKeys.ROLE_PASSWORD_RESET)\
448 .filter(UserApiKeys.role == UserApiKeys.ROLE_PASSWORD_RESET)\
449 .first()
449 .first()
450
450
451 assert key
451 assert key
452
452
453 confirm_url = '{}?key={}'.format(route_path('reset_password_confirmation'), key.api_key)
453 confirm_url = '{}?key={}'.format(route_path('reset_password_confirmation'), key.api_key)
454 response = self.app.get(confirm_url)
454 response = self.app.get(confirm_url)
455 assert response.status == '302 Found'
455 assert response.status == '302 Found'
456 assert response.location.endswith(route_path('login'))
456 assert response.location.endswith(route_path('login'))
457
457
458 assert_session_flash(
458 assert_session_flash(
459 response,
459 response,
460 'Your password reset was successful, '
460 'Your password reset was successful, '
461 'a new password has been sent to your email')
461 'a new password has been sent to your email')
462
462
463 response.follow()
463 response.follow()
464
464
465 def _get_api_whitelist(self, values=None):
465 def _get_api_whitelist(self, values=None):
466 config = {'api_access_controllers_whitelist': values or []}
466 config = {'api_access_controllers_whitelist': values or []}
467 return config
467 return config
468
468
469 @pytest.mark.parametrize("test_name, auth_token", [
469 @pytest.mark.parametrize("test_name, auth_token", [
470 ('none', None),
470 ('none', None),
471 ('empty_string', ''),
471 ('empty_string', ''),
472 ('fake_number', '123456'),
472 ('fake_number', '123456'),
473 ('proper_auth_token', None)
473 ('proper_auth_token', None)
474 ])
474 ])
475 def test_access_not_whitelisted_page_via_auth_token(
475 def test_access_not_whitelisted_page_via_auth_token(
476 self, test_name, auth_token, user_admin):
476 self, test_name, auth_token, user_admin):
477
477
478 whitelist = self._get_api_whitelist([])
478 whitelist = self._get_api_whitelist([])
479 with mock.patch.dict('rhodecode.CONFIG', whitelist):
479 with mock.patch.dict('rhodecode.CONFIG', whitelist):
480 assert [] == whitelist['api_access_controllers_whitelist']
480 assert [] == whitelist['api_access_controllers_whitelist']
481 if test_name == 'proper_auth_token':
481 if test_name == 'proper_auth_token':
482 # use builtin if api_key is None
482 # use builtin if api_key is None
483 auth_token = user_admin.api_key
483 auth_token = user_admin.api_key
484
484
485 with fixture.anon_access(False):
485 with fixture.anon_access(False):
486 # webtest uses linter to check if response is bytes,
486 # webtest uses linter to check if response is bytes,
487 # and we use memoryview here as a wrapper, quick turn-off
487 # and we use memoryview here as a wrapper, quick turn-off
488 self.app.lint = False
488 self.app.lint = False
489
489
490 self.app.get(
490 self.app.get(
491 route_path('repo_commit_raw',
491 route_path('repo_commit_raw',
492 repo_name=HG_REPO, commit_id='tip',
492 repo_name=HG_REPO, commit_id='tip',
493 params=dict(api_key=auth_token)),
493 params=dict(api_key=auth_token)),
494 status=302)
494 status=302)
495
495
496 @pytest.mark.parametrize("test_name, auth_token, code", [
496 @pytest.mark.parametrize("test_name, auth_token, code", [
497 ('none', None, 302),
497 ('none', None, 302),
498 ('empty_string', '', 302),
498 ('empty_string', '', 302),
499 ('fake_number', '123456', 302),
499 ('fake_number', '123456', 302),
500 ('proper_auth_token', None, 200)
500 ('proper_auth_token', None, 200)
501 ])
501 ])
502 def test_access_whitelisted_page_via_auth_token(
502 def test_access_whitelisted_page_via_auth_token(
503 self, test_name, auth_token, code, user_admin):
503 self, test_name, auth_token, code, user_admin):
504
504
505 whitelist = self._get_api_whitelist(whitelist_view)
505 whitelist = self._get_api_whitelist(whitelist_view)
506
506
507 with mock.patch.dict('rhodecode.CONFIG', whitelist):
507 with mock.patch.dict('rhodecode.CONFIG', whitelist):
508 assert whitelist_view == whitelist['api_access_controllers_whitelist']
508 assert whitelist_view == whitelist['api_access_controllers_whitelist']
509
509
510 if test_name == 'proper_auth_token':
510 if test_name == 'proper_auth_token':
511 auth_token = user_admin.api_key
511 auth_token = user_admin.api_key
512 assert auth_token
512 assert auth_token
513
513
514 with fixture.anon_access(False):
514 with fixture.anon_access(False):
515 # webtest uses linter to check if response is bytes,
515 # webtest uses linter to check if response is bytes,
516 # and we use memoryview here as a wrapper, quick turn-off
516 # and we use memoryview here as a wrapper, quick turn-off
517 self.app.lint = False
517 self.app.lint = False
518 self.app.get(
518 self.app.get(
519 route_path('repo_commit_raw',
519 route_path('repo_commit_raw',
520 repo_name=HG_REPO, commit_id='tip',
520 repo_name=HG_REPO, commit_id='tip',
521 params=dict(api_key=auth_token)),
521 params=dict(api_key=auth_token)),
522 status=code)
522 status=code)
523
523
524 @pytest.mark.parametrize("test_name, auth_token, code", [
524 @pytest.mark.parametrize("test_name, auth_token, code", [
525 ('proper_auth_token', None, 200),
525 ('proper_auth_token', None, 200),
526 ('wrong_auth_token', '123456', 302),
526 ('wrong_auth_token', '123456', 302),
527 ])
527 ])
528 def test_access_whitelisted_page_via_auth_token_bound_to_token(
528 def test_access_whitelisted_page_via_auth_token_bound_to_token(
529 self, test_name, auth_token, code, user_admin):
529 self, test_name, auth_token, code, user_admin):
530
530
531 expected_token = auth_token
531 expected_token = auth_token
532 if test_name == 'proper_auth_token':
532 if test_name == 'proper_auth_token':
533 auth_token = user_admin.api_key
533 auth_token = user_admin.api_key
534 expected_token = auth_token
534 expected_token = auth_token
535 assert auth_token
535 assert auth_token
536
536
537 whitelist = self._get_api_whitelist([
537 whitelist = self._get_api_whitelist([
538 'RepoCommitsView:repo_commit_raw@{}'.format(expected_token)])
538 'RepoCommitsView:repo_commit_raw@{}'.format(expected_token)])
539
539
540 with mock.patch.dict('rhodecode.CONFIG', whitelist):
540 with mock.patch.dict('rhodecode.CONFIG', whitelist):
541
541
542 with fixture.anon_access(False):
542 with fixture.anon_access(False):
543 # webtest uses linter to check if response is bytes,
543 # webtest uses linter to check if response is bytes,
544 # and we use memoryview here as a wrapper, quick turn-off
544 # and we use memoryview here as a wrapper, quick turn-off
545 self.app.lint = False
545 self.app.lint = False
546
546
547 self.app.get(
547 self.app.get(
548 route_path('repo_commit_raw',
548 route_path('repo_commit_raw',
549 repo_name=HG_REPO, commit_id='tip',
549 repo_name=HG_REPO, commit_id='tip',
550 params=dict(api_key=auth_token)),
550 params=dict(api_key=auth_token)),
551 status=code)
551 status=code)
552
552
553 def test_access_page_via_extra_auth_token(self):
553 def test_access_page_via_extra_auth_token(self):
554 whitelist = self._get_api_whitelist(whitelist_view)
554 whitelist = self._get_api_whitelist(whitelist_view)
555 with mock.patch.dict('rhodecode.CONFIG', whitelist):
555 with mock.patch.dict('rhodecode.CONFIG', whitelist):
556 assert whitelist_view == \
556 assert whitelist_view == \
557 whitelist['api_access_controllers_whitelist']
557 whitelist['api_access_controllers_whitelist']
558
558
559 new_auth_token = AuthTokenModel().create(
559 new_auth_token = AuthTokenModel().create(
560 TEST_USER_ADMIN_LOGIN, 'test')
560 TEST_USER_ADMIN_LOGIN, 'test')
561 Session().commit()
561 Session().commit()
562 with fixture.anon_access(False):
562 with fixture.anon_access(False):
563 # webtest uses linter to check if response is bytes,
563 # webtest uses linter to check if response is bytes,
564 # and we use memoryview here as a wrapper, quick turn-off
564 # and we use memoryview here as a wrapper, quick turn-off
565 self.app.lint = False
565 self.app.lint = False
566 self.app.get(
566 self.app.get(
567 route_path('repo_commit_raw',
567 route_path('repo_commit_raw',
568 repo_name=HG_REPO, commit_id='tip',
568 repo_name=HG_REPO, commit_id='tip',
569 params=dict(api_key=new_auth_token.api_key)),
569 params=dict(api_key=new_auth_token.api_key)),
570 status=200)
570 status=200)
571
571
572 def test_access_page_via_expired_auth_token(self):
572 def test_access_page_via_expired_auth_token(self):
573 whitelist = self._get_api_whitelist(whitelist_view)
573 whitelist = self._get_api_whitelist(whitelist_view)
574 with mock.patch.dict('rhodecode.CONFIG', whitelist):
574 with mock.patch.dict('rhodecode.CONFIG', whitelist):
575 assert whitelist_view == \
575 assert whitelist_view == \
576 whitelist['api_access_controllers_whitelist']
576 whitelist['api_access_controllers_whitelist']
577
577
578 new_auth_token = AuthTokenModel().create(
578 new_auth_token = AuthTokenModel().create(
579 TEST_USER_ADMIN_LOGIN, 'test')
579 TEST_USER_ADMIN_LOGIN, 'test')
580 Session().commit()
580 Session().commit()
581 # patch the api key and make it expired
581 # patch the api key and make it expired
582 new_auth_token.expires = 0
582 new_auth_token.expires = 0
583 Session().add(new_auth_token)
583 Session().add(new_auth_token)
584 Session().commit()
584 Session().commit()
585 with fixture.anon_access(False):
585 with fixture.anon_access(False):
586 # webtest uses linter to check if response is bytes,
586 # webtest uses linter to check if response is bytes,
587 # and we use memoryview here as a wrapper, quick turn-off
587 # and we use memoryview here as a wrapper, quick turn-off
588 self.app.lint = False
588 self.app.lint = False
589 self.app.get(
589 self.app.get(
590 route_path('repo_commit_raw',
590 route_path('repo_commit_raw',
591 repo_name=HG_REPO, commit_id='tip',
591 repo_name=HG_REPO, commit_id='tip',
592 params=dict(api_key=new_auth_token.api_key)),
592 params=dict(api_key=new_auth_token.api_key)),
593 status=302)
593 status=302)
@@ -1,108 +1,108 b''
1
1
2 # Copyright (C) 2010-2023 RhodeCode GmbH
2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software: you can redistribute it and/or modify
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License, version 3
5 # it under the terms of the GNU Affero General Public License, version 3
6 # (only), as published by the Free Software Foundation.
6 # (only), as published by the Free Software Foundation.
7 #
7 #
8 # This program is distributed in the hope that it will be useful,
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
11 # GNU General Public License for more details.
12 #
12 #
13 # You should have received a copy of the GNU Affero General Public License
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 #
15 #
16 # This program is dual-licensed. If you wish to learn more about the
16 # This program is dual-licensed. If you wish to learn more about the
17 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19
19
20 """
20 """
21 External module for testing plugins
21 External module for testing plugins
22
22
23 rhodecode.tests.auth_external_test
23 rhodecode.tests.auth_external_test
24
24
25 """
25 """
26 import logging
26 import logging
27 import traceback
27 import traceback
28
28
29 from rhodecode.authentication.base import (
29 from rhodecode.authentication.base import (
30 RhodeCodeExternalAuthPlugin, hybrid_property)
30 RhodeCodeExternalAuthPlugin, hybrid_property)
31 from rhodecode.model.db import User
31 from rhodecode.model.db import User
32 from rhodecode.lib.ext_json import formatted_json
32 from rhodecode.lib.ext_json import formatted_json
33
33
34 log = logging.getLogger(__name__)
34 log = logging.getLogger(__name__)
35
35
36
36
37 class RhodeCodeAuthPlugin(RhodeCodeExternalAuthPlugin):
37 class RhodeCodeAuthPlugin(RhodeCodeExternalAuthPlugin):
38 def __init__(self):
38 def __init__(self):
39 self._logger = logging.getLogger(__name__)
39 self._logger = logging.getLogger(__name__)
40
40
41 @hybrid_property
41 @hybrid_property
42 def allows_creating_users(self):
42 def allows_creating_users(self):
43 return True
43 return True
44
44
45 @hybrid_property
45 @hybrid_property
46 def name(self):
46 def name(self):
47 return u"external_test"
47 return "external_test"
48
48
49 def settings(self):
49 def settings(self):
50 settings = [
50 settings = [
51 ]
51 ]
52 return settings
52 return settings
53
53
54 def use_fake_password(self):
54 def use_fake_password(self):
55 return True
55 return True
56
56
57 def user_activation_state(self):
57 def user_activation_state(self):
58 def_user_perms = User.get_default_user().AuthUser().permissions['global']
58 def_user_perms = User.get_default_user().AuthUser().permissions['global']
59 return 'hg.extern_activate.auto' in def_user_perms
59 return 'hg.extern_activate.auto' in def_user_perms
60
60
61 def auth(self, userobj, username, password, settings, **kwargs):
61 def auth(self, userobj, username, password, settings, **kwargs):
62 """
62 """
63 Given a user object (which may be null), username, a plaintext password,
63 Given a user object (which may be null), username, a plaintext password,
64 and a settings object (containing all the keys needed as listed in settings()),
64 and a settings object (containing all the keys needed as listed in settings()),
65 authenticate this user's login attempt.
65 authenticate this user's login attempt.
66
66
67 Return None on failure. On success, return a dictionary of the form:
67 Return None on failure. On success, return a dictionary of the form:
68
68
69 see: RhodeCodeAuthPluginBase.auth_func_attrs
69 see: RhodeCodeAuthPluginBase.auth_func_attrs
70 This is later validated for correctness
70 This is later validated for correctness
71 """
71 """
72
72
73 if not username or not password:
73 if not username or not password:
74 log.debug('Empty username or password skipping...')
74 log.debug('Empty username or password skipping...')
75 return None
75 return None
76
76
77 try:
77 try:
78 user_dn = username
78 user_dn = username
79
79
80 # # old attrs fetched from RhodeCode database
80 # # old attrs fetched from RhodeCode database
81 admin = getattr(userobj, 'admin', False)
81 admin = getattr(userobj, 'admin', False)
82 active = getattr(userobj, 'active', True)
82 active = getattr(userobj, 'active', True)
83 email = getattr(userobj, 'email', '')
83 email = getattr(userobj, 'email', '')
84 firstname = getattr(userobj, 'firstname', '')
84 firstname = getattr(userobj, 'firstname', '')
85 lastname = getattr(userobj, 'lastname', '')
85 lastname = getattr(userobj, 'lastname', '')
86 extern_type = getattr(userobj, 'extern_type', '')
86 extern_type = getattr(userobj, 'extern_type', '')
87 #
87 #
88 user_attrs = {
88 user_attrs = {
89 'username': username,
89 'username': username,
90 'firstname': firstname,
90 'firstname': firstname,
91 'lastname': lastname,
91 'lastname': lastname,
92 'groups': [],
92 'groups': [],
93 'email': '%s@rhodecode.com' % username,
93 'email': '%s@rhodecode.com' % username,
94 'admin': admin,
94 'admin': admin,
95 'active': active,
95 'active': active,
96 "active_from_extern": None,
96 "active_from_extern": None,
97 'extern_name': user_dn,
97 'extern_name': user_dn,
98 'extern_type': extern_type,
98 'extern_type': extern_type,
99 }
99 }
100
100
101 log.debug('EXTERNAL user: \n%s', formatted_json(user_attrs))
101 log.debug('EXTERNAL user: \n%s', formatted_json(user_attrs))
102 log.info('user `%s` authenticated correctly', user_attrs['username'])
102 log.info('user `%s` authenticated correctly', user_attrs['username'])
103
103
104 return user_attrs
104 return user_attrs
105
105
106 except (Exception,):
106 except (Exception,):
107 log.error(traceback.format_exc())
107 log.error(traceback.format_exc())
108 return None
108 return None
@@ -1,188 +1,188 b''
1
1
2 # Copyright (C) 2010-2023 RhodeCode GmbH
2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software: you can redistribute it and/or modify
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License, version 3
5 # it under the terms of the GNU Affero General Public License, version 3
6 # (only), as published by the Free Software Foundation.
6 # (only), as published by the Free Software Foundation.
7 #
7 #
8 # This program is distributed in the hope that it will be useful,
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
11 # GNU General Public License for more details.
12 #
12 #
13 # You should have received a copy of the GNU Affero General Public License
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 #
15 #
16 # This program is dual-licensed. If you wish to learn more about the
16 # This program is dual-licensed. If you wish to learn more about the
17 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19
19
20 import mock
20 import mock
21 import pytest
21 import pytest
22
22
23 from rhodecode.lib.auth import _RhodeCodeCryptoBCrypt
23 from rhodecode.lib.auth import _RhodeCodeCryptoBCrypt
24 from rhodecode.authentication.base import RhodeCodeAuthPluginBase
24 from rhodecode.authentication.base import RhodeCodeAuthPluginBase
25 from rhodecode.authentication.plugins.auth_ldap import RhodeCodeAuthPlugin
25 from rhodecode.authentication.plugins.auth_ldap import RhodeCodeAuthPlugin
26 from rhodecode.model import db
26 from rhodecode.model import db
27
27
28
28
29 class RcTestAuthPlugin(RhodeCodeAuthPluginBase):
29 class RcTestAuthPlugin(RhodeCodeAuthPluginBase):
30
30
31 def name(self):
31 def name(self):
32 return u'stub_auth'
32 return 'stub_auth'
33
33
34
34
35 def test_authenticate_returns_from_auth(stub_auth_data):
35 def test_authenticate_returns_from_auth(stub_auth_data):
36 plugin = RcTestAuthPlugin('stub_id')
36 plugin = RcTestAuthPlugin('stub_id')
37 with mock.patch.object(plugin, 'auth') as auth_mock:
37 with mock.patch.object(plugin, 'auth') as auth_mock:
38 auth_mock.return_value = stub_auth_data
38 auth_mock.return_value = stub_auth_data
39 result = plugin._authenticate(mock.Mock(), 'test', 'password', {})
39 result = plugin._authenticate(mock.Mock(), 'test', 'password', {})
40 assert stub_auth_data == result
40 assert stub_auth_data == result
41
41
42
42
43 def test_authenticate_returns_empty_auth_data():
43 def test_authenticate_returns_empty_auth_data():
44 auth_data = {}
44 auth_data = {}
45 plugin = RcTestAuthPlugin('stub_id')
45 plugin = RcTestAuthPlugin('stub_id')
46 with mock.patch.object(plugin, 'auth') as auth_mock:
46 with mock.patch.object(plugin, 'auth') as auth_mock:
47 auth_mock.return_value = auth_data
47 auth_mock.return_value = auth_data
48 result = plugin._authenticate(mock.Mock(), 'test', 'password', {})
48 result = plugin._authenticate(mock.Mock(), 'test', 'password', {})
49 assert auth_data == result
49 assert auth_data == result
50
50
51
51
52 def test_authenticate_skips_hash_migration_if_mismatch(stub_auth_data):
52 def test_authenticate_skips_hash_migration_if_mismatch(stub_auth_data):
53 stub_auth_data['_hash_migrate'] = 'new-hash'
53 stub_auth_data['_hash_migrate'] = 'new-hash'
54 plugin = RcTestAuthPlugin('stub_id')
54 plugin = RcTestAuthPlugin('stub_id')
55 with mock.patch.object(plugin, 'auth') as auth_mock:
55 with mock.patch.object(plugin, 'auth') as auth_mock:
56 auth_mock.return_value = stub_auth_data
56 auth_mock.return_value = stub_auth_data
57 result = plugin._authenticate(mock.Mock(), 'test', 'password', {})
57 result = plugin._authenticate(mock.Mock(), 'test', 'password', {})
58
58
59 user = db.User.get_by_username(stub_auth_data['username'])
59 user = db.User.get_by_username(stub_auth_data['username'])
60 assert user.password != 'new-hash'
60 assert user.password != 'new-hash'
61 assert result == stub_auth_data
61 assert result == stub_auth_data
62
62
63
63
64 def test_authenticate_migrates_to_new_hash(stub_auth_data):
64 def test_authenticate_migrates_to_new_hash(stub_auth_data):
65 new_password = b'new-password'
65 new_password = b'new-password'
66 new_hash = _RhodeCodeCryptoBCrypt().hash_create(new_password)
66 new_hash = _RhodeCodeCryptoBCrypt().hash_create(new_password)
67 stub_auth_data['_hash_migrate'] = new_hash
67 stub_auth_data['_hash_migrate'] = new_hash
68 plugin = RcTestAuthPlugin('stub_id')
68 plugin = RcTestAuthPlugin('stub_id')
69 with mock.patch.object(plugin, 'auth') as auth_mock:
69 with mock.patch.object(plugin, 'auth') as auth_mock:
70 auth_mock.return_value = stub_auth_data
70 auth_mock.return_value = stub_auth_data
71 result = plugin._authenticate(
71 result = plugin._authenticate(
72 mock.Mock(), stub_auth_data['username'], new_password, {})
72 mock.Mock(), stub_auth_data['username'], new_password, {})
73
73
74 user = db.User.get_by_username(stub_auth_data['username'])
74 user = db.User.get_by_username(stub_auth_data['username'])
75 assert user.password == new_hash
75 assert user.password == new_hash
76 assert result == stub_auth_data
76 assert result == stub_auth_data
77
77
78
78
79 @pytest.fixture()
79 @pytest.fixture()
80 def stub_auth_data(user_util):
80 def stub_auth_data(user_util):
81 user = user_util.create_user()
81 user = user_util.create_user()
82 data = {
82 data = {
83 'username': user.username,
83 'username': user.username,
84 'password': 'password',
84 'password': 'password',
85 'email': 'test@example.org',
85 'email': 'test@example.org',
86 'firstname': 'John',
86 'firstname': 'John',
87 'lastname': 'Smith',
87 'lastname': 'Smith',
88 'groups': [],
88 'groups': [],
89 'active': True,
89 'active': True,
90 'admin': False,
90 'admin': False,
91 'extern_name': 'test',
91 'extern_name': 'test',
92 'extern_type': 'ldap',
92 'extern_type': 'ldap',
93 'active_from_extern': True
93 'active_from_extern': True
94 }
94 }
95 return data
95 return data
96
96
97
97
98 class TestRhodeCodeAuthPlugin(object):
98 class TestRhodeCodeAuthPlugin(object):
99 def setup_method(self, method):
99 def setup_method(self, method):
100 self.finalizers = []
100 self.finalizers = []
101 self.user = mock.Mock()
101 self.user = mock.Mock()
102 self.user.username = 'test'
102 self.user.username = 'test'
103 self.user.password = 'old-password'
103 self.user.password = 'old-password'
104 self.fake_auth = {
104 self.fake_auth = {
105 'username': 'test',
105 'username': 'test',
106 'password': 'test',
106 'password': 'test',
107 'email': 'test@example.org',
107 'email': 'test@example.org',
108 'firstname': 'John',
108 'firstname': 'John',
109 'lastname': 'Smith',
109 'lastname': 'Smith',
110 'groups': [],
110 'groups': [],
111 'active': True,
111 'active': True,
112 'admin': False,
112 'admin': False,
113 'extern_name': 'test',
113 'extern_name': 'test',
114 'extern_type': 'ldap',
114 'extern_type': 'ldap',
115 'active_from_extern': True
115 'active_from_extern': True
116 }
116 }
117
117
118 def teardown_method(self, method):
118 def teardown_method(self, method):
119 if self.finalizers:
119 if self.finalizers:
120 for finalizer in self.finalizers:
120 for finalizer in self.finalizers:
121 finalizer()
121 finalizer()
122 self.finalizers = []
122 self.finalizers = []
123
123
124 def test_fake_password_is_created_for_the_new_user(self):
124 def test_fake_password_is_created_for_the_new_user(self):
125 self._patch()
125 self._patch()
126 auth_plugin = RhodeCodeAuthPlugin('stub_id')
126 auth_plugin = RhodeCodeAuthPlugin('stub_id')
127 auth_plugin._authenticate(self.user, 'test', 'test', [])
127 auth_plugin._authenticate(self.user, 'test', 'test', [])
128 self.password_generator_mock.assert_called_once_with(length=16)
128 self.password_generator_mock.assert_called_once_with(length=16)
129 create_user_kwargs = self.create_user_mock.call_args[1]
129 create_user_kwargs = self.create_user_mock.call_args[1]
130 assert create_user_kwargs['password'] == 'new-password'
130 assert create_user_kwargs['password'] == 'new-password'
131
131
132 def test_fake_password_is_not_created_for_the_existing_user(self):
132 def test_fake_password_is_not_created_for_the_existing_user(self):
133 self._patch()
133 self._patch()
134 self.get_user_mock.return_value = self.user
134 self.get_user_mock.return_value = self.user
135 auth_plugin = RhodeCodeAuthPlugin('stub_id')
135 auth_plugin = RhodeCodeAuthPlugin('stub_id')
136 auth_plugin._authenticate(self.user, 'test', 'test', [])
136 auth_plugin._authenticate(self.user, 'test', 'test', [])
137 assert self.password_generator_mock.called is False
137 assert self.password_generator_mock.called is False
138 create_user_kwargs = self.create_user_mock.call_args[1]
138 create_user_kwargs = self.create_user_mock.call_args[1]
139 assert create_user_kwargs['password'] == self.user.password
139 assert create_user_kwargs['password'] == self.user.password
140
140
141 def _patch(self):
141 def _patch(self):
142 get_user_patch = mock.patch('rhodecode.model.db.User.get_by_username')
142 get_user_patch = mock.patch('rhodecode.model.db.User.get_by_username')
143 self.get_user_mock = get_user_patch.start()
143 self.get_user_mock = get_user_patch.start()
144 self.get_user_mock.return_value = None
144 self.get_user_mock.return_value = None
145 self.finalizers.append(get_user_patch.stop)
145 self.finalizers.append(get_user_patch.stop)
146
146
147 create_user_patch = mock.patch(
147 create_user_patch = mock.patch(
148 'rhodecode.model.user.UserModel.create_or_update')
148 'rhodecode.model.user.UserModel.create_or_update')
149 self.create_user_mock = create_user_patch.start()
149 self.create_user_mock = create_user_patch.start()
150 self.create_user_mock.return_value = None
150 self.create_user_mock.return_value = None
151 self.finalizers.append(create_user_patch.stop)
151 self.finalizers.append(create_user_patch.stop)
152
152
153 auth_patch = mock.patch.object(RhodeCodeAuthPlugin, 'auth')
153 auth_patch = mock.patch.object(RhodeCodeAuthPlugin, 'auth')
154 self.auth_mock = auth_patch.start()
154 self.auth_mock = auth_patch.start()
155 self.auth_mock.return_value = self.fake_auth
155 self.auth_mock.return_value = self.fake_auth
156 self.finalizers.append(auth_patch.stop)
156 self.finalizers.append(auth_patch.stop)
157
157
158 password_generator_patch = mock.patch(
158 password_generator_patch = mock.patch(
159 'rhodecode.lib.auth.PasswordGenerator.gen_password')
159 'rhodecode.lib.auth.PasswordGenerator.gen_password')
160 self.password_generator_mock = password_generator_patch.start()
160 self.password_generator_mock = password_generator_patch.start()
161 self.password_generator_mock.return_value = 'new-password'
161 self.password_generator_mock.return_value = 'new-password'
162 self.finalizers.append(password_generator_patch.stop)
162 self.finalizers.append(password_generator_patch.stop)
163
163
164
164
165 def test_missing_ldap():
165 def test_missing_ldap():
166 from rhodecode.model.validators import Missing
166 from rhodecode.model.validators import Missing
167
167
168 try:
168 try:
169 import ldap_not_existing
169 import ldap_not_existing
170 except ImportError:
170 except ImportError:
171 # means that python-ldap is not installed
171 # means that python-ldap is not installed
172 ldap_not_existing = Missing
172 ldap_not_existing = Missing
173
173
174 # missing is singleton
174 # missing is singleton
175 assert ldap_not_existing == Missing
175 assert ldap_not_existing == Missing
176
176
177
177
178 def test_import_ldap():
178 def test_import_ldap():
179 from rhodecode.model.validators import Missing
179 from rhodecode.model.validators import Missing
180
180
181 try:
181 try:
182 import ldap
182 import ldap
183 except ImportError:
183 except ImportError:
184 # means that python-ldap is not installed
184 # means that python-ldap is not installed
185 ldap = Missing
185 ldap = Missing
186
186
187 # missing is singleton
187 # missing is singleton
188 assert False is (ldap == Missing)
188 assert False is (ldap == Missing)
@@ -1,1307 +1,1307 b''
1
1
2 # Copyright (C) 2010-2023 RhodeCode GmbH
2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software: you can redistribute it and/or modify
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License, version 3
5 # it under the terms of the GNU Affero General Public License, version 3
6 # (only), as published by the Free Software Foundation.
6 # (only), as published by the Free Software Foundation.
7 #
7 #
8 # This program is distributed in the hope that it will be useful,
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
11 # GNU General Public License for more details.
12 #
12 #
13 # You should have received a copy of the GNU Affero General Public License
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 #
15 #
16 # This program is dual-licensed. If you wish to learn more about the
16 # This program is dual-licensed. If you wish to learn more about the
17 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19
19
20 import datetime
20 import datetime
21 import mock
21 import mock
22 import os
22 import os
23 import sys
23 import sys
24 import shutil
24 import shutil
25
25
26 import pytest
26 import pytest
27
27
28 from rhodecode.lib.utils import make_db_config
28 from rhodecode.lib.utils import make_db_config
29 from rhodecode.lib.vcs.backends.base import Reference
29 from rhodecode.lib.vcs.backends.base import Reference
30 from rhodecode.lib.vcs.backends.git import (
30 from rhodecode.lib.vcs.backends.git import (
31 GitRepository, GitCommit, discover_git_version)
31 GitRepository, GitCommit, discover_git_version)
32 from rhodecode.lib.vcs.exceptions import (
32 from rhodecode.lib.vcs.exceptions import (
33 RepositoryError, VCSError, NodeDoesNotExistError)
33 RepositoryError, VCSError, NodeDoesNotExistError)
34 from rhodecode.lib.vcs.nodes import (
34 from rhodecode.lib.vcs.nodes import (
35 NodeKind, FileNode, DirNode, NodeState, SubModuleNode)
35 NodeKind, FileNode, DirNode, NodeState, SubModuleNode)
36 from rhodecode.tests import TEST_GIT_REPO, TEST_GIT_REPO_CLONE, get_new_dir
36 from rhodecode.tests import TEST_GIT_REPO, TEST_GIT_REPO_CLONE, get_new_dir
37 from rhodecode.tests.vcs.conftest import BackendTestMixin
37 from rhodecode.tests.vcs.conftest import BackendTestMixin
38
38
39
39
40 pytestmark = pytest.mark.backends("git")
40 pytestmark = pytest.mark.backends("git")
41
41
42
42
43 DIFF_FROM_REMOTE = br"""diff --git a/foobar b/foobar
43 DIFF_FROM_REMOTE = br"""diff --git a/foobar b/foobar
44 new file mode 100644
44 new file mode 100644
45 index 0000000..f6ea049
45 index 0000000..f6ea049
46 --- /dev/null
46 --- /dev/null
47 +++ b/foobar
47 +++ b/foobar
48 @@ -0,0 +1 @@
48 @@ -0,0 +1 @@
49 +foobar
49 +foobar
50 \ No newline at end of file
50 \ No newline at end of file
51 diff --git a/foobar2 b/foobar2
51 diff --git a/foobar2 b/foobar2
52 new file mode 100644
52 new file mode 100644
53 index 0000000..e8c9d6b
53 index 0000000..e8c9d6b
54 --- /dev/null
54 --- /dev/null
55 +++ b/foobar2
55 +++ b/foobar2
56 @@ -0,0 +1 @@
56 @@ -0,0 +1 @@
57 +foobar2
57 +foobar2
58 \ No newline at end of file
58 \ No newline at end of file
59 """
59 """
60
60
61
61
62 def callable_get_diff(*args, **kwargs):
62 def callable_get_diff(*args, **kwargs):
63 return DIFF_FROM_REMOTE
63 return DIFF_FROM_REMOTE
64
64
65
65
66 class TestGitRepository(object):
66 class TestGitRepository(object):
67
67
68 @pytest.fixture(autouse=True)
68 @pytest.fixture(autouse=True)
69 def prepare(self, request, baseapp):
69 def prepare(self, request, baseapp):
70 self.repo = GitRepository(TEST_GIT_REPO, bare=True)
70 self.repo = GitRepository(TEST_GIT_REPO, bare=True)
71 self.repo.count()
71 self.repo.count()
72
72
73 def get_clone_repo(self, tmpdir):
73 def get_clone_repo(self, tmpdir):
74 """
74 """
75 Return a non bare clone of the base repo.
75 Return a non bare clone of the base repo.
76 """
76 """
77 clone_path = str(tmpdir.join('clone-repo'))
77 clone_path = str(tmpdir.join('clone-repo'))
78 repo_clone = GitRepository(
78 repo_clone = GitRepository(
79 clone_path, create=True, src_url=self.repo.path, bare=False)
79 clone_path, create=True, src_url=self.repo.path, bare=False)
80
80
81 return repo_clone
81 return repo_clone
82
82
83 def get_empty_repo(self, tmpdir, bare=False):
83 def get_empty_repo(self, tmpdir, bare=False):
84 """
84 """
85 Return a non bare empty repo.
85 Return a non bare empty repo.
86 """
86 """
87 clone_path = str(tmpdir.join('empty-repo'))
87 clone_path = str(tmpdir.join('empty-repo'))
88 return GitRepository(clone_path, create=True, bare=bare)
88 return GitRepository(clone_path, create=True, bare=bare)
89
89
90 def test_wrong_repo_path(self):
90 def test_wrong_repo_path(self):
91 wrong_repo_path = '/tmp/errorrepo_git'
91 wrong_repo_path = '/tmp/errorrepo_git'
92 with pytest.raises(RepositoryError):
92 with pytest.raises(RepositoryError):
93 GitRepository(wrong_repo_path)
93 GitRepository(wrong_repo_path)
94
94
95 def test_repo_clone(self, tmp_path_factory):
95 def test_repo_clone(self, tmp_path_factory):
96 repo = GitRepository(TEST_GIT_REPO)
96 repo = GitRepository(TEST_GIT_REPO)
97 clone_path = '{}_{}'.format(tmp_path_factory.mktemp('_'), TEST_GIT_REPO_CLONE)
97 clone_path = '{}_{}'.format(tmp_path_factory.mktemp('_'), TEST_GIT_REPO_CLONE)
98 repo_clone = GitRepository(
98 repo_clone = GitRepository(
99 clone_path,
99 clone_path,
100 src_url=TEST_GIT_REPO, create=True, do_workspace_checkout=True)
100 src_url=TEST_GIT_REPO, create=True, do_workspace_checkout=True)
101
101
102 assert len(repo.commit_ids) == len(repo_clone.commit_ids)
102 assert len(repo.commit_ids) == len(repo_clone.commit_ids)
103 # Checking hashes of commits should be enough
103 # Checking hashes of commits should be enough
104 for commit in repo.get_commits():
104 for commit in repo.get_commits():
105 raw_id = commit.raw_id
105 raw_id = commit.raw_id
106 assert raw_id == repo_clone.get_commit(raw_id).raw_id
106 assert raw_id == repo_clone.get_commit(raw_id).raw_id
107
107
108 def test_repo_clone_without_create(self):
108 def test_repo_clone_without_create(self):
109 with pytest.raises(RepositoryError):
109 with pytest.raises(RepositoryError):
110 GitRepository(
110 GitRepository(
111 TEST_GIT_REPO_CLONE + '_wo_create', src_url=TEST_GIT_REPO)
111 TEST_GIT_REPO_CLONE + '_wo_create', src_url=TEST_GIT_REPO)
112
112
113 def test_repo_clone_with_update(self, tmp_path_factory):
113 def test_repo_clone_with_update(self, tmp_path_factory):
114 repo = GitRepository(TEST_GIT_REPO)
114 repo = GitRepository(TEST_GIT_REPO)
115 clone_path = '{}_{}_update'.format(tmp_path_factory.mktemp('_'), TEST_GIT_REPO_CLONE)
115 clone_path = '{}_{}_update'.format(tmp_path_factory.mktemp('_'), TEST_GIT_REPO_CLONE)
116
116
117 repo_clone = GitRepository(
117 repo_clone = GitRepository(
118 clone_path,
118 clone_path,
119 create=True, src_url=TEST_GIT_REPO, do_workspace_checkout=True)
119 create=True, src_url=TEST_GIT_REPO, do_workspace_checkout=True)
120 assert len(repo.commit_ids) == len(repo_clone.commit_ids)
120 assert len(repo.commit_ids) == len(repo_clone.commit_ids)
121
121
122 # check if current workdir was updated
122 # check if current workdir was updated
123 fpath = os.path.join(clone_path, 'MANIFEST.in')
123 fpath = os.path.join(clone_path, 'MANIFEST.in')
124 assert os.path.isfile(fpath)
124 assert os.path.isfile(fpath)
125
125
126 def test_repo_clone_without_update(self, tmp_path_factory):
126 def test_repo_clone_without_update(self, tmp_path_factory):
127 repo = GitRepository(TEST_GIT_REPO)
127 repo = GitRepository(TEST_GIT_REPO)
128 clone_path = '{}_{}_without_update'.format(tmp_path_factory.mktemp('_'), TEST_GIT_REPO_CLONE)
128 clone_path = '{}_{}_without_update'.format(tmp_path_factory.mktemp('_'), TEST_GIT_REPO_CLONE)
129 repo_clone = GitRepository(
129 repo_clone = GitRepository(
130 clone_path,
130 clone_path,
131 create=True, src_url=TEST_GIT_REPO, do_workspace_checkout=False)
131 create=True, src_url=TEST_GIT_REPO, do_workspace_checkout=False)
132 assert len(repo.commit_ids) == len(repo_clone.commit_ids)
132 assert len(repo.commit_ids) == len(repo_clone.commit_ids)
133 # check if current workdir was *NOT* updated
133 # check if current workdir was *NOT* updated
134 fpath = os.path.join(clone_path, 'MANIFEST.in')
134 fpath = os.path.join(clone_path, 'MANIFEST.in')
135 # Make sure it's not bare repo
135 # Make sure it's not bare repo
136 assert not repo_clone.bare
136 assert not repo_clone.bare
137 assert not os.path.isfile(fpath)
137 assert not os.path.isfile(fpath)
138
138
139 def test_repo_clone_into_bare_repo(self, tmp_path_factory):
139 def test_repo_clone_into_bare_repo(self, tmp_path_factory):
140 repo = GitRepository(TEST_GIT_REPO)
140 repo = GitRepository(TEST_GIT_REPO)
141 clone_path = '{}_{}_bare.git'.format(tmp_path_factory.mktemp('_'), TEST_GIT_REPO_CLONE)
141 clone_path = '{}_{}_bare.git'.format(tmp_path_factory.mktemp('_'), TEST_GIT_REPO_CLONE)
142 repo_clone = GitRepository(
142 repo_clone = GitRepository(
143 clone_path, create=True, src_url=repo.path, bare=True)
143 clone_path, create=True, src_url=repo.path, bare=True)
144 assert repo_clone.bare
144 assert repo_clone.bare
145
145
146 def test_create_repo_is_not_bare_by_default(self):
146 def test_create_repo_is_not_bare_by_default(self):
147 repo = GitRepository(get_new_dir('not-bare-by-default'), create=True)
147 repo = GitRepository(get_new_dir('not-bare-by-default'), create=True)
148 assert not repo.bare
148 assert not repo.bare
149
149
150 def test_create_bare_repo(self):
150 def test_create_bare_repo(self):
151 repo = GitRepository(get_new_dir('bare-repo'), create=True, bare=True)
151 repo = GitRepository(get_new_dir('bare-repo'), create=True, bare=True)
152 assert repo.bare
152 assert repo.bare
153
153
154 def test_update_server_info(self):
154 def test_update_server_info(self):
155 self.repo._update_server_info()
155 self.repo._update_server_info()
156
156
157 def test_fetch(self, vcsbackend_git):
157 def test_fetch(self, vcsbackend_git):
158 # Note: This is a git specific part of the API, it's only implemented
158 # Note: This is a git specific part of the API, it's only implemented
159 # by the git backend.
159 # by the git backend.
160 source_repo = vcsbackend_git.repo
160 source_repo = vcsbackend_git.repo
161 target_repo = vcsbackend_git.create_repo(bare=True)
161 target_repo = vcsbackend_git.create_repo(bare=True)
162 target_repo.fetch(source_repo.path)
162 target_repo.fetch(source_repo.path)
163 # Note: Get a fresh instance, avoids caching trouble
163 # Note: Get a fresh instance, avoids caching trouble
164 target_repo = vcsbackend_git.backend(target_repo.path)
164 target_repo = vcsbackend_git.backend(target_repo.path)
165 assert len(source_repo.commit_ids) == len(target_repo.commit_ids)
165 assert len(source_repo.commit_ids) == len(target_repo.commit_ids)
166
166
167 def test_commit_ids(self):
167 def test_commit_ids(self):
168 # there are 112 commits (by now)
168 # there are 112 commits (by now)
169 # so we can assume they would be available from now on
169 # so we can assume they would be available from now on
170 subset = {'c1214f7e79e02fc37156ff215cd71275450cffc3',
170 subset = {'c1214f7e79e02fc37156ff215cd71275450cffc3',
171 '38b5fe81f109cb111f549bfe9bb6b267e10bc557',
171 '38b5fe81f109cb111f549bfe9bb6b267e10bc557',
172 'fa6600f6848800641328adbf7811fd2372c02ab2',
172 'fa6600f6848800641328adbf7811fd2372c02ab2',
173 '102607b09cdd60e2793929c4f90478be29f85a17',
173 '102607b09cdd60e2793929c4f90478be29f85a17',
174 '49d3fd156b6f7db46313fac355dca1a0b94a0017',
174 '49d3fd156b6f7db46313fac355dca1a0b94a0017',
175 '2d1028c054665b962fa3d307adfc923ddd528038',
175 '2d1028c054665b962fa3d307adfc923ddd528038',
176 'd7e0d30fbcae12c90680eb095a4f5f02505ce501',
176 'd7e0d30fbcae12c90680eb095a4f5f02505ce501',
177 'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
177 'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
178 'dd80b0f6cf5052f17cc738c2951c4f2070200d7f',
178 'dd80b0f6cf5052f17cc738c2951c4f2070200d7f',
179 '8430a588b43b5d6da365400117c89400326e7992',
179 '8430a588b43b5d6da365400117c89400326e7992',
180 'd955cd312c17b02143c04fa1099a352b04368118',
180 'd955cd312c17b02143c04fa1099a352b04368118',
181 'f67b87e5c629c2ee0ba58f85197e423ff28d735b',
181 'f67b87e5c629c2ee0ba58f85197e423ff28d735b',
182 'add63e382e4aabc9e1afdc4bdc24506c269b7618',
182 'add63e382e4aabc9e1afdc4bdc24506c269b7618',
183 'f298fe1189f1b69779a4423f40b48edf92a703fc',
183 'f298fe1189f1b69779a4423f40b48edf92a703fc',
184 'bd9b619eb41994cac43d67cf4ccc8399c1125808',
184 'bd9b619eb41994cac43d67cf4ccc8399c1125808',
185 '6e125e7c890379446e98980d8ed60fba87d0f6d1',
185 '6e125e7c890379446e98980d8ed60fba87d0f6d1',
186 'd4a54db9f745dfeba6933bf5b1e79e15d0af20bd',
186 'd4a54db9f745dfeba6933bf5b1e79e15d0af20bd',
187 '0b05e4ed56c802098dfc813cbe779b2f49e92500',
187 '0b05e4ed56c802098dfc813cbe779b2f49e92500',
188 '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
188 '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
189 '45223f8f114c64bf4d6f853e3c35a369a6305520',
189 '45223f8f114c64bf4d6f853e3c35a369a6305520',
190 'ca1eb7957a54bce53b12d1a51b13452f95bc7c7e',
190 'ca1eb7957a54bce53b12d1a51b13452f95bc7c7e',
191 'f5ea29fc42ef67a2a5a7aecff10e1566699acd68',
191 'f5ea29fc42ef67a2a5a7aecff10e1566699acd68',
192 '27d48942240f5b91dfda77accd2caac94708cc7d',
192 '27d48942240f5b91dfda77accd2caac94708cc7d',
193 '622f0eb0bafd619d2560c26f80f09e3b0b0d78af',
193 '622f0eb0bafd619d2560c26f80f09e3b0b0d78af',
194 'e686b958768ee96af8029fe19c6050b1a8dd3b2b'}
194 'e686b958768ee96af8029fe19c6050b1a8dd3b2b'}
195 assert subset.issubset(set(self.repo.commit_ids))
195 assert subset.issubset(set(self.repo.commit_ids))
196
196
197 def test_slicing(self):
197 def test_slicing(self):
198 # 4 1 5 10 95
198 # 4 1 5 10 95
199 for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5),
199 for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5),
200 (10, 20, 10), (5, 100, 95)]:
200 (10, 20, 10), (5, 100, 95)]:
201 commit_ids = list(self.repo[sfrom:sto])
201 commit_ids = list(self.repo[sfrom:sto])
202 assert len(commit_ids) == size
202 assert len(commit_ids) == size
203 assert commit_ids[0] == self.repo.get_commit(commit_idx=sfrom)
203 assert commit_ids[0] == self.repo.get_commit(commit_idx=sfrom)
204 assert commit_ids[-1] == self.repo.get_commit(commit_idx=sto - 1)
204 assert commit_ids[-1] == self.repo.get_commit(commit_idx=sto - 1)
205
205
206 def test_branches(self):
206 def test_branches(self):
207 # TODO: Need more tests here
207 # TODO: Need more tests here
208 # Removed (those are 'remotes' branches for cloned repo)
208 # Removed (those are 'remotes' branches for cloned repo)
209 # assert 'master' in self.repo.branches
209 # assert 'master' in self.repo.branches
210 # assert 'gittree' in self.repo.branches
210 # assert 'gittree' in self.repo.branches
211 # assert 'web-branch' in self.repo.branches
211 # assert 'web-branch' in self.repo.branches
212 for __, commit_id in self.repo.branches.items():
212 for __, commit_id in self.repo.branches.items():
213 assert isinstance(self.repo.get_commit(commit_id), GitCommit)
213 assert isinstance(self.repo.get_commit(commit_id), GitCommit)
214
214
215 def test_tags(self):
215 def test_tags(self):
216 # TODO: Need more tests here
216 # TODO: Need more tests here
217 assert 'v0.1.1' in self.repo.tags
217 assert 'v0.1.1' in self.repo.tags
218 assert 'v0.1.2' in self.repo.tags
218 assert 'v0.1.2' in self.repo.tags
219 for __, commit_id in self.repo.tags.items():
219 for __, commit_id in self.repo.tags.items():
220 assert isinstance(self.repo.get_commit(commit_id), GitCommit)
220 assert isinstance(self.repo.get_commit(commit_id), GitCommit)
221
221
222 def _test_single_commit_cache(self, commit_id):
222 def _test_single_commit_cache(self, commit_id):
223 commit = self.repo.get_commit(commit_id)
223 commit = self.repo.get_commit(commit_id)
224 assert commit_id in self.repo.commits
224 assert commit_id in self.repo.commits
225 assert commit is self.repo.commits[commit_id]
225 assert commit is self.repo.commits[commit_id]
226
226
227 def test_initial_commit(self):
227 def test_initial_commit(self):
228 commit_id = self.repo.commit_ids[0]
228 commit_id = self.repo.commit_ids[0]
229 init_commit = self.repo.get_commit(commit_id)
229 init_commit = self.repo.get_commit(commit_id)
230 init_author = init_commit.author
230 init_author = init_commit.author
231
231
232 assert init_commit.message == 'initial import\n'
232 assert init_commit.message == 'initial import\n'
233 assert init_author == 'Marcin Kuzminski <marcin@python-blog.com>'
233 assert init_author == 'Marcin Kuzminski <marcin@python-blog.com>'
234 assert init_author == init_commit.committer
234 assert init_author == init_commit.committer
235 for path in ('vcs/__init__.py',
235 for path in ('vcs/__init__.py',
236 'vcs/backends/BaseRepository.py',
236 'vcs/backends/BaseRepository.py',
237 'vcs/backends/__init__.py'):
237 'vcs/backends/__init__.py'):
238 assert isinstance(init_commit.get_node(path), FileNode)
238 assert isinstance(init_commit.get_node(path), FileNode)
239 for path in ('', 'vcs', 'vcs/backends'):
239 for path in ('', 'vcs', 'vcs/backends'):
240 assert isinstance(init_commit.get_node(path), DirNode)
240 assert isinstance(init_commit.get_node(path), DirNode)
241
241
242 with pytest.raises(NodeDoesNotExistError):
242 with pytest.raises(NodeDoesNotExistError):
243 init_commit.get_node(path='foobar')
243 init_commit.get_node(path='foobar')
244
244
245 node = init_commit.get_node('vcs/')
245 node = init_commit.get_node('vcs/')
246 assert hasattr(node, 'kind')
246 assert hasattr(node, 'kind')
247 assert node.kind == NodeKind.DIR
247 assert node.kind == NodeKind.DIR
248
248
249 node = init_commit.get_node('vcs')
249 node = init_commit.get_node('vcs')
250 assert hasattr(node, 'kind')
250 assert hasattr(node, 'kind')
251 assert node.kind == NodeKind.DIR
251 assert node.kind == NodeKind.DIR
252
252
253 node = init_commit.get_node('vcs/__init__.py')
253 node = init_commit.get_node('vcs/__init__.py')
254 assert hasattr(node, 'kind')
254 assert hasattr(node, 'kind')
255 assert node.kind == NodeKind.FILE
255 assert node.kind == NodeKind.FILE
256
256
257 def test_not_existing_commit(self):
257 def test_not_existing_commit(self):
258 with pytest.raises(RepositoryError):
258 with pytest.raises(RepositoryError):
259 self.repo.get_commit('f' * 40)
259 self.repo.get_commit('f' * 40)
260
260
261 def test_commit10(self):
261 def test_commit10(self):
262
262
263 commit10 = self.repo.get_commit(self.repo.commit_ids[9])
263 commit10 = self.repo.get_commit(self.repo.commit_ids[9])
264 README = """===
264 README = """===
265 VCS
265 VCS
266 ===
266 ===
267
267
268 Various Version Control System management abstraction layer for Python.
268 Various Version Control System management abstraction layer for Python.
269
269
270 Introduction
270 Introduction
271 ------------
271 ------------
272
272
273 TODO: To be written...
273 TODO: To be written...
274
274
275 """
275 """
276 node = commit10.get_node('README.rst')
276 node = commit10.get_node('README.rst')
277 assert node.kind == NodeKind.FILE
277 assert node.kind == NodeKind.FILE
278 assert node.str_content == README
278 assert node.str_content == README
279
279
280 def test_head(self):
280 def test_head(self):
281 assert self.repo.head == self.repo.get_commit().raw_id
281 assert self.repo.head == self.repo.get_commit().raw_id
282
282
283 def test_checkout_with_create(self, tmpdir):
283 def test_checkout_with_create(self, tmpdir):
284 repo_clone = self.get_clone_repo(tmpdir)
284 repo_clone = self.get_clone_repo(tmpdir)
285
285
286 new_branch = 'new_branch'
286 new_branch = 'new_branch'
287 assert repo_clone._current_branch() == 'master'
287 assert repo_clone._current_branch() == 'master'
288 assert set(repo_clone.branches) == {'master'}
288 assert set(repo_clone.branches) == {'master'}
289 repo_clone._checkout(new_branch, create=True)
289 repo_clone._checkout(new_branch, create=True)
290
290
291 # Branches is a lazy property so we need to recrete the Repo object.
291 # Branches is a lazy property so we need to recrete the Repo object.
292 repo_clone = GitRepository(repo_clone.path)
292 repo_clone = GitRepository(repo_clone.path)
293 assert set(repo_clone.branches) == {'master', new_branch}
293 assert set(repo_clone.branches) == {'master', new_branch}
294 assert repo_clone._current_branch() == new_branch
294 assert repo_clone._current_branch() == new_branch
295
295
296 def test_checkout(self, tmpdir):
296 def test_checkout(self, tmpdir):
297 repo_clone = self.get_clone_repo(tmpdir)
297 repo_clone = self.get_clone_repo(tmpdir)
298
298
299 repo_clone._checkout('new_branch', create=True)
299 repo_clone._checkout('new_branch', create=True)
300 repo_clone._checkout('master')
300 repo_clone._checkout('master')
301
301
302 assert repo_clone._current_branch() == 'master'
302 assert repo_clone._current_branch() == 'master'
303
303
304 def test_checkout_same_branch(self, tmpdir):
304 def test_checkout_same_branch(self, tmpdir):
305 repo_clone = self.get_clone_repo(tmpdir)
305 repo_clone = self.get_clone_repo(tmpdir)
306
306
307 repo_clone._checkout('master')
307 repo_clone._checkout('master')
308 assert repo_clone._current_branch() == 'master'
308 assert repo_clone._current_branch() == 'master'
309
309
310 def test_checkout_branch_already_exists(self, tmpdir):
310 def test_checkout_branch_already_exists(self, tmpdir):
311 repo_clone = self.get_clone_repo(tmpdir)
311 repo_clone = self.get_clone_repo(tmpdir)
312
312
313 with pytest.raises(RepositoryError):
313 with pytest.raises(RepositoryError):
314 repo_clone._checkout('master', create=True)
314 repo_clone._checkout('master', create=True)
315
315
316 def test_checkout_bare_repo(self):
316 def test_checkout_bare_repo(self):
317 with pytest.raises(RepositoryError):
317 with pytest.raises(RepositoryError):
318 self.repo._checkout('master')
318 self.repo._checkout('master')
319
319
320 def test_current_branch_bare_repo(self):
320 def test_current_branch_bare_repo(self):
321 with pytest.raises(RepositoryError):
321 with pytest.raises(RepositoryError):
322 self.repo._current_branch()
322 self.repo._current_branch()
323
323
324 def test_current_branch_empty_repo(self, tmpdir):
324 def test_current_branch_empty_repo(self, tmpdir):
325 repo = self.get_empty_repo(tmpdir)
325 repo = self.get_empty_repo(tmpdir)
326 assert repo._current_branch() is None
326 assert repo._current_branch() is None
327
327
328 def test_local_clone(self, tmp_path_factory):
328 def test_local_clone(self, tmp_path_factory):
329 clone_path = str(tmp_path_factory.mktemp('test-local-clone'))
329 clone_path = str(tmp_path_factory.mktemp('test-local-clone'))
330 self.repo._local_clone(clone_path, 'master')
330 self.repo._local_clone(clone_path, 'master')
331 repo_clone = GitRepository(clone_path)
331 repo_clone = GitRepository(clone_path)
332
332
333 assert self.repo.commit_ids == repo_clone.commit_ids
333 assert self.repo.commit_ids == repo_clone.commit_ids
334
334
335 def test_local_clone_with_specific_branch(self, tmpdir):
335 def test_local_clone_with_specific_branch(self, tmpdir):
336 source_repo = self.get_clone_repo(tmpdir)
336 source_repo = self.get_clone_repo(tmpdir)
337
337
338 # Create a new branch in source repo
338 # Create a new branch in source repo
339 new_branch_commit = source_repo.commit_ids[-3]
339 new_branch_commit = source_repo.commit_ids[-3]
340 source_repo._checkout(new_branch_commit)
340 source_repo._checkout(new_branch_commit)
341 source_repo._checkout('new_branch', create=True)
341 source_repo._checkout('new_branch', create=True)
342
342
343 clone_path = str(tmpdir.join('git-clone-path-1'))
343 clone_path = str(tmpdir.join('git-clone-path-1'))
344 source_repo._local_clone(clone_path, 'new_branch')
344 source_repo._local_clone(clone_path, 'new_branch')
345 repo_clone = GitRepository(clone_path)
345 repo_clone = GitRepository(clone_path)
346
346
347 assert source_repo.commit_ids[:-3 + 1] == repo_clone.commit_ids
347 assert source_repo.commit_ids[:-3 + 1] == repo_clone.commit_ids
348
348
349 clone_path = str(tmpdir.join('git-clone-path-2'))
349 clone_path = str(tmpdir.join('git-clone-path-2'))
350 source_repo._local_clone(clone_path, 'master')
350 source_repo._local_clone(clone_path, 'master')
351 repo_clone = GitRepository(clone_path)
351 repo_clone = GitRepository(clone_path)
352
352
353 assert source_repo.commit_ids == repo_clone.commit_ids
353 assert source_repo.commit_ids == repo_clone.commit_ids
354
354
355 def test_local_clone_fails_if_target_exists(self):
355 def test_local_clone_fails_if_target_exists(self):
356 with pytest.raises(RepositoryError):
356 with pytest.raises(RepositoryError):
357 self.repo._local_clone(self.repo.path, 'master')
357 self.repo._local_clone(self.repo.path, 'master')
358
358
359 def test_local_fetch(self, tmpdir):
359 def test_local_fetch(self, tmpdir):
360 target_repo = self.get_empty_repo(tmpdir)
360 target_repo = self.get_empty_repo(tmpdir)
361 source_repo = self.get_clone_repo(tmpdir)
361 source_repo = self.get_clone_repo(tmpdir)
362
362
363 # Create a new branch in source repo
363 # Create a new branch in source repo
364 master_commit = source_repo.commit_ids[-1]
364 master_commit = source_repo.commit_ids[-1]
365 new_branch_commit = source_repo.commit_ids[-3]
365 new_branch_commit = source_repo.commit_ids[-3]
366 source_repo._checkout(new_branch_commit)
366 source_repo._checkout(new_branch_commit)
367 source_repo._checkout('new_branch', create=True)
367 source_repo._checkout('new_branch', create=True)
368
368
369 target_repo._local_fetch(source_repo.path, 'new_branch')
369 target_repo._local_fetch(source_repo.path, 'new_branch')
370 assert target_repo._last_fetch_heads() == [new_branch_commit]
370 assert target_repo._last_fetch_heads() == [new_branch_commit]
371
371
372 target_repo._local_fetch(source_repo.path, 'master')
372 target_repo._local_fetch(source_repo.path, 'master')
373 assert target_repo._last_fetch_heads() == [master_commit]
373 assert target_repo._last_fetch_heads() == [master_commit]
374
374
375 def test_local_fetch_from_bare_repo(self, tmpdir):
375 def test_local_fetch_from_bare_repo(self, tmpdir):
376 target_repo = self.get_empty_repo(tmpdir)
376 target_repo = self.get_empty_repo(tmpdir)
377 target_repo._local_fetch(self.repo.path, 'master')
377 target_repo._local_fetch(self.repo.path, 'master')
378
378
379 master_commit = self.repo.commit_ids[-1]
379 master_commit = self.repo.commit_ids[-1]
380 assert target_repo._last_fetch_heads() == [master_commit]
380 assert target_repo._last_fetch_heads() == [master_commit]
381
381
382 def test_local_fetch_from_same_repo(self):
382 def test_local_fetch_from_same_repo(self):
383 with pytest.raises(ValueError):
383 with pytest.raises(ValueError):
384 self.repo._local_fetch(self.repo.path, 'master')
384 self.repo._local_fetch(self.repo.path, 'master')
385
385
386 def test_local_fetch_branch_does_not_exist(self, tmpdir):
386 def test_local_fetch_branch_does_not_exist(self, tmpdir):
387 target_repo = self.get_empty_repo(tmpdir)
387 target_repo = self.get_empty_repo(tmpdir)
388
388
389 with pytest.raises(RepositoryError):
389 with pytest.raises(RepositoryError):
390 target_repo._local_fetch(self.repo.path, 'new_branch')
390 target_repo._local_fetch(self.repo.path, 'new_branch')
391
391
392 def test_local_pull(self, tmpdir):
392 def test_local_pull(self, tmpdir):
393 target_repo = self.get_empty_repo(tmpdir)
393 target_repo = self.get_empty_repo(tmpdir)
394 source_repo = self.get_clone_repo(tmpdir)
394 source_repo = self.get_clone_repo(tmpdir)
395
395
396 # Create a new branch in source repo
396 # Create a new branch in source repo
397 master_commit = source_repo.commit_ids[-1]
397 master_commit = source_repo.commit_ids[-1]
398 new_branch_commit = source_repo.commit_ids[-3]
398 new_branch_commit = source_repo.commit_ids[-3]
399 source_repo._checkout(new_branch_commit)
399 source_repo._checkout(new_branch_commit)
400 source_repo._checkout('new_branch', create=True)
400 source_repo._checkout('new_branch', create=True)
401
401
402 target_repo._local_pull(source_repo.path, 'new_branch')
402 target_repo._local_pull(source_repo.path, 'new_branch')
403 target_repo = GitRepository(target_repo.path)
403 target_repo = GitRepository(target_repo.path)
404 assert target_repo.head == new_branch_commit
404 assert target_repo.head == new_branch_commit
405
405
406 target_repo._local_pull(source_repo.path, 'master')
406 target_repo._local_pull(source_repo.path, 'master')
407 target_repo = GitRepository(target_repo.path)
407 target_repo = GitRepository(target_repo.path)
408 assert target_repo.head == master_commit
408 assert target_repo.head == master_commit
409
409
410 def test_local_pull_in_bare_repo(self):
410 def test_local_pull_in_bare_repo(self):
411 with pytest.raises(RepositoryError):
411 with pytest.raises(RepositoryError):
412 self.repo._local_pull(self.repo.path, 'master')
412 self.repo._local_pull(self.repo.path, 'master')
413
413
414 def test_local_merge(self, tmpdir):
414 def test_local_merge(self, tmpdir):
415 target_repo = self.get_empty_repo(tmpdir)
415 target_repo = self.get_empty_repo(tmpdir)
416 source_repo = self.get_clone_repo(tmpdir)
416 source_repo = self.get_clone_repo(tmpdir)
417
417
418 # Create a new branch in source repo
418 # Create a new branch in source repo
419 master_commit = source_repo.commit_ids[-1]
419 master_commit = source_repo.commit_ids[-1]
420 new_branch_commit = source_repo.commit_ids[-3]
420 new_branch_commit = source_repo.commit_ids[-3]
421 source_repo._checkout(new_branch_commit)
421 source_repo._checkout(new_branch_commit)
422 source_repo._checkout('new_branch', create=True)
422 source_repo._checkout('new_branch', create=True)
423
423
424 # This is required as one cannot do a -ff-only merge in an empty repo.
424 # This is required as one cannot do a -ff-only merge in an empty repo.
425 target_repo._local_pull(source_repo.path, 'new_branch')
425 target_repo._local_pull(source_repo.path, 'new_branch')
426
426
427 target_repo._local_fetch(source_repo.path, 'master')
427 target_repo._local_fetch(source_repo.path, 'master')
428 merge_message = 'Merge message\n\nDescription:...'
428 merge_message = 'Merge message\n\nDescription:...'
429 user_name = 'Albert Einstein'
429 user_name = 'Albert Einstein'
430 user_email = 'albert@einstein.com'
430 user_email = 'albert@einstein.com'
431 target_repo._local_merge(merge_message, user_name, user_email,
431 target_repo._local_merge(merge_message, user_name, user_email,
432 target_repo._last_fetch_heads())
432 target_repo._last_fetch_heads())
433
433
434 target_repo = GitRepository(target_repo.path)
434 target_repo = GitRepository(target_repo.path)
435 assert target_repo.commit_ids[-2] == master_commit
435 assert target_repo.commit_ids[-2] == master_commit
436 last_commit = target_repo.get_commit(target_repo.head)
436 last_commit = target_repo.get_commit(target_repo.head)
437 assert last_commit.message.strip() == merge_message
437 assert last_commit.message.strip() == merge_message
438 assert last_commit.author == '%s <%s>' % (user_name, user_email)
438 assert last_commit.author == '%s <%s>' % (user_name, user_email)
439
439
440 assert not os.path.exists(
440 assert not os.path.exists(
441 os.path.join(target_repo.path, '.git', 'MERGE_HEAD'))
441 os.path.join(target_repo.path, '.git', 'MERGE_HEAD'))
442
442
443 def test_local_merge_raises_exception_on_conflict(self, vcsbackend_git):
443 def test_local_merge_raises_exception_on_conflict(self, vcsbackend_git):
444 target_repo = vcsbackend_git.create_repo(number_of_commits=1)
444 target_repo = vcsbackend_git.create_repo(number_of_commits=1)
445 vcsbackend_git.ensure_file(b'README', b'I will conflict with you!!!')
445 vcsbackend_git.ensure_file(b'README', b'I will conflict with you!!!')
446
446
447 target_repo._local_fetch(self.repo.path, 'master')
447 target_repo._local_fetch(self.repo.path, 'master')
448 with pytest.raises(RepositoryError):
448 with pytest.raises(RepositoryError):
449 target_repo._local_merge(
449 target_repo._local_merge(
450 'merge_message', 'user name', 'user@name.com',
450 'merge_message', 'user name', 'user@name.com',
451 target_repo._last_fetch_heads())
451 target_repo._last_fetch_heads())
452
452
453 # Check we are not left in an intermediate merge state
453 # Check we are not left in an intermediate merge state
454 assert not os.path.exists(
454 assert not os.path.exists(
455 os.path.join(target_repo.path, '.git', 'MERGE_HEAD'))
455 os.path.join(target_repo.path, '.git', 'MERGE_HEAD'))
456
456
457 def test_local_merge_into_empty_repo(self, tmpdir):
457 def test_local_merge_into_empty_repo(self, tmpdir):
458 target_repo = self.get_empty_repo(tmpdir)
458 target_repo = self.get_empty_repo(tmpdir)
459
459
460 # This is required as one cannot do a -ff-only merge in an empty repo.
460 # This is required as one cannot do a -ff-only merge in an empty repo.
461 target_repo._local_fetch(self.repo.path, 'master')
461 target_repo._local_fetch(self.repo.path, 'master')
462 with pytest.raises(RepositoryError):
462 with pytest.raises(RepositoryError):
463 target_repo._local_merge(
463 target_repo._local_merge(
464 'merge_message', 'user name', 'user@name.com',
464 'merge_message', 'user name', 'user@name.com',
465 target_repo._last_fetch_heads())
465 target_repo._last_fetch_heads())
466
466
467 def test_local_merge_in_bare_repo(self):
467 def test_local_merge_in_bare_repo(self):
468 with pytest.raises(RepositoryError):
468 with pytest.raises(RepositoryError):
469 self.repo._local_merge(
469 self.repo._local_merge(
470 'merge_message', 'user name', 'user@name.com', None)
470 'merge_message', 'user name', 'user@name.com', None)
471
471
472 def test_local_push_non_bare(self, tmpdir):
472 def test_local_push_non_bare(self, tmpdir):
473 target_repo = self.get_empty_repo(tmpdir)
473 target_repo = self.get_empty_repo(tmpdir)
474
474
475 pushed_branch = 'pushed_branch'
475 pushed_branch = 'pushed_branch'
476 self.repo._local_push('master', target_repo.path, pushed_branch)
476 self.repo._local_push('master', target_repo.path, pushed_branch)
477 # Fix the HEAD of the target repo, or otherwise GitRepository won't
477 # Fix the HEAD of the target repo, or otherwise GitRepository won't
478 # report any branches.
478 # report any branches.
479 with open(os.path.join(target_repo.path, '.git', 'HEAD'), 'w') as f:
479 with open(os.path.join(target_repo.path, '.git', 'HEAD'), 'w') as f:
480 f.write('ref: refs/heads/%s' % pushed_branch)
480 f.write('ref: refs/heads/%s' % pushed_branch)
481
481
482 target_repo = GitRepository(target_repo.path)
482 target_repo = GitRepository(target_repo.path)
483
483
484 assert (target_repo.branches[pushed_branch] ==
484 assert (target_repo.branches[pushed_branch] ==
485 self.repo.branches['master'])
485 self.repo.branches['master'])
486
486
487 def test_local_push_bare(self, tmpdir):
487 def test_local_push_bare(self, tmpdir):
488 target_repo = self.get_empty_repo(tmpdir, bare=True)
488 target_repo = self.get_empty_repo(tmpdir, bare=True)
489
489
490 pushed_branch = 'pushed_branch'
490 pushed_branch = 'pushed_branch'
491 self.repo._local_push('master', target_repo.path, pushed_branch)
491 self.repo._local_push('master', target_repo.path, pushed_branch)
492 # Fix the HEAD of the target repo, or otherwise GitRepository won't
492 # Fix the HEAD of the target repo, or otherwise GitRepository won't
493 # report any branches.
493 # report any branches.
494 with open(os.path.join(target_repo.path, 'HEAD'), 'w') as f:
494 with open(os.path.join(target_repo.path, 'HEAD'), 'w') as f:
495 f.write('ref: refs/heads/%s' % pushed_branch)
495 f.write('ref: refs/heads/%s' % pushed_branch)
496
496
497 target_repo = GitRepository(target_repo.path)
497 target_repo = GitRepository(target_repo.path)
498
498
499 assert (target_repo.branches[pushed_branch] ==
499 assert (target_repo.branches[pushed_branch] ==
500 self.repo.branches['master'])
500 self.repo.branches['master'])
501
501
502 def test_local_push_non_bare_target_branch_is_checked_out(self, tmpdir):
502 def test_local_push_non_bare_target_branch_is_checked_out(self, tmpdir):
503 target_repo = self.get_clone_repo(tmpdir)
503 target_repo = self.get_clone_repo(tmpdir)
504
504
505 pushed_branch = 'pushed_branch'
505 pushed_branch = 'pushed_branch'
506 # Create a new branch in source repo
506 # Create a new branch in source repo
507 new_branch_commit = target_repo.commit_ids[-3]
507 new_branch_commit = target_repo.commit_ids[-3]
508 target_repo._checkout(new_branch_commit)
508 target_repo._checkout(new_branch_commit)
509 target_repo._checkout(pushed_branch, create=True)
509 target_repo._checkout(pushed_branch, create=True)
510
510
511 self.repo._local_push('master', target_repo.path, pushed_branch)
511 self.repo._local_push('master', target_repo.path, pushed_branch)
512
512
513 target_repo = GitRepository(target_repo.path)
513 target_repo = GitRepository(target_repo.path)
514
514
515 assert (target_repo.branches[pushed_branch] ==
515 assert (target_repo.branches[pushed_branch] ==
516 self.repo.branches['master'])
516 self.repo.branches['master'])
517
517
518 def test_local_push_raises_exception_on_conflict(self, vcsbackend_git):
518 def test_local_push_raises_exception_on_conflict(self, vcsbackend_git):
519 target_repo = vcsbackend_git.create_repo(number_of_commits=1)
519 target_repo = vcsbackend_git.create_repo(number_of_commits=1)
520 with pytest.raises(RepositoryError):
520 with pytest.raises(RepositoryError):
521 self.repo._local_push('master', target_repo.path, 'master')
521 self.repo._local_push('master', target_repo.path, 'master')
522
522
523 def test_hooks_can_be_enabled_via_env_variable_for_local_push(self, tmpdir):
523 def test_hooks_can_be_enabled_via_env_variable_for_local_push(self, tmpdir):
524 target_repo = self.get_empty_repo(tmpdir, bare=True)
524 target_repo = self.get_empty_repo(tmpdir, bare=True)
525
525
526 with mock.patch.object(self.repo, 'run_git_command') as run_mock:
526 with mock.patch.object(self.repo, 'run_git_command') as run_mock:
527 self.repo._local_push(
527 self.repo._local_push(
528 'master', target_repo.path, 'master', enable_hooks=True)
528 'master', target_repo.path, 'master', enable_hooks=True)
529 env = run_mock.call_args[1]['extra_env']
529 env = run_mock.call_args[1]['extra_env']
530 assert 'RC_SKIP_HOOKS' not in env
530 assert 'RC_SKIP_HOOKS' not in env
531
531
532 def _add_failing_hook(self, repo_path, hook_name, bare=False):
532 def _add_failing_hook(self, repo_path, hook_name, bare=False):
533 path_components = (
533 path_components = (
534 ['hooks', hook_name] if bare else ['.git', 'hooks', hook_name])
534 ['hooks', hook_name] if bare else ['.git', 'hooks', hook_name])
535 hook_path = os.path.join(repo_path, *path_components)
535 hook_path = os.path.join(repo_path, *path_components)
536 with open(hook_path, 'w') as f:
536 with open(hook_path, 'w') as f:
537 script_lines = [
537 script_lines = [
538 '#!%s' % sys.executable,
538 '#!%s' % sys.executable,
539 'import os',
539 'import os',
540 'import sys',
540 'import sys',
541 'if os.environ.get("RC_SKIP_HOOKS"):',
541 'if os.environ.get("RC_SKIP_HOOKS"):',
542 ' sys.exit(0)',
542 ' sys.exit(0)',
543 'sys.exit(1)',
543 'sys.exit(1)',
544 ]
544 ]
545 f.write('\n'.join(script_lines))
545 f.write('\n'.join(script_lines))
546 os.chmod(hook_path, 0o755)
546 os.chmod(hook_path, 0o755)
547
547
548 def test_local_push_does_not_execute_hook(self, tmpdir):
548 def test_local_push_does_not_execute_hook(self, tmpdir):
549 target_repo = self.get_empty_repo(tmpdir)
549 target_repo = self.get_empty_repo(tmpdir)
550
550
551 pushed_branch = 'pushed_branch'
551 pushed_branch = 'pushed_branch'
552 self._add_failing_hook(target_repo.path, 'pre-receive')
552 self._add_failing_hook(target_repo.path, 'pre-receive')
553 self.repo._local_push('master', target_repo.path, pushed_branch)
553 self.repo._local_push('master', target_repo.path, pushed_branch)
554 # Fix the HEAD of the target repo, or otherwise GitRepository won't
554 # Fix the HEAD of the target repo, or otherwise GitRepository won't
555 # report any branches.
555 # report any branches.
556 with open(os.path.join(target_repo.path, '.git', 'HEAD'), 'w') as f:
556 with open(os.path.join(target_repo.path, '.git', 'HEAD'), 'w') as f:
557 f.write('ref: refs/heads/%s' % pushed_branch)
557 f.write('ref: refs/heads/%s' % pushed_branch)
558
558
559 target_repo = GitRepository(target_repo.path)
559 target_repo = GitRepository(target_repo.path)
560
560
561 assert (target_repo.branches[pushed_branch] ==
561 assert (target_repo.branches[pushed_branch] ==
562 self.repo.branches['master'])
562 self.repo.branches['master'])
563
563
564 def test_local_push_executes_hook(self, tmpdir):
564 def test_local_push_executes_hook(self, tmpdir):
565 target_repo = self.get_empty_repo(tmpdir, bare=True)
565 target_repo = self.get_empty_repo(tmpdir, bare=True)
566 self._add_failing_hook(target_repo.path, 'pre-receive', bare=True)
566 self._add_failing_hook(target_repo.path, 'pre-receive', bare=True)
567 with pytest.raises(RepositoryError):
567 with pytest.raises(RepositoryError):
568 self.repo._local_push(
568 self.repo._local_push(
569 'master', target_repo.path, 'master', enable_hooks=True)
569 'master', target_repo.path, 'master', enable_hooks=True)
570
570
571 def test_maybe_prepare_merge_workspace(self):
571 def test_maybe_prepare_merge_workspace(self):
572 workspace = self.repo._maybe_prepare_merge_workspace(
572 workspace = self.repo._maybe_prepare_merge_workspace(
573 2, 'pr2', Reference('branch', 'master', 'unused'),
573 2, 'pr2', Reference('branch', 'master', 'unused'),
574 Reference('branch', 'master', 'unused'))
574 Reference('branch', 'master', 'unused'))
575
575
576 assert os.path.isdir(workspace)
576 assert os.path.isdir(workspace)
577 workspace_repo = GitRepository(workspace)
577 workspace_repo = GitRepository(workspace)
578 assert workspace_repo.branches == self.repo.branches
578 assert workspace_repo.branches == self.repo.branches
579
579
580 # Calling it a second time should also succeed
580 # Calling it a second time should also succeed
581 workspace = self.repo._maybe_prepare_merge_workspace(
581 workspace = self.repo._maybe_prepare_merge_workspace(
582 2, 'pr2', Reference('branch', 'master', 'unused'),
582 2, 'pr2', Reference('branch', 'master', 'unused'),
583 Reference('branch', 'master', 'unused'))
583 Reference('branch', 'master', 'unused'))
584 assert os.path.isdir(workspace)
584 assert os.path.isdir(workspace)
585
585
586 def test_maybe_prepare_merge_workspace_different_refs(self):
586 def test_maybe_prepare_merge_workspace_different_refs(self):
587 workspace = self.repo._maybe_prepare_merge_workspace(
587 workspace = self.repo._maybe_prepare_merge_workspace(
588 2, 'pr2', Reference('branch', 'master', 'unused'),
588 2, 'pr2', Reference('branch', 'master', 'unused'),
589 Reference('branch', 'develop', 'unused'))
589 Reference('branch', 'develop', 'unused'))
590
590
591 assert os.path.isdir(workspace)
591 assert os.path.isdir(workspace)
592 workspace_repo = GitRepository(workspace)
592 workspace_repo = GitRepository(workspace)
593 assert workspace_repo.branches == self.repo.branches
593 assert workspace_repo.branches == self.repo.branches
594
594
595 # Calling it a second time should also succeed
595 # Calling it a second time should also succeed
596 workspace = self.repo._maybe_prepare_merge_workspace(
596 workspace = self.repo._maybe_prepare_merge_workspace(
597 2, 'pr2', Reference('branch', 'master', 'unused'),
597 2, 'pr2', Reference('branch', 'master', 'unused'),
598 Reference('branch', 'develop', 'unused'))
598 Reference('branch', 'develop', 'unused'))
599 assert os.path.isdir(workspace)
599 assert os.path.isdir(workspace)
600
600
601 def test_cleanup_merge_workspace(self):
601 def test_cleanup_merge_workspace(self):
602 workspace = self.repo._maybe_prepare_merge_workspace(
602 workspace = self.repo._maybe_prepare_merge_workspace(
603 2, 'pr3', Reference('branch', 'master', 'unused'),
603 2, 'pr3', Reference('branch', 'master', 'unused'),
604 Reference('branch', 'master', 'unused'))
604 Reference('branch', 'master', 'unused'))
605 self.repo.cleanup_merge_workspace(2, 'pr3')
605 self.repo.cleanup_merge_workspace(2, 'pr3')
606
606
607 assert not os.path.exists(workspace)
607 assert not os.path.exists(workspace)
608
608
609 def test_cleanup_merge_workspace_invalid_workspace_id(self):
609 def test_cleanup_merge_workspace_invalid_workspace_id(self):
610 # No assert: because in case of an inexistent workspace this function
610 # No assert: because in case of an inexistent workspace this function
611 # should still succeed.
611 # should still succeed.
612 self.repo.cleanup_merge_workspace(1, 'pr4')
612 self.repo.cleanup_merge_workspace(1, 'pr4')
613
613
614 def test_set_refs(self):
614 def test_set_refs(self):
615 test_ref = 'refs/test-refs/abcde'
615 test_ref = 'refs/test-refs/abcde'
616 test_commit_id = 'ecb86e1f424f2608262b130db174a7dfd25a6623'
616 test_commit_id = 'ecb86e1f424f2608262b130db174a7dfd25a6623'
617
617
618 self.repo.set_refs(test_ref, test_commit_id)
618 self.repo.set_refs(test_ref, test_commit_id)
619 stdout, _ = self.repo.run_git_command(['show-ref'])
619 stdout, _ = self.repo.run_git_command(['show-ref'])
620 assert test_ref in stdout
620 assert test_ref in stdout
621 assert test_commit_id in stdout
621 assert test_commit_id in stdout
622
622
623 def test_remove_ref(self):
623 def test_remove_ref(self):
624 test_ref = 'refs/test-refs/abcde'
624 test_ref = 'refs/test-refs/abcde'
625 test_commit_id = 'ecb86e1f424f2608262b130db174a7dfd25a6623'
625 test_commit_id = 'ecb86e1f424f2608262b130db174a7dfd25a6623'
626 self.repo.set_refs(test_ref, test_commit_id)
626 self.repo.set_refs(test_ref, test_commit_id)
627 stdout, _ = self.repo.run_git_command(['show-ref'])
627 stdout, _ = self.repo.run_git_command(['show-ref'])
628 assert test_ref in stdout
628 assert test_ref in stdout
629 assert test_commit_id in stdout
629 assert test_commit_id in stdout
630
630
631 self.repo.remove_ref(test_ref)
631 self.repo.remove_ref(test_ref)
632 stdout, _ = self.repo.run_git_command(['show-ref'])
632 stdout, _ = self.repo.run_git_command(['show-ref'])
633 assert test_ref not in stdout
633 assert test_ref not in stdout
634 assert test_commit_id not in stdout
634 assert test_commit_id not in stdout
635
635
636
636
637 class TestGitCommit(object):
637 class TestGitCommit(object):
638
638
639 @pytest.fixture(autouse=True)
639 @pytest.fixture(autouse=True)
640 def prepare(self):
640 def prepare(self):
641 self.repo = GitRepository(TEST_GIT_REPO)
641 self.repo = GitRepository(TEST_GIT_REPO)
642
642
643 def test_default_commit(self):
643 def test_default_commit(self):
644 tip = self.repo.get_commit()
644 tip = self.repo.get_commit()
645 assert tip == self.repo.get_commit(None)
645 assert tip == self.repo.get_commit(None)
646 assert tip == self.repo.get_commit('tip')
646 assert tip == self.repo.get_commit('tip')
647
647
648 def test_root_node(self):
648 def test_root_node(self):
649 tip = self.repo.get_commit()
649 tip = self.repo.get_commit()
650 assert tip.root is tip.get_node('')
650 assert tip.root is tip.get_node('')
651
651
652 def test_lazy_fetch(self):
652 def test_lazy_fetch(self):
653 """
653 """
654 Test if commit's nodes expands and are cached as we walk through
654 Test if commit's nodes expands and are cached as we walk through
655 the commit. This test is somewhat hard to write as order of tests
655 the commit. This test is somewhat hard to write as order of tests
656 is a key here. Written by running command after command in a shell.
656 is a key here. Written by running command after command in a shell.
657 """
657 """
658 commit_id = '2a13f185e4525f9d4b59882791a2d397b90d5ddc'
658 commit_id = '2a13f185e4525f9d4b59882791a2d397b90d5ddc'
659 assert commit_id in self.repo.commit_ids
659 assert commit_id in self.repo.commit_ids
660 commit = self.repo.get_commit(commit_id)
660 commit = self.repo.get_commit(commit_id)
661 assert len(commit.nodes) == 0
661 assert len(commit.nodes) == 0
662 root = commit.root
662 root = commit.root
663 assert len(commit.nodes) == 1
663 assert len(commit.nodes) == 1
664 assert len(root.nodes) == 8
664 assert len(root.nodes) == 8
665 # accessing root.nodes updates commit.nodes
665 # accessing root.nodes updates commit.nodes
666 assert len(commit.nodes) == 9
666 assert len(commit.nodes) == 9
667
667
668 docs = root.get_node('docs')
668 docs = root.get_node('docs')
669 # we haven't yet accessed anything new as docs dir was already cached
669 # we haven't yet accessed anything new as docs dir was already cached
670 assert len(commit.nodes) == 9
670 assert len(commit.nodes) == 9
671 assert len(docs.nodes) == 8
671 assert len(docs.nodes) == 8
672 # accessing docs.nodes updates commit.nodes
672 # accessing docs.nodes updates commit.nodes
673 assert len(commit.nodes) == 17
673 assert len(commit.nodes) == 17
674
674
675 assert docs is commit.get_node('docs')
675 assert docs is commit.get_node('docs')
676 assert docs is root.nodes[0]
676 assert docs is root.nodes[0]
677 assert docs is root.dirs[0]
677 assert docs is root.dirs[0]
678 assert docs is commit.get_node('docs')
678 assert docs is commit.get_node('docs')
679
679
680 def test_nodes_with_commit(self):
680 def test_nodes_with_commit(self):
681 commit_id = '2a13f185e4525f9d4b59882791a2d397b90d5ddc'
681 commit_id = '2a13f185e4525f9d4b59882791a2d397b90d5ddc'
682 commit = self.repo.get_commit(commit_id)
682 commit = self.repo.get_commit(commit_id)
683 root = commit.root
683 root = commit.root
684 docs = root.get_node('docs')
684 docs = root.get_node('docs')
685 assert docs is commit.get_node('docs')
685 assert docs is commit.get_node('docs')
686 api = docs.get_node('api')
686 api = docs.get_node('api')
687 assert api is commit.get_node('docs/api')
687 assert api is commit.get_node('docs/api')
688 index = api.get_node('index.rst')
688 index = api.get_node('index.rst')
689 assert index is commit.get_node('docs/api/index.rst')
689 assert index is commit.get_node('docs/api/index.rst')
690 assert index is commit.get_node('docs')\
690 assert index is commit.get_node('docs')\
691 .get_node('api')\
691 .get_node('api')\
692 .get_node('index.rst')
692 .get_node('index.rst')
693
693
694 def test_branch_and_tags(self):
694 def test_branch_and_tags(self):
695 """
695 """
696 rev0 = self.repo.commit_ids[0]
696 rev0 = self.repo.commit_ids[0]
697 commit0 = self.repo.get_commit(rev0)
697 commit0 = self.repo.get_commit(rev0)
698 assert commit0.branch == 'master'
698 assert commit0.branch == 'master'
699 assert commit0.tags == []
699 assert commit0.tags == []
700
700
701 rev10 = self.repo.commit_ids[10]
701 rev10 = self.repo.commit_ids[10]
702 commit10 = self.repo.get_commit(rev10)
702 commit10 = self.repo.get_commit(rev10)
703 assert commit10.branch == 'master'
703 assert commit10.branch == 'master'
704 assert commit10.tags == []
704 assert commit10.tags == []
705
705
706 rev44 = self.repo.commit_ids[44]
706 rev44 = self.repo.commit_ids[44]
707 commit44 = self.repo.get_commit(rev44)
707 commit44 = self.repo.get_commit(rev44)
708 assert commit44.branch == 'web-branch'
708 assert commit44.branch == 'web-branch'
709
709
710 tip = self.repo.get_commit('tip')
710 tip = self.repo.get_commit('tip')
711 assert 'tip' in tip.tags
711 assert 'tip' in tip.tags
712 """
712 """
713 # Those tests would fail - branches are now going
713 # Those tests would fail - branches are now going
714 # to be changed at main API in order to support git backend
714 # to be changed at main API in order to support git backend
715 pass
715 pass
716
716
717 def test_file_size(self):
717 def test_file_size(self):
718 to_check = (
718 to_check = (
719 ('c1214f7e79e02fc37156ff215cd71275450cffc3',
719 ('c1214f7e79e02fc37156ff215cd71275450cffc3',
720 'vcs/backends/BaseRepository.py', 502),
720 'vcs/backends/BaseRepository.py', 502),
721 ('d7e0d30fbcae12c90680eb095a4f5f02505ce501',
721 ('d7e0d30fbcae12c90680eb095a4f5f02505ce501',
722 'vcs/backends/hg.py', 854),
722 'vcs/backends/hg.py', 854),
723 ('6e125e7c890379446e98980d8ed60fba87d0f6d1',
723 ('6e125e7c890379446e98980d8ed60fba87d0f6d1',
724 'setup.py', 1068),
724 'setup.py', 1068),
725
725
726 ('d955cd312c17b02143c04fa1099a352b04368118',
726 ('d955cd312c17b02143c04fa1099a352b04368118',
727 'vcs/backends/base.py', 2921),
727 'vcs/backends/base.py', 2921),
728 ('ca1eb7957a54bce53b12d1a51b13452f95bc7c7e',
728 ('ca1eb7957a54bce53b12d1a51b13452f95bc7c7e',
729 'vcs/backends/base.py', 3936),
729 'vcs/backends/base.py', 3936),
730 ('f50f42baeed5af6518ef4b0cb2f1423f3851a941',
730 ('f50f42baeed5af6518ef4b0cb2f1423f3851a941',
731 'vcs/backends/base.py', 6189),
731 'vcs/backends/base.py', 6189),
732 )
732 )
733 for commit_id, path, size in to_check:
733 for commit_id, path, size in to_check:
734 node = self.repo.get_commit(commit_id).get_node(path)
734 node = self.repo.get_commit(commit_id).get_node(path)
735 assert node.is_file()
735 assert node.is_file()
736 assert node.size == size
736 assert node.size == size
737
737
738 def test_file_history_from_commits(self):
738 def test_file_history_from_commits(self):
739 node = self.repo[10].get_node('setup.py')
739 node = self.repo[10].get_node('setup.py')
740 commit_ids = [commit.raw_id for commit in node.history]
740 commit_ids = [commit.raw_id for commit in node.history]
741 assert ['ff7ca51e58c505fec0dd2491de52c622bb7a806b'] == commit_ids
741 assert ['ff7ca51e58c505fec0dd2491de52c622bb7a806b'] == commit_ids
742
742
743 node = self.repo[20].get_node('setup.py')
743 node = self.repo[20].get_node('setup.py')
744 node_ids = [commit.raw_id for commit in node.history]
744 node_ids = [commit.raw_id for commit in node.history]
745 assert ['191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
745 assert ['191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
746 'ff7ca51e58c505fec0dd2491de52c622bb7a806b'] == node_ids
746 'ff7ca51e58c505fec0dd2491de52c622bb7a806b'] == node_ids
747
747
748 # special case we check history from commit that has this particular
748 # special case we check history from commit that has this particular
749 # file changed this means we check if it's included as well
749 # file changed this means we check if it's included as well
750 node = self.repo.get_commit('191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e') \
750 node = self.repo.get_commit('191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e') \
751 .get_node('setup.py')
751 .get_node('setup.py')
752 node_ids = [commit.raw_id for commit in node.history]
752 node_ids = [commit.raw_id for commit in node.history]
753 assert ['191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
753 assert ['191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
754 'ff7ca51e58c505fec0dd2491de52c622bb7a806b'] == node_ids
754 'ff7ca51e58c505fec0dd2491de52c622bb7a806b'] == node_ids
755
755
756 def test_file_history(self):
756 def test_file_history(self):
757 # we can only check if those commits are present in the history
757 # we can only check if those commits are present in the history
758 # as we cannot update this test every time file is changed
758 # as we cannot update this test every time file is changed
759 files = {
759 files = {
760 'setup.py': [
760 'setup.py': [
761 '54386793436c938cff89326944d4c2702340037d',
761 '54386793436c938cff89326944d4c2702340037d',
762 '51d254f0ecf5df2ce50c0b115741f4cf13985dab',
762 '51d254f0ecf5df2ce50c0b115741f4cf13985dab',
763 '998ed409c795fec2012b1c0ca054d99888b22090',
763 '998ed409c795fec2012b1c0ca054d99888b22090',
764 '5e0eb4c47f56564395f76333f319d26c79e2fb09',
764 '5e0eb4c47f56564395f76333f319d26c79e2fb09',
765 '0115510b70c7229dbc5dc49036b32e7d91d23acd',
765 '0115510b70c7229dbc5dc49036b32e7d91d23acd',
766 '7cb3fd1b6d8c20ba89e2264f1c8baebc8a52d36e',
766 '7cb3fd1b6d8c20ba89e2264f1c8baebc8a52d36e',
767 '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
767 '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
768 '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
768 '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
769 'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
769 'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
770 ],
770 ],
771 'vcs/nodes.py': [
771 'vcs/nodes.py': [
772 '33fa3223355104431402a888fa77a4e9956feb3e',
772 '33fa3223355104431402a888fa77a4e9956feb3e',
773 'fa014c12c26d10ba682fadb78f2a11c24c8118e1',
773 'fa014c12c26d10ba682fadb78f2a11c24c8118e1',
774 'e686b958768ee96af8029fe19c6050b1a8dd3b2b',
774 'e686b958768ee96af8029fe19c6050b1a8dd3b2b',
775 'ab5721ca0a081f26bf43d9051e615af2cc99952f',
775 'ab5721ca0a081f26bf43d9051e615af2cc99952f',
776 'c877b68d18e792a66b7f4c529ea02c8f80801542',
776 'c877b68d18e792a66b7f4c529ea02c8f80801542',
777 '4313566d2e417cb382948f8d9d7c765330356054',
777 '4313566d2e417cb382948f8d9d7c765330356054',
778 '6c2303a793671e807d1cfc70134c9ca0767d98c2',
778 '6c2303a793671e807d1cfc70134c9ca0767d98c2',
779 '54386793436c938cff89326944d4c2702340037d',
779 '54386793436c938cff89326944d4c2702340037d',
780 '54000345d2e78b03a99d561399e8e548de3f3203',
780 '54000345d2e78b03a99d561399e8e548de3f3203',
781 '1c6b3677b37ea064cb4b51714d8f7498f93f4b2b',
781 '1c6b3677b37ea064cb4b51714d8f7498f93f4b2b',
782 '2d03ca750a44440fb5ea8b751176d1f36f8e8f46',
782 '2d03ca750a44440fb5ea8b751176d1f36f8e8f46',
783 '2a08b128c206db48c2f0b8f70df060e6db0ae4f8',
783 '2a08b128c206db48c2f0b8f70df060e6db0ae4f8',
784 '30c26513ff1eb8e5ce0e1c6b477ee5dc50e2f34b',
784 '30c26513ff1eb8e5ce0e1c6b477ee5dc50e2f34b',
785 'ac71e9503c2ca95542839af0ce7b64011b72ea7c',
785 'ac71e9503c2ca95542839af0ce7b64011b72ea7c',
786 '12669288fd13adba2a9b7dd5b870cc23ffab92d2',
786 '12669288fd13adba2a9b7dd5b870cc23ffab92d2',
787 '5a0c84f3e6fe3473e4c8427199d5a6fc71a9b382',
787 '5a0c84f3e6fe3473e4c8427199d5a6fc71a9b382',
788 '12f2f5e2b38e6ff3fbdb5d722efed9aa72ecb0d5',
788 '12f2f5e2b38e6ff3fbdb5d722efed9aa72ecb0d5',
789 '5eab1222a7cd4bfcbabc218ca6d04276d4e27378',
789 '5eab1222a7cd4bfcbabc218ca6d04276d4e27378',
790 'f50f42baeed5af6518ef4b0cb2f1423f3851a941',
790 'f50f42baeed5af6518ef4b0cb2f1423f3851a941',
791 'd7e390a45f6aa96f04f5e7f583ad4f867431aa25',
791 'd7e390a45f6aa96f04f5e7f583ad4f867431aa25',
792 'f15c21f97864b4f071cddfbf2750ec2e23859414',
792 'f15c21f97864b4f071cddfbf2750ec2e23859414',
793 'e906ef056cf539a4e4e5fc8003eaf7cf14dd8ade',
793 'e906ef056cf539a4e4e5fc8003eaf7cf14dd8ade',
794 'ea2b108b48aa8f8c9c4a941f66c1a03315ca1c3b',
794 'ea2b108b48aa8f8c9c4a941f66c1a03315ca1c3b',
795 '84dec09632a4458f79f50ddbbd155506c460b4f9',
795 '84dec09632a4458f79f50ddbbd155506c460b4f9',
796 '0115510b70c7229dbc5dc49036b32e7d91d23acd',
796 '0115510b70c7229dbc5dc49036b32e7d91d23acd',
797 '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
797 '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
798 '3bf1c5868e570e39569d094f922d33ced2fa3b2b',
798 '3bf1c5868e570e39569d094f922d33ced2fa3b2b',
799 'b8d04012574729d2c29886e53b1a43ef16dd00a1',
799 'b8d04012574729d2c29886e53b1a43ef16dd00a1',
800 '6970b057cffe4aab0a792aa634c89f4bebf01441',
800 '6970b057cffe4aab0a792aa634c89f4bebf01441',
801 'dd80b0f6cf5052f17cc738c2951c4f2070200d7f',
801 'dd80b0f6cf5052f17cc738c2951c4f2070200d7f',
802 'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
802 'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
803 ],
803 ],
804 'vcs/backends/git.py': [
804 'vcs/backends/git.py': [
805 '4cf116ad5a457530381135e2f4c453e68a1b0105',
805 '4cf116ad5a457530381135e2f4c453e68a1b0105',
806 '9a751d84d8e9408e736329767387f41b36935153',
806 '9a751d84d8e9408e736329767387f41b36935153',
807 'cb681fb539c3faaedbcdf5ca71ca413425c18f01',
807 'cb681fb539c3faaedbcdf5ca71ca413425c18f01',
808 '428f81bb652bcba8d631bce926e8834ff49bdcc6',
808 '428f81bb652bcba8d631bce926e8834ff49bdcc6',
809 '180ab15aebf26f98f714d8c68715e0f05fa6e1c7',
809 '180ab15aebf26f98f714d8c68715e0f05fa6e1c7',
810 '2b8e07312a2e89e92b90426ab97f349f4bce2a3a',
810 '2b8e07312a2e89e92b90426ab97f349f4bce2a3a',
811 '50e08c506174d8645a4bb517dd122ac946a0f3bf',
811 '50e08c506174d8645a4bb517dd122ac946a0f3bf',
812 '54000345d2e78b03a99d561399e8e548de3f3203',
812 '54000345d2e78b03a99d561399e8e548de3f3203',
813 ],
813 ],
814 }
814 }
815 for path, commit_ids in files.items():
815 for path, commit_ids in files.items():
816 node = self.repo.get_commit(commit_ids[0]).get_node(path)
816 node = self.repo.get_commit(commit_ids[0]).get_node(path)
817 node_ids = [commit.raw_id for commit in node.history]
817 node_ids = [commit.raw_id for commit in node.history]
818 assert set(commit_ids).issubset(set(node_ids)), (
818 assert set(commit_ids).issubset(set(node_ids)), (
819 "We assumed that %s is subset of commit_ids for which file %s "
819 "We assumed that %s is subset of commit_ids for which file %s "
820 "has been changed, and history of that node returned: %s"
820 "has been changed, and history of that node returned: %s"
821 % (commit_ids, path, node_ids))
821 % (commit_ids, path, node_ids))
822
822
823 def test_file_annotate(self):
823 def test_file_annotate(self):
824 files = {
824 files = {
825 'vcs/backends/__init__.py': {
825 'vcs/backends/__init__.py': {
826 'c1214f7e79e02fc37156ff215cd71275450cffc3': {
826 'c1214f7e79e02fc37156ff215cd71275450cffc3': {
827 'lines_no': 1,
827 'lines_no': 1,
828 'commits': [
828 'commits': [
829 'c1214f7e79e02fc37156ff215cd71275450cffc3',
829 'c1214f7e79e02fc37156ff215cd71275450cffc3',
830 ],
830 ],
831 },
831 },
832 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647': {
832 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647': {
833 'lines_no': 21,
833 'lines_no': 21,
834 'commits': [
834 'commits': [
835 '49d3fd156b6f7db46313fac355dca1a0b94a0017',
835 '49d3fd156b6f7db46313fac355dca1a0b94a0017',
836 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
836 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
837 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
837 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
838 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
838 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
839 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
839 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
840 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
840 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
841 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
841 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
842 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
842 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
843 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
843 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
844 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
844 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
845 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
845 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
846 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
846 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
847 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
847 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
848 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
848 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
849 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
849 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
850 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
850 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
851 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
851 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
852 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
852 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
853 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
853 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
854 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
854 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
855 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
855 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
856 ],
856 ],
857 },
857 },
858 'e29b67bd158580fc90fc5e9111240b90e6e86064': {
858 'e29b67bd158580fc90fc5e9111240b90e6e86064': {
859 'lines_no': 32,
859 'lines_no': 32,
860 'commits': [
860 'commits': [
861 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
861 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
862 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
862 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
863 '5eab1222a7cd4bfcbabc218ca6d04276d4e27378',
863 '5eab1222a7cd4bfcbabc218ca6d04276d4e27378',
864 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
864 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
865 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
865 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
866 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
866 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
867 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
867 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
868 '54000345d2e78b03a99d561399e8e548de3f3203',
868 '54000345d2e78b03a99d561399e8e548de3f3203',
869 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
869 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
870 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
870 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
871 '78c3f0c23b7ee935ec276acb8b8212444c33c396',
871 '78c3f0c23b7ee935ec276acb8b8212444c33c396',
872 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
872 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
873 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
873 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
874 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
874 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
875 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
875 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
876 '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
876 '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
877 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
877 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
878 '78c3f0c23b7ee935ec276acb8b8212444c33c396',
878 '78c3f0c23b7ee935ec276acb8b8212444c33c396',
879 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
879 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
880 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
880 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
881 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
881 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
882 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
882 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
883 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
883 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
884 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
884 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
885 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
885 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
886 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
886 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
887 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
887 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
888 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
888 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
889 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
889 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
890 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
890 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
891 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
891 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
892 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
892 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
893 ],
893 ],
894 },
894 },
895 },
895 },
896 }
896 }
897
897
898 for fname, commit_dict in files.items():
898 for fname, commit_dict in files.items():
899 for commit_id, __ in commit_dict.items():
899 for commit_id, __ in commit_dict.items():
900 commit = self.repo.get_commit(commit_id)
900 commit = self.repo.get_commit(commit_id)
901
901
902 l1_1 = [x[1] for x in commit.get_file_annotate(fname)]
902 l1_1 = [x[1] for x in commit.get_file_annotate(fname)]
903 l1_2 = [x[2]().raw_id for x in commit.get_file_annotate(fname)]
903 l1_2 = [x[2]().raw_id for x in commit.get_file_annotate(fname)]
904 assert l1_1 == l1_2
904 assert l1_1 == l1_2
905 l1 = l1_1
905 l1 = l1_1
906 l2 = files[fname][commit_id]['commits']
906 l2 = files[fname][commit_id]['commits']
907 assert l1 == l2, (
907 assert l1 == l2, (
908 "The lists of commit_ids for %s@commit_id %s"
908 "The lists of commit_ids for %s@commit_id %s"
909 "from annotation list should match each other, "
909 "from annotation list should match each other, "
910 "got \n%s \nvs \n%s " % (fname, commit_id, l1, l2))
910 "got \n%s \nvs \n%s " % (fname, commit_id, l1, l2))
911
911
912 def test_files_state(self):
912 def test_files_state(self):
913 """
913 """
914 Tests state of FileNodes.
914 Tests state of FileNodes.
915 """
915 """
916 node = self.repo\
916 node = self.repo\
917 .get_commit('e6ea6d16e2f26250124a1f4b4fe37a912f9d86a0')\
917 .get_commit('e6ea6d16e2f26250124a1f4b4fe37a912f9d86a0')\
918 .get_node('vcs/utils/diffs.py')
918 .get_node('vcs/utils/diffs.py')
919 assert node.state, NodeState.ADDED
919 assert node.state, NodeState.ADDED
920 assert node.added
920 assert node.added
921 assert not node.changed
921 assert not node.changed
922 assert not node.not_changed
922 assert not node.not_changed
923 assert not node.removed
923 assert not node.removed
924
924
925 node = self.repo\
925 node = self.repo\
926 .get_commit('33fa3223355104431402a888fa77a4e9956feb3e')\
926 .get_commit('33fa3223355104431402a888fa77a4e9956feb3e')\
927 .get_node('.hgignore')
927 .get_node('.hgignore')
928 assert node.state, NodeState.CHANGED
928 assert node.state, NodeState.CHANGED
929 assert not node.added
929 assert not node.added
930 assert node.changed
930 assert node.changed
931 assert not node.not_changed
931 assert not node.not_changed
932 assert not node.removed
932 assert not node.removed
933
933
934 node = self.repo\
934 node = self.repo\
935 .get_commit('e29b67bd158580fc90fc5e9111240b90e6e86064')\
935 .get_commit('e29b67bd158580fc90fc5e9111240b90e6e86064')\
936 .get_node('setup.py')
936 .get_node('setup.py')
937 assert node.state, NodeState.NOT_CHANGED
937 assert node.state, NodeState.NOT_CHANGED
938 assert not node.added
938 assert not node.added
939 assert not node.changed
939 assert not node.changed
940 assert node.not_changed
940 assert node.not_changed
941 assert not node.removed
941 assert not node.removed
942
942
943 # If node has REMOVED state then trying to fetch it would raise
943 # If node has REMOVED state then trying to fetch it would raise
944 # CommitError exception
944 # CommitError exception
945 commit = self.repo.get_commit(
945 commit = self.repo.get_commit(
946 'fa6600f6848800641328adbf7811fd2372c02ab2')
946 'fa6600f6848800641328adbf7811fd2372c02ab2')
947 path = 'vcs/backends/BaseRepository.py'
947 path = 'vcs/backends/BaseRepository.py'
948 with pytest.raises(NodeDoesNotExistError):
948 with pytest.raises(NodeDoesNotExistError):
949 commit.get_node(path)
949 commit.get_node(path)
950 # but it would be one of ``removed`` (commit's attribute)
950 # but it would be one of ``removed`` (commit's attribute)
951 assert path in [rf.path for rf in commit.removed]
951 assert path in [rf.path for rf in commit.removed]
952
952
953 commit = self.repo.get_commit(
953 commit = self.repo.get_commit(
954 '54386793436c938cff89326944d4c2702340037d')
954 '54386793436c938cff89326944d4c2702340037d')
955 changed = [
955 changed = [
956 'setup.py', 'tests/test_nodes.py', 'vcs/backends/hg.py',
956 'setup.py', 'tests/test_nodes.py', 'vcs/backends/hg.py',
957 'vcs/nodes.py']
957 'vcs/nodes.py']
958 assert set(changed) == set([f.path for f in commit.changed])
958 assert set(changed) == set([f.path for f in commit.changed])
959
959
960 def test_unicode_branch_refs(self):
960 def test_unicode_branch_refs(self):
961 unicode_branches = {
961 unicode_branches = {
962 'refs/heads/unicode': '6c0ce52b229aa978889e91b38777f800e85f330b',
962 'refs/heads/unicode': '6c0ce52b229aa978889e91b38777f800e85f330b',
963 u'refs/heads/uniΓ§ΓΆβˆ‚e': 'ΓΌrl',
963 u'refs/heads/uniΓ§ΓΆβˆ‚e': 'ΓΌrl',
964 }
964 }
965 with mock.patch(
965 with mock.patch(
966 ("rhodecode.lib.vcs.backends.git.repository"
966 ("rhodecode.lib.vcs.backends.git.repository"
967 ".GitRepository._refs"),
967 ".GitRepository._refs"),
968 unicode_branches):
968 unicode_branches):
969 branches = self.repo.branches
969 branches = self.repo.branches
970
970
971 assert 'unicode' in branches
971 assert 'unicode' in branches
972 assert u'uniΓ§ΓΆβˆ‚e' in branches
972 assert 'uniΓ§ΓΆβˆ‚e' in branches
973
973
974 def test_unicode_tag_refs(self):
974 def test_unicode_tag_refs(self):
975 unicode_tags = {
975 unicode_tags = {
976 'refs/tags/unicode': '6c0ce52b229aa978889e91b38777f800e85f330b',
976 'refs/tags/unicode': '6c0ce52b229aa978889e91b38777f800e85f330b',
977 u'refs/tags/uniΓ§ΓΆβˆ‚e': '6c0ce52b229aa978889e91b38777f800e85f330b',
977 u'refs/tags/uniΓ§ΓΆβˆ‚e': '6c0ce52b229aa978889e91b38777f800e85f330b',
978 }
978 }
979 with mock.patch(
979 with mock.patch(
980 ("rhodecode.lib.vcs.backends.git.repository"
980 ("rhodecode.lib.vcs.backends.git.repository"
981 ".GitRepository._refs"),
981 ".GitRepository._refs"),
982 unicode_tags):
982 unicode_tags):
983 tags = self.repo.tags
983 tags = self.repo.tags
984
984
985 assert 'unicode' in tags
985 assert 'unicode' in tags
986 assert u'uniΓ§ΓΆβˆ‚e' in tags
986 assert 'uniΓ§ΓΆβˆ‚e' in tags
987
987
988 def test_commit_message_is_unicode(self):
988 def test_commit_message_is_unicode(self):
989 for commit in self.repo:
989 for commit in self.repo:
990 assert type(commit.message) == str
990 assert type(commit.message) == str
991
991
992 def test_commit_author_is_unicode(self):
992 def test_commit_author_is_unicode(self):
993 for commit in self.repo:
993 for commit in self.repo:
994 assert type(commit.author) == str
994 assert type(commit.author) == str
995
995
996 def test_repo_files_content_types(self):
996 def test_repo_files_content_types(self):
997 commit = self.repo.get_commit()
997 commit = self.repo.get_commit()
998 for node in commit.get_node('/'):
998 for node in commit.get_node('/'):
999 if node.is_file():
999 if node.is_file():
1000 assert type(node.content) == bytes
1000 assert type(node.content) == bytes
1001 assert type(node.str_content) == str
1001 assert type(node.str_content) == str
1002
1002
1003 def test_wrong_path(self):
1003 def test_wrong_path(self):
1004 # There is 'setup.py' in the root dir but not there:
1004 # There is 'setup.py' in the root dir but not there:
1005 path = 'foo/bar/setup.py'
1005 path = 'foo/bar/setup.py'
1006 tip = self.repo.get_commit()
1006 tip = self.repo.get_commit()
1007 with pytest.raises(VCSError):
1007 with pytest.raises(VCSError):
1008 tip.get_node(path)
1008 tip.get_node(path)
1009
1009
1010 @pytest.mark.parametrize("author_email, commit_id", [
1010 @pytest.mark.parametrize("author_email, commit_id", [
1011 ('marcin@python-blog.com', 'c1214f7e79e02fc37156ff215cd71275450cffc3'),
1011 ('marcin@python-blog.com', 'c1214f7e79e02fc37156ff215cd71275450cffc3'),
1012 ('lukasz.balcerzak@python-center.pl',
1012 ('lukasz.balcerzak@python-center.pl',
1013 'ff7ca51e58c505fec0dd2491de52c622bb7a806b'),
1013 'ff7ca51e58c505fec0dd2491de52c622bb7a806b'),
1014 ('none@none', '8430a588b43b5d6da365400117c89400326e7992'),
1014 ('none@none', '8430a588b43b5d6da365400117c89400326e7992'),
1015 ])
1015 ])
1016 def test_author_email(self, author_email, commit_id):
1016 def test_author_email(self, author_email, commit_id):
1017 commit = self.repo.get_commit(commit_id)
1017 commit = self.repo.get_commit(commit_id)
1018 assert author_email == commit.author_email
1018 assert author_email == commit.author_email
1019
1019
1020 @pytest.mark.parametrize("author, commit_id", [
1020 @pytest.mark.parametrize("author, commit_id", [
1021 ('Marcin Kuzminski', 'c1214f7e79e02fc37156ff215cd71275450cffc3'),
1021 ('Marcin Kuzminski', 'c1214f7e79e02fc37156ff215cd71275450cffc3'),
1022 ('Lukasz Balcerzak', 'ff7ca51e58c505fec0dd2491de52c622bb7a806b'),
1022 ('Lukasz Balcerzak', 'ff7ca51e58c505fec0dd2491de52c622bb7a806b'),
1023 ('marcink', '8430a588b43b5d6da365400117c89400326e7992'),
1023 ('marcink', '8430a588b43b5d6da365400117c89400326e7992'),
1024 ])
1024 ])
1025 def test_author_username(self, author, commit_id):
1025 def test_author_username(self, author, commit_id):
1026 commit = self.repo.get_commit(commit_id)
1026 commit = self.repo.get_commit(commit_id)
1027 assert author == commit.author_name
1027 assert author == commit.author_name
1028
1028
1029
1029
1030 class TestLargeFileRepo(object):
1030 class TestLargeFileRepo(object):
1031
1031
1032 def test_large_file(self, backend_git):
1032 def test_large_file(self, backend_git):
1033 conf = make_db_config()
1033 conf = make_db_config()
1034 repo = backend_git.create_test_repo('largefiles', conf)
1034 repo = backend_git.create_test_repo('largefiles', conf)
1035
1035
1036 tip = repo.scm_instance().get_commit()
1036 tip = repo.scm_instance().get_commit()
1037
1037
1038 # extract stored LF node into the origin cache
1038 # extract stored LF node into the origin cache
1039 lfs_store = os.path.join(repo.repo_path, repo.repo_name, 'lfs_store')
1039 lfs_store = os.path.join(repo.repo_path, repo.repo_name, 'lfs_store')
1040
1040
1041 oid = '7b331c02e313c7599d5a90212e17e6d3cb729bd2e1c9b873c302a63c95a2f9bf'
1041 oid = '7b331c02e313c7599d5a90212e17e6d3cb729bd2e1c9b873c302a63c95a2f9bf'
1042 oid_path = os.path.join(lfs_store, oid)
1042 oid_path = os.path.join(lfs_store, oid)
1043 # Todo: oid path depends on LFSOidStorage.store_suffix. Once it will be changed update below line accordingly
1043 # Todo: oid path depends on LFSOidStorage.store_suffix. Once it will be changed update below line accordingly
1044 oid_destination = os.path.join(
1044 oid_destination = os.path.join(
1045 conf.get('vcs_git_lfs', 'store_location'), f'objects/{oid[:2]}/{oid[2:4]}/{oid}')
1045 conf.get('vcs_git_lfs', 'store_location'), f'objects/{oid[:2]}/{oid[2:4]}/{oid}')
1046
1046
1047 os.makedirs(os.path.dirname(oid_destination))
1047 os.makedirs(os.path.dirname(oid_destination))
1048 shutil.copy(oid_path, oid_destination)
1048 shutil.copy(oid_path, oid_destination)
1049
1049
1050 node = tip.get_node('1MB.zip')
1050 node = tip.get_node('1MB.zip')
1051
1051
1052 lf_node = node.get_largefile_node()
1052 lf_node = node.get_largefile_node()
1053
1053
1054 assert lf_node.is_largefile() is True
1054 assert lf_node.is_largefile() is True
1055 assert lf_node.size == 1024000
1055 assert lf_node.size == 1024000
1056 assert lf_node.name == '1MB.zip'
1056 assert lf_node.name == '1MB.zip'
1057
1057
1058
1058
1059 @pytest.mark.usefixtures("vcs_repository_support")
1059 @pytest.mark.usefixtures("vcs_repository_support")
1060 class TestGitSpecificWithRepo(BackendTestMixin):
1060 class TestGitSpecificWithRepo(BackendTestMixin):
1061
1061
1062 @classmethod
1062 @classmethod
1063 def _get_commits(cls):
1063 def _get_commits(cls):
1064 return [
1064 return [
1065 {
1065 {
1066 'message': 'Initial',
1066 'message': 'Initial',
1067 'author': 'Joe Doe <joe.doe@example.com>',
1067 'author': 'Joe Doe <joe.doe@example.com>',
1068 'date': datetime.datetime(2010, 1, 1, 20),
1068 'date': datetime.datetime(2010, 1, 1, 20),
1069 'added': [
1069 'added': [
1070 FileNode(b'foobar/static/js/admin/base.js', content=b'base'),
1070 FileNode(b'foobar/static/js/admin/base.js', content=b'base'),
1071 FileNode(b'foobar/static/admin', content=b'admin', mode=0o120000), # this is a link
1071 FileNode(b'foobar/static/admin', content=b'admin', mode=0o120000), # this is a link
1072 FileNode(b'foo', content=b'foo'),
1072 FileNode(b'foo', content=b'foo'),
1073 ],
1073 ],
1074 },
1074 },
1075 {
1075 {
1076 'message': 'Second',
1076 'message': 'Second',
1077 'author': 'Joe Doe <joe.doe@example.com>',
1077 'author': 'Joe Doe <joe.doe@example.com>',
1078 'date': datetime.datetime(2010, 1, 1, 22),
1078 'date': datetime.datetime(2010, 1, 1, 22),
1079 'added': [
1079 'added': [
1080 FileNode(b'foo2', content=b'foo2'),
1080 FileNode(b'foo2', content=b'foo2'),
1081 ],
1081 ],
1082 },
1082 },
1083 ]
1083 ]
1084
1084
1085 def test_paths_slow_traversing(self):
1085 def test_paths_slow_traversing(self):
1086 commit = self.repo.get_commit()
1086 commit = self.repo.get_commit()
1087 assert commit.get_node('foobar').get_node('static').get_node('js')\
1087 assert commit.get_node('foobar').get_node('static').get_node('js')\
1088 .get_node('admin').get_node('base.js').content == b'base'
1088 .get_node('admin').get_node('base.js').content == b'base'
1089
1089
1090 def test_paths_fast_traversing(self):
1090 def test_paths_fast_traversing(self):
1091 commit = self.repo.get_commit()
1091 commit = self.repo.get_commit()
1092 assert commit.get_node('foobar/static/js/admin/base.js').content == b'base'
1092 assert commit.get_node('foobar/static/js/admin/base.js').content == b'base'
1093
1093
1094 def test_get_diff_runs_git_command_with_hashes(self):
1094 def test_get_diff_runs_git_command_with_hashes(self):
1095 comm1 = self.repo[0]
1095 comm1 = self.repo[0]
1096 comm2 = self.repo[1]
1096 comm2 = self.repo[1]
1097
1097
1098 with mock.patch.object(self.repo, '_remote', return_value=mock.Mock()) as remote_mock:
1098 with mock.patch.object(self.repo, '_remote', return_value=mock.Mock()) as remote_mock:
1099 remote_mock.diff = mock.MagicMock(side_effect=callable_get_diff)
1099 remote_mock.diff = mock.MagicMock(side_effect=callable_get_diff)
1100 self.repo.get_diff(comm1, comm2)
1100 self.repo.get_diff(comm1, comm2)
1101
1101
1102 remote_mock.diff.assert_called_once_with(
1102 remote_mock.diff.assert_called_once_with(
1103 comm1.raw_id, comm2.raw_id,
1103 comm1.raw_id, comm2.raw_id,
1104 file_filter=None, opt_ignorews=False, context=3)
1104 file_filter=None, opt_ignorews=False, context=3)
1105
1105
1106 def test_get_diff_runs_git_command_with_str_hashes(self):
1106 def test_get_diff_runs_git_command_with_str_hashes(self):
1107 comm2 = self.repo[1]
1107 comm2 = self.repo[1]
1108
1108
1109 with mock.patch.object(self.repo, '_remote', return_value=mock.Mock()) as remote_mock:
1109 with mock.patch.object(self.repo, '_remote', return_value=mock.Mock()) as remote_mock:
1110 remote_mock.diff = mock.MagicMock(side_effect=callable_get_diff)
1110 remote_mock.diff = mock.MagicMock(side_effect=callable_get_diff)
1111 self.repo.get_diff(self.repo.EMPTY_COMMIT, comm2)
1111 self.repo.get_diff(self.repo.EMPTY_COMMIT, comm2)
1112
1112
1113 remote_mock.diff.assert_called_once_with(
1113 remote_mock.diff.assert_called_once_with(
1114 self.repo.EMPTY_COMMIT.raw_id, comm2.raw_id,
1114 self.repo.EMPTY_COMMIT.raw_id, comm2.raw_id,
1115 file_filter=None, opt_ignorews=False, context=3)
1115 file_filter=None, opt_ignorews=False, context=3)
1116
1116
1117 def test_get_diff_runs_git_command_with_path_if_its_given(self):
1117 def test_get_diff_runs_git_command_with_path_if_its_given(self):
1118 comm1 = self.repo[0]
1118 comm1 = self.repo[0]
1119 comm2 = self.repo[1]
1119 comm2 = self.repo[1]
1120
1120
1121 with mock.patch.object(self.repo, '_remote', return_value=mock.Mock()) as remote_mock:
1121 with mock.patch.object(self.repo, '_remote', return_value=mock.Mock()) as remote_mock:
1122 remote_mock.diff = mock.MagicMock(side_effect=callable_get_diff)
1122 remote_mock.diff = mock.MagicMock(side_effect=callable_get_diff)
1123 self.repo.get_diff(comm1, comm2, 'foo')
1123 self.repo.get_diff(comm1, comm2, 'foo')
1124
1124
1125 remote_mock.diff.assert_called_once_with(
1125 remote_mock.diff.assert_called_once_with(
1126 self.repo._lookup_commit(0), comm2.raw_id,
1126 self.repo._lookup_commit(0), comm2.raw_id,
1127 file_filter='foo', opt_ignorews=False, context=3)
1127 file_filter='foo', opt_ignorews=False, context=3)
1128
1128
1129
1129
1130 @pytest.mark.usefixtures("vcs_repository_support")
1130 @pytest.mark.usefixtures("vcs_repository_support")
1131 class TestGitRegression(BackendTestMixin):
1131 class TestGitRegression(BackendTestMixin):
1132
1132
1133 @classmethod
1133 @classmethod
1134 def _get_commits(cls):
1134 def _get_commits(cls):
1135 return [
1135 return [
1136 {
1136 {
1137 'message': 'Initial',
1137 'message': 'Initial',
1138 'author': 'Joe Doe <joe.doe@example.com>',
1138 'author': 'Joe Doe <joe.doe@example.com>',
1139 'date': datetime.datetime(2010, 1, 1, 20),
1139 'date': datetime.datetime(2010, 1, 1, 20),
1140 'added': [
1140 'added': [
1141 FileNode(b'bot/__init__.py', content=b'base'),
1141 FileNode(b'bot/__init__.py', content=b'base'),
1142 FileNode(b'bot/templates/404.html', content=b'base'),
1142 FileNode(b'bot/templates/404.html', content=b'base'),
1143 FileNode(b'bot/templates/500.html', content=b'base'),
1143 FileNode(b'bot/templates/500.html', content=b'base'),
1144 ],
1144 ],
1145 },
1145 },
1146 {
1146 {
1147 'message': 'Second',
1147 'message': 'Second',
1148 'author': 'Joe Doe <joe.doe@example.com>',
1148 'author': 'Joe Doe <joe.doe@example.com>',
1149 'date': datetime.datetime(2010, 1, 1, 22),
1149 'date': datetime.datetime(2010, 1, 1, 22),
1150 'added': [
1150 'added': [
1151 FileNode(b'bot/build/migrations/1.py', content=b'foo2'),
1151 FileNode(b'bot/build/migrations/1.py', content=b'foo2'),
1152 FileNode(b'bot/build/migrations/2.py', content=b'foo2'),
1152 FileNode(b'bot/build/migrations/2.py', content=b'foo2'),
1153 FileNode(b'bot/build/static/templates/f.html', content=b'foo2'),
1153 FileNode(b'bot/build/static/templates/f.html', content=b'foo2'),
1154 FileNode(b'bot/build/static/templates/f1.html', content=b'foo2'),
1154 FileNode(b'bot/build/static/templates/f1.html', content=b'foo2'),
1155 FileNode(b'bot/build/templates/err.html', content=b'foo2'),
1155 FileNode(b'bot/build/templates/err.html', content=b'foo2'),
1156 FileNode(b'bot/build/templates/err2.html', content=b'foo2'),
1156 FileNode(b'bot/build/templates/err2.html', content=b'foo2'),
1157 ],
1157 ],
1158 },
1158 },
1159 ]
1159 ]
1160
1160
1161 @pytest.mark.parametrize("path, expected_paths", [
1161 @pytest.mark.parametrize("path, expected_paths", [
1162 ('bot', [
1162 ('bot', [
1163 'bot/build',
1163 'bot/build',
1164 'bot/templates',
1164 'bot/templates',
1165 'bot/__init__.py']),
1165 'bot/__init__.py']),
1166 ('bot/build', [
1166 ('bot/build', [
1167 'bot/build/migrations',
1167 'bot/build/migrations',
1168 'bot/build/static',
1168 'bot/build/static',
1169 'bot/build/templates']),
1169 'bot/build/templates']),
1170 ('bot/build/static', [
1170 ('bot/build/static', [
1171 'bot/build/static/templates']),
1171 'bot/build/static/templates']),
1172 ('bot/build/static/templates', [
1172 ('bot/build/static/templates', [
1173 'bot/build/static/templates/f.html',
1173 'bot/build/static/templates/f.html',
1174 'bot/build/static/templates/f1.html']),
1174 'bot/build/static/templates/f1.html']),
1175 ('bot/build/templates', [
1175 ('bot/build/templates', [
1176 'bot/build/templates/err.html',
1176 'bot/build/templates/err.html',
1177 'bot/build/templates/err2.html']),
1177 'bot/build/templates/err2.html']),
1178 ('bot/templates/', [
1178 ('bot/templates/', [
1179 'bot/templates/404.html',
1179 'bot/templates/404.html',
1180 'bot/templates/500.html']),
1180 'bot/templates/500.html']),
1181 ])
1181 ])
1182 def test_similar_paths(self, path, expected_paths):
1182 def test_similar_paths(self, path, expected_paths):
1183 commit = self.repo.get_commit()
1183 commit = self.repo.get_commit()
1184 paths = [n.path for n in commit.get_nodes(path)]
1184 paths = [n.path for n in commit.get_nodes(path)]
1185 assert paths == expected_paths
1185 assert paths == expected_paths
1186
1186
1187
1187
1188 class TestDiscoverGitVersion(object):
1188 class TestDiscoverGitVersion(object):
1189
1189
1190 def test_returns_git_version(self, baseapp):
1190 def test_returns_git_version(self, baseapp):
1191 version = discover_git_version()
1191 version = discover_git_version()
1192 assert version
1192 assert version
1193
1193
1194 def test_returns_empty_string_without_vcsserver(self):
1194 def test_returns_empty_string_without_vcsserver(self):
1195 mock_connection = mock.Mock()
1195 mock_connection = mock.Mock()
1196 mock_connection.discover_git_version = mock.Mock(
1196 mock_connection.discover_git_version = mock.Mock(
1197 side_effect=Exception)
1197 side_effect=Exception)
1198 with mock.patch('rhodecode.lib.vcs.connection.Git', mock_connection):
1198 with mock.patch('rhodecode.lib.vcs.connection.Git', mock_connection):
1199 version = discover_git_version()
1199 version = discover_git_version()
1200 assert version == ''
1200 assert version == ''
1201
1201
1202
1202
1203 class TestGetSubmoduleUrl(object):
1203 class TestGetSubmoduleUrl(object):
1204 def test_submodules_file_found(self):
1204 def test_submodules_file_found(self):
1205 commit = GitCommit(repository=mock.Mock(), raw_id='abcdef12', idx=1)
1205 commit = GitCommit(repository=mock.Mock(), raw_id='abcdef12', idx=1)
1206 node = mock.Mock()
1206 node = mock.Mock()
1207
1207
1208 with mock.patch.object(
1208 with mock.patch.object(
1209 commit, 'get_node', return_value=node) as get_node_mock:
1209 commit, 'get_node', return_value=node) as get_node_mock:
1210 node.str_content = (
1210 node.str_content = (
1211 '[submodule "subrepo1"]\n'
1211 '[submodule "subrepo1"]\n'
1212 '\tpath = subrepo1\n'
1212 '\tpath = subrepo1\n'
1213 '\turl = https://code.rhodecode.com/dulwich\n'
1213 '\turl = https://code.rhodecode.com/dulwich\n'
1214 )
1214 )
1215 result = commit._get_submodule_url('subrepo1')
1215 result = commit._get_submodule_url('subrepo1')
1216 get_node_mock.assert_called_once_with('.gitmodules')
1216 get_node_mock.assert_called_once_with('.gitmodules')
1217 assert result == 'https://code.rhodecode.com/dulwich'
1217 assert result == 'https://code.rhodecode.com/dulwich'
1218
1218
1219 def test_complex_submodule_path(self):
1219 def test_complex_submodule_path(self):
1220 commit = GitCommit(repository=mock.Mock(), raw_id='abcdef12', idx=1)
1220 commit = GitCommit(repository=mock.Mock(), raw_id='abcdef12', idx=1)
1221 node = mock.Mock()
1221 node = mock.Mock()
1222
1222
1223 with mock.patch.object(
1223 with mock.patch.object(
1224 commit, 'get_node', return_value=node) as get_node_mock:
1224 commit, 'get_node', return_value=node) as get_node_mock:
1225 node.str_content = (
1225 node.str_content = (
1226 '[submodule "complex/subrepo/path"]\n'
1226 '[submodule "complex/subrepo/path"]\n'
1227 '\tpath = complex/subrepo/path\n'
1227 '\tpath = complex/subrepo/path\n'
1228 '\turl = https://code.rhodecode.com/dulwich\n'
1228 '\turl = https://code.rhodecode.com/dulwich\n'
1229 )
1229 )
1230 result = commit._get_submodule_url('complex/subrepo/path')
1230 result = commit._get_submodule_url('complex/subrepo/path')
1231 get_node_mock.assert_called_once_with('.gitmodules')
1231 get_node_mock.assert_called_once_with('.gitmodules')
1232 assert result == 'https://code.rhodecode.com/dulwich'
1232 assert result == 'https://code.rhodecode.com/dulwich'
1233
1233
1234 def test_submodules_file_not_found(self):
1234 def test_submodules_file_not_found(self):
1235 commit = GitCommit(repository=mock.Mock(), raw_id='abcdef12', idx=1)
1235 commit = GitCommit(repository=mock.Mock(), raw_id='abcdef12', idx=1)
1236 with mock.patch.object(
1236 with mock.patch.object(
1237 commit, 'get_node', side_effect=NodeDoesNotExistError):
1237 commit, 'get_node', side_effect=NodeDoesNotExistError):
1238 result = commit._get_submodule_url('complex/subrepo/path')
1238 result = commit._get_submodule_url('complex/subrepo/path')
1239 assert result is None
1239 assert result is None
1240
1240
1241 def test_path_not_found(self):
1241 def test_path_not_found(self):
1242 commit = GitCommit(repository=mock.Mock(), raw_id='abcdef12', idx=1)
1242 commit = GitCommit(repository=mock.Mock(), raw_id='abcdef12', idx=1)
1243 node = mock.Mock()
1243 node = mock.Mock()
1244
1244
1245 with mock.patch.object(
1245 with mock.patch.object(
1246 commit, 'get_node', return_value=node) as get_node_mock:
1246 commit, 'get_node', return_value=node) as get_node_mock:
1247 node.str_content = (
1247 node.str_content = (
1248 '[submodule "subrepo1"]\n'
1248 '[submodule "subrepo1"]\n'
1249 '\tpath = subrepo1\n'
1249 '\tpath = subrepo1\n'
1250 '\turl = https://code.rhodecode.com/dulwich\n'
1250 '\turl = https://code.rhodecode.com/dulwich\n'
1251 )
1251 )
1252 result = commit._get_submodule_url('subrepo2')
1252 result = commit._get_submodule_url('subrepo2')
1253 get_node_mock.assert_called_once_with('.gitmodules')
1253 get_node_mock.assert_called_once_with('.gitmodules')
1254 assert result is None
1254 assert result is None
1255
1255
1256 def test_returns_cached_values(self):
1256 def test_returns_cached_values(self):
1257 commit = GitCommit(repository=mock.Mock(), raw_id='abcdef12', idx=1)
1257 commit = GitCommit(repository=mock.Mock(), raw_id='abcdef12', idx=1)
1258 node = mock.Mock()
1258 node = mock.Mock()
1259
1259
1260 with mock.patch.object(
1260 with mock.patch.object(
1261 commit, 'get_node', return_value=node) as get_node_mock:
1261 commit, 'get_node', return_value=node) as get_node_mock:
1262 node.str_content = (
1262 node.str_content = (
1263 '[submodule "subrepo1"]\n'
1263 '[submodule "subrepo1"]\n'
1264 '\tpath = subrepo1\n'
1264 '\tpath = subrepo1\n'
1265 '\turl = https://code.rhodecode.com/dulwich\n'
1265 '\turl = https://code.rhodecode.com/dulwich\n'
1266 )
1266 )
1267 for _ in range(3):
1267 for _ in range(3):
1268 commit._get_submodule_url('subrepo1')
1268 commit._get_submodule_url('subrepo1')
1269 get_node_mock.assert_called_once_with('.gitmodules')
1269 get_node_mock.assert_called_once_with('.gitmodules')
1270
1270
1271 def test_get_node_returns_a_link(self):
1271 def test_get_node_returns_a_link(self):
1272 repository = mock.Mock()
1272 repository = mock.Mock()
1273 repository.alias = 'git'
1273 repository.alias = 'git'
1274 commit = GitCommit(repository=repository, raw_id='abcdef12', idx=1)
1274 commit = GitCommit(repository=repository, raw_id='abcdef12', idx=1)
1275 submodule_url = 'https://code.rhodecode.com/dulwich'
1275 submodule_url = 'https://code.rhodecode.com/dulwich'
1276 get_id_patch = mock.patch.object(
1276 get_id_patch = mock.patch.object(
1277 commit, '_get_tree_id_for_path', return_value=(1, 'link'))
1277 commit, '_get_tree_id_for_path', return_value=(1, 'link'))
1278 get_submodule_patch = mock.patch.object(
1278 get_submodule_patch = mock.patch.object(
1279 commit, '_get_submodule_url', return_value=submodule_url)
1279 commit, '_get_submodule_url', return_value=submodule_url)
1280
1280
1281 with get_id_patch, get_submodule_patch as submodule_mock:
1281 with get_id_patch, get_submodule_patch as submodule_mock:
1282 node = commit.get_node('/abcde')
1282 node = commit.get_node('/abcde')
1283
1283
1284 submodule_mock.assert_called_once_with('/abcde')
1284 submodule_mock.assert_called_once_with('/abcde')
1285 assert type(node) == SubModuleNode
1285 assert type(node) == SubModuleNode
1286 assert node.url == submodule_url
1286 assert node.url == submodule_url
1287
1287
1288 def test_get_nodes_returns_links(self):
1288 def test_get_nodes_returns_links(self):
1289 repository = mock.MagicMock()
1289 repository = mock.MagicMock()
1290 repository.alias = 'git'
1290 repository.alias = 'git'
1291 repository._remote.tree_items.return_value = [
1291 repository._remote.tree_items.return_value = [
1292 ('subrepo', 'stat', 1, 'link')
1292 ('subrepo', 'stat', 1, 'link')
1293 ]
1293 ]
1294 commit = GitCommit(repository=repository, raw_id='abcdef12', idx=1)
1294 commit = GitCommit(repository=repository, raw_id='abcdef12', idx=1)
1295 submodule_url = 'https://code.rhodecode.com/dulwich'
1295 submodule_url = 'https://code.rhodecode.com/dulwich'
1296 get_id_patch = mock.patch.object(
1296 get_id_patch = mock.patch.object(
1297 commit, '_get_tree_id_for_path', return_value=(1, 'tree'))
1297 commit, '_get_tree_id_for_path', return_value=(1, 'tree'))
1298 get_submodule_patch = mock.patch.object(
1298 get_submodule_patch = mock.patch.object(
1299 commit, '_get_submodule_url', return_value=submodule_url)
1299 commit, '_get_submodule_url', return_value=submodule_url)
1300
1300
1301 with get_id_patch, get_submodule_patch as submodule_mock:
1301 with get_id_patch, get_submodule_patch as submodule_mock:
1302 nodes = commit.get_nodes('/abcde')
1302 nodes = commit.get_nodes('/abcde')
1303
1303
1304 submodule_mock.assert_called_once_with('/abcde/subrepo')
1304 submodule_mock.assert_called_once_with('/abcde/subrepo')
1305 assert len(nodes) == 1
1305 assert len(nodes) == 1
1306 assert type(nodes[0]) == SubModuleNode
1306 assert type(nodes[0]) == SubModuleNode
1307 assert nodes[0].url == submodule_url
1307 assert nodes[0].url == submodule_url
@@ -1,195 +1,195 b''
1
1
2 # Copyright (C) 2010-2023 RhodeCode GmbH
2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software: you can redistribute it and/or modify
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License, version 3
5 # it under the terms of the GNU Affero General Public License, version 3
6 # (only), as published by the Free Software Foundation.
6 # (only), as published by the Free Software Foundation.
7 #
7 #
8 # This program is distributed in the hope that it will be useful,
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
11 # GNU General Public License for more details.
12 #
12 #
13 # You should have received a copy of the GNU Affero General Public License
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 #
15 #
16 # This program is dual-licensed. If you wish to learn more about the
16 # This program is dual-licensed. If you wish to learn more about the
17 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19
19
20 import os
20 import os
21
21
22 import mock
22 import mock
23 import pytest
23 import pytest
24
24
25 from rhodecode.lib.str_utils import safe_bytes
25 from rhodecode.lib.str_utils import safe_bytes
26 from rhodecode.tests import SVN_REPO, TEST_DIR, TESTS_TMP_PATH
26 from rhodecode.tests import SVN_REPO, TEST_DIR, TESTS_TMP_PATH
27 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
27 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
28 from rhodecode.lib.vcs.conf import settings
28 from rhodecode.lib.vcs.conf import settings
29 from rhodecode.lib.vcs.exceptions import VCSError
29 from rhodecode.lib.vcs.exceptions import VCSError
30
30
31
31
32 pytestmark = [
32 pytestmark = [
33 pytest.mark.backends("svn"),
33 pytest.mark.backends("svn"),
34 pytest.mark.usefixtures("baseapp"),
34 pytest.mark.usefixtures("baseapp"),
35 ]
35 ]
36
36
37
37
38 @pytest.fixture()
38 @pytest.fixture()
39 def repo(baseapp):
39 def repo(baseapp):
40 repo = SubversionRepository(os.path.join(TESTS_TMP_PATH, SVN_REPO))
40 repo = SubversionRepository(os.path.join(TESTS_TMP_PATH, SVN_REPO))
41 return repo
41 return repo
42
42
43
43
44 @pytest.fixture()
44 @pytest.fixture()
45 def head(repo):
45 def head(repo):
46 return repo.get_commit()
46 return repo.get_commit()
47
47
48
48
49 def test_init_fails_if_path_does_not_exist():
49 def test_init_fails_if_path_does_not_exist():
50 path = os.path.join(TEST_DIR, 'i-do-not-exist')
50 path = os.path.join(TEST_DIR, 'i-do-not-exist')
51 with pytest.raises(VCSError):
51 with pytest.raises(VCSError):
52 SubversionRepository(path)
52 SubversionRepository(path)
53
53
54
54
55 def test_init_fails_if_path_is_not_a_valid_repository(tmpdir):
55 def test_init_fails_if_path_is_not_a_valid_repository(tmpdir):
56 path = str(tmpdir.mkdir('unicode Γ€'))
56 path = str(tmpdir.mkdir('unicode Γ€'))
57 with pytest.raises(VCSError):
57 with pytest.raises(VCSError):
58 SubversionRepository(path)
58 SubversionRepository(path)
59
59
60
60
61 def test_repo_clone(vcsbackend, reposerver):
61 def test_repo_clone(vcsbackend, reposerver):
62 source = vcsbackend.create_repo(number_of_commits=3)
62 source = vcsbackend.create_repo(number_of_commits=3)
63 reposerver.serve(source)
63 reposerver.serve(source)
64 repo = SubversionRepository(
64 repo = SubversionRepository(
65 vcsbackend.new_repo_path(),
65 vcsbackend.new_repo_path(),
66 create=True,
66 create=True,
67 src_url=reposerver.url)
67 src_url=reposerver.url)
68
68
69 assert source.commit_ids == repo.commit_ids
69 assert source.commit_ids == repo.commit_ids
70 assert source[0].message == repo[0].message
70 assert source[0].message == repo[0].message
71
71
72
72
73 def test_latest_commit(head):
73 def test_latest_commit(head):
74 assert head.raw_id == '393'
74 assert head.raw_id == '393'
75
75
76
76
77 def test_commit_description(head):
77 def test_commit_description(head):
78 assert head.message == """Added a symlink"""
78 assert head.message == """Added a symlink"""
79
79
80
80
81 def test_commit_author(head):
81 def test_commit_author(head):
82 assert head.author == 'marcin'
82 assert head.author == 'marcin'
83
83
84
84
85 @pytest.mark.parametrize("filename, content, mime_type", [
85 @pytest.mark.parametrize("filename, content, mime_type", [
86 (b'test.txt', b'Text content\n', None),
86 (b'test.txt', b'Text content\n', None),
87 (b'test.bin', b'\0 binary \0', 'application/octet-stream'),
87 (b'test.bin', b'\0 binary \0', 'application/octet-stream'),
88 ], ids=['text', 'binary'])
88 ], ids=['text', 'binary'])
89 def test_sets_mime_type_correctly(vcsbackend, filename, content, mime_type):
89 def test_sets_mime_type_correctly(vcsbackend, filename, content, mime_type):
90 repo = vcsbackend.create_repo()
90 repo = vcsbackend.create_repo()
91 vcsbackend.ensure_file(filename, content)
91 vcsbackend.ensure_file(filename, content)
92 file_properties = repo._remote.node_properties(filename, 1)
92 file_properties = repo._remote.node_properties(filename, 1)
93 assert file_properties.get('svn:mime-type') == mime_type
93 assert file_properties.get('svn:mime-type') == mime_type
94
94
95
95
96 def test_slice_access(repo):
96 def test_slice_access(repo):
97 page_size = 5
97 page_size = 5
98 page = 0
98 page = 0
99 start = page * page_size
99 start = page * page_size
100 end = start + page_size - 1
100 end = start + page_size - 1
101
101
102 commits = list(repo[start:end])
102 commits = list(repo[start:end])
103 assert [commit.raw_id for commit in commits] == ['1', '2', '3', '4']
103 assert [commit.raw_id for commit in commits] == ['1', '2', '3', '4']
104
104
105
105
106 def test_walk_changelog_page(repo):
106 def test_walk_changelog_page(repo):
107 page_size = 5
107 page_size = 5
108 page = 0
108 page = 0
109 start = page * page_size
109 start = page * page_size
110 end = start + page_size - 1
110 end = start + page_size - 1
111
111
112 commits = list(repo[start:end])
112 commits = list(repo[start:end])
113 changelog = [
113 changelog = [
114 'r%s, %s, %s' % (c.raw_id, c.author, c.message) for c in commits]
114 'r%s, %s, %s' % (c.raw_id, c.author, c.message) for c in commits]
115
115
116 expexted_messages = [
116 expexted_messages = [
117 'r1, marcin, initial import',
117 'r1, marcin, initial import',
118 'r2, marcin, hg ignore',
118 'r2, marcin, hg ignore',
119 'r3, marcin, Pip standards refactor',
119 'r3, marcin, Pip standards refactor',
120 'r4, marcin, Base repository few new functions added']
120 'r4, marcin, Base repository few new functions added']
121 assert changelog == expexted_messages
121 assert changelog == expexted_messages
122
122
123
123
124 def test_read_full_file_tree(head):
124 def test_read_full_file_tree(head):
125 for topnode, dirs, files in head.walk():
125 for topnode, dirs, files in head.walk():
126 for f in files:
126 for f in files:
127 len(f.content)
127 len(f.content)
128
128
129
129
130 def test_topnode_files_attribute(head):
130 def test_topnode_files_attribute(head):
131 topnode = head.get_node('')
131 topnode = head.get_node('')
132 topnode.files
132 topnode.files
133
133
134
134
135
135
136
136
137 @pytest.mark.parametrize("filename, content, branch, mime_type", [
137 @pytest.mark.parametrize("filename, content, branch, mime_type", [
138 ('branches/plain/test.txt', b'Text content\n', 'plain', None),
138 ('branches/plain/test.txt', b'Text content\n', 'plain', None),
139 ('branches/uniΓ§ΓΆβˆ‚e/test.bin', b'\0 binary \0', 'uniΓ§ΓΆβˆ‚e', 'application/octet-stream'),
139 ('branches/uniΓ§ΓΆβˆ‚e/test.bin', b'\0 binary \0', 'uniΓ§ΓΆβˆ‚e', 'application/octet-stream'),
140 ], ids=['text', 'binary'])
140 ], ids=['text', 'binary'])
141 def test_unicode_refs(vcsbackend, filename, content, branch, mime_type):
141 def test_unicode_refs(vcsbackend, filename, content, branch, mime_type):
142 filename = safe_bytes(filename)
142 filename = safe_bytes(filename)
143 repo = vcsbackend.create_repo()
143 repo = vcsbackend.create_repo()
144 vcsbackend.ensure_file(filename, content)
144 vcsbackend.ensure_file(filename, content)
145 with mock.patch(("rhodecode.lib.vcs.backends.svn.repository"
145 with mock.patch(("rhodecode.lib.vcs.backends.svn.repository"
146 ".SubversionRepository._patterns_from_section"),
146 ".SubversionRepository._patterns_from_section"),
147 return_value=['branches/*']):
147 return_value=['branches/*']):
148 assert u'branches/{0}'.format(branch) in repo.branches
148 assert f'branches/{branch}' in repo.branches
149
149
150
150
151 def test_compatible_version(monkeypatch, vcsbackend):
151 def test_compatible_version(monkeypatch, vcsbackend):
152 monkeypatch.setattr(settings, 'SVN_COMPATIBLE_VERSION', 'pre-1.8-compatible')
152 monkeypatch.setattr(settings, 'SVN_COMPATIBLE_VERSION', 'pre-1.8-compatible')
153 path = vcsbackend.new_repo_path()
153 path = vcsbackend.new_repo_path()
154 SubversionRepository(path, create=True)
154 SubversionRepository(path, create=True)
155 with open('{}/db/format'.format(path)) as f:
155 with open(f'{path}/db/format') as f:
156 first_line = f.readline().strip()
156 first_line = f.readline().strip()
157 assert first_line == '4'
157 assert first_line == '4'
158
158
159
159
160 def test_invalid_compatible_version(monkeypatch, vcsbackend):
160 def test_invalid_compatible_version(monkeypatch, vcsbackend):
161 monkeypatch.setattr(settings, 'SVN_COMPATIBLE_VERSION', 'i-am-an-invalid-setting')
161 monkeypatch.setattr(settings, 'SVN_COMPATIBLE_VERSION', 'i-am-an-invalid-setting')
162 path = vcsbackend.new_repo_path()
162 path = vcsbackend.new_repo_path()
163 with pytest.raises(Exception):
163 with pytest.raises(Exception):
164 SubversionRepository(path, create=True)
164 SubversionRepository(path, create=True)
165
165
166
166
167 class TestSVNCommit(object):
167 class TestSVNCommit(object):
168
168
169 @pytest.fixture(autouse=True)
169 @pytest.fixture(autouse=True)
170 def prepare(self, repo):
170 def prepare(self, repo):
171 self.repo = repo
171 self.repo = repo
172
172
173 def test_file_history_from_commits(self):
173 def test_file_history_from_commits(self):
174 node = self.repo[10].get_node('setup.py')
174 node = self.repo[10].get_node('setup.py')
175 commit_ids = [commit.raw_id for commit in node.history]
175 commit_ids = [commit.raw_id for commit in node.history]
176 assert ['8'] == commit_ids
176 assert ['8'] == commit_ids
177
177
178 node = self.repo[20].get_node('setup.py')
178 node = self.repo[20].get_node('setup.py')
179 node_ids = [commit.raw_id for commit in node.history]
179 node_ids = [commit.raw_id for commit in node.history]
180 assert ['18',
180 assert ['18',
181 '8'] == node_ids
181 '8'] == node_ids
182
182
183 # special case we check history from commit that has this particular
183 # special case we check history from commit that has this particular
184 # file changed this means we check if it's included as well
184 # file changed this means we check if it's included as well
185 node = self.repo.get_commit('18').get_node('setup.py')
185 node = self.repo.get_commit('18').get_node('setup.py')
186 node_ids = [commit.raw_id for commit in node.history]
186 node_ids = [commit.raw_id for commit in node.history]
187 assert ['18',
187 assert ['18',
188 '8'] == node_ids
188 '8'] == node_ids
189
189
190 def test_repo_files_content_type(self):
190 def test_repo_files_content_type(self):
191 test_commit = self.repo.get_commit(commit_idx=100)
191 test_commit = self.repo.get_commit(commit_idx=100)
192 for node in test_commit.get_node('/'):
192 for node in test_commit.get_node('/'):
193 if node.is_file():
193 if node.is_file():
194 assert type(node.content) == bytes
194 assert type(node.content) == bytes
195 assert type(node.str_content) == str
195 assert type(node.str_content) == str
General Comments 0
You need to be logged in to leave comments. Login now