##// END OF EJS Templates
chore(cleanups): cleanup unicode usage...
super-admin -
r5453:c08e0f36 default
parent child Browse files
Show More
@@ -1,348 +1,348 b''
1 1
2 2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software: you can redistribute it and/or modify
5 5 # it under the terms of the GNU Affero General Public License, version 3
6 6 # (only), as published by the Free Software Foundation.
7 7 #
8 8 # This program is distributed in the hope that it will be useful,
9 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 11 # GNU General Public License for more details.
12 12 #
13 13 # You should have received a copy of the GNU Affero General Public License
14 14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 15 #
16 16 # This program is dual-licensed. If you wish to learn more about the
17 17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 19
20 20 import mock
21 21 import pytest
22 22
23 23 from rhodecode.lib.vcs import settings
24 24 from rhodecode.model.meta import Session
25 25 from rhodecode.model.repo import RepoModel
26 26 from rhodecode.model.user import UserModel
27 27 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
28 28 from rhodecode.api.tests.utils import (
29 29 build_data, api_call, assert_ok, assert_error, crash)
30 30 from rhodecode.tests.fixture import Fixture
31 31 from rhodecode.lib.ext_json import json
32 32 from rhodecode.lib.str_utils import safe_str
33 33
34 34
35 35 fixture = Fixture()
36 36
37 37
38 38 @pytest.mark.usefixtures("testuser_api", "app")
39 39 class TestCreateRepo(object):
40 40
41 41 @pytest.mark.parametrize('given, expected_name, expected_exc', [
42 42 ('api repo-1', 'api-repo-1', False),
43 43 ('api-repo 1-Δ…Δ‡', 'api-repo-1-Δ…Δ‡', False),
44 (u'unicode-Δ…Δ‡', u'unicode-Δ…Δ‡', False),
44 ('unicode-Δ…Δ‡', u'unicode-Δ…Δ‡', False),
45 45 ('some repo v1.2', 'some-repo-v1.2', False),
46 46 ('v2.0', 'v2.0', False),
47 47 ])
48 48 def test_api_create_repo(self, backend, given, expected_name, expected_exc):
49 49
50 50 id_, params = build_data(
51 51 self.apikey,
52 52 'create_repo',
53 53 repo_name=given,
54 54 owner=TEST_USER_ADMIN_LOGIN,
55 55 repo_type=backend.alias,
56 56 )
57 57 response = api_call(self.app, params)
58 58
59 59 ret = {
60 60 'msg': 'Created new repository `%s`' % (expected_name,),
61 61 'success': True,
62 62 'task': None,
63 63 }
64 64 expected = ret
65 65 assert_ok(id_, expected, given=response.body)
66 66
67 67 repo = RepoModel().get_by_repo_name(safe_str(expected_name))
68 68 assert repo is not None
69 69
70 70 id_, params = build_data(self.apikey, 'get_repo', repoid=expected_name)
71 71 response = api_call(self.app, params)
72 72 body = json.loads(response.body)
73 73
74 74 assert body['result']['enable_downloads'] is False
75 75 assert body['result']['enable_locking'] is False
76 76 assert body['result']['enable_statistics'] is False
77 77
78 78 fixture.destroy_repo(safe_str(expected_name))
79 79
80 80 def test_api_create_restricted_repo_type(self, backend):
81 81 repo_name = 'api-repo-type-{0}'.format(backend.alias)
82 82 id_, params = build_data(
83 83 self.apikey,
84 84 'create_repo',
85 85 repo_name=repo_name,
86 86 owner=TEST_USER_ADMIN_LOGIN,
87 87 repo_type=backend.alias,
88 88 )
89 89 git_backend = settings.BACKENDS['git']
90 90 with mock.patch(
91 91 'rhodecode.lib.vcs.settings.BACKENDS', {'git': git_backend}):
92 92 response = api_call(self.app, params)
93 93
94 94 repo = RepoModel().get_by_repo_name(repo_name)
95 95
96 96 if backend.alias == 'git':
97 97 assert repo is not None
98 98 expected = {
99 99 'msg': 'Created new repository `{0}`'.format(repo_name,),
100 100 'success': True,
101 101 'task': None,
102 102 }
103 103 assert_ok(id_, expected, given=response.body)
104 104 else:
105 105 assert repo is None
106 106
107 107 fixture.destroy_repo(repo_name)
108 108
109 109 def test_api_create_repo_with_booleans(self, backend):
110 110 repo_name = 'api-repo-2'
111 111 id_, params = build_data(
112 112 self.apikey,
113 113 'create_repo',
114 114 repo_name=repo_name,
115 115 owner=TEST_USER_ADMIN_LOGIN,
116 116 repo_type=backend.alias,
117 117 enable_statistics=True,
118 118 enable_locking=True,
119 119 enable_downloads=True
120 120 )
121 121 response = api_call(self.app, params)
122 122
123 123 repo = RepoModel().get_by_repo_name(repo_name)
124 124
125 125 assert repo is not None
126 126 ret = {
127 127 'msg': 'Created new repository `%s`' % (repo_name,),
128 128 'success': True,
129 129 'task': None,
130 130 }
131 131 expected = ret
132 132 assert_ok(id_, expected, given=response.body)
133 133
134 134 id_, params = build_data(self.apikey, 'get_repo', repoid=repo_name)
135 135 response = api_call(self.app, params)
136 136 body = json.loads(response.body)
137 137
138 138 assert body['result']['enable_downloads'] is True
139 139 assert body['result']['enable_locking'] is True
140 140 assert body['result']['enable_statistics'] is True
141 141
142 142 fixture.destroy_repo(repo_name)
143 143
144 144 def test_api_create_repo_in_group(self, backend):
145 145 repo_group_name = 'my_gr'
146 146 # create the parent
147 147 fixture.create_repo_group(repo_group_name)
148 148
149 149 repo_name = '%s/api-repo-gr' % (repo_group_name,)
150 150 id_, params = build_data(
151 151 self.apikey, 'create_repo',
152 152 repo_name=repo_name,
153 153 owner=TEST_USER_ADMIN_LOGIN,
154 154 repo_type=backend.alias,)
155 155 response = api_call(self.app, params)
156 156 repo = RepoModel().get_by_repo_name(repo_name)
157 157 assert repo is not None
158 158 assert repo.group is not None
159 159
160 160 ret = {
161 161 'msg': 'Created new repository `%s`' % (repo_name,),
162 162 'success': True,
163 163 'task': None,
164 164 }
165 165 expected = ret
166 166 assert_ok(id_, expected, given=response.body)
167 167 fixture.destroy_repo(repo_name)
168 168 fixture.destroy_repo_group(repo_group_name)
169 169
170 170 def test_create_repo_in_group_that_doesnt_exist(self, backend, user_util):
171 171 repo_group_name = 'fake_group'
172 172
173 173 repo_name = '%s/api-repo-gr' % (repo_group_name,)
174 174 id_, params = build_data(
175 175 self.apikey, 'create_repo',
176 176 repo_name=repo_name,
177 177 owner=TEST_USER_ADMIN_LOGIN,
178 178 repo_type=backend.alias,)
179 179 response = api_call(self.app, params)
180 180
181 181 expected = {'repo_group': 'Repository group `{}` does not exist'.format(
182 182 repo_group_name)}
183 183 assert_error(id_, expected, given=response.body)
184 184
185 185 def test_api_create_repo_unknown_owner(self, backend):
186 186 repo_name = 'api-repo-2'
187 187 owner = 'i-dont-exist'
188 188 id_, params = build_data(
189 189 self.apikey, 'create_repo',
190 190 repo_name=repo_name,
191 191 owner=owner,
192 192 repo_type=backend.alias)
193 193 response = api_call(self.app, params)
194 194 expected = 'user `%s` does not exist' % (owner,)
195 195 assert_error(id_, expected, given=response.body)
196 196
197 197 def test_api_create_repo_dont_specify_owner(self, backend):
198 198 repo_name = 'api-repo-3'
199 199 id_, params = build_data(
200 200 self.apikey, 'create_repo',
201 201 repo_name=repo_name,
202 202 repo_type=backend.alias)
203 203 response = api_call(self.app, params)
204 204
205 205 repo = RepoModel().get_by_repo_name(repo_name)
206 206 assert repo is not None
207 207 ret = {
208 208 'msg': 'Created new repository `%s`' % (repo_name,),
209 209 'success': True,
210 210 'task': None,
211 211 }
212 212 expected = ret
213 213 assert_ok(id_, expected, given=response.body)
214 214 fixture.destroy_repo(repo_name)
215 215
216 216 def test_api_create_repo_by_non_admin(self, backend):
217 217 repo_name = 'api-repo-4'
218 218 id_, params = build_data(
219 219 self.apikey_regular, 'create_repo',
220 220 repo_name=repo_name,
221 221 repo_type=backend.alias)
222 222 response = api_call(self.app, params)
223 223
224 224 repo = RepoModel().get_by_repo_name(repo_name)
225 225 assert repo is not None
226 226 ret = {
227 227 'msg': 'Created new repository `%s`' % (repo_name,),
228 228 'success': True,
229 229 'task': None,
230 230 }
231 231 expected = ret
232 232 assert_ok(id_, expected, given=response.body)
233 233 fixture.destroy_repo(repo_name)
234 234
235 235 def test_api_create_repo_by_non_admin_specify_owner(self, backend):
236 236 repo_name = 'api-repo-5'
237 237 owner = 'i-dont-exist'
238 238 id_, params = build_data(
239 239 self.apikey_regular, 'create_repo',
240 240 repo_name=repo_name,
241 241 repo_type=backend.alias,
242 242 owner=owner)
243 243 response = api_call(self.app, params)
244 244
245 245 expected = 'Only RhodeCode super-admin can specify `owner` param'
246 246 assert_error(id_, expected, given=response.body)
247 247 fixture.destroy_repo(repo_name)
248 248
249 249 def test_api_create_repo_by_non_admin_no_parent_group_perms(self, backend):
250 250 repo_group_name = 'no-access'
251 251 fixture.create_repo_group(repo_group_name)
252 252 repo_name = 'no-access/api-repo'
253 253
254 254 id_, params = build_data(
255 255 self.apikey_regular, 'create_repo',
256 256 repo_name=repo_name,
257 257 repo_type=backend.alias)
258 258 response = api_call(self.app, params)
259 259
260 260 expected = {'repo_group': 'Repository group `{}` does not exist'.format(
261 261 repo_group_name)}
262 262 assert_error(id_, expected, given=response.body)
263 263 fixture.destroy_repo_group(repo_group_name)
264 264 fixture.destroy_repo(repo_name)
265 265
266 266 def test_api_create_repo_non_admin_no_permission_to_create_to_root_level(
267 267 self, backend, user_util):
268 268
269 269 regular_user = user_util.create_user()
270 270 regular_user_api_key = regular_user.api_key
271 271
272 272 usr = UserModel().get_by_username(regular_user.username)
273 273 usr.inherit_default_permissions = False
274 274 Session().add(usr)
275 275
276 276 repo_name = backend.new_repo_name()
277 277 id_, params = build_data(
278 278 regular_user_api_key, 'create_repo',
279 279 repo_name=repo_name,
280 280 repo_type=backend.alias)
281 281 response = api_call(self.app, params)
282 282 expected = {
283 283 "repo_name": "You do not have the permission to "
284 284 "store repositories in the root location."}
285 285 assert_error(id_, expected, given=response.body)
286 286
287 287 def test_api_create_repo_exists(self, backend):
288 288 repo_name = backend.repo_name
289 289 id_, params = build_data(
290 290 self.apikey, 'create_repo',
291 291 repo_name=repo_name,
292 292 owner=TEST_USER_ADMIN_LOGIN,
293 293 repo_type=backend.alias,)
294 294 response = api_call(self.app, params)
295 295 expected = {
296 296 'unique_repo_name': 'Repository with name `{}` already exists'.format(
297 297 repo_name)}
298 298 assert_error(id_, expected, given=response.body)
299 299
300 300 @mock.patch.object(RepoModel, 'create', crash)
301 301 def test_api_create_repo_exception_occurred(self, backend):
302 302 repo_name = 'api-repo-6'
303 303 id_, params = build_data(
304 304 self.apikey, 'create_repo',
305 305 repo_name=repo_name,
306 306 owner=TEST_USER_ADMIN_LOGIN,
307 307 repo_type=backend.alias,)
308 308 response = api_call(self.app, params)
309 309 expected = 'failed to create repository `%s`' % (repo_name,)
310 310 assert_error(id_, expected, given=response.body)
311 311
312 312 @pytest.mark.parametrize('parent_group, dirty_name, expected_name', [
313 313 (None, 'foo bar x', 'foo-bar-x'),
314 314 ('foo', '/foo//bar x', 'foo/bar-x'),
315 315 ('foo-bar', 'foo-bar //bar x', 'foo-bar/bar-x'),
316 316 ])
317 317 def test_create_repo_with_extra_slashes_in_name(
318 318 self, backend, parent_group, dirty_name, expected_name):
319 319
320 320 if parent_group:
321 321 gr = fixture.create_repo_group(parent_group)
322 322 assert gr.group_name == parent_group
323 323
324 324 id_, params = build_data(
325 325 self.apikey, 'create_repo',
326 326 repo_name=dirty_name,
327 327 repo_type=backend.alias,
328 328 owner=TEST_USER_ADMIN_LOGIN,)
329 329 response = api_call(self.app, params)
330 330 expected ={
331 331 "msg": "Created new repository `{}`".format(expected_name),
332 332 "task": None,
333 333 "success": True
334 334 }
335 335 assert_ok(id_, expected, response.body)
336 336
337 337 repo = RepoModel().get_by_repo_name(expected_name)
338 338 assert repo is not None
339 339
340 340 expected = {
341 341 'msg': 'Created new repository `%s`' % (expected_name,),
342 342 'success': True,
343 343 'task': None,
344 344 }
345 345 assert_ok(id_, expected, given=response.body)
346 346 fixture.destroy_repo(expected_name)
347 347 if parent_group:
348 348 fixture.destroy_repo_group(parent_group)
@@ -1,288 +1,288 b''
1 1
2 2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software: you can redistribute it and/or modify
5 5 # it under the terms of the GNU Affero General Public License, version 3
6 6 # (only), as published by the Free Software Foundation.
7 7 #
8 8 # This program is distributed in the hope that it will be useful,
9 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 11 # GNU General Public License for more details.
12 12 #
13 13 # You should have received a copy of the GNU Affero General Public License
14 14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 15 #
16 16 # This program is dual-licensed. If you wish to learn more about the
17 17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 19
20 20 import mock
21 21 import pytest
22 22
23 23 from rhodecode.model.meta import Session
24 24 from rhodecode.model.repo_group import RepoGroupModel
25 25 from rhodecode.model.user import UserModel
26 26 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
27 27 from rhodecode.api.tests.utils import (
28 28 build_data, api_call, assert_ok, assert_error, crash)
29 29 from rhodecode.tests.fixture import Fixture
30 30
31 31
32 32 fixture = Fixture()
33 33
34 34
35 35 @pytest.mark.usefixtures("testuser_api", "app")
36 36 class TestCreateRepoGroup(object):
37 37 def test_api_create_repo_group(self):
38 38 repo_group_name = 'api-repo-group'
39 39
40 40 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
41 41 assert repo_group is None
42 42
43 43 id_, params = build_data(
44 44 self.apikey, 'create_repo_group',
45 45 group_name=repo_group_name,
46 46 owner=TEST_USER_ADMIN_LOGIN,)
47 47 response = api_call(self.app, params)
48 48
49 49 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
50 50 assert repo_group is not None
51 51 ret = {
52 52 'msg': 'Created new repo group `%s`' % (repo_group_name,),
53 53 'repo_group': repo_group.get_api_data()
54 54 }
55 55 expected = ret
56 56 try:
57 57 assert_ok(id_, expected, given=response.body)
58 58 finally:
59 59 fixture.destroy_repo_group(repo_group_name)
60 60
61 61 def test_api_create_repo_group_in_another_group(self):
62 62 repo_group_name = 'api-repo-group'
63 63
64 64 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
65 65 assert repo_group is None
66 66 # create the parent
67 67 fixture.create_repo_group(repo_group_name)
68 68
69 69 full_repo_group_name = repo_group_name+'/'+repo_group_name
70 70 id_, params = build_data(
71 71 self.apikey, 'create_repo_group',
72 72 group_name=full_repo_group_name,
73 73 owner=TEST_USER_ADMIN_LOGIN,
74 74 copy_permissions=True)
75 75 response = api_call(self.app, params)
76 76
77 77 repo_group = RepoGroupModel.cls.get_by_group_name(full_repo_group_name)
78 78 assert repo_group is not None
79 79 ret = {
80 80 'msg': 'Created new repo group `%s`' % (full_repo_group_name,),
81 81 'repo_group': repo_group.get_api_data()
82 82 }
83 83 expected = ret
84 84 try:
85 85 assert_ok(id_, expected, given=response.body)
86 86 finally:
87 87 fixture.destroy_repo_group(full_repo_group_name)
88 88 fixture.destroy_repo_group(repo_group_name)
89 89
90 90 def test_api_create_repo_group_in_another_group_not_existing(self):
91 91 repo_group_name = 'api-repo-group-no'
92 92
93 93 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
94 94 assert repo_group is None
95 95
96 96 full_repo_group_name = repo_group_name+'/'+repo_group_name
97 97 id_, params = build_data(
98 98 self.apikey, 'create_repo_group',
99 99 group_name=full_repo_group_name,
100 100 owner=TEST_USER_ADMIN_LOGIN,
101 101 copy_permissions=True)
102 102 response = api_call(self.app, params)
103 103 expected = {
104 104 'repo_group':
105 105 'Parent repository group `{}` does not exist'.format(
106 106 repo_group_name)}
107 107 assert_error(id_, expected, given=response.body)
108 108
109 109 def test_api_create_repo_group_that_exists(self):
110 110 repo_group_name = 'api-repo-group'
111 111
112 112 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
113 113 assert repo_group is None
114 114
115 115 fixture.create_repo_group(repo_group_name)
116 116 id_, params = build_data(
117 117 self.apikey, 'create_repo_group',
118 118 group_name=repo_group_name,
119 119 owner=TEST_USER_ADMIN_LOGIN,)
120 120 response = api_call(self.app, params)
121 121 expected = {
122 122 'unique_repo_group_name':
123 123 'Repository group with name `{}` already exists'.format(
124 124 repo_group_name)}
125 125 try:
126 126 assert_error(id_, expected, given=response.body)
127 127 finally:
128 128 fixture.destroy_repo_group(repo_group_name)
129 129
130 130 def test_api_create_repo_group_regular_user_wit_root_location_perms(
131 131 self, user_util):
132 132 regular_user = user_util.create_user()
133 133 regular_user_api_key = regular_user.api_key
134 134
135 135 repo_group_name = 'api-repo-group-by-regular-user'
136 136
137 137 usr = UserModel().get_by_username(regular_user.username)
138 138 usr.inherit_default_permissions = False
139 139 Session().add(usr)
140 140
141 141 UserModel().grant_perm(
142 142 regular_user.username, 'hg.repogroup.create.true')
143 143 Session().commit()
144 144
145 145 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
146 146 assert repo_group is None
147 147
148 148 id_, params = build_data(
149 149 regular_user_api_key, 'create_repo_group',
150 150 group_name=repo_group_name)
151 151 response = api_call(self.app, params)
152 152
153 153 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
154 154 assert repo_group is not None
155 155 expected = {
156 156 'msg': 'Created new repo group `%s`' % (repo_group_name,),
157 157 'repo_group': repo_group.get_api_data()
158 158 }
159 159 try:
160 160 assert_ok(id_, expected, given=response.body)
161 161 finally:
162 162 fixture.destroy_repo_group(repo_group_name)
163 163
164 164 def test_api_create_repo_group_regular_user_with_admin_perms_to_parent(
165 165 self, user_util):
166 166
167 167 repo_group_name = 'api-repo-group-parent'
168 168
169 169 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
170 170 assert repo_group is None
171 171 # create the parent
172 172 fixture.create_repo_group(repo_group_name)
173 173
174 174 # user perms
175 175 regular_user = user_util.create_user()
176 176 regular_user_api_key = regular_user.api_key
177 177
178 178 usr = UserModel().get_by_username(regular_user.username)
179 179 usr.inherit_default_permissions = False
180 180 Session().add(usr)
181 181
182 182 RepoGroupModel().grant_user_permission(
183 183 repo_group_name, regular_user.username, 'group.admin')
184 184 Session().commit()
185 185
186 186 full_repo_group_name = repo_group_name + '/' + repo_group_name
187 187 id_, params = build_data(
188 188 regular_user_api_key, 'create_repo_group',
189 189 group_name=full_repo_group_name)
190 190 response = api_call(self.app, params)
191 191
192 192 repo_group = RepoGroupModel.cls.get_by_group_name(full_repo_group_name)
193 193 assert repo_group is not None
194 194 expected = {
195 195 'msg': 'Created new repo group `{}`'.format(full_repo_group_name),
196 196 'repo_group': repo_group.get_api_data()
197 197 }
198 198 try:
199 199 assert_ok(id_, expected, given=response.body)
200 200 finally:
201 201 fixture.destroy_repo_group(full_repo_group_name)
202 202 fixture.destroy_repo_group(repo_group_name)
203 203
204 204 def test_api_create_repo_group_regular_user_no_permission_to_create_to_root_level(self):
205 205 repo_group_name = 'api-repo-group'
206 206
207 207 id_, params = build_data(
208 208 self.apikey_regular, 'create_repo_group',
209 209 group_name=repo_group_name)
210 210 response = api_call(self.app, params)
211 211
212 212 expected = {
213 213 'repo_group':
214 u'You do not have the permission to store '
215 u'repository groups in the root location.'}
214 'You do not have the permission to store '
215 'repository groups in the root location.'}
216 216 assert_error(id_, expected, given=response.body)
217 217
218 218 def test_api_create_repo_group_regular_user_no_parent_group_perms(self):
219 219 repo_group_name = 'api-repo-group-regular-user'
220 220
221 221 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
222 222 assert repo_group is None
223 223 # create the parent
224 224 fixture.create_repo_group(repo_group_name)
225 225
226 226 full_repo_group_name = repo_group_name+'/'+repo_group_name
227 227
228 228 id_, params = build_data(
229 229 self.apikey_regular, 'create_repo_group',
230 230 group_name=full_repo_group_name)
231 231 response = api_call(self.app, params)
232 232
233 233 expected = {
234 234 'repo_group':
235 u"You do not have the permissions to store "
236 u"repository groups inside repository group `{}`".format(repo_group_name)}
235 "You do not have the permissions to store "
236 "repository groups inside repository group `{}`".format(repo_group_name)}
237 237 try:
238 238 assert_error(id_, expected, given=response.body)
239 239 finally:
240 240 fixture.destroy_repo_group(repo_group_name)
241 241
242 242 def test_api_create_repo_group_regular_user_no_permission_to_specify_owner(
243 243 self):
244 244 repo_group_name = 'api-repo-group'
245 245
246 246 id_, params = build_data(
247 247 self.apikey_regular, 'create_repo_group',
248 248 group_name=repo_group_name,
249 249 owner=TEST_USER_ADMIN_LOGIN,)
250 250 response = api_call(self.app, params)
251 251
252 252 expected = "Only RhodeCode super-admin can specify `owner` param"
253 253 assert_error(id_, expected, given=response.body)
254 254
255 255 @mock.patch.object(RepoGroupModel, 'create', crash)
256 256 def test_api_create_repo_group_exception_occurred(self):
257 257 repo_group_name = 'api-repo-group'
258 258
259 259 repo_group = RepoGroupModel.cls.get_by_group_name(repo_group_name)
260 260 assert repo_group is None
261 261
262 262 id_, params = build_data(
263 263 self.apikey, 'create_repo_group',
264 264 group_name=repo_group_name,
265 265 owner=TEST_USER_ADMIN_LOGIN,)
266 266 response = api_call(self.app, params)
267 267 expected = 'failed to create repo group `%s`' % (repo_group_name,)
268 268 assert_error(id_, expected, given=response.body)
269 269
270 270 def test_create_group_with_extra_slashes_in_name(self, user_util):
271 271 existing_repo_group = user_util.create_repo_group()
272 272 dirty_group_name = '//{}//group2//'.format(
273 273 existing_repo_group.group_name)
274 274 cleaned_group_name = '{}/group2'.format(
275 275 existing_repo_group.group_name)
276 276
277 277 id_, params = build_data(
278 278 self.apikey, 'create_repo_group',
279 279 group_name=dirty_group_name,
280 280 owner=TEST_USER_ADMIN_LOGIN,)
281 281 response = api_call(self.app, params)
282 282 repo_group = RepoGroupModel.cls.get_by_group_name(cleaned_group_name)
283 283 expected = {
284 284 'msg': 'Created new repo group `%s`' % (cleaned_group_name,),
285 285 'repo_group': repo_group.get_api_data()
286 286 }
287 287 assert_ok(id_, expected, given=response.body)
288 288 fixture.destroy_repo_group(cleaned_group_name)
@@ -1,101 +1,101 b''
1 1
2 2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software: you can redistribute it and/or modify
5 5 # it under the terms of the GNU Affero General Public License, version 3
6 6 # (only), as published by the Free Software Foundation.
7 7 #
8 8 # This program is distributed in the hope that it will be useful,
9 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 11 # GNU General Public License for more details.
12 12 #
13 13 # You should have received a copy of the GNU Affero General Public License
14 14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 15 #
16 16 # This program is dual-licensed. If you wish to learn more about the
17 17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 19
20 20
21 21 import pytest
22 22
23 23 from rhodecode.lib.str_utils import safe_bytes
24 24 from rhodecode.model.db import Gist
25 25 from rhodecode.api.tests.utils import (
26 26 build_data, api_call, assert_error, assert_ok)
27 27
28 28
29 29 @pytest.mark.usefixtures("testuser_api", "app")
30 30 class TestApiGetGist(object):
31 31 def test_api_get_gist(self, gist_util, http_host_only_stub):
32 32 gist = gist_util.create_gist()
33 33 gist_id = gist.gist_access_id
34 34 gist_created_on = gist.created_on
35 35 gist_modified_at = gist.modified_at
36 36 id_, params = build_data(
37 37 self.apikey, 'get_gist', gistid=gist_id, )
38 38 response = api_call(self.app, params)
39 39
40 40 expected = {
41 41 'access_id': gist_id,
42 42 'created_on': gist_created_on,
43 43 'modified_at': gist_modified_at,
44 44 'description': 'new-gist',
45 45 'expires': -1.0,
46 46 'gist_id': int(gist_id),
47 47 'type': 'public',
48 48 'url': 'http://%s/_admin/gists/%s' % (http_host_only_stub, gist_id,),
49 49 'acl_level': Gist.ACL_LEVEL_PUBLIC,
50 50 'content': None,
51 51 }
52 52
53 53 assert_ok(id_, expected, given=response.body)
54 54
55 55 def test_api_get_gist_with_content(self, gist_util, http_host_only_stub):
56 56 mapping = {
57 57 b'filename1.txt': {'content': b'hello world'},
58 58 safe_bytes('filename1Δ….txt'): {'content': safe_bytes('hello worldΔ™')}
59 59 }
60 60 gist = gist_util.create_gist(gist_mapping=mapping)
61 61 gist_id = gist.gist_access_id
62 62 gist_created_on = gist.created_on
63 63 gist_modified_at = gist.modified_at
64 64 id_, params = build_data(
65 65 self.apikey, 'get_gist', gistid=gist_id, content=True)
66 66 response = api_call(self.app, params)
67 67
68 68 expected = {
69 69 'access_id': gist_id,
70 70 'created_on': gist_created_on,
71 71 'modified_at': gist_modified_at,
72 72 'description': 'new-gist',
73 73 'expires': -1.0,
74 74 'gist_id': int(gist_id),
75 75 'type': 'public',
76 76 'url': 'http://%s/_admin/gists/%s' % (http_host_only_stub, gist_id,),
77 77 'acl_level': Gist.ACL_LEVEL_PUBLIC,
78 78 'content': {
79 u'filename1.txt': u'hello world',
80 u'filename1Δ….txt': u'hello worldΔ™'
79 'filename1.txt': 'hello world',
80 'filename1Δ….txt': 'hello worldΔ™'
81 81 },
82 82 }
83 83
84 84 assert_ok(id_, expected, given=response.body)
85 85
86 86 def test_api_get_gist_not_existing(self):
87 87 id_, params = build_data(
88 88 self.apikey_regular, 'get_gist', gistid='12345', )
89 89 response = api_call(self.app, params)
90 90 expected = 'gist `%s` does not exist' % ('12345',)
91 91 assert_error(id_, expected, given=response.body)
92 92
93 93 def test_api_get_gist_private_gist_without_permission(self, gist_util):
94 94 gist = gist_util.create_gist()
95 95 gist_id = gist.gist_access_id
96 96 id_, params = build_data(
97 97 self.apikey_regular, 'get_gist', gistid=gist_id, )
98 98 response = api_call(self.app, params)
99 99
100 100 expected = 'gist `%s` does not exist' % (gist_id,)
101 101 assert_error(id_, expected, given=response.body)
@@ -1,497 +1,497 b''
1 1
2 2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software: you can redistribute it and/or modify
5 5 # it under the terms of the GNU Affero General Public License, version 3
6 6 # (only), as published by the Free Software Foundation.
7 7 #
8 8 # This program is distributed in the hope that it will be useful,
9 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 11 # GNU General Public License for more details.
12 12 #
13 13 # You should have received a copy of the GNU Affero General Public License
14 14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 15 #
16 16 # This program is dual-licensed. If you wish to learn more about the
17 17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 19
20 20 import urllib.request
21 21 import urllib.parse
22 22 import urllib.error
23 23
24 24 import mock
25 25 import pytest
26 26
27 27 from rhodecode.apps._base import ADMIN_PREFIX
28 28 from rhodecode.lib import auth
29 29 from rhodecode.lib.utils2 import safe_str
30 30 from rhodecode.lib import helpers as h
31 31 from rhodecode.model.db import (
32 32 Repository, RepoGroup, UserRepoToPerm, User, Permission)
33 33 from rhodecode.model.meta import Session
34 34 from rhodecode.model.repo import RepoModel
35 35 from rhodecode.model.repo_group import RepoGroupModel
36 36 from rhodecode.model.user import UserModel
37 37 from rhodecode.tests import (
38 38 login_user_session, assert_session_flash, TEST_USER_ADMIN_LOGIN,
39 39 TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
40 40 from rhodecode.tests.fixture import Fixture, error_function
41 41 from rhodecode.tests.utils import repo_on_filesystem
42 42 from rhodecode.tests.routes import route_path
43 43
44 44 fixture = Fixture()
45 45
46 46
47 47 def _get_permission_for_user(user, repo):
48 48 perm = UserRepoToPerm.query()\
49 49 .filter(UserRepoToPerm.repository ==
50 50 Repository.get_by_repo_name(repo))\
51 51 .filter(UserRepoToPerm.user == User.get_by_username(user))\
52 52 .all()
53 53 return perm
54 54
55 55
56 56 @pytest.mark.usefixtures("app")
57 57 class TestAdminRepos(object):
58 58
59 59 def test_repo_list(self, autologin_user, user_util, xhr_header):
60 60 repo = user_util.create_repo()
61 61 repo_name = repo.repo_name
62 62 response = self.app.get(
63 63 route_path('repos_data'), status=200,
64 64 extra_environ=xhr_header)
65 65
66 66 response.mustcontain(repo_name)
67 67
68 68 def test_create_page_restricted_to_single_backend(self, autologin_user, backend):
69 69 with mock.patch('rhodecode.BACKENDS', {'git': 'git'}):
70 70 response = self.app.get(route_path('repo_new'), status=200)
71 71 assert_response = response.assert_response()
72 72 element = assert_response.get_element('[name=repo_type]')
73 73 assert element.get('value') == 'git'
74 74
75 75 def test_create_page_non_restricted_backends(self, autologin_user, backend):
76 76 response = self.app.get(route_path('repo_new'), status=200)
77 77 assert_response = response.assert_response()
78 78 assert ['hg', 'git', 'svn'] == [x.get('value') for x in assert_response.get_elements('[name=repo_type]')]
79 79
80 80 @pytest.mark.parametrize(
81 81 "suffix", ['', 'xxa'], ids=['', 'non-ascii'])
82 82 def test_create(self, autologin_user, backend, suffix, csrf_token):
83 83 repo_name_unicode = backend.new_repo_name(suffix=suffix)
84 84 repo_name = repo_name_unicode
85 85
86 86 description_unicode = 'description for newly created repo' + suffix
87 87 description = description_unicode
88 88
89 89 response = self.app.post(
90 90 route_path('repo_create'),
91 91 fixture._get_repo_create_params(
92 92 repo_private=False,
93 93 repo_name=repo_name,
94 94 repo_type=backend.alias,
95 95 repo_description=description,
96 96 csrf_token=csrf_token),
97 97 status=302)
98 98
99 99 self.assert_repository_is_created_correctly(
100 100 repo_name, description, backend)
101 101
102 102 def test_create_numeric_name(self, autologin_user, backend, csrf_token):
103 103 numeric_repo = '1234'
104 104 repo_name = numeric_repo
105 105 description = 'description for newly created repo' + numeric_repo
106 106 self.app.post(
107 107 route_path('repo_create'),
108 108 fixture._get_repo_create_params(
109 109 repo_private=False,
110 110 repo_name=repo_name,
111 111 repo_type=backend.alias,
112 112 repo_description=description,
113 113 csrf_token=csrf_token))
114 114
115 115 self.assert_repository_is_created_correctly(
116 116 repo_name, description, backend)
117 117
118 118 @pytest.mark.parametrize("suffix", ['', '_Δ…Δ‡Δ™'], ids=['', 'non-ascii'])
119 119 def test_create_in_group(
120 120 self, autologin_user, backend, suffix, csrf_token):
121 121 # create GROUP
122 122 group_name = f'sometest_{backend.alias}'
123 123 gr = RepoGroupModel().create(group_name=group_name,
124 124 group_description='test',
125 125 owner=TEST_USER_ADMIN_LOGIN)
126 126 Session().commit()
127 127
128 128 repo_name = f'ingroup{suffix}'
129 129 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
130 130 description = 'description for newly created repo'
131 131
132 132 self.app.post(
133 133 route_path('repo_create'),
134 134 fixture._get_repo_create_params(
135 135 repo_private=False,
136 136 repo_name=safe_str(repo_name),
137 137 repo_type=backend.alias,
138 138 repo_description=description,
139 139 repo_group=gr.group_id,
140 140 csrf_token=csrf_token))
141 141
142 142 # TODO: johbo: Cleanup work to fixture
143 143 try:
144 144 self.assert_repository_is_created_correctly(
145 145 repo_name_full, description, backend)
146 146
147 147 new_repo = RepoModel().get_by_repo_name(repo_name_full)
148 148 inherited_perms = UserRepoToPerm.query().filter(
149 149 UserRepoToPerm.repository_id == new_repo.repo_id).all()
150 150 assert len(inherited_perms) == 1
151 151 finally:
152 152 RepoModel().delete(repo_name_full)
153 153 RepoGroupModel().delete(group_name)
154 154 Session().commit()
155 155
156 156 def test_create_in_group_numeric_name(
157 157 self, autologin_user, backend, csrf_token):
158 158 # create GROUP
159 159 group_name = 'sometest_%s' % backend.alias
160 160 gr = RepoGroupModel().create(group_name=group_name,
161 161 group_description='test',
162 162 owner=TEST_USER_ADMIN_LOGIN)
163 163 Session().commit()
164 164
165 165 repo_name = '12345'
166 166 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
167 167 description = 'description for newly created repo'
168 168 self.app.post(
169 169 route_path('repo_create'),
170 170 fixture._get_repo_create_params(
171 171 repo_private=False,
172 172 repo_name=repo_name,
173 173 repo_type=backend.alias,
174 174 repo_description=description,
175 175 repo_group=gr.group_id,
176 176 csrf_token=csrf_token))
177 177
178 178 # TODO: johbo: Cleanup work to fixture
179 179 try:
180 180 self.assert_repository_is_created_correctly(
181 181 repo_name_full, description, backend)
182 182
183 183 new_repo = RepoModel().get_by_repo_name(repo_name_full)
184 184 inherited_perms = UserRepoToPerm.query()\
185 185 .filter(UserRepoToPerm.repository_id == new_repo.repo_id).all()
186 186 assert len(inherited_perms) == 1
187 187 finally:
188 188 RepoModel().delete(repo_name_full)
189 189 RepoGroupModel().delete(group_name)
190 190 Session().commit()
191 191
192 192 def test_create_in_group_without_needed_permissions(self, backend):
193 193 session = login_user_session(
194 194 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
195 195 csrf_token = auth.get_csrf_token(session)
196 196 # revoke
197 197 user_model = UserModel()
198 198 # disable fork and create on default user
199 199 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
200 200 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
201 201 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
202 202 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
203 203
204 204 # disable on regular user
205 205 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
206 206 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
207 207 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
208 208 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
209 209 Session().commit()
210 210
211 211 # create GROUP
212 212 group_name = 'reg_sometest_%s' % backend.alias
213 213 gr = RepoGroupModel().create(group_name=group_name,
214 214 group_description='test',
215 215 owner=TEST_USER_ADMIN_LOGIN)
216 216 Session().commit()
217 217 repo_group_id = gr.group_id
218 218
219 219 group_name_allowed = 'reg_sometest_allowed_%s' % backend.alias
220 220 gr_allowed = RepoGroupModel().create(
221 221 group_name=group_name_allowed,
222 222 group_description='test',
223 223 owner=TEST_USER_REGULAR_LOGIN)
224 224 allowed_repo_group_id = gr_allowed.group_id
225 225 Session().commit()
226 226
227 227 repo_name = 'ingroup'
228 228 description = 'description for newly created repo'
229 229 response = self.app.post(
230 230 route_path('repo_create'),
231 231 fixture._get_repo_create_params(
232 232 repo_private=False,
233 233 repo_name=repo_name,
234 234 repo_type=backend.alias,
235 235 repo_description=description,
236 236 repo_group=repo_group_id,
237 237 csrf_token=csrf_token))
238 238
239 239 response.mustcontain('Invalid value')
240 240
241 241 # user is allowed to create in this group
242 242 repo_name = 'ingroup'
243 243 repo_name_full = RepoGroup.url_sep().join(
244 244 [group_name_allowed, repo_name])
245 245 description = 'description for newly created repo'
246 246 response = self.app.post(
247 247 route_path('repo_create'),
248 248 fixture._get_repo_create_params(
249 249 repo_private=False,
250 250 repo_name=repo_name,
251 251 repo_type=backend.alias,
252 252 repo_description=description,
253 253 repo_group=allowed_repo_group_id,
254 254 csrf_token=csrf_token))
255 255
256 256 # TODO: johbo: Cleanup in pytest fixture
257 257 try:
258 258 self.assert_repository_is_created_correctly(
259 259 repo_name_full, description, backend)
260 260
261 261 new_repo = RepoModel().get_by_repo_name(repo_name_full)
262 262 inherited_perms = UserRepoToPerm.query().filter(
263 263 UserRepoToPerm.repository_id == new_repo.repo_id).all()
264 264 assert len(inherited_perms) == 1
265 265
266 266 assert repo_on_filesystem(repo_name_full)
267 267 finally:
268 268 RepoModel().delete(repo_name_full)
269 269 RepoGroupModel().delete(group_name)
270 270 RepoGroupModel().delete(group_name_allowed)
271 271 Session().commit()
272 272
273 273 def test_create_in_group_inherit_permissions(self, autologin_user, backend,
274 274 csrf_token):
275 275 # create GROUP
276 276 group_name = 'sometest_%s' % backend.alias
277 277 gr = RepoGroupModel().create(group_name=group_name,
278 278 group_description='test',
279 279 owner=TEST_USER_ADMIN_LOGIN)
280 280 perm = Permission.get_by_key('repository.write')
281 281 RepoGroupModel().grant_user_permission(
282 282 gr, TEST_USER_REGULAR_LOGIN, perm)
283 283
284 284 # add repo permissions
285 285 Session().commit()
286 286 repo_group_id = gr.group_id
287 287 repo_name = 'ingroup_inherited_%s' % backend.alias
288 288 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
289 289 description = 'description for newly created repo'
290 290 self.app.post(
291 291 route_path('repo_create'),
292 292 fixture._get_repo_create_params(
293 293 repo_private=False,
294 294 repo_name=repo_name,
295 295 repo_type=backend.alias,
296 296 repo_description=description,
297 297 repo_group=repo_group_id,
298 298 repo_copy_permissions=True,
299 299 csrf_token=csrf_token))
300 300
301 301 # TODO: johbo: Cleanup to pytest fixture
302 302 try:
303 303 self.assert_repository_is_created_correctly(
304 304 repo_name_full, description, backend)
305 305 except Exception:
306 306 RepoGroupModel().delete(group_name)
307 307 Session().commit()
308 308 raise
309 309
310 310 # check if inherited permissions are applied
311 311 new_repo = RepoModel().get_by_repo_name(repo_name_full)
312 312 inherited_perms = UserRepoToPerm.query().filter(
313 313 UserRepoToPerm.repository_id == new_repo.repo_id).all()
314 314 assert len(inherited_perms) == 2
315 315
316 316 assert TEST_USER_REGULAR_LOGIN in [
317 317 x.user.username for x in inherited_perms]
318 318 assert 'repository.write' in [
319 319 x.permission.permission_name for x in inherited_perms]
320 320
321 321 RepoModel().delete(repo_name_full)
322 322 RepoGroupModel().delete(group_name)
323 323 Session().commit()
324 324
325 325 @pytest.mark.xfail_backends(
326 326 "git", "hg", reason="Missing reposerver support")
327 327 def test_create_with_clone_uri(self, autologin_user, backend, reposerver,
328 328 csrf_token):
329 329 source_repo = backend.create_repo(number_of_commits=2)
330 330 source_repo_name = source_repo.repo_name
331 331 reposerver.serve(source_repo.scm_instance())
332 332
333 333 repo_name = backend.new_repo_name()
334 334 response = self.app.post(
335 335 route_path('repo_create'),
336 336 fixture._get_repo_create_params(
337 337 repo_private=False,
338 338 repo_name=repo_name,
339 339 repo_type=backend.alias,
340 340 repo_description='',
341 341 clone_uri=reposerver.url,
342 342 csrf_token=csrf_token),
343 343 status=302)
344 344
345 345 # Should be redirected to the creating page
346 346 response.mustcontain('repo_creating')
347 347
348 348 # Expecting that both repositories have same history
349 349 source_repo = RepoModel().get_by_repo_name(source_repo_name)
350 350 source_vcs = source_repo.scm_instance()
351 351 repo = RepoModel().get_by_repo_name(repo_name)
352 352 repo_vcs = repo.scm_instance()
353 353 assert source_vcs[0].message == repo_vcs[0].message
354 354 assert source_vcs.count() == repo_vcs.count()
355 355 assert source_vcs.commit_ids == repo_vcs.commit_ids
356 356
357 357 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
358 358 def test_create_remote_repo_wrong_clone_uri(self, autologin_user, backend,
359 359 csrf_token):
360 360 repo_name = backend.new_repo_name()
361 361 description = 'description for newly created repo'
362 362 response = self.app.post(
363 363 route_path('repo_create'),
364 364 fixture._get_repo_create_params(
365 365 repo_private=False,
366 366 repo_name=repo_name,
367 367 repo_type=backend.alias,
368 368 repo_description=description,
369 369 clone_uri='http://repo.invalid/repo',
370 370 csrf_token=csrf_token))
371 371 response.mustcontain('invalid clone url')
372 372
373 373 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
374 374 def test_create_remote_repo_wrong_clone_uri_hg_svn(
375 375 self, autologin_user, backend, csrf_token):
376 376 repo_name = backend.new_repo_name()
377 377 description = 'description for newly created repo'
378 378 response = self.app.post(
379 379 route_path('repo_create'),
380 380 fixture._get_repo_create_params(
381 381 repo_private=False,
382 382 repo_name=repo_name,
383 383 repo_type=backend.alias,
384 384 repo_description=description,
385 385 clone_uri='svn+http://svn.invalid/repo',
386 386 csrf_token=csrf_token))
387 387 response.mustcontain('invalid clone url')
388 388
389 389 def test_create_with_git_suffix(
390 390 self, autologin_user, backend, csrf_token):
391 391 repo_name = backend.new_repo_name() + ".git"
392 392 description = 'description for newly created repo'
393 393 response = self.app.post(
394 394 route_path('repo_create'),
395 395 fixture._get_repo_create_params(
396 396 repo_private=False,
397 397 repo_name=repo_name,
398 398 repo_type=backend.alias,
399 399 repo_description=description,
400 400 csrf_token=csrf_token))
401 401 response.mustcontain('Repository name cannot end with .git')
402 402
403 403 def test_default_user_cannot_access_private_repo_in_a_group(
404 404 self, autologin_user, user_util, backend):
405 405
406 406 group = user_util.create_repo_group()
407 407
408 408 repo = backend.create_repo(
409 409 repo_private=True, repo_group=group, repo_copy_permissions=True)
410 410
411 411 permissions = _get_permission_for_user(
412 412 user='default', repo=repo.repo_name)
413 413 assert len(permissions) == 1
414 414 assert permissions[0].permission.permission_name == 'repository.none'
415 415 assert permissions[0].repository.private is True
416 416
417 417 def test_create_on_top_level_without_permissions(self, backend):
418 418 session = login_user_session(
419 419 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
420 420 csrf_token = auth.get_csrf_token(session)
421 421
422 422 # revoke
423 423 user_model = UserModel()
424 424 # disable fork and create on default user
425 425 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
426 426 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
427 427 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
428 428 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
429 429
430 430 # disable on regular user
431 431 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
432 432 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
433 433 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
434 434 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
435 435 Session().commit()
436 436
437 437 repo_name = backend.new_repo_name()
438 438 description = 'description for newly created repo'
439 439 response = self.app.post(
440 440 route_path('repo_create'),
441 441 fixture._get_repo_create_params(
442 442 repo_private=False,
443 443 repo_name=repo_name,
444 444 repo_type=backend.alias,
445 445 repo_description=description,
446 446 csrf_token=csrf_token))
447 447
448 448 response.mustcontain(
449 u"You do not have the permission to store repositories in "
450 u"the root location.")
449 "You do not have the permission to store repositories in "
450 "the root location.")
451 451
452 452 @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
453 453 def test_create_repo_when_filesystem_op_fails(
454 454 self, autologin_user, backend, csrf_token):
455 455 repo_name = backend.new_repo_name()
456 456 description = 'description for newly created repo'
457 457
458 458 response = self.app.post(
459 459 route_path('repo_create'),
460 460 fixture._get_repo_create_params(
461 461 repo_private=False,
462 462 repo_name=repo_name,
463 463 repo_type=backend.alias,
464 464 repo_description=description,
465 465 csrf_token=csrf_token))
466 466
467 467 assert_session_flash(
468 468 response, 'Error creating repository %s' % repo_name)
469 469 # repo must not be in db
470 470 assert backend.repo is None
471 471 # repo must not be in filesystem !
472 472 assert not repo_on_filesystem(repo_name)
473 473
474 474 def assert_repository_is_created_correctly(self, repo_name, description, backend):
475 475 url_quoted_repo_name = urllib.parse.quote(repo_name)
476 476
477 477 # run the check page that triggers the flash message
478 478 response = self.app.get(
479 479 route_path('repo_creating_check', repo_name=repo_name))
480 480 assert response.json == {'result': True}
481 481
482 482 flash_msg = 'Created repository <a href="/{}">{}</a>'.format(url_quoted_repo_name, repo_name)
483 483 assert_session_flash(response, flash_msg)
484 484
485 485 # test if the repo was created in the database
486 486 new_repo = RepoModel().get_by_repo_name(repo_name)
487 487
488 488 assert new_repo.repo_name == repo_name
489 489 assert new_repo.description == description
490 490
491 491 # test if the repository is visible in the list ?
492 492 response = self.app.get(
493 493 h.route_path('repo_summary', repo_name=repo_name))
494 494 response.mustcontain(repo_name)
495 495 response.mustcontain(backend.alias)
496 496
497 497 assert repo_on_filesystem(repo_name)
@@ -1,593 +1,593 b''
1 1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import urllib.parse
20 20
21 21 import mock
22 22 import pytest
23 23
24 24
25 25 from rhodecode.lib.auth import check_password
26 26 from rhodecode.lib import helpers as h
27 27 from rhodecode.model.auth_token import AuthTokenModel
28 28 from rhodecode.model.db import User, Notification, UserApiKeys
29 29 from rhodecode.model.meta import Session
30 30
31 31 from rhodecode.tests import (
32 32 assert_session_flash, HG_REPO, TEST_USER_ADMIN_LOGIN,
33 33 no_newline_id_generator)
34 34 from rhodecode.tests.fixture import Fixture
35 35 from rhodecode.tests.routes import route_path
36 36
37 37 fixture = Fixture()
38 38
39 39 whitelist_view = ['RepoCommitsView:repo_commit_raw']
40 40
41 41
42 42 @pytest.mark.usefixtures('app')
43 43 class TestLoginController(object):
44 44 destroy_users = set()
45 45
46 46 @classmethod
47 47 def teardown_class(cls):
48 48 fixture.destroy_users(cls.destroy_users)
49 49
50 50 def teardown_method(self, method):
51 51 for n in Notification.query().all():
52 52 Session().delete(n)
53 53
54 54 Session().commit()
55 55 assert Notification.query().all() == []
56 56
57 57 def test_index(self):
58 58 response = self.app.get(route_path('login'))
59 59 assert response.status == '200 OK'
60 60 # Test response...
61 61
62 62 def test_login_admin_ok(self):
63 63 response = self.app.post(route_path('login'),
64 64 {'username': 'test_admin',
65 65 'password': 'test12'}, status=302)
66 66 response = response.follow()
67 67 session = response.get_session_from_response()
68 68 username = session['rhodecode_user'].get('username')
69 69 assert username == 'test_admin'
70 70 response.mustcontain('logout')
71 71
72 72 def test_login_regular_ok(self):
73 73 response = self.app.post(route_path('login'),
74 74 {'username': 'test_regular',
75 75 'password': 'test12'}, status=302)
76 76
77 77 response = response.follow()
78 78 session = response.get_session_from_response()
79 79 username = session['rhodecode_user'].get('username')
80 80 assert username == 'test_regular'
81 81 response.mustcontain('logout')
82 82
83 83 def test_login_with_primary_email(self):
84 84 user_email = 'test_regular@mail.com'
85 85 response = self.app.post(route_path('login'),
86 86 {'username': user_email,
87 87 'password': 'test12'}, status=302)
88 88 response = response.follow()
89 89 session = response.get_session_from_response()
90 90 user = session['rhodecode_user']
91 91 assert user['username'] == user_email.split('@')[0]
92 92 assert user['is_authenticated']
93 93 response.mustcontain('logout')
94 94
95 95 def test_login_regular_forbidden_when_super_admin_restriction(self):
96 96 from rhodecode.authentication.plugins.auth_rhodecode import RhodeCodeAuthPlugin
97 97 with fixture.auth_restriction(self.app._pyramid_registry,
98 98 RhodeCodeAuthPlugin.AUTH_RESTRICTION_SUPER_ADMIN):
99 99 response = self.app.post(route_path('login'),
100 100 {'username': 'test_regular',
101 101 'password': 'test12'})
102 102
103 103 response.mustcontain('invalid user name')
104 104 response.mustcontain('invalid password')
105 105
106 106 def test_login_regular_forbidden_when_scope_restriction(self):
107 107 from rhodecode.authentication.plugins.auth_rhodecode import RhodeCodeAuthPlugin
108 108 with fixture.scope_restriction(self.app._pyramid_registry,
109 109 RhodeCodeAuthPlugin.AUTH_RESTRICTION_SCOPE_VCS):
110 110 response = self.app.post(route_path('login'),
111 111 {'username': 'test_regular',
112 112 'password': 'test12'})
113 113
114 114 response.mustcontain('invalid user name')
115 115 response.mustcontain('invalid password')
116 116
117 117 def test_login_ok_came_from(self):
118 118 test_came_from = '/_admin/users?branch=stable'
119 119 _url = '{}?came_from={}'.format(route_path('login'), test_came_from)
120 120 response = self.app.post(
121 121 _url, {'username': 'test_admin', 'password': 'test12'}, status=302)
122 122
123 123 assert 'branch=stable' in response.location
124 124 response = response.follow()
125 125
126 126 assert response.status == '200 OK'
127 127 response.mustcontain('Users administration')
128 128
129 129 def test_redirect_to_login_with_get_args(self):
130 130 with fixture.anon_access(False):
131 131 kwargs = {'branch': 'stable'}
132 132 response = self.app.get(
133 133 h.route_path('repo_summary', repo_name=HG_REPO, _query=kwargs),
134 134 status=302)
135 135
136 136 response_query = urllib.parse.parse_qsl(response.location)
137 137 assert 'branch=stable' in response_query[0][1]
138 138
139 139 def test_login_form_with_get_args(self):
140 140 _url = '{}?came_from=/_admin/users,branch=stable'.format(route_path('login'))
141 141 response = self.app.get(_url)
142 142 assert 'branch%3Dstable' in response.form.action
143 143
144 144 @pytest.mark.parametrize("url_came_from", [
145 145 'data:text/html,<script>window.alert("xss")</script>',
146 146 'mailto:test@rhodecode.org',
147 147 'file:///etc/passwd',
148 148 'ftp://some.ftp.server',
149 149 'http://other.domain',
150 150 ], ids=no_newline_id_generator)
151 151 def test_login_bad_came_froms(self, url_came_from):
152 152 _url = '{}?came_from={}'.format(route_path('login'), url_came_from)
153 153 response = self.app.post(
154 154 _url, {'username': 'test_admin', 'password': 'test12'}, status=302)
155 155 assert response.status == '302 Found'
156 156 response = response.follow()
157 157 assert response.status == '200 OK'
158 158 assert response.request.path == '/'
159 159
160 160 @pytest.mark.xfail(reason="newline params changed behaviour in python3")
161 161 @pytest.mark.parametrize("url_came_from", [
162 162 '/\r\nX-Forwarded-Host: \rhttp://example.org',
163 163 ], ids=no_newline_id_generator)
164 164 def test_login_bad_came_froms_404(self, url_came_from):
165 165 _url = '{}?came_from={}'.format(route_path('login'), url_came_from)
166 166 response = self.app.post(
167 167 _url, {'username': 'test_admin', 'password': 'test12'}, status=302)
168 168
169 169 response = response.follow()
170 170 assert response.status == '404 Not Found'
171 171
172 172 def test_login_short_password(self):
173 173 response = self.app.post(route_path('login'),
174 174 {'username': 'test_admin',
175 175 'password': 'as'})
176 176 assert response.status == '200 OK'
177 177
178 178 response.mustcontain('Enter 3 characters or more')
179 179
180 180 def test_login_wrong_non_ascii_password(self, user_regular):
181 181 response = self.app.post(
182 182 route_path('login'),
183 183 {'username': user_regular.username,
184 184 'password': 'invalid-non-asci\xe4'.encode('utf8')})
185 185
186 186 response.mustcontain('invalid user name')
187 187 response.mustcontain('invalid password')
188 188
189 189 def test_login_with_non_ascii_password(self, user_util):
190 190 password = u'valid-non-ascii\xe4'
191 191 user = user_util.create_user(password=password)
192 192 response = self.app.post(
193 193 route_path('login'),
194 194 {'username': user.username,
195 195 'password': password})
196 196 assert response.status_code == 302
197 197
198 198 def test_login_wrong_username_password(self):
199 199 response = self.app.post(route_path('login'),
200 200 {'username': 'error',
201 201 'password': 'test12'})
202 202
203 203 response.mustcontain('invalid user name')
204 204 response.mustcontain('invalid password')
205 205
206 206 def test_login_admin_ok_password_migration(self, real_crypto_backend):
207 207 from rhodecode.lib import auth
208 208
209 209 # create new user, with sha256 password
210 210 temp_user = 'test_admin_sha256'
211 211 user = fixture.create_user(temp_user)
212 212 user.password = auth._RhodeCodeCryptoSha256().hash_create(
213 213 b'test123')
214 214 Session().add(user)
215 215 Session().commit()
216 216 self.destroy_users.add(temp_user)
217 217 response = self.app.post(route_path('login'),
218 218 {'username': temp_user,
219 219 'password': 'test123'}, status=302)
220 220
221 221 response = response.follow()
222 222 session = response.get_session_from_response()
223 223 username = session['rhodecode_user'].get('username')
224 224 assert username == temp_user
225 225 response.mustcontain('logout')
226 226
227 227 # new password should be bcrypted, after log-in and transfer
228 228 user = User.get_by_username(temp_user)
229 229 assert user.password.startswith('$')
230 230
231 231 # REGISTRATIONS
232 232 def test_register(self):
233 233 response = self.app.get(route_path('register'))
234 234 response.mustcontain('Create an Account')
235 235
236 236 def test_register_err_same_username(self):
237 237 uname = 'test_admin'
238 238 response = self.app.post(
239 239 route_path('register'),
240 240 {
241 241 'username': uname,
242 242 'password': 'test12',
243 243 'password_confirmation': 'test12',
244 244 'email': 'goodmail@domain.com',
245 245 'firstname': 'test',
246 246 'lastname': 'test'
247 247 }
248 248 )
249 249
250 250 assertr = response.assert_response()
251 251 msg = 'Username "%(username)s" already exists'
252 252 msg = msg % {'username': uname}
253 253 assertr.element_contains('#username+.error-message', msg)
254 254
255 255 def test_register_err_same_email(self):
256 256 response = self.app.post(
257 257 route_path('register'),
258 258 {
259 259 'username': 'test_admin_0',
260 260 'password': 'test12',
261 261 'password_confirmation': 'test12',
262 262 'email': 'test_admin@mail.com',
263 263 'firstname': 'test',
264 264 'lastname': 'test'
265 265 }
266 266 )
267 267
268 268 assertr = response.assert_response()
269 msg = u'This e-mail address is already taken'
269 msg = 'This e-mail address is already taken'
270 270 assertr.element_contains('#email+.error-message', msg)
271 271
272 272 def test_register_err_same_email_case_sensitive(self):
273 273 response = self.app.post(
274 274 route_path('register'),
275 275 {
276 276 'username': 'test_admin_1',
277 277 'password': 'test12',
278 278 'password_confirmation': 'test12',
279 279 'email': 'TesT_Admin@mail.COM',
280 280 'firstname': 'test',
281 281 'lastname': 'test'
282 282 }
283 283 )
284 284 assertr = response.assert_response()
285 msg = u'This e-mail address is already taken'
285 msg = 'This e-mail address is already taken'
286 286 assertr.element_contains('#email+.error-message', msg)
287 287
288 288 def test_register_err_wrong_data(self):
289 289 response = self.app.post(
290 290 route_path('register'),
291 291 {
292 292 'username': 'xs',
293 293 'password': 'test',
294 294 'password_confirmation': 'test',
295 295 'email': 'goodmailm',
296 296 'firstname': 'test',
297 297 'lastname': 'test'
298 298 }
299 299 )
300 300 assert response.status == '200 OK'
301 301 response.mustcontain('An email address must contain a single @')
302 302 response.mustcontain('Enter a value 6 characters long or more')
303 303
304 304 def test_register_err_username(self):
305 305 response = self.app.post(
306 306 route_path('register'),
307 307 {
308 308 'username': 'error user',
309 309 'password': 'test12',
310 310 'password_confirmation': 'test12',
311 311 'email': 'goodmailm',
312 312 'firstname': 'test',
313 313 'lastname': 'test'
314 314 }
315 315 )
316 316
317 317 response.mustcontain('An email address must contain a single @')
318 318 response.mustcontain(
319 319 'Username may only contain '
320 320 'alphanumeric characters underscores, '
321 321 'periods or dashes and must begin with '
322 322 'alphanumeric character')
323 323
324 324 def test_register_err_case_sensitive(self):
325 325 usr = 'Test_Admin'
326 326 response = self.app.post(
327 327 route_path('register'),
328 328 {
329 329 'username': usr,
330 330 'password': 'test12',
331 331 'password_confirmation': 'test12',
332 332 'email': 'goodmailm',
333 333 'firstname': 'test',
334 334 'lastname': 'test'
335 335 }
336 336 )
337 337
338 338 assertr = response.assert_response()
339 339 msg = u'Username "%(username)s" already exists'
340 340 msg = msg % {'username': usr}
341 341 assertr.element_contains('#username+.error-message', msg)
342 342
343 343 def test_register_special_chars(self):
344 344 response = self.app.post(
345 345 route_path('register'),
346 346 {
347 347 'username': 'xxxaxn',
348 348 'password': 'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
349 349 'password_confirmation': 'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
350 350 'email': 'goodmailm@test.plx',
351 351 'firstname': 'test',
352 352 'lastname': 'test'
353 353 }
354 354 )
355 355
356 356 msg = u'Invalid characters (non-ascii) in password'
357 357 response.mustcontain(msg)
358 358
359 359 def test_register_password_mismatch(self):
360 360 response = self.app.post(
361 361 route_path('register'),
362 362 {
363 363 'username': 'xs',
364 364 'password': '123qwe',
365 365 'password_confirmation': 'qwe123',
366 366 'email': 'goodmailm@test.plxa',
367 367 'firstname': 'test',
368 368 'lastname': 'test'
369 369 }
370 370 )
371 371 msg = u'Passwords do not match'
372 372 response.mustcontain(msg)
373 373
374 374 def test_register_ok(self):
375 375 username = 'test_regular4'
376 376 password = 'qweqwe'
377 377 email = 'marcin@test.com'
378 378 name = 'testname'
379 379 lastname = 'testlastname'
380 380
381 381 # this initializes a session
382 382 response = self.app.get(route_path('register'))
383 383 response.mustcontain('Create an Account')
384 384
385 385
386 386 response = self.app.post(
387 387 route_path('register'),
388 388 {
389 389 'username': username,
390 390 'password': password,
391 391 'password_confirmation': password,
392 392 'email': email,
393 393 'firstname': name,
394 394 'lastname': lastname,
395 395 'admin': True
396 396 },
397 397 status=302
398 398 ) # This should be overridden
399 399
400 400 assert_session_flash(
401 401 response, 'You have successfully registered with RhodeCode. You can log-in now.')
402 402
403 403 ret = Session().query(User).filter(
404 404 User.username == 'test_regular4').one()
405 405 assert ret.username == username
406 406 assert check_password(password, ret.password)
407 407 assert ret.email == email
408 408 assert ret.name == name
409 409 assert ret.lastname == lastname
410 410 assert ret.auth_tokens is not None
411 411 assert not ret.admin
412 412
413 413 def test_forgot_password_wrong_mail(self):
414 414 bad_email = 'marcin@wrongmail.org'
415 415 # this initializes a session
416 416 self.app.get(route_path('reset_password'))
417 417
418 418 response = self.app.post(
419 419 route_path('reset_password'), {'email': bad_email, }
420 420 )
421 421 assert_session_flash(response,
422 422 'If such email exists, a password reset link was sent to it.')
423 423
424 424 def test_forgot_password(self, user_util):
425 425 # this initializes a session
426 426 self.app.get(route_path('reset_password'))
427 427
428 428 user = user_util.create_user()
429 429 user_id = user.user_id
430 430 email = user.email
431 431
432 432 response = self.app.post(route_path('reset_password'), {'email': email, })
433 433
434 434 assert_session_flash(response,
435 435 'If such email exists, a password reset link was sent to it.')
436 436
437 437 # BAD KEY
438 438 confirm_url = route_path('reset_password_confirmation', params={'key': 'badkey'})
439 439 response = self.app.get(confirm_url, status=302)
440 440 assert response.location.endswith(route_path('reset_password'))
441 441 assert_session_flash(response, 'Given reset token is invalid')
442 442
443 443 response.follow() # cleanup flash
444 444
445 445 # GOOD KEY
446 446 key = UserApiKeys.query()\
447 447 .filter(UserApiKeys.user_id == user_id)\
448 448 .filter(UserApiKeys.role == UserApiKeys.ROLE_PASSWORD_RESET)\
449 449 .first()
450 450
451 451 assert key
452 452
453 453 confirm_url = '{}?key={}'.format(route_path('reset_password_confirmation'), key.api_key)
454 454 response = self.app.get(confirm_url)
455 455 assert response.status == '302 Found'
456 456 assert response.location.endswith(route_path('login'))
457 457
458 458 assert_session_flash(
459 459 response,
460 460 'Your password reset was successful, '
461 461 'a new password has been sent to your email')
462 462
463 463 response.follow()
464 464
465 465 def _get_api_whitelist(self, values=None):
466 466 config = {'api_access_controllers_whitelist': values or []}
467 467 return config
468 468
469 469 @pytest.mark.parametrize("test_name, auth_token", [
470 470 ('none', None),
471 471 ('empty_string', ''),
472 472 ('fake_number', '123456'),
473 473 ('proper_auth_token', None)
474 474 ])
475 475 def test_access_not_whitelisted_page_via_auth_token(
476 476 self, test_name, auth_token, user_admin):
477 477
478 478 whitelist = self._get_api_whitelist([])
479 479 with mock.patch.dict('rhodecode.CONFIG', whitelist):
480 480 assert [] == whitelist['api_access_controllers_whitelist']
481 481 if test_name == 'proper_auth_token':
482 482 # use builtin if api_key is None
483 483 auth_token = user_admin.api_key
484 484
485 485 with fixture.anon_access(False):
486 486 # webtest uses linter to check if response is bytes,
487 487 # and we use memoryview here as a wrapper, quick turn-off
488 488 self.app.lint = False
489 489
490 490 self.app.get(
491 491 route_path('repo_commit_raw',
492 492 repo_name=HG_REPO, commit_id='tip',
493 493 params=dict(api_key=auth_token)),
494 494 status=302)
495 495
496 496 @pytest.mark.parametrize("test_name, auth_token, code", [
497 497 ('none', None, 302),
498 498 ('empty_string', '', 302),
499 499 ('fake_number', '123456', 302),
500 500 ('proper_auth_token', None, 200)
501 501 ])
502 502 def test_access_whitelisted_page_via_auth_token(
503 503 self, test_name, auth_token, code, user_admin):
504 504
505 505 whitelist = self._get_api_whitelist(whitelist_view)
506 506
507 507 with mock.patch.dict('rhodecode.CONFIG', whitelist):
508 508 assert whitelist_view == whitelist['api_access_controllers_whitelist']
509 509
510 510 if test_name == 'proper_auth_token':
511 511 auth_token = user_admin.api_key
512 512 assert auth_token
513 513
514 514 with fixture.anon_access(False):
515 515 # webtest uses linter to check if response is bytes,
516 516 # and we use memoryview here as a wrapper, quick turn-off
517 517 self.app.lint = False
518 518 self.app.get(
519 519 route_path('repo_commit_raw',
520 520 repo_name=HG_REPO, commit_id='tip',
521 521 params=dict(api_key=auth_token)),
522 522 status=code)
523 523
524 524 @pytest.mark.parametrize("test_name, auth_token, code", [
525 525 ('proper_auth_token', None, 200),
526 526 ('wrong_auth_token', '123456', 302),
527 527 ])
528 528 def test_access_whitelisted_page_via_auth_token_bound_to_token(
529 529 self, test_name, auth_token, code, user_admin):
530 530
531 531 expected_token = auth_token
532 532 if test_name == 'proper_auth_token':
533 533 auth_token = user_admin.api_key
534 534 expected_token = auth_token
535 535 assert auth_token
536 536
537 537 whitelist = self._get_api_whitelist([
538 538 'RepoCommitsView:repo_commit_raw@{}'.format(expected_token)])
539 539
540 540 with mock.patch.dict('rhodecode.CONFIG', whitelist):
541 541
542 542 with fixture.anon_access(False):
543 543 # webtest uses linter to check if response is bytes,
544 544 # and we use memoryview here as a wrapper, quick turn-off
545 545 self.app.lint = False
546 546
547 547 self.app.get(
548 548 route_path('repo_commit_raw',
549 549 repo_name=HG_REPO, commit_id='tip',
550 550 params=dict(api_key=auth_token)),
551 551 status=code)
552 552
553 553 def test_access_page_via_extra_auth_token(self):
554 554 whitelist = self._get_api_whitelist(whitelist_view)
555 555 with mock.patch.dict('rhodecode.CONFIG', whitelist):
556 556 assert whitelist_view == \
557 557 whitelist['api_access_controllers_whitelist']
558 558
559 559 new_auth_token = AuthTokenModel().create(
560 560 TEST_USER_ADMIN_LOGIN, 'test')
561 561 Session().commit()
562 562 with fixture.anon_access(False):
563 563 # webtest uses linter to check if response is bytes,
564 564 # and we use memoryview here as a wrapper, quick turn-off
565 565 self.app.lint = False
566 566 self.app.get(
567 567 route_path('repo_commit_raw',
568 568 repo_name=HG_REPO, commit_id='tip',
569 569 params=dict(api_key=new_auth_token.api_key)),
570 570 status=200)
571 571
572 572 def test_access_page_via_expired_auth_token(self):
573 573 whitelist = self._get_api_whitelist(whitelist_view)
574 574 with mock.patch.dict('rhodecode.CONFIG', whitelist):
575 575 assert whitelist_view == \
576 576 whitelist['api_access_controllers_whitelist']
577 577
578 578 new_auth_token = AuthTokenModel().create(
579 579 TEST_USER_ADMIN_LOGIN, 'test')
580 580 Session().commit()
581 581 # patch the api key and make it expired
582 582 new_auth_token.expires = 0
583 583 Session().add(new_auth_token)
584 584 Session().commit()
585 585 with fixture.anon_access(False):
586 586 # webtest uses linter to check if response is bytes,
587 587 # and we use memoryview here as a wrapper, quick turn-off
588 588 self.app.lint = False
589 589 self.app.get(
590 590 route_path('repo_commit_raw',
591 591 repo_name=HG_REPO, commit_id='tip',
592 592 params=dict(api_key=new_auth_token.api_key)),
593 593 status=302)
@@ -1,108 +1,108 b''
1 1
2 2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software: you can redistribute it and/or modify
5 5 # it under the terms of the GNU Affero General Public License, version 3
6 6 # (only), as published by the Free Software Foundation.
7 7 #
8 8 # This program is distributed in the hope that it will be useful,
9 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 11 # GNU General Public License for more details.
12 12 #
13 13 # You should have received a copy of the GNU Affero General Public License
14 14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 15 #
16 16 # This program is dual-licensed. If you wish to learn more about the
17 17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 19
20 20 """
21 21 External module for testing plugins
22 22
23 23 rhodecode.tests.auth_external_test
24 24
25 25 """
26 26 import logging
27 27 import traceback
28 28
29 29 from rhodecode.authentication.base import (
30 30 RhodeCodeExternalAuthPlugin, hybrid_property)
31 31 from rhodecode.model.db import User
32 32 from rhodecode.lib.ext_json import formatted_json
33 33
34 34 log = logging.getLogger(__name__)
35 35
36 36
37 37 class RhodeCodeAuthPlugin(RhodeCodeExternalAuthPlugin):
38 38 def __init__(self):
39 39 self._logger = logging.getLogger(__name__)
40 40
41 41 @hybrid_property
42 42 def allows_creating_users(self):
43 43 return True
44 44
45 45 @hybrid_property
46 46 def name(self):
47 return u"external_test"
47 return "external_test"
48 48
49 49 def settings(self):
50 50 settings = [
51 51 ]
52 52 return settings
53 53
54 54 def use_fake_password(self):
55 55 return True
56 56
57 57 def user_activation_state(self):
58 58 def_user_perms = User.get_default_user().AuthUser().permissions['global']
59 59 return 'hg.extern_activate.auto' in def_user_perms
60 60
61 61 def auth(self, userobj, username, password, settings, **kwargs):
62 62 """
63 63 Given a user object (which may be null), username, a plaintext password,
64 64 and a settings object (containing all the keys needed as listed in settings()),
65 65 authenticate this user's login attempt.
66 66
67 67 Return None on failure. On success, return a dictionary of the form:
68 68
69 69 see: RhodeCodeAuthPluginBase.auth_func_attrs
70 70 This is later validated for correctness
71 71 """
72 72
73 73 if not username or not password:
74 74 log.debug('Empty username or password skipping...')
75 75 return None
76 76
77 77 try:
78 78 user_dn = username
79 79
80 80 # # old attrs fetched from RhodeCode database
81 81 admin = getattr(userobj, 'admin', False)
82 82 active = getattr(userobj, 'active', True)
83 83 email = getattr(userobj, 'email', '')
84 84 firstname = getattr(userobj, 'firstname', '')
85 85 lastname = getattr(userobj, 'lastname', '')
86 86 extern_type = getattr(userobj, 'extern_type', '')
87 87 #
88 88 user_attrs = {
89 89 'username': username,
90 90 'firstname': firstname,
91 91 'lastname': lastname,
92 92 'groups': [],
93 93 'email': '%s@rhodecode.com' % username,
94 94 'admin': admin,
95 95 'active': active,
96 96 "active_from_extern": None,
97 97 'extern_name': user_dn,
98 98 'extern_type': extern_type,
99 99 }
100 100
101 101 log.debug('EXTERNAL user: \n%s', formatted_json(user_attrs))
102 102 log.info('user `%s` authenticated correctly', user_attrs['username'])
103 103
104 104 return user_attrs
105 105
106 106 except (Exception,):
107 107 log.error(traceback.format_exc())
108 108 return None
@@ -1,188 +1,188 b''
1 1
2 2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software: you can redistribute it and/or modify
5 5 # it under the terms of the GNU Affero General Public License, version 3
6 6 # (only), as published by the Free Software Foundation.
7 7 #
8 8 # This program is distributed in the hope that it will be useful,
9 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 11 # GNU General Public License for more details.
12 12 #
13 13 # You should have received a copy of the GNU Affero General Public License
14 14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 15 #
16 16 # This program is dual-licensed. If you wish to learn more about the
17 17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 19
20 20 import mock
21 21 import pytest
22 22
23 23 from rhodecode.lib.auth import _RhodeCodeCryptoBCrypt
24 24 from rhodecode.authentication.base import RhodeCodeAuthPluginBase
25 25 from rhodecode.authentication.plugins.auth_ldap import RhodeCodeAuthPlugin
26 26 from rhodecode.model import db
27 27
28 28
29 29 class RcTestAuthPlugin(RhodeCodeAuthPluginBase):
30 30
31 31 def name(self):
32 return u'stub_auth'
32 return 'stub_auth'
33 33
34 34
35 35 def test_authenticate_returns_from_auth(stub_auth_data):
36 36 plugin = RcTestAuthPlugin('stub_id')
37 37 with mock.patch.object(plugin, 'auth') as auth_mock:
38 38 auth_mock.return_value = stub_auth_data
39 39 result = plugin._authenticate(mock.Mock(), 'test', 'password', {})
40 40 assert stub_auth_data == result
41 41
42 42
43 43 def test_authenticate_returns_empty_auth_data():
44 44 auth_data = {}
45 45 plugin = RcTestAuthPlugin('stub_id')
46 46 with mock.patch.object(plugin, 'auth') as auth_mock:
47 47 auth_mock.return_value = auth_data
48 48 result = plugin._authenticate(mock.Mock(), 'test', 'password', {})
49 49 assert auth_data == result
50 50
51 51
52 52 def test_authenticate_skips_hash_migration_if_mismatch(stub_auth_data):
53 53 stub_auth_data['_hash_migrate'] = 'new-hash'
54 54 plugin = RcTestAuthPlugin('stub_id')
55 55 with mock.patch.object(plugin, 'auth') as auth_mock:
56 56 auth_mock.return_value = stub_auth_data
57 57 result = plugin._authenticate(mock.Mock(), 'test', 'password', {})
58 58
59 59 user = db.User.get_by_username(stub_auth_data['username'])
60 60 assert user.password != 'new-hash'
61 61 assert result == stub_auth_data
62 62
63 63
64 64 def test_authenticate_migrates_to_new_hash(stub_auth_data):
65 65 new_password = b'new-password'
66 66 new_hash = _RhodeCodeCryptoBCrypt().hash_create(new_password)
67 67 stub_auth_data['_hash_migrate'] = new_hash
68 68 plugin = RcTestAuthPlugin('stub_id')
69 69 with mock.patch.object(plugin, 'auth') as auth_mock:
70 70 auth_mock.return_value = stub_auth_data
71 71 result = plugin._authenticate(
72 72 mock.Mock(), stub_auth_data['username'], new_password, {})
73 73
74 74 user = db.User.get_by_username(stub_auth_data['username'])
75 75 assert user.password == new_hash
76 76 assert result == stub_auth_data
77 77
78 78
79 79 @pytest.fixture()
80 80 def stub_auth_data(user_util):
81 81 user = user_util.create_user()
82 82 data = {
83 83 'username': user.username,
84 84 'password': 'password',
85 85 'email': 'test@example.org',
86 86 'firstname': 'John',
87 87 'lastname': 'Smith',
88 88 'groups': [],
89 89 'active': True,
90 90 'admin': False,
91 91 'extern_name': 'test',
92 92 'extern_type': 'ldap',
93 93 'active_from_extern': True
94 94 }
95 95 return data
96 96
97 97
98 98 class TestRhodeCodeAuthPlugin(object):
99 99 def setup_method(self, method):
100 100 self.finalizers = []
101 101 self.user = mock.Mock()
102 102 self.user.username = 'test'
103 103 self.user.password = 'old-password'
104 104 self.fake_auth = {
105 105 'username': 'test',
106 106 'password': 'test',
107 107 'email': 'test@example.org',
108 108 'firstname': 'John',
109 109 'lastname': 'Smith',
110 110 'groups': [],
111 111 'active': True,
112 112 'admin': False,
113 113 'extern_name': 'test',
114 114 'extern_type': 'ldap',
115 115 'active_from_extern': True
116 116 }
117 117
118 118 def teardown_method(self, method):
119 119 if self.finalizers:
120 120 for finalizer in self.finalizers:
121 121 finalizer()
122 122 self.finalizers = []
123 123
124 124 def test_fake_password_is_created_for_the_new_user(self):
125 125 self._patch()
126 126 auth_plugin = RhodeCodeAuthPlugin('stub_id')
127 127 auth_plugin._authenticate(self.user, 'test', 'test', [])
128 128 self.password_generator_mock.assert_called_once_with(length=16)
129 129 create_user_kwargs = self.create_user_mock.call_args[1]
130 130 assert create_user_kwargs['password'] == 'new-password'
131 131
132 132 def test_fake_password_is_not_created_for_the_existing_user(self):
133 133 self._patch()
134 134 self.get_user_mock.return_value = self.user
135 135 auth_plugin = RhodeCodeAuthPlugin('stub_id')
136 136 auth_plugin._authenticate(self.user, 'test', 'test', [])
137 137 assert self.password_generator_mock.called is False
138 138 create_user_kwargs = self.create_user_mock.call_args[1]
139 139 assert create_user_kwargs['password'] == self.user.password
140 140
141 141 def _patch(self):
142 142 get_user_patch = mock.patch('rhodecode.model.db.User.get_by_username')
143 143 self.get_user_mock = get_user_patch.start()
144 144 self.get_user_mock.return_value = None
145 145 self.finalizers.append(get_user_patch.stop)
146 146
147 147 create_user_patch = mock.patch(
148 148 'rhodecode.model.user.UserModel.create_or_update')
149 149 self.create_user_mock = create_user_patch.start()
150 150 self.create_user_mock.return_value = None
151 151 self.finalizers.append(create_user_patch.stop)
152 152
153 153 auth_patch = mock.patch.object(RhodeCodeAuthPlugin, 'auth')
154 154 self.auth_mock = auth_patch.start()
155 155 self.auth_mock.return_value = self.fake_auth
156 156 self.finalizers.append(auth_patch.stop)
157 157
158 158 password_generator_patch = mock.patch(
159 159 'rhodecode.lib.auth.PasswordGenerator.gen_password')
160 160 self.password_generator_mock = password_generator_patch.start()
161 161 self.password_generator_mock.return_value = 'new-password'
162 162 self.finalizers.append(password_generator_patch.stop)
163 163
164 164
165 165 def test_missing_ldap():
166 166 from rhodecode.model.validators import Missing
167 167
168 168 try:
169 169 import ldap_not_existing
170 170 except ImportError:
171 171 # means that python-ldap is not installed
172 172 ldap_not_existing = Missing
173 173
174 174 # missing is singleton
175 175 assert ldap_not_existing == Missing
176 176
177 177
178 178 def test_import_ldap():
179 179 from rhodecode.model.validators import Missing
180 180
181 181 try:
182 182 import ldap
183 183 except ImportError:
184 184 # means that python-ldap is not installed
185 185 ldap = Missing
186 186
187 187 # missing is singleton
188 188 assert False is (ldap == Missing)
@@ -1,1307 +1,1307 b''
1 1
2 2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software: you can redistribute it and/or modify
5 5 # it under the terms of the GNU Affero General Public License, version 3
6 6 # (only), as published by the Free Software Foundation.
7 7 #
8 8 # This program is distributed in the hope that it will be useful,
9 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 11 # GNU General Public License for more details.
12 12 #
13 13 # You should have received a copy of the GNU Affero General Public License
14 14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 15 #
16 16 # This program is dual-licensed. If you wish to learn more about the
17 17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 19
20 20 import datetime
21 21 import mock
22 22 import os
23 23 import sys
24 24 import shutil
25 25
26 26 import pytest
27 27
28 28 from rhodecode.lib.utils import make_db_config
29 29 from rhodecode.lib.vcs.backends.base import Reference
30 30 from rhodecode.lib.vcs.backends.git import (
31 31 GitRepository, GitCommit, discover_git_version)
32 32 from rhodecode.lib.vcs.exceptions import (
33 33 RepositoryError, VCSError, NodeDoesNotExistError)
34 34 from rhodecode.lib.vcs.nodes import (
35 35 NodeKind, FileNode, DirNode, NodeState, SubModuleNode)
36 36 from rhodecode.tests import TEST_GIT_REPO, TEST_GIT_REPO_CLONE, get_new_dir
37 37 from rhodecode.tests.vcs.conftest import BackendTestMixin
38 38
39 39
40 40 pytestmark = pytest.mark.backends("git")
41 41
42 42
43 43 DIFF_FROM_REMOTE = br"""diff --git a/foobar b/foobar
44 44 new file mode 100644
45 45 index 0000000..f6ea049
46 46 --- /dev/null
47 47 +++ b/foobar
48 48 @@ -0,0 +1 @@
49 49 +foobar
50 50 \ No newline at end of file
51 51 diff --git a/foobar2 b/foobar2
52 52 new file mode 100644
53 53 index 0000000..e8c9d6b
54 54 --- /dev/null
55 55 +++ b/foobar2
56 56 @@ -0,0 +1 @@
57 57 +foobar2
58 58 \ No newline at end of file
59 59 """
60 60
61 61
62 62 def callable_get_diff(*args, **kwargs):
63 63 return DIFF_FROM_REMOTE
64 64
65 65
66 66 class TestGitRepository(object):
67 67
68 68 @pytest.fixture(autouse=True)
69 69 def prepare(self, request, baseapp):
70 70 self.repo = GitRepository(TEST_GIT_REPO, bare=True)
71 71 self.repo.count()
72 72
73 73 def get_clone_repo(self, tmpdir):
74 74 """
75 75 Return a non bare clone of the base repo.
76 76 """
77 77 clone_path = str(tmpdir.join('clone-repo'))
78 78 repo_clone = GitRepository(
79 79 clone_path, create=True, src_url=self.repo.path, bare=False)
80 80
81 81 return repo_clone
82 82
83 83 def get_empty_repo(self, tmpdir, bare=False):
84 84 """
85 85 Return a non bare empty repo.
86 86 """
87 87 clone_path = str(tmpdir.join('empty-repo'))
88 88 return GitRepository(clone_path, create=True, bare=bare)
89 89
90 90 def test_wrong_repo_path(self):
91 91 wrong_repo_path = '/tmp/errorrepo_git'
92 92 with pytest.raises(RepositoryError):
93 93 GitRepository(wrong_repo_path)
94 94
95 95 def test_repo_clone(self, tmp_path_factory):
96 96 repo = GitRepository(TEST_GIT_REPO)
97 97 clone_path = '{}_{}'.format(tmp_path_factory.mktemp('_'), TEST_GIT_REPO_CLONE)
98 98 repo_clone = GitRepository(
99 99 clone_path,
100 100 src_url=TEST_GIT_REPO, create=True, do_workspace_checkout=True)
101 101
102 102 assert len(repo.commit_ids) == len(repo_clone.commit_ids)
103 103 # Checking hashes of commits should be enough
104 104 for commit in repo.get_commits():
105 105 raw_id = commit.raw_id
106 106 assert raw_id == repo_clone.get_commit(raw_id).raw_id
107 107
108 108 def test_repo_clone_without_create(self):
109 109 with pytest.raises(RepositoryError):
110 110 GitRepository(
111 111 TEST_GIT_REPO_CLONE + '_wo_create', src_url=TEST_GIT_REPO)
112 112
113 113 def test_repo_clone_with_update(self, tmp_path_factory):
114 114 repo = GitRepository(TEST_GIT_REPO)
115 115 clone_path = '{}_{}_update'.format(tmp_path_factory.mktemp('_'), TEST_GIT_REPO_CLONE)
116 116
117 117 repo_clone = GitRepository(
118 118 clone_path,
119 119 create=True, src_url=TEST_GIT_REPO, do_workspace_checkout=True)
120 120 assert len(repo.commit_ids) == len(repo_clone.commit_ids)
121 121
122 122 # check if current workdir was updated
123 123 fpath = os.path.join(clone_path, 'MANIFEST.in')
124 124 assert os.path.isfile(fpath)
125 125
126 126 def test_repo_clone_without_update(self, tmp_path_factory):
127 127 repo = GitRepository(TEST_GIT_REPO)
128 128 clone_path = '{}_{}_without_update'.format(tmp_path_factory.mktemp('_'), TEST_GIT_REPO_CLONE)
129 129 repo_clone = GitRepository(
130 130 clone_path,
131 131 create=True, src_url=TEST_GIT_REPO, do_workspace_checkout=False)
132 132 assert len(repo.commit_ids) == len(repo_clone.commit_ids)
133 133 # check if current workdir was *NOT* updated
134 134 fpath = os.path.join(clone_path, 'MANIFEST.in')
135 135 # Make sure it's not bare repo
136 136 assert not repo_clone.bare
137 137 assert not os.path.isfile(fpath)
138 138
139 139 def test_repo_clone_into_bare_repo(self, tmp_path_factory):
140 140 repo = GitRepository(TEST_GIT_REPO)
141 141 clone_path = '{}_{}_bare.git'.format(tmp_path_factory.mktemp('_'), TEST_GIT_REPO_CLONE)
142 142 repo_clone = GitRepository(
143 143 clone_path, create=True, src_url=repo.path, bare=True)
144 144 assert repo_clone.bare
145 145
146 146 def test_create_repo_is_not_bare_by_default(self):
147 147 repo = GitRepository(get_new_dir('not-bare-by-default'), create=True)
148 148 assert not repo.bare
149 149
150 150 def test_create_bare_repo(self):
151 151 repo = GitRepository(get_new_dir('bare-repo'), create=True, bare=True)
152 152 assert repo.bare
153 153
154 154 def test_update_server_info(self):
155 155 self.repo._update_server_info()
156 156
157 157 def test_fetch(self, vcsbackend_git):
158 158 # Note: This is a git specific part of the API, it's only implemented
159 159 # by the git backend.
160 160 source_repo = vcsbackend_git.repo
161 161 target_repo = vcsbackend_git.create_repo(bare=True)
162 162 target_repo.fetch(source_repo.path)
163 163 # Note: Get a fresh instance, avoids caching trouble
164 164 target_repo = vcsbackend_git.backend(target_repo.path)
165 165 assert len(source_repo.commit_ids) == len(target_repo.commit_ids)
166 166
167 167 def test_commit_ids(self):
168 168 # there are 112 commits (by now)
169 169 # so we can assume they would be available from now on
170 170 subset = {'c1214f7e79e02fc37156ff215cd71275450cffc3',
171 171 '38b5fe81f109cb111f549bfe9bb6b267e10bc557',
172 172 'fa6600f6848800641328adbf7811fd2372c02ab2',
173 173 '102607b09cdd60e2793929c4f90478be29f85a17',
174 174 '49d3fd156b6f7db46313fac355dca1a0b94a0017',
175 175 '2d1028c054665b962fa3d307adfc923ddd528038',
176 176 'd7e0d30fbcae12c90680eb095a4f5f02505ce501',
177 177 'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
178 178 'dd80b0f6cf5052f17cc738c2951c4f2070200d7f',
179 179 '8430a588b43b5d6da365400117c89400326e7992',
180 180 'd955cd312c17b02143c04fa1099a352b04368118',
181 181 'f67b87e5c629c2ee0ba58f85197e423ff28d735b',
182 182 'add63e382e4aabc9e1afdc4bdc24506c269b7618',
183 183 'f298fe1189f1b69779a4423f40b48edf92a703fc',
184 184 'bd9b619eb41994cac43d67cf4ccc8399c1125808',
185 185 '6e125e7c890379446e98980d8ed60fba87d0f6d1',
186 186 'd4a54db9f745dfeba6933bf5b1e79e15d0af20bd',
187 187 '0b05e4ed56c802098dfc813cbe779b2f49e92500',
188 188 '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
189 189 '45223f8f114c64bf4d6f853e3c35a369a6305520',
190 190 'ca1eb7957a54bce53b12d1a51b13452f95bc7c7e',
191 191 'f5ea29fc42ef67a2a5a7aecff10e1566699acd68',
192 192 '27d48942240f5b91dfda77accd2caac94708cc7d',
193 193 '622f0eb0bafd619d2560c26f80f09e3b0b0d78af',
194 194 'e686b958768ee96af8029fe19c6050b1a8dd3b2b'}
195 195 assert subset.issubset(set(self.repo.commit_ids))
196 196
197 197 def test_slicing(self):
198 198 # 4 1 5 10 95
199 199 for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5),
200 200 (10, 20, 10), (5, 100, 95)]:
201 201 commit_ids = list(self.repo[sfrom:sto])
202 202 assert len(commit_ids) == size
203 203 assert commit_ids[0] == self.repo.get_commit(commit_idx=sfrom)
204 204 assert commit_ids[-1] == self.repo.get_commit(commit_idx=sto - 1)
205 205
206 206 def test_branches(self):
207 207 # TODO: Need more tests here
208 208 # Removed (those are 'remotes' branches for cloned repo)
209 209 # assert 'master' in self.repo.branches
210 210 # assert 'gittree' in self.repo.branches
211 211 # assert 'web-branch' in self.repo.branches
212 212 for __, commit_id in self.repo.branches.items():
213 213 assert isinstance(self.repo.get_commit(commit_id), GitCommit)
214 214
215 215 def test_tags(self):
216 216 # TODO: Need more tests here
217 217 assert 'v0.1.1' in self.repo.tags
218 218 assert 'v0.1.2' in self.repo.tags
219 219 for __, commit_id in self.repo.tags.items():
220 220 assert isinstance(self.repo.get_commit(commit_id), GitCommit)
221 221
222 222 def _test_single_commit_cache(self, commit_id):
223 223 commit = self.repo.get_commit(commit_id)
224 224 assert commit_id in self.repo.commits
225 225 assert commit is self.repo.commits[commit_id]
226 226
227 227 def test_initial_commit(self):
228 228 commit_id = self.repo.commit_ids[0]
229 229 init_commit = self.repo.get_commit(commit_id)
230 230 init_author = init_commit.author
231 231
232 232 assert init_commit.message == 'initial import\n'
233 233 assert init_author == 'Marcin Kuzminski <marcin@python-blog.com>'
234 234 assert init_author == init_commit.committer
235 235 for path in ('vcs/__init__.py',
236 236 'vcs/backends/BaseRepository.py',
237 237 'vcs/backends/__init__.py'):
238 238 assert isinstance(init_commit.get_node(path), FileNode)
239 239 for path in ('', 'vcs', 'vcs/backends'):
240 240 assert isinstance(init_commit.get_node(path), DirNode)
241 241
242 242 with pytest.raises(NodeDoesNotExistError):
243 243 init_commit.get_node(path='foobar')
244 244
245 245 node = init_commit.get_node('vcs/')
246 246 assert hasattr(node, 'kind')
247 247 assert node.kind == NodeKind.DIR
248 248
249 249 node = init_commit.get_node('vcs')
250 250 assert hasattr(node, 'kind')
251 251 assert node.kind == NodeKind.DIR
252 252
253 253 node = init_commit.get_node('vcs/__init__.py')
254 254 assert hasattr(node, 'kind')
255 255 assert node.kind == NodeKind.FILE
256 256
257 257 def test_not_existing_commit(self):
258 258 with pytest.raises(RepositoryError):
259 259 self.repo.get_commit('f' * 40)
260 260
261 261 def test_commit10(self):
262 262
263 263 commit10 = self.repo.get_commit(self.repo.commit_ids[9])
264 264 README = """===
265 265 VCS
266 266 ===
267 267
268 268 Various Version Control System management abstraction layer for Python.
269 269
270 270 Introduction
271 271 ------------
272 272
273 273 TODO: To be written...
274 274
275 275 """
276 276 node = commit10.get_node('README.rst')
277 277 assert node.kind == NodeKind.FILE
278 278 assert node.str_content == README
279 279
280 280 def test_head(self):
281 281 assert self.repo.head == self.repo.get_commit().raw_id
282 282
283 283 def test_checkout_with_create(self, tmpdir):
284 284 repo_clone = self.get_clone_repo(tmpdir)
285 285
286 286 new_branch = 'new_branch'
287 287 assert repo_clone._current_branch() == 'master'
288 288 assert set(repo_clone.branches) == {'master'}
289 289 repo_clone._checkout(new_branch, create=True)
290 290
291 291 # Branches is a lazy property so we need to recrete the Repo object.
292 292 repo_clone = GitRepository(repo_clone.path)
293 293 assert set(repo_clone.branches) == {'master', new_branch}
294 294 assert repo_clone._current_branch() == new_branch
295 295
296 296 def test_checkout(self, tmpdir):
297 297 repo_clone = self.get_clone_repo(tmpdir)
298 298
299 299 repo_clone._checkout('new_branch', create=True)
300 300 repo_clone._checkout('master')
301 301
302 302 assert repo_clone._current_branch() == 'master'
303 303
304 304 def test_checkout_same_branch(self, tmpdir):
305 305 repo_clone = self.get_clone_repo(tmpdir)
306 306
307 307 repo_clone._checkout('master')
308 308 assert repo_clone._current_branch() == 'master'
309 309
310 310 def test_checkout_branch_already_exists(self, tmpdir):
311 311 repo_clone = self.get_clone_repo(tmpdir)
312 312
313 313 with pytest.raises(RepositoryError):
314 314 repo_clone._checkout('master', create=True)
315 315
316 316 def test_checkout_bare_repo(self):
317 317 with pytest.raises(RepositoryError):
318 318 self.repo._checkout('master')
319 319
320 320 def test_current_branch_bare_repo(self):
321 321 with pytest.raises(RepositoryError):
322 322 self.repo._current_branch()
323 323
324 324 def test_current_branch_empty_repo(self, tmpdir):
325 325 repo = self.get_empty_repo(tmpdir)
326 326 assert repo._current_branch() is None
327 327
328 328 def test_local_clone(self, tmp_path_factory):
329 329 clone_path = str(tmp_path_factory.mktemp('test-local-clone'))
330 330 self.repo._local_clone(clone_path, 'master')
331 331 repo_clone = GitRepository(clone_path)
332 332
333 333 assert self.repo.commit_ids == repo_clone.commit_ids
334 334
335 335 def test_local_clone_with_specific_branch(self, tmpdir):
336 336 source_repo = self.get_clone_repo(tmpdir)
337 337
338 338 # Create a new branch in source repo
339 339 new_branch_commit = source_repo.commit_ids[-3]
340 340 source_repo._checkout(new_branch_commit)
341 341 source_repo._checkout('new_branch', create=True)
342 342
343 343 clone_path = str(tmpdir.join('git-clone-path-1'))
344 344 source_repo._local_clone(clone_path, 'new_branch')
345 345 repo_clone = GitRepository(clone_path)
346 346
347 347 assert source_repo.commit_ids[:-3 + 1] == repo_clone.commit_ids
348 348
349 349 clone_path = str(tmpdir.join('git-clone-path-2'))
350 350 source_repo._local_clone(clone_path, 'master')
351 351 repo_clone = GitRepository(clone_path)
352 352
353 353 assert source_repo.commit_ids == repo_clone.commit_ids
354 354
355 355 def test_local_clone_fails_if_target_exists(self):
356 356 with pytest.raises(RepositoryError):
357 357 self.repo._local_clone(self.repo.path, 'master')
358 358
359 359 def test_local_fetch(self, tmpdir):
360 360 target_repo = self.get_empty_repo(tmpdir)
361 361 source_repo = self.get_clone_repo(tmpdir)
362 362
363 363 # Create a new branch in source repo
364 364 master_commit = source_repo.commit_ids[-1]
365 365 new_branch_commit = source_repo.commit_ids[-3]
366 366 source_repo._checkout(new_branch_commit)
367 367 source_repo._checkout('new_branch', create=True)
368 368
369 369 target_repo._local_fetch(source_repo.path, 'new_branch')
370 370 assert target_repo._last_fetch_heads() == [new_branch_commit]
371 371
372 372 target_repo._local_fetch(source_repo.path, 'master')
373 373 assert target_repo._last_fetch_heads() == [master_commit]
374 374
375 375 def test_local_fetch_from_bare_repo(self, tmpdir):
376 376 target_repo = self.get_empty_repo(tmpdir)
377 377 target_repo._local_fetch(self.repo.path, 'master')
378 378
379 379 master_commit = self.repo.commit_ids[-1]
380 380 assert target_repo._last_fetch_heads() == [master_commit]
381 381
382 382 def test_local_fetch_from_same_repo(self):
383 383 with pytest.raises(ValueError):
384 384 self.repo._local_fetch(self.repo.path, 'master')
385 385
386 386 def test_local_fetch_branch_does_not_exist(self, tmpdir):
387 387 target_repo = self.get_empty_repo(tmpdir)
388 388
389 389 with pytest.raises(RepositoryError):
390 390 target_repo._local_fetch(self.repo.path, 'new_branch')
391 391
392 392 def test_local_pull(self, tmpdir):
393 393 target_repo = self.get_empty_repo(tmpdir)
394 394 source_repo = self.get_clone_repo(tmpdir)
395 395
396 396 # Create a new branch in source repo
397 397 master_commit = source_repo.commit_ids[-1]
398 398 new_branch_commit = source_repo.commit_ids[-3]
399 399 source_repo._checkout(new_branch_commit)
400 400 source_repo._checkout('new_branch', create=True)
401 401
402 402 target_repo._local_pull(source_repo.path, 'new_branch')
403 403 target_repo = GitRepository(target_repo.path)
404 404 assert target_repo.head == new_branch_commit
405 405
406 406 target_repo._local_pull(source_repo.path, 'master')
407 407 target_repo = GitRepository(target_repo.path)
408 408 assert target_repo.head == master_commit
409 409
410 410 def test_local_pull_in_bare_repo(self):
411 411 with pytest.raises(RepositoryError):
412 412 self.repo._local_pull(self.repo.path, 'master')
413 413
414 414 def test_local_merge(self, tmpdir):
415 415 target_repo = self.get_empty_repo(tmpdir)
416 416 source_repo = self.get_clone_repo(tmpdir)
417 417
418 418 # Create a new branch in source repo
419 419 master_commit = source_repo.commit_ids[-1]
420 420 new_branch_commit = source_repo.commit_ids[-3]
421 421 source_repo._checkout(new_branch_commit)
422 422 source_repo._checkout('new_branch', create=True)
423 423
424 424 # This is required as one cannot do a -ff-only merge in an empty repo.
425 425 target_repo._local_pull(source_repo.path, 'new_branch')
426 426
427 427 target_repo._local_fetch(source_repo.path, 'master')
428 428 merge_message = 'Merge message\n\nDescription:...'
429 429 user_name = 'Albert Einstein'
430 430 user_email = 'albert@einstein.com'
431 431 target_repo._local_merge(merge_message, user_name, user_email,
432 432 target_repo._last_fetch_heads())
433 433
434 434 target_repo = GitRepository(target_repo.path)
435 435 assert target_repo.commit_ids[-2] == master_commit
436 436 last_commit = target_repo.get_commit(target_repo.head)
437 437 assert last_commit.message.strip() == merge_message
438 438 assert last_commit.author == '%s <%s>' % (user_name, user_email)
439 439
440 440 assert not os.path.exists(
441 441 os.path.join(target_repo.path, '.git', 'MERGE_HEAD'))
442 442
443 443 def test_local_merge_raises_exception_on_conflict(self, vcsbackend_git):
444 444 target_repo = vcsbackend_git.create_repo(number_of_commits=1)
445 445 vcsbackend_git.ensure_file(b'README', b'I will conflict with you!!!')
446 446
447 447 target_repo._local_fetch(self.repo.path, 'master')
448 448 with pytest.raises(RepositoryError):
449 449 target_repo._local_merge(
450 450 'merge_message', 'user name', 'user@name.com',
451 451 target_repo._last_fetch_heads())
452 452
453 453 # Check we are not left in an intermediate merge state
454 454 assert not os.path.exists(
455 455 os.path.join(target_repo.path, '.git', 'MERGE_HEAD'))
456 456
457 457 def test_local_merge_into_empty_repo(self, tmpdir):
458 458 target_repo = self.get_empty_repo(tmpdir)
459 459
460 460 # This is required as one cannot do a -ff-only merge in an empty repo.
461 461 target_repo._local_fetch(self.repo.path, 'master')
462 462 with pytest.raises(RepositoryError):
463 463 target_repo._local_merge(
464 464 'merge_message', 'user name', 'user@name.com',
465 465 target_repo._last_fetch_heads())
466 466
467 467 def test_local_merge_in_bare_repo(self):
468 468 with pytest.raises(RepositoryError):
469 469 self.repo._local_merge(
470 470 'merge_message', 'user name', 'user@name.com', None)
471 471
472 472 def test_local_push_non_bare(self, tmpdir):
473 473 target_repo = self.get_empty_repo(tmpdir)
474 474
475 475 pushed_branch = 'pushed_branch'
476 476 self.repo._local_push('master', target_repo.path, pushed_branch)
477 477 # Fix the HEAD of the target repo, or otherwise GitRepository won't
478 478 # report any branches.
479 479 with open(os.path.join(target_repo.path, '.git', 'HEAD'), 'w') as f:
480 480 f.write('ref: refs/heads/%s' % pushed_branch)
481 481
482 482 target_repo = GitRepository(target_repo.path)
483 483
484 484 assert (target_repo.branches[pushed_branch] ==
485 485 self.repo.branches['master'])
486 486
487 487 def test_local_push_bare(self, tmpdir):
488 488 target_repo = self.get_empty_repo(tmpdir, bare=True)
489 489
490 490 pushed_branch = 'pushed_branch'
491 491 self.repo._local_push('master', target_repo.path, pushed_branch)
492 492 # Fix the HEAD of the target repo, or otherwise GitRepository won't
493 493 # report any branches.
494 494 with open(os.path.join(target_repo.path, 'HEAD'), 'w') as f:
495 495 f.write('ref: refs/heads/%s' % pushed_branch)
496 496
497 497 target_repo = GitRepository(target_repo.path)
498 498
499 499 assert (target_repo.branches[pushed_branch] ==
500 500 self.repo.branches['master'])
501 501
502 502 def test_local_push_non_bare_target_branch_is_checked_out(self, tmpdir):
503 503 target_repo = self.get_clone_repo(tmpdir)
504 504
505 505 pushed_branch = 'pushed_branch'
506 506 # Create a new branch in source repo
507 507 new_branch_commit = target_repo.commit_ids[-3]
508 508 target_repo._checkout(new_branch_commit)
509 509 target_repo._checkout(pushed_branch, create=True)
510 510
511 511 self.repo._local_push('master', target_repo.path, pushed_branch)
512 512
513 513 target_repo = GitRepository(target_repo.path)
514 514
515 515 assert (target_repo.branches[pushed_branch] ==
516 516 self.repo.branches['master'])
517 517
518 518 def test_local_push_raises_exception_on_conflict(self, vcsbackend_git):
519 519 target_repo = vcsbackend_git.create_repo(number_of_commits=1)
520 520 with pytest.raises(RepositoryError):
521 521 self.repo._local_push('master', target_repo.path, 'master')
522 522
523 523 def test_hooks_can_be_enabled_via_env_variable_for_local_push(self, tmpdir):
524 524 target_repo = self.get_empty_repo(tmpdir, bare=True)
525 525
526 526 with mock.patch.object(self.repo, 'run_git_command') as run_mock:
527 527 self.repo._local_push(
528 528 'master', target_repo.path, 'master', enable_hooks=True)
529 529 env = run_mock.call_args[1]['extra_env']
530 530 assert 'RC_SKIP_HOOKS' not in env
531 531
532 532 def _add_failing_hook(self, repo_path, hook_name, bare=False):
533 533 path_components = (
534 534 ['hooks', hook_name] if bare else ['.git', 'hooks', hook_name])
535 535 hook_path = os.path.join(repo_path, *path_components)
536 536 with open(hook_path, 'w') as f:
537 537 script_lines = [
538 538 '#!%s' % sys.executable,
539 539 'import os',
540 540 'import sys',
541 541 'if os.environ.get("RC_SKIP_HOOKS"):',
542 542 ' sys.exit(0)',
543 543 'sys.exit(1)',
544 544 ]
545 545 f.write('\n'.join(script_lines))
546 546 os.chmod(hook_path, 0o755)
547 547
548 548 def test_local_push_does_not_execute_hook(self, tmpdir):
549 549 target_repo = self.get_empty_repo(tmpdir)
550 550
551 551 pushed_branch = 'pushed_branch'
552 552 self._add_failing_hook(target_repo.path, 'pre-receive')
553 553 self.repo._local_push('master', target_repo.path, pushed_branch)
554 554 # Fix the HEAD of the target repo, or otherwise GitRepository won't
555 555 # report any branches.
556 556 with open(os.path.join(target_repo.path, '.git', 'HEAD'), 'w') as f:
557 557 f.write('ref: refs/heads/%s' % pushed_branch)
558 558
559 559 target_repo = GitRepository(target_repo.path)
560 560
561 561 assert (target_repo.branches[pushed_branch] ==
562 562 self.repo.branches['master'])
563 563
564 564 def test_local_push_executes_hook(self, tmpdir):
565 565 target_repo = self.get_empty_repo(tmpdir, bare=True)
566 566 self._add_failing_hook(target_repo.path, 'pre-receive', bare=True)
567 567 with pytest.raises(RepositoryError):
568 568 self.repo._local_push(
569 569 'master', target_repo.path, 'master', enable_hooks=True)
570 570
571 571 def test_maybe_prepare_merge_workspace(self):
572 572 workspace = self.repo._maybe_prepare_merge_workspace(
573 573 2, 'pr2', Reference('branch', 'master', 'unused'),
574 574 Reference('branch', 'master', 'unused'))
575 575
576 576 assert os.path.isdir(workspace)
577 577 workspace_repo = GitRepository(workspace)
578 578 assert workspace_repo.branches == self.repo.branches
579 579
580 580 # Calling it a second time should also succeed
581 581 workspace = self.repo._maybe_prepare_merge_workspace(
582 582 2, 'pr2', Reference('branch', 'master', 'unused'),
583 583 Reference('branch', 'master', 'unused'))
584 584 assert os.path.isdir(workspace)
585 585
586 586 def test_maybe_prepare_merge_workspace_different_refs(self):
587 587 workspace = self.repo._maybe_prepare_merge_workspace(
588 588 2, 'pr2', Reference('branch', 'master', 'unused'),
589 589 Reference('branch', 'develop', 'unused'))
590 590
591 591 assert os.path.isdir(workspace)
592 592 workspace_repo = GitRepository(workspace)
593 593 assert workspace_repo.branches == self.repo.branches
594 594
595 595 # Calling it a second time should also succeed
596 596 workspace = self.repo._maybe_prepare_merge_workspace(
597 597 2, 'pr2', Reference('branch', 'master', 'unused'),
598 598 Reference('branch', 'develop', 'unused'))
599 599 assert os.path.isdir(workspace)
600 600
601 601 def test_cleanup_merge_workspace(self):
602 602 workspace = self.repo._maybe_prepare_merge_workspace(
603 603 2, 'pr3', Reference('branch', 'master', 'unused'),
604 604 Reference('branch', 'master', 'unused'))
605 605 self.repo.cleanup_merge_workspace(2, 'pr3')
606 606
607 607 assert not os.path.exists(workspace)
608 608
609 609 def test_cleanup_merge_workspace_invalid_workspace_id(self):
610 610 # No assert: because in case of an inexistent workspace this function
611 611 # should still succeed.
612 612 self.repo.cleanup_merge_workspace(1, 'pr4')
613 613
614 614 def test_set_refs(self):
615 615 test_ref = 'refs/test-refs/abcde'
616 616 test_commit_id = 'ecb86e1f424f2608262b130db174a7dfd25a6623'
617 617
618 618 self.repo.set_refs(test_ref, test_commit_id)
619 619 stdout, _ = self.repo.run_git_command(['show-ref'])
620 620 assert test_ref in stdout
621 621 assert test_commit_id in stdout
622 622
623 623 def test_remove_ref(self):
624 624 test_ref = 'refs/test-refs/abcde'
625 625 test_commit_id = 'ecb86e1f424f2608262b130db174a7dfd25a6623'
626 626 self.repo.set_refs(test_ref, test_commit_id)
627 627 stdout, _ = self.repo.run_git_command(['show-ref'])
628 628 assert test_ref in stdout
629 629 assert test_commit_id in stdout
630 630
631 631 self.repo.remove_ref(test_ref)
632 632 stdout, _ = self.repo.run_git_command(['show-ref'])
633 633 assert test_ref not in stdout
634 634 assert test_commit_id not in stdout
635 635
636 636
637 637 class TestGitCommit(object):
638 638
639 639 @pytest.fixture(autouse=True)
640 640 def prepare(self):
641 641 self.repo = GitRepository(TEST_GIT_REPO)
642 642
643 643 def test_default_commit(self):
644 644 tip = self.repo.get_commit()
645 645 assert tip == self.repo.get_commit(None)
646 646 assert tip == self.repo.get_commit('tip')
647 647
648 648 def test_root_node(self):
649 649 tip = self.repo.get_commit()
650 650 assert tip.root is tip.get_node('')
651 651
652 652 def test_lazy_fetch(self):
653 653 """
654 654 Test if commit's nodes expands and are cached as we walk through
655 655 the commit. This test is somewhat hard to write as order of tests
656 656 is a key here. Written by running command after command in a shell.
657 657 """
658 658 commit_id = '2a13f185e4525f9d4b59882791a2d397b90d5ddc'
659 659 assert commit_id in self.repo.commit_ids
660 660 commit = self.repo.get_commit(commit_id)
661 661 assert len(commit.nodes) == 0
662 662 root = commit.root
663 663 assert len(commit.nodes) == 1
664 664 assert len(root.nodes) == 8
665 665 # accessing root.nodes updates commit.nodes
666 666 assert len(commit.nodes) == 9
667 667
668 668 docs = root.get_node('docs')
669 669 # we haven't yet accessed anything new as docs dir was already cached
670 670 assert len(commit.nodes) == 9
671 671 assert len(docs.nodes) == 8
672 672 # accessing docs.nodes updates commit.nodes
673 673 assert len(commit.nodes) == 17
674 674
675 675 assert docs is commit.get_node('docs')
676 676 assert docs is root.nodes[0]
677 677 assert docs is root.dirs[0]
678 678 assert docs is commit.get_node('docs')
679 679
680 680 def test_nodes_with_commit(self):
681 681 commit_id = '2a13f185e4525f9d4b59882791a2d397b90d5ddc'
682 682 commit = self.repo.get_commit(commit_id)
683 683 root = commit.root
684 684 docs = root.get_node('docs')
685 685 assert docs is commit.get_node('docs')
686 686 api = docs.get_node('api')
687 687 assert api is commit.get_node('docs/api')
688 688 index = api.get_node('index.rst')
689 689 assert index is commit.get_node('docs/api/index.rst')
690 690 assert index is commit.get_node('docs')\
691 691 .get_node('api')\
692 692 .get_node('index.rst')
693 693
694 694 def test_branch_and_tags(self):
695 695 """
696 696 rev0 = self.repo.commit_ids[0]
697 697 commit0 = self.repo.get_commit(rev0)
698 698 assert commit0.branch == 'master'
699 699 assert commit0.tags == []
700 700
701 701 rev10 = self.repo.commit_ids[10]
702 702 commit10 = self.repo.get_commit(rev10)
703 703 assert commit10.branch == 'master'
704 704 assert commit10.tags == []
705 705
706 706 rev44 = self.repo.commit_ids[44]
707 707 commit44 = self.repo.get_commit(rev44)
708 708 assert commit44.branch == 'web-branch'
709 709
710 710 tip = self.repo.get_commit('tip')
711 711 assert 'tip' in tip.tags
712 712 """
713 713 # Those tests would fail - branches are now going
714 714 # to be changed at main API in order to support git backend
715 715 pass
716 716
717 717 def test_file_size(self):
718 718 to_check = (
719 719 ('c1214f7e79e02fc37156ff215cd71275450cffc3',
720 720 'vcs/backends/BaseRepository.py', 502),
721 721 ('d7e0d30fbcae12c90680eb095a4f5f02505ce501',
722 722 'vcs/backends/hg.py', 854),
723 723 ('6e125e7c890379446e98980d8ed60fba87d0f6d1',
724 724 'setup.py', 1068),
725 725
726 726 ('d955cd312c17b02143c04fa1099a352b04368118',
727 727 'vcs/backends/base.py', 2921),
728 728 ('ca1eb7957a54bce53b12d1a51b13452f95bc7c7e',
729 729 'vcs/backends/base.py', 3936),
730 730 ('f50f42baeed5af6518ef4b0cb2f1423f3851a941',
731 731 'vcs/backends/base.py', 6189),
732 732 )
733 733 for commit_id, path, size in to_check:
734 734 node = self.repo.get_commit(commit_id).get_node(path)
735 735 assert node.is_file()
736 736 assert node.size == size
737 737
738 738 def test_file_history_from_commits(self):
739 739 node = self.repo[10].get_node('setup.py')
740 740 commit_ids = [commit.raw_id for commit in node.history]
741 741 assert ['ff7ca51e58c505fec0dd2491de52c622bb7a806b'] == commit_ids
742 742
743 743 node = self.repo[20].get_node('setup.py')
744 744 node_ids = [commit.raw_id for commit in node.history]
745 745 assert ['191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
746 746 'ff7ca51e58c505fec0dd2491de52c622bb7a806b'] == node_ids
747 747
748 748 # special case we check history from commit that has this particular
749 749 # file changed this means we check if it's included as well
750 750 node = self.repo.get_commit('191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e') \
751 751 .get_node('setup.py')
752 752 node_ids = [commit.raw_id for commit in node.history]
753 753 assert ['191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
754 754 'ff7ca51e58c505fec0dd2491de52c622bb7a806b'] == node_ids
755 755
756 756 def test_file_history(self):
757 757 # we can only check if those commits are present in the history
758 758 # as we cannot update this test every time file is changed
759 759 files = {
760 760 'setup.py': [
761 761 '54386793436c938cff89326944d4c2702340037d',
762 762 '51d254f0ecf5df2ce50c0b115741f4cf13985dab',
763 763 '998ed409c795fec2012b1c0ca054d99888b22090',
764 764 '5e0eb4c47f56564395f76333f319d26c79e2fb09',
765 765 '0115510b70c7229dbc5dc49036b32e7d91d23acd',
766 766 '7cb3fd1b6d8c20ba89e2264f1c8baebc8a52d36e',
767 767 '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
768 768 '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
769 769 'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
770 770 ],
771 771 'vcs/nodes.py': [
772 772 '33fa3223355104431402a888fa77a4e9956feb3e',
773 773 'fa014c12c26d10ba682fadb78f2a11c24c8118e1',
774 774 'e686b958768ee96af8029fe19c6050b1a8dd3b2b',
775 775 'ab5721ca0a081f26bf43d9051e615af2cc99952f',
776 776 'c877b68d18e792a66b7f4c529ea02c8f80801542',
777 777 '4313566d2e417cb382948f8d9d7c765330356054',
778 778 '6c2303a793671e807d1cfc70134c9ca0767d98c2',
779 779 '54386793436c938cff89326944d4c2702340037d',
780 780 '54000345d2e78b03a99d561399e8e548de3f3203',
781 781 '1c6b3677b37ea064cb4b51714d8f7498f93f4b2b',
782 782 '2d03ca750a44440fb5ea8b751176d1f36f8e8f46',
783 783 '2a08b128c206db48c2f0b8f70df060e6db0ae4f8',
784 784 '30c26513ff1eb8e5ce0e1c6b477ee5dc50e2f34b',
785 785 'ac71e9503c2ca95542839af0ce7b64011b72ea7c',
786 786 '12669288fd13adba2a9b7dd5b870cc23ffab92d2',
787 787 '5a0c84f3e6fe3473e4c8427199d5a6fc71a9b382',
788 788 '12f2f5e2b38e6ff3fbdb5d722efed9aa72ecb0d5',
789 789 '5eab1222a7cd4bfcbabc218ca6d04276d4e27378',
790 790 'f50f42baeed5af6518ef4b0cb2f1423f3851a941',
791 791 'd7e390a45f6aa96f04f5e7f583ad4f867431aa25',
792 792 'f15c21f97864b4f071cddfbf2750ec2e23859414',
793 793 'e906ef056cf539a4e4e5fc8003eaf7cf14dd8ade',
794 794 'ea2b108b48aa8f8c9c4a941f66c1a03315ca1c3b',
795 795 '84dec09632a4458f79f50ddbbd155506c460b4f9',
796 796 '0115510b70c7229dbc5dc49036b32e7d91d23acd',
797 797 '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
798 798 '3bf1c5868e570e39569d094f922d33ced2fa3b2b',
799 799 'b8d04012574729d2c29886e53b1a43ef16dd00a1',
800 800 '6970b057cffe4aab0a792aa634c89f4bebf01441',
801 801 'dd80b0f6cf5052f17cc738c2951c4f2070200d7f',
802 802 'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
803 803 ],
804 804 'vcs/backends/git.py': [
805 805 '4cf116ad5a457530381135e2f4c453e68a1b0105',
806 806 '9a751d84d8e9408e736329767387f41b36935153',
807 807 'cb681fb539c3faaedbcdf5ca71ca413425c18f01',
808 808 '428f81bb652bcba8d631bce926e8834ff49bdcc6',
809 809 '180ab15aebf26f98f714d8c68715e0f05fa6e1c7',
810 810 '2b8e07312a2e89e92b90426ab97f349f4bce2a3a',
811 811 '50e08c506174d8645a4bb517dd122ac946a0f3bf',
812 812 '54000345d2e78b03a99d561399e8e548de3f3203',
813 813 ],
814 814 }
815 815 for path, commit_ids in files.items():
816 816 node = self.repo.get_commit(commit_ids[0]).get_node(path)
817 817 node_ids = [commit.raw_id for commit in node.history]
818 818 assert set(commit_ids).issubset(set(node_ids)), (
819 819 "We assumed that %s is subset of commit_ids for which file %s "
820 820 "has been changed, and history of that node returned: %s"
821 821 % (commit_ids, path, node_ids))
822 822
823 823 def test_file_annotate(self):
824 824 files = {
825 825 'vcs/backends/__init__.py': {
826 826 'c1214f7e79e02fc37156ff215cd71275450cffc3': {
827 827 'lines_no': 1,
828 828 'commits': [
829 829 'c1214f7e79e02fc37156ff215cd71275450cffc3',
830 830 ],
831 831 },
832 832 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647': {
833 833 'lines_no': 21,
834 834 'commits': [
835 835 '49d3fd156b6f7db46313fac355dca1a0b94a0017',
836 836 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
837 837 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
838 838 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
839 839 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
840 840 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
841 841 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
842 842 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
843 843 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
844 844 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
845 845 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
846 846 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
847 847 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
848 848 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
849 849 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
850 850 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
851 851 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
852 852 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
853 853 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
854 854 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
855 855 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
856 856 ],
857 857 },
858 858 'e29b67bd158580fc90fc5e9111240b90e6e86064': {
859 859 'lines_no': 32,
860 860 'commits': [
861 861 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
862 862 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
863 863 '5eab1222a7cd4bfcbabc218ca6d04276d4e27378',
864 864 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
865 865 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
866 866 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
867 867 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
868 868 '54000345d2e78b03a99d561399e8e548de3f3203',
869 869 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
870 870 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
871 871 '78c3f0c23b7ee935ec276acb8b8212444c33c396',
872 872 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
873 873 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
874 874 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
875 875 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
876 876 '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
877 877 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
878 878 '78c3f0c23b7ee935ec276acb8b8212444c33c396',
879 879 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
880 880 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
881 881 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
882 882 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
883 883 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
884 884 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
885 885 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
886 886 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
887 887 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
888 888 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
889 889 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
890 890 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
891 891 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
892 892 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
893 893 ],
894 894 },
895 895 },
896 896 }
897 897
898 898 for fname, commit_dict in files.items():
899 899 for commit_id, __ in commit_dict.items():
900 900 commit = self.repo.get_commit(commit_id)
901 901
902 902 l1_1 = [x[1] for x in commit.get_file_annotate(fname)]
903 903 l1_2 = [x[2]().raw_id for x in commit.get_file_annotate(fname)]
904 904 assert l1_1 == l1_2
905 905 l1 = l1_1
906 906 l2 = files[fname][commit_id]['commits']
907 907 assert l1 == l2, (
908 908 "The lists of commit_ids for %s@commit_id %s"
909 909 "from annotation list should match each other, "
910 910 "got \n%s \nvs \n%s " % (fname, commit_id, l1, l2))
911 911
912 912 def test_files_state(self):
913 913 """
914 914 Tests state of FileNodes.
915 915 """
916 916 node = self.repo\
917 917 .get_commit('e6ea6d16e2f26250124a1f4b4fe37a912f9d86a0')\
918 918 .get_node('vcs/utils/diffs.py')
919 919 assert node.state, NodeState.ADDED
920 920 assert node.added
921 921 assert not node.changed
922 922 assert not node.not_changed
923 923 assert not node.removed
924 924
925 925 node = self.repo\
926 926 .get_commit('33fa3223355104431402a888fa77a4e9956feb3e')\
927 927 .get_node('.hgignore')
928 928 assert node.state, NodeState.CHANGED
929 929 assert not node.added
930 930 assert node.changed
931 931 assert not node.not_changed
932 932 assert not node.removed
933 933
934 934 node = self.repo\
935 935 .get_commit('e29b67bd158580fc90fc5e9111240b90e6e86064')\
936 936 .get_node('setup.py')
937 937 assert node.state, NodeState.NOT_CHANGED
938 938 assert not node.added
939 939 assert not node.changed
940 940 assert node.not_changed
941 941 assert not node.removed
942 942
943 943 # If node has REMOVED state then trying to fetch it would raise
944 944 # CommitError exception
945 945 commit = self.repo.get_commit(
946 946 'fa6600f6848800641328adbf7811fd2372c02ab2')
947 947 path = 'vcs/backends/BaseRepository.py'
948 948 with pytest.raises(NodeDoesNotExistError):
949 949 commit.get_node(path)
950 950 # but it would be one of ``removed`` (commit's attribute)
951 951 assert path in [rf.path for rf in commit.removed]
952 952
953 953 commit = self.repo.get_commit(
954 954 '54386793436c938cff89326944d4c2702340037d')
955 955 changed = [
956 956 'setup.py', 'tests/test_nodes.py', 'vcs/backends/hg.py',
957 957 'vcs/nodes.py']
958 958 assert set(changed) == set([f.path for f in commit.changed])
959 959
960 960 def test_unicode_branch_refs(self):
961 961 unicode_branches = {
962 962 'refs/heads/unicode': '6c0ce52b229aa978889e91b38777f800e85f330b',
963 963 u'refs/heads/uniΓ§ΓΆβˆ‚e': 'ΓΌrl',
964 964 }
965 965 with mock.patch(
966 966 ("rhodecode.lib.vcs.backends.git.repository"
967 967 ".GitRepository._refs"),
968 968 unicode_branches):
969 969 branches = self.repo.branches
970 970
971 971 assert 'unicode' in branches
972 assert u'uniΓ§ΓΆβˆ‚e' in branches
972 assert 'uniΓ§ΓΆβˆ‚e' in branches
973 973
974 974 def test_unicode_tag_refs(self):
975 975 unicode_tags = {
976 976 'refs/tags/unicode': '6c0ce52b229aa978889e91b38777f800e85f330b',
977 977 u'refs/tags/uniΓ§ΓΆβˆ‚e': '6c0ce52b229aa978889e91b38777f800e85f330b',
978 978 }
979 979 with mock.patch(
980 980 ("rhodecode.lib.vcs.backends.git.repository"
981 981 ".GitRepository._refs"),
982 982 unicode_tags):
983 983 tags = self.repo.tags
984 984
985 985 assert 'unicode' in tags
986 assert u'uniΓ§ΓΆβˆ‚e' in tags
986 assert 'uniΓ§ΓΆβˆ‚e' in tags
987 987
988 988 def test_commit_message_is_unicode(self):
989 989 for commit in self.repo:
990 990 assert type(commit.message) == str
991 991
992 992 def test_commit_author_is_unicode(self):
993 993 for commit in self.repo:
994 994 assert type(commit.author) == str
995 995
996 996 def test_repo_files_content_types(self):
997 997 commit = self.repo.get_commit()
998 998 for node in commit.get_node('/'):
999 999 if node.is_file():
1000 1000 assert type(node.content) == bytes
1001 1001 assert type(node.str_content) == str
1002 1002
1003 1003 def test_wrong_path(self):
1004 1004 # There is 'setup.py' in the root dir but not there:
1005 1005 path = 'foo/bar/setup.py'
1006 1006 tip = self.repo.get_commit()
1007 1007 with pytest.raises(VCSError):
1008 1008 tip.get_node(path)
1009 1009
1010 1010 @pytest.mark.parametrize("author_email, commit_id", [
1011 1011 ('marcin@python-blog.com', 'c1214f7e79e02fc37156ff215cd71275450cffc3'),
1012 1012 ('lukasz.balcerzak@python-center.pl',
1013 1013 'ff7ca51e58c505fec0dd2491de52c622bb7a806b'),
1014 1014 ('none@none', '8430a588b43b5d6da365400117c89400326e7992'),
1015 1015 ])
1016 1016 def test_author_email(self, author_email, commit_id):
1017 1017 commit = self.repo.get_commit(commit_id)
1018 1018 assert author_email == commit.author_email
1019 1019
1020 1020 @pytest.mark.parametrize("author, commit_id", [
1021 1021 ('Marcin Kuzminski', 'c1214f7e79e02fc37156ff215cd71275450cffc3'),
1022 1022 ('Lukasz Balcerzak', 'ff7ca51e58c505fec0dd2491de52c622bb7a806b'),
1023 1023 ('marcink', '8430a588b43b5d6da365400117c89400326e7992'),
1024 1024 ])
1025 1025 def test_author_username(self, author, commit_id):
1026 1026 commit = self.repo.get_commit(commit_id)
1027 1027 assert author == commit.author_name
1028 1028
1029 1029
1030 1030 class TestLargeFileRepo(object):
1031 1031
1032 1032 def test_large_file(self, backend_git):
1033 1033 conf = make_db_config()
1034 1034 repo = backend_git.create_test_repo('largefiles', conf)
1035 1035
1036 1036 tip = repo.scm_instance().get_commit()
1037 1037
1038 1038 # extract stored LF node into the origin cache
1039 1039 lfs_store = os.path.join(repo.repo_path, repo.repo_name, 'lfs_store')
1040 1040
1041 1041 oid = '7b331c02e313c7599d5a90212e17e6d3cb729bd2e1c9b873c302a63c95a2f9bf'
1042 1042 oid_path = os.path.join(lfs_store, oid)
1043 1043 # Todo: oid path depends on LFSOidStorage.store_suffix. Once it will be changed update below line accordingly
1044 1044 oid_destination = os.path.join(
1045 1045 conf.get('vcs_git_lfs', 'store_location'), f'objects/{oid[:2]}/{oid[2:4]}/{oid}')
1046 1046
1047 1047 os.makedirs(os.path.dirname(oid_destination))
1048 1048 shutil.copy(oid_path, oid_destination)
1049 1049
1050 1050 node = tip.get_node('1MB.zip')
1051 1051
1052 1052 lf_node = node.get_largefile_node()
1053 1053
1054 1054 assert lf_node.is_largefile() is True
1055 1055 assert lf_node.size == 1024000
1056 1056 assert lf_node.name == '1MB.zip'
1057 1057
1058 1058
1059 1059 @pytest.mark.usefixtures("vcs_repository_support")
1060 1060 class TestGitSpecificWithRepo(BackendTestMixin):
1061 1061
1062 1062 @classmethod
1063 1063 def _get_commits(cls):
1064 1064 return [
1065 1065 {
1066 1066 'message': 'Initial',
1067 1067 'author': 'Joe Doe <joe.doe@example.com>',
1068 1068 'date': datetime.datetime(2010, 1, 1, 20),
1069 1069 'added': [
1070 1070 FileNode(b'foobar/static/js/admin/base.js', content=b'base'),
1071 1071 FileNode(b'foobar/static/admin', content=b'admin', mode=0o120000), # this is a link
1072 1072 FileNode(b'foo', content=b'foo'),
1073 1073 ],
1074 1074 },
1075 1075 {
1076 1076 'message': 'Second',
1077 1077 'author': 'Joe Doe <joe.doe@example.com>',
1078 1078 'date': datetime.datetime(2010, 1, 1, 22),
1079 1079 'added': [
1080 1080 FileNode(b'foo2', content=b'foo2'),
1081 1081 ],
1082 1082 },
1083 1083 ]
1084 1084
1085 1085 def test_paths_slow_traversing(self):
1086 1086 commit = self.repo.get_commit()
1087 1087 assert commit.get_node('foobar').get_node('static').get_node('js')\
1088 1088 .get_node('admin').get_node('base.js').content == b'base'
1089 1089
1090 1090 def test_paths_fast_traversing(self):
1091 1091 commit = self.repo.get_commit()
1092 1092 assert commit.get_node('foobar/static/js/admin/base.js').content == b'base'
1093 1093
1094 1094 def test_get_diff_runs_git_command_with_hashes(self):
1095 1095 comm1 = self.repo[0]
1096 1096 comm2 = self.repo[1]
1097 1097
1098 1098 with mock.patch.object(self.repo, '_remote', return_value=mock.Mock()) as remote_mock:
1099 1099 remote_mock.diff = mock.MagicMock(side_effect=callable_get_diff)
1100 1100 self.repo.get_diff(comm1, comm2)
1101 1101
1102 1102 remote_mock.diff.assert_called_once_with(
1103 1103 comm1.raw_id, comm2.raw_id,
1104 1104 file_filter=None, opt_ignorews=False, context=3)
1105 1105
1106 1106 def test_get_diff_runs_git_command_with_str_hashes(self):
1107 1107 comm2 = self.repo[1]
1108 1108
1109 1109 with mock.patch.object(self.repo, '_remote', return_value=mock.Mock()) as remote_mock:
1110 1110 remote_mock.diff = mock.MagicMock(side_effect=callable_get_diff)
1111 1111 self.repo.get_diff(self.repo.EMPTY_COMMIT, comm2)
1112 1112
1113 1113 remote_mock.diff.assert_called_once_with(
1114 1114 self.repo.EMPTY_COMMIT.raw_id, comm2.raw_id,
1115 1115 file_filter=None, opt_ignorews=False, context=3)
1116 1116
1117 1117 def test_get_diff_runs_git_command_with_path_if_its_given(self):
1118 1118 comm1 = self.repo[0]
1119 1119 comm2 = self.repo[1]
1120 1120
1121 1121 with mock.patch.object(self.repo, '_remote', return_value=mock.Mock()) as remote_mock:
1122 1122 remote_mock.diff = mock.MagicMock(side_effect=callable_get_diff)
1123 1123 self.repo.get_diff(comm1, comm2, 'foo')
1124 1124
1125 1125 remote_mock.diff.assert_called_once_with(
1126 1126 self.repo._lookup_commit(0), comm2.raw_id,
1127 1127 file_filter='foo', opt_ignorews=False, context=3)
1128 1128
1129 1129
1130 1130 @pytest.mark.usefixtures("vcs_repository_support")
1131 1131 class TestGitRegression(BackendTestMixin):
1132 1132
1133 1133 @classmethod
1134 1134 def _get_commits(cls):
1135 1135 return [
1136 1136 {
1137 1137 'message': 'Initial',
1138 1138 'author': 'Joe Doe <joe.doe@example.com>',
1139 1139 'date': datetime.datetime(2010, 1, 1, 20),
1140 1140 'added': [
1141 1141 FileNode(b'bot/__init__.py', content=b'base'),
1142 1142 FileNode(b'bot/templates/404.html', content=b'base'),
1143 1143 FileNode(b'bot/templates/500.html', content=b'base'),
1144 1144 ],
1145 1145 },
1146 1146 {
1147 1147 'message': 'Second',
1148 1148 'author': 'Joe Doe <joe.doe@example.com>',
1149 1149 'date': datetime.datetime(2010, 1, 1, 22),
1150 1150 'added': [
1151 1151 FileNode(b'bot/build/migrations/1.py', content=b'foo2'),
1152 1152 FileNode(b'bot/build/migrations/2.py', content=b'foo2'),
1153 1153 FileNode(b'bot/build/static/templates/f.html', content=b'foo2'),
1154 1154 FileNode(b'bot/build/static/templates/f1.html', content=b'foo2'),
1155 1155 FileNode(b'bot/build/templates/err.html', content=b'foo2'),
1156 1156 FileNode(b'bot/build/templates/err2.html', content=b'foo2'),
1157 1157 ],
1158 1158 },
1159 1159 ]
1160 1160
1161 1161 @pytest.mark.parametrize("path, expected_paths", [
1162 1162 ('bot', [
1163 1163 'bot/build',
1164 1164 'bot/templates',
1165 1165 'bot/__init__.py']),
1166 1166 ('bot/build', [
1167 1167 'bot/build/migrations',
1168 1168 'bot/build/static',
1169 1169 'bot/build/templates']),
1170 1170 ('bot/build/static', [
1171 1171 'bot/build/static/templates']),
1172 1172 ('bot/build/static/templates', [
1173 1173 'bot/build/static/templates/f.html',
1174 1174 'bot/build/static/templates/f1.html']),
1175 1175 ('bot/build/templates', [
1176 1176 'bot/build/templates/err.html',
1177 1177 'bot/build/templates/err2.html']),
1178 1178 ('bot/templates/', [
1179 1179 'bot/templates/404.html',
1180 1180 'bot/templates/500.html']),
1181 1181 ])
1182 1182 def test_similar_paths(self, path, expected_paths):
1183 1183 commit = self.repo.get_commit()
1184 1184 paths = [n.path for n in commit.get_nodes(path)]
1185 1185 assert paths == expected_paths
1186 1186
1187 1187
1188 1188 class TestDiscoverGitVersion(object):
1189 1189
1190 1190 def test_returns_git_version(self, baseapp):
1191 1191 version = discover_git_version()
1192 1192 assert version
1193 1193
1194 1194 def test_returns_empty_string_without_vcsserver(self):
1195 1195 mock_connection = mock.Mock()
1196 1196 mock_connection.discover_git_version = mock.Mock(
1197 1197 side_effect=Exception)
1198 1198 with mock.patch('rhodecode.lib.vcs.connection.Git', mock_connection):
1199 1199 version = discover_git_version()
1200 1200 assert version == ''
1201 1201
1202 1202
1203 1203 class TestGetSubmoduleUrl(object):
1204 1204 def test_submodules_file_found(self):
1205 1205 commit = GitCommit(repository=mock.Mock(), raw_id='abcdef12', idx=1)
1206 1206 node = mock.Mock()
1207 1207
1208 1208 with mock.patch.object(
1209 1209 commit, 'get_node', return_value=node) as get_node_mock:
1210 1210 node.str_content = (
1211 1211 '[submodule "subrepo1"]\n'
1212 1212 '\tpath = subrepo1\n'
1213 1213 '\turl = https://code.rhodecode.com/dulwich\n'
1214 1214 )
1215 1215 result = commit._get_submodule_url('subrepo1')
1216 1216 get_node_mock.assert_called_once_with('.gitmodules')
1217 1217 assert result == 'https://code.rhodecode.com/dulwich'
1218 1218
1219 1219 def test_complex_submodule_path(self):
1220 1220 commit = GitCommit(repository=mock.Mock(), raw_id='abcdef12', idx=1)
1221 1221 node = mock.Mock()
1222 1222
1223 1223 with mock.patch.object(
1224 1224 commit, 'get_node', return_value=node) as get_node_mock:
1225 1225 node.str_content = (
1226 1226 '[submodule "complex/subrepo/path"]\n'
1227 1227 '\tpath = complex/subrepo/path\n'
1228 1228 '\turl = https://code.rhodecode.com/dulwich\n'
1229 1229 )
1230 1230 result = commit._get_submodule_url('complex/subrepo/path')
1231 1231 get_node_mock.assert_called_once_with('.gitmodules')
1232 1232 assert result == 'https://code.rhodecode.com/dulwich'
1233 1233
1234 1234 def test_submodules_file_not_found(self):
1235 1235 commit = GitCommit(repository=mock.Mock(), raw_id='abcdef12', idx=1)
1236 1236 with mock.patch.object(
1237 1237 commit, 'get_node', side_effect=NodeDoesNotExistError):
1238 1238 result = commit._get_submodule_url('complex/subrepo/path')
1239 1239 assert result is None
1240 1240
1241 1241 def test_path_not_found(self):
1242 1242 commit = GitCommit(repository=mock.Mock(), raw_id='abcdef12', idx=1)
1243 1243 node = mock.Mock()
1244 1244
1245 1245 with mock.patch.object(
1246 1246 commit, 'get_node', return_value=node) as get_node_mock:
1247 1247 node.str_content = (
1248 1248 '[submodule "subrepo1"]\n'
1249 1249 '\tpath = subrepo1\n'
1250 1250 '\turl = https://code.rhodecode.com/dulwich\n'
1251 1251 )
1252 1252 result = commit._get_submodule_url('subrepo2')
1253 1253 get_node_mock.assert_called_once_with('.gitmodules')
1254 1254 assert result is None
1255 1255
1256 1256 def test_returns_cached_values(self):
1257 1257 commit = GitCommit(repository=mock.Mock(), raw_id='abcdef12', idx=1)
1258 1258 node = mock.Mock()
1259 1259
1260 1260 with mock.patch.object(
1261 1261 commit, 'get_node', return_value=node) as get_node_mock:
1262 1262 node.str_content = (
1263 1263 '[submodule "subrepo1"]\n'
1264 1264 '\tpath = subrepo1\n'
1265 1265 '\turl = https://code.rhodecode.com/dulwich\n'
1266 1266 )
1267 1267 for _ in range(3):
1268 1268 commit._get_submodule_url('subrepo1')
1269 1269 get_node_mock.assert_called_once_with('.gitmodules')
1270 1270
1271 1271 def test_get_node_returns_a_link(self):
1272 1272 repository = mock.Mock()
1273 1273 repository.alias = 'git'
1274 1274 commit = GitCommit(repository=repository, raw_id='abcdef12', idx=1)
1275 1275 submodule_url = 'https://code.rhodecode.com/dulwich'
1276 1276 get_id_patch = mock.patch.object(
1277 1277 commit, '_get_tree_id_for_path', return_value=(1, 'link'))
1278 1278 get_submodule_patch = mock.patch.object(
1279 1279 commit, '_get_submodule_url', return_value=submodule_url)
1280 1280
1281 1281 with get_id_patch, get_submodule_patch as submodule_mock:
1282 1282 node = commit.get_node('/abcde')
1283 1283
1284 1284 submodule_mock.assert_called_once_with('/abcde')
1285 1285 assert type(node) == SubModuleNode
1286 1286 assert node.url == submodule_url
1287 1287
1288 1288 def test_get_nodes_returns_links(self):
1289 1289 repository = mock.MagicMock()
1290 1290 repository.alias = 'git'
1291 1291 repository._remote.tree_items.return_value = [
1292 1292 ('subrepo', 'stat', 1, 'link')
1293 1293 ]
1294 1294 commit = GitCommit(repository=repository, raw_id='abcdef12', idx=1)
1295 1295 submodule_url = 'https://code.rhodecode.com/dulwich'
1296 1296 get_id_patch = mock.patch.object(
1297 1297 commit, '_get_tree_id_for_path', return_value=(1, 'tree'))
1298 1298 get_submodule_patch = mock.patch.object(
1299 1299 commit, '_get_submodule_url', return_value=submodule_url)
1300 1300
1301 1301 with get_id_patch, get_submodule_patch as submodule_mock:
1302 1302 nodes = commit.get_nodes('/abcde')
1303 1303
1304 1304 submodule_mock.assert_called_once_with('/abcde/subrepo')
1305 1305 assert len(nodes) == 1
1306 1306 assert type(nodes[0]) == SubModuleNode
1307 1307 assert nodes[0].url == submodule_url
@@ -1,195 +1,195 b''
1 1
2 2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software: you can redistribute it and/or modify
5 5 # it under the terms of the GNU Affero General Public License, version 3
6 6 # (only), as published by the Free Software Foundation.
7 7 #
8 8 # This program is distributed in the hope that it will be useful,
9 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 11 # GNU General Public License for more details.
12 12 #
13 13 # You should have received a copy of the GNU Affero General Public License
14 14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 15 #
16 16 # This program is dual-licensed. If you wish to learn more about the
17 17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 19
20 20 import os
21 21
22 22 import mock
23 23 import pytest
24 24
25 25 from rhodecode.lib.str_utils import safe_bytes
26 26 from rhodecode.tests import SVN_REPO, TEST_DIR, TESTS_TMP_PATH
27 27 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
28 28 from rhodecode.lib.vcs.conf import settings
29 29 from rhodecode.lib.vcs.exceptions import VCSError
30 30
31 31
32 32 pytestmark = [
33 33 pytest.mark.backends("svn"),
34 34 pytest.mark.usefixtures("baseapp"),
35 35 ]
36 36
37 37
38 38 @pytest.fixture()
39 39 def repo(baseapp):
40 40 repo = SubversionRepository(os.path.join(TESTS_TMP_PATH, SVN_REPO))
41 41 return repo
42 42
43 43
44 44 @pytest.fixture()
45 45 def head(repo):
46 46 return repo.get_commit()
47 47
48 48
49 49 def test_init_fails_if_path_does_not_exist():
50 50 path = os.path.join(TEST_DIR, 'i-do-not-exist')
51 51 with pytest.raises(VCSError):
52 52 SubversionRepository(path)
53 53
54 54
55 55 def test_init_fails_if_path_is_not_a_valid_repository(tmpdir):
56 56 path = str(tmpdir.mkdir('unicode Γ€'))
57 57 with pytest.raises(VCSError):
58 58 SubversionRepository(path)
59 59
60 60
61 61 def test_repo_clone(vcsbackend, reposerver):
62 62 source = vcsbackend.create_repo(number_of_commits=3)
63 63 reposerver.serve(source)
64 64 repo = SubversionRepository(
65 65 vcsbackend.new_repo_path(),
66 66 create=True,
67 67 src_url=reposerver.url)
68 68
69 69 assert source.commit_ids == repo.commit_ids
70 70 assert source[0].message == repo[0].message
71 71
72 72
73 73 def test_latest_commit(head):
74 74 assert head.raw_id == '393'
75 75
76 76
77 77 def test_commit_description(head):
78 78 assert head.message == """Added a symlink"""
79 79
80 80
81 81 def test_commit_author(head):
82 82 assert head.author == 'marcin'
83 83
84 84
85 85 @pytest.mark.parametrize("filename, content, mime_type", [
86 86 (b'test.txt', b'Text content\n', None),
87 87 (b'test.bin', b'\0 binary \0', 'application/octet-stream'),
88 88 ], ids=['text', 'binary'])
89 89 def test_sets_mime_type_correctly(vcsbackend, filename, content, mime_type):
90 90 repo = vcsbackend.create_repo()
91 91 vcsbackend.ensure_file(filename, content)
92 92 file_properties = repo._remote.node_properties(filename, 1)
93 93 assert file_properties.get('svn:mime-type') == mime_type
94 94
95 95
96 96 def test_slice_access(repo):
97 97 page_size = 5
98 98 page = 0
99 99 start = page * page_size
100 100 end = start + page_size - 1
101 101
102 102 commits = list(repo[start:end])
103 103 assert [commit.raw_id for commit in commits] == ['1', '2', '3', '4']
104 104
105 105
106 106 def test_walk_changelog_page(repo):
107 107 page_size = 5
108 108 page = 0
109 109 start = page * page_size
110 110 end = start + page_size - 1
111 111
112 112 commits = list(repo[start:end])
113 113 changelog = [
114 114 'r%s, %s, %s' % (c.raw_id, c.author, c.message) for c in commits]
115 115
116 116 expexted_messages = [
117 117 'r1, marcin, initial import',
118 118 'r2, marcin, hg ignore',
119 119 'r3, marcin, Pip standards refactor',
120 120 'r4, marcin, Base repository few new functions added']
121 121 assert changelog == expexted_messages
122 122
123 123
124 124 def test_read_full_file_tree(head):
125 125 for topnode, dirs, files in head.walk():
126 126 for f in files:
127 127 len(f.content)
128 128
129 129
130 130 def test_topnode_files_attribute(head):
131 131 topnode = head.get_node('')
132 132 topnode.files
133 133
134 134
135 135
136 136
137 137 @pytest.mark.parametrize("filename, content, branch, mime_type", [
138 138 ('branches/plain/test.txt', b'Text content\n', 'plain', None),
139 139 ('branches/uniΓ§ΓΆβˆ‚e/test.bin', b'\0 binary \0', 'uniΓ§ΓΆβˆ‚e', 'application/octet-stream'),
140 140 ], ids=['text', 'binary'])
141 141 def test_unicode_refs(vcsbackend, filename, content, branch, mime_type):
142 142 filename = safe_bytes(filename)
143 143 repo = vcsbackend.create_repo()
144 144 vcsbackend.ensure_file(filename, content)
145 145 with mock.patch(("rhodecode.lib.vcs.backends.svn.repository"
146 146 ".SubversionRepository._patterns_from_section"),
147 147 return_value=['branches/*']):
148 assert u'branches/{0}'.format(branch) in repo.branches
148 assert f'branches/{branch}' in repo.branches
149 149
150 150
151 151 def test_compatible_version(monkeypatch, vcsbackend):
152 152 monkeypatch.setattr(settings, 'SVN_COMPATIBLE_VERSION', 'pre-1.8-compatible')
153 153 path = vcsbackend.new_repo_path()
154 154 SubversionRepository(path, create=True)
155 with open('{}/db/format'.format(path)) as f:
155 with open(f'{path}/db/format') as f:
156 156 first_line = f.readline().strip()
157 157 assert first_line == '4'
158 158
159 159
160 160 def test_invalid_compatible_version(monkeypatch, vcsbackend):
161 161 monkeypatch.setattr(settings, 'SVN_COMPATIBLE_VERSION', 'i-am-an-invalid-setting')
162 162 path = vcsbackend.new_repo_path()
163 163 with pytest.raises(Exception):
164 164 SubversionRepository(path, create=True)
165 165
166 166
167 167 class TestSVNCommit(object):
168 168
169 169 @pytest.fixture(autouse=True)
170 170 def prepare(self, repo):
171 171 self.repo = repo
172 172
173 173 def test_file_history_from_commits(self):
174 174 node = self.repo[10].get_node('setup.py')
175 175 commit_ids = [commit.raw_id for commit in node.history]
176 176 assert ['8'] == commit_ids
177 177
178 178 node = self.repo[20].get_node('setup.py')
179 179 node_ids = [commit.raw_id for commit in node.history]
180 180 assert ['18',
181 181 '8'] == node_ids
182 182
183 183 # special case we check history from commit that has this particular
184 184 # file changed this means we check if it's included as well
185 185 node = self.repo.get_commit('18').get_node('setup.py')
186 186 node_ids = [commit.raw_id for commit in node.history]
187 187 assert ['18',
188 188 '8'] == node_ids
189 189
190 190 def test_repo_files_content_type(self):
191 191 test_commit = self.repo.get_commit(commit_idx=100)
192 192 for node in test_commit.get_node('/'):
193 193 if node.is_file():
194 194 assert type(node.content) == bytes
195 195 assert type(node.str_content) == str
General Comments 0
You need to be logged in to leave comments. Login now