##// END OF EJS Templates
tests: Fix a helper method that checks if a repository is available on filesystem....
Martin Bornhold -
r486:e773cd0c default
parent child Browse files
Show More
@@ -1,1260 +1,1250 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 import os
22 21 import urllib
23 22
24 23 import mock
25 24 import pytest
26 25
27 26 from rhodecode.lib import auth
28 from rhodecode.lib import vcs
29 27 from rhodecode.lib.utils2 import safe_str, str2bool
30 28 from rhodecode.lib.vcs.exceptions import RepositoryRequirementError
31 29 from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\
32 30 Permission
33 31 from rhodecode.model.meta import Session
34 32 from rhodecode.model.repo import RepoModel
35 33 from rhodecode.model.repo_group import RepoGroupModel
36 34 from rhodecode.model.settings import SettingsModel, VcsSettingsModel
37 35 from rhodecode.model.user import UserModel
38 36 from rhodecode.tests import (
39 37 login_user_session, url, assert_session_flash, TEST_USER_ADMIN_LOGIN,
40 38 TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS, HG_REPO, GIT_REPO,
41 TESTS_TMP_PATH, logout_user_session)
39 logout_user_session)
42 40 from rhodecode.tests.fixture import Fixture, error_function
43 from rhodecode.tests.utils import AssertResponse
41 from rhodecode.tests.utils import AssertResponse, repo_on_filesystem
44 42
45 43 fixture = Fixture()
46 44
47 45
48 46 @pytest.mark.usefixtures("app")
49 47 class TestAdminRepos:
50 48
51 49 def test_index(self):
52 50 self.app.get(url('repos'))
53 51
54 52 def test_create_page_restricted(self, autologin_user, backend):
55 53 with mock.patch('rhodecode.BACKENDS', {'git': 'git'}):
56 54 response = self.app.get(url('new_repo'), status=200)
57 55 assert_response = AssertResponse(response)
58 56 element = assert_response.get_element('#repo_type')
59 57 assert element.text_content() == '\ngit\n'
60 58
61 59 def test_create_page_non_restricted(self, autologin_user, backend):
62 60 response = self.app.get(url('new_repo'), status=200)
63 61 assert_response = AssertResponse(response)
64 62 assert_response.element_contains('#repo_type', 'git')
65 63 assert_response.element_contains('#repo_type', 'svn')
66 64 assert_response.element_contains('#repo_type', 'hg')
67 65
68 66 @pytest.mark.parametrize("suffix", [u'', u''], ids=['', 'non-ascii'])
69 67 def test_create(self, autologin_user, backend, suffix, csrf_token):
70 68 repo_name_unicode = backend.new_repo_name(suffix=suffix)
71 69 repo_name = repo_name_unicode.encode('utf8')
72 70 description_unicode = u'description for newly created repo' + suffix
73 71 description = description_unicode.encode('utf8')
74 72 self.app.post(
75 73 url('repos'),
76 74 fixture._get_repo_create_params(
77 75 repo_private=False,
78 76 repo_name=repo_name,
79 77 repo_type=backend.alias,
80 78 repo_description=description,
81 79 csrf_token=csrf_token),
82 80 status=302
83 81 )
84 82
85 83 self.assert_repository_is_created_correctly(
86 84 repo_name, description, backend)
87 85
88 86 def test_create_numeric(self, autologin_user, backend, csrf_token):
89 87 numeric_repo = '1234'
90 88 repo_name = numeric_repo
91 89 description = 'description for newly created repo' + numeric_repo
92 90 self.app.post(
93 91 url('repos'),
94 92 fixture._get_repo_create_params(
95 93 repo_private=False,
96 94 repo_name=repo_name,
97 95 repo_type=backend.alias,
98 96 repo_description=description,
99 97 csrf_token=csrf_token))
100 98
101 99 self.assert_repository_is_created_correctly(
102 100 repo_name, description, backend)
103 101
104 102 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ‡Δ™'], ids=['', 'non-ascii'])
105 103 def test_create_in_group(
106 104 self, autologin_user, backend, suffix, csrf_token):
107 105 # create GROUP
108 106 group_name = 'sometest_%s' % backend.alias
109 107 gr = RepoGroupModel().create(group_name=group_name,
110 108 group_description='test',
111 109 owner=TEST_USER_ADMIN_LOGIN)
112 110 Session().commit()
113 111
114 112 repo_name = u'ingroup' + suffix
115 113 repo_name_full = RepoGroup.url_sep().join(
116 114 [group_name, repo_name])
117 115 description = u'description for newly created repo'
118 116 self.app.post(
119 117 url('repos'),
120 118 fixture._get_repo_create_params(
121 119 repo_private=False,
122 120 repo_name=safe_str(repo_name),
123 121 repo_type=backend.alias,
124 122 repo_description=description,
125 123 repo_group=gr.group_id,
126 124 csrf_token=csrf_token))
127 125
128 126 # TODO: johbo: Cleanup work to fixture
129 127 try:
130 128 self.assert_repository_is_created_correctly(
131 129 repo_name_full, description, backend)
132 130
133 131 new_repo = RepoModel().get_by_repo_name(repo_name_full)
134 132 inherited_perms = UserRepoToPerm.query().filter(
135 133 UserRepoToPerm.repository_id == new_repo.repo_id).all()
136 134 assert len(inherited_perms) == 1
137 135 finally:
138 136 RepoModel().delete(repo_name_full)
139 137 RepoGroupModel().delete(group_name)
140 138 Session().commit()
141 139
142 140 def test_create_in_group_numeric(
143 141 self, autologin_user, backend, csrf_token):
144 142 # create GROUP
145 143 group_name = 'sometest_%s' % backend.alias
146 144 gr = RepoGroupModel().create(group_name=group_name,
147 145 group_description='test',
148 146 owner=TEST_USER_ADMIN_LOGIN)
149 147 Session().commit()
150 148
151 149 repo_name = '12345'
152 150 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
153 151 description = 'description for newly created repo'
154 152 self.app.post(
155 153 url('repos'),
156 154 fixture._get_repo_create_params(
157 155 repo_private=False,
158 156 repo_name=repo_name,
159 157 repo_type=backend.alias,
160 158 repo_description=description,
161 159 repo_group=gr.group_id,
162 160 csrf_token=csrf_token))
163 161
164 162 # TODO: johbo: Cleanup work to fixture
165 163 try:
166 164 self.assert_repository_is_created_correctly(
167 165 repo_name_full, description, backend)
168 166
169 167 new_repo = RepoModel().get_by_repo_name(repo_name_full)
170 168 inherited_perms = UserRepoToPerm.query()\
171 169 .filter(UserRepoToPerm.repository_id == new_repo.repo_id).all()
172 170 assert len(inherited_perms) == 1
173 171 finally:
174 172 RepoModel().delete(repo_name_full)
175 173 RepoGroupModel().delete(group_name)
176 174 Session().commit()
177 175
178 176 def test_create_in_group_without_needed_permissions(self, backend):
179 177 session = login_user_session(
180 178 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
181 179 csrf_token = auth.get_csrf_token(session)
182 180 # revoke
183 181 user_model = UserModel()
184 182 # disable fork and create on default user
185 183 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
186 184 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
187 185 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
188 186 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
189 187
190 188 # disable on regular user
191 189 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
192 190 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
193 191 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
194 192 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
195 193 Session().commit()
196 194
197 195 # create GROUP
198 196 group_name = 'reg_sometest_%s' % backend.alias
199 197 gr = RepoGroupModel().create(group_name=group_name,
200 198 group_description='test',
201 199 owner=TEST_USER_ADMIN_LOGIN)
202 200 Session().commit()
203 201
204 202 group_name_allowed = 'reg_sometest_allowed_%s' % backend.alias
205 203 gr_allowed = RepoGroupModel().create(
206 204 group_name=group_name_allowed,
207 205 group_description='test',
208 206 owner=TEST_USER_REGULAR_LOGIN)
209 207 Session().commit()
210 208
211 209 repo_name = 'ingroup'
212 210 description = 'description for newly created repo'
213 211 response = self.app.post(
214 212 url('repos'),
215 213 fixture._get_repo_create_params(
216 214 repo_private=False,
217 215 repo_name=repo_name,
218 216 repo_type=backend.alias,
219 217 repo_description=description,
220 218 repo_group=gr.group_id,
221 219 csrf_token=csrf_token))
222 220
223 221 response.mustcontain('Invalid value')
224 222
225 223 # user is allowed to create in this group
226 224 repo_name = 'ingroup'
227 225 repo_name_full = RepoGroup.url_sep().join(
228 226 [group_name_allowed, repo_name])
229 227 description = 'description for newly created repo'
230 228 response = self.app.post(
231 229 url('repos'),
232 230 fixture._get_repo_create_params(
233 231 repo_private=False,
234 232 repo_name=repo_name,
235 233 repo_type=backend.alias,
236 234 repo_description=description,
237 235 repo_group=gr_allowed.group_id,
238 236 csrf_token=csrf_token))
239 237
240 238 # TODO: johbo: Cleanup in pytest fixture
241 239 try:
242 240 self.assert_repository_is_created_correctly(
243 241 repo_name_full, description, backend)
244 242
245 243 new_repo = RepoModel().get_by_repo_name(repo_name_full)
246 244 inherited_perms = UserRepoToPerm.query().filter(
247 245 UserRepoToPerm.repository_id == new_repo.repo_id).all()
248 246 assert len(inherited_perms) == 1
249 247
250 248 assert repo_on_filesystem(repo_name_full)
251 249 finally:
252 250 RepoModel().delete(repo_name_full)
253 251 RepoGroupModel().delete(group_name)
254 252 RepoGroupModel().delete(group_name_allowed)
255 253 Session().commit()
256 254
257 255 def test_create_in_group_inherit_permissions(self, autologin_user, backend,
258 256 csrf_token):
259 257 # create GROUP
260 258 group_name = 'sometest_%s' % backend.alias
261 259 gr = RepoGroupModel().create(group_name=group_name,
262 260 group_description='test',
263 261 owner=TEST_USER_ADMIN_LOGIN)
264 262 perm = Permission.get_by_key('repository.write')
265 263 RepoGroupModel().grant_user_permission(
266 264 gr, TEST_USER_REGULAR_LOGIN, perm)
267 265
268 266 # add repo permissions
269 267 Session().commit()
270 268
271 269 repo_name = 'ingroup_inherited_%s' % backend.alias
272 270 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
273 271 description = 'description for newly created repo'
274 272 self.app.post(
275 273 url('repos'),
276 274 fixture._get_repo_create_params(
277 275 repo_private=False,
278 276 repo_name=repo_name,
279 277 repo_type=backend.alias,
280 278 repo_description=description,
281 279 repo_group=gr.group_id,
282 280 repo_copy_permissions=True,
283 281 csrf_token=csrf_token))
284 282
285 283 # TODO: johbo: Cleanup to pytest fixture
286 284 try:
287 285 self.assert_repository_is_created_correctly(
288 286 repo_name_full, description, backend)
289 287 except Exception:
290 288 RepoGroupModel().delete(group_name)
291 289 Session().commit()
292 290 raise
293 291
294 292 # check if inherited permissions are applied
295 293 new_repo = RepoModel().get_by_repo_name(repo_name_full)
296 294 inherited_perms = UserRepoToPerm.query().filter(
297 295 UserRepoToPerm.repository_id == new_repo.repo_id).all()
298 296 assert len(inherited_perms) == 2
299 297
300 298 assert TEST_USER_REGULAR_LOGIN in [
301 299 x.user.username for x in inherited_perms]
302 300 assert 'repository.write' in [
303 301 x.permission.permission_name for x in inherited_perms]
304 302
305 303 RepoModel().delete(repo_name_full)
306 304 RepoGroupModel().delete(group_name)
307 305 Session().commit()
308 306
309 307 @pytest.mark.xfail_backends(
310 308 "git", "hg", reason="Missing reposerver support")
311 309 def test_create_with_clone_uri(self, autologin_user, backend, reposerver,
312 310 csrf_token):
313 311 source_repo = backend.create_repo(number_of_commits=2)
314 312 source_repo_name = source_repo.repo_name
315 313 reposerver.serve(source_repo.scm_instance())
316 314
317 315 repo_name = backend.new_repo_name()
318 316 response = self.app.post(
319 317 url('repos'),
320 318 fixture._get_repo_create_params(
321 319 repo_private=False,
322 320 repo_name=repo_name,
323 321 repo_type=backend.alias,
324 322 repo_description='',
325 323 clone_uri=reposerver.url,
326 324 csrf_token=csrf_token),
327 325 status=302)
328 326
329 327 # Should be redirected to the creating page
330 328 response.mustcontain('repo_creating')
331 329
332 330 # Expecting that both repositories have same history
333 331 source_repo = RepoModel().get_by_repo_name(source_repo_name)
334 332 source_vcs = source_repo.scm_instance()
335 333 repo = RepoModel().get_by_repo_name(repo_name)
336 334 repo_vcs = repo.scm_instance()
337 335 assert source_vcs[0].message == repo_vcs[0].message
338 336 assert source_vcs.count() == repo_vcs.count()
339 337 assert source_vcs.commit_ids == repo_vcs.commit_ids
340 338
341 339 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
342 340 def test_create_remote_repo_wrong_clone_uri(self, autologin_user, backend,
343 341 csrf_token):
344 342 repo_name = backend.new_repo_name()
345 343 description = 'description for newly created repo'
346 344 response = self.app.post(
347 345 url('repos'),
348 346 fixture._get_repo_create_params(
349 347 repo_private=False,
350 348 repo_name=repo_name,
351 349 repo_type=backend.alias,
352 350 repo_description=description,
353 351 clone_uri='http://repo.invalid/repo',
354 352 csrf_token=csrf_token))
355 353 response.mustcontain('invalid clone url')
356 354
357 355 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
358 356 def test_create_remote_repo_wrong_clone_uri_hg_svn(
359 357 self, autologin_user, backend, csrf_token):
360 358 repo_name = backend.new_repo_name()
361 359 description = 'description for newly created repo'
362 360 response = self.app.post(
363 361 url('repos'),
364 362 fixture._get_repo_create_params(
365 363 repo_private=False,
366 364 repo_name=repo_name,
367 365 repo_type=backend.alias,
368 366 repo_description=description,
369 367 clone_uri='svn+http://svn.invalid/repo',
370 368 csrf_token=csrf_token))
371 369 response.mustcontain('invalid clone url')
372 370
373 371 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ™Ε‚'], ids=['', 'non-ascii'])
374 372 def test_delete(self, autologin_user, backend, suffix, csrf_token):
375 373 repo = backend.create_repo(name_suffix=suffix)
376 374 repo_name = repo.repo_name
377 375
378 376 response = self.app.post(url('repo', repo_name=repo_name),
379 377 params={'_method': 'delete',
380 378 'csrf_token': csrf_token})
381 379 assert_session_flash(response, 'Deleted repository %s' % (repo_name))
382 380 response.follow()
383 381
384 382 # check if repo was deleted from db
385 383 assert RepoModel().get_by_repo_name(repo_name) is None
386 384 assert not repo_on_filesystem(repo_name)
387 385
388 386 def test_show(self, autologin_user, backend):
389 387 self.app.get(url('repo', repo_name=backend.repo_name))
390 388
391 389 def test_edit(self, backend, autologin_user):
392 390 self.app.get(url('edit_repo', repo_name=backend.repo_name))
393 391
394 392 def test_edit_accessible_when_missing_requirements(
395 393 self, backend_hg, autologin_user):
396 394 scm_patcher = mock.patch.object(
397 395 Repository, 'scm_instance', side_effect=RepositoryRequirementError)
398 396 with scm_patcher:
399 397 self.app.get(url('edit_repo', repo_name=backend_hg.repo_name))
400 398
401 399 def test_set_private_flag_sets_default_to_none(
402 400 self, autologin_user, backend, csrf_token):
403 401 # initially repository perm should be read
404 402 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
405 403 assert len(perm) == 1
406 404 assert perm[0].permission.permission_name == 'repository.read'
407 405 assert not backend.repo.private
408 406
409 407 response = self.app.post(
410 408 url('repo', repo_name=backend.repo_name),
411 409 fixture._get_repo_create_params(
412 410 repo_private=1,
413 411 repo_name=backend.repo_name,
414 412 repo_type=backend.alias,
415 413 user=TEST_USER_ADMIN_LOGIN,
416 414 _method='put',
417 415 csrf_token=csrf_token))
418 416 assert_session_flash(
419 417 response,
420 418 msg='Repository %s updated successfully' % (backend.repo_name))
421 419 assert backend.repo.private
422 420
423 421 # now the repo default permission should be None
424 422 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
425 423 assert len(perm) == 1
426 424 assert perm[0].permission.permission_name == 'repository.none'
427 425
428 426 response = self.app.post(
429 427 url('repo', repo_name=backend.repo_name),
430 428 fixture._get_repo_create_params(
431 429 repo_private=False,
432 430 repo_name=backend.repo_name,
433 431 repo_type=backend.alias,
434 432 user=TEST_USER_ADMIN_LOGIN,
435 433 _method='put',
436 434 csrf_token=csrf_token))
437 435 assert_session_flash(
438 436 response,
439 437 msg='Repository %s updated successfully' % (backend.repo_name))
440 438 assert not backend.repo.private
441 439
442 440 # we turn off private now the repo default permission should stay None
443 441 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
444 442 assert len(perm) == 1
445 443 assert perm[0].permission.permission_name == 'repository.none'
446 444
447 445 # update this permission back
448 446 perm[0].permission = Permission.get_by_key('repository.read')
449 447 Session().add(perm[0])
450 448 Session().commit()
451 449
452 450 def test_default_user_cannot_access_private_repo_in_a_group(
453 451 self, autologin_user, user_util, backend, csrf_token):
454 452
455 453 group = user_util.create_repo_group()
456 454
457 455 repo = backend.create_repo(
458 456 repo_private=True, repo_group=group, repo_copy_permissions=True)
459 457
460 458 permissions = _get_permission_for_user(
461 459 user='default', repo=repo.repo_name)
462 460 assert len(permissions) == 1
463 461 assert permissions[0].permission.permission_name == 'repository.none'
464 462 assert permissions[0].repository.private is True
465 463
466 464 def test_set_repo_fork_has_no_self_id(self, autologin_user, backend):
467 465 repo = backend.repo
468 466 response = self.app.get(
469 467 url('edit_repo_advanced', repo_name=backend.repo_name))
470 468 opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id
471 469 response.mustcontain(no=[opt])
472 470
473 471 def test_set_fork_of_target_repo(
474 472 self, autologin_user, backend, csrf_token):
475 473 target_repo = 'target_%s' % backend.alias
476 474 fixture.create_repo(target_repo, repo_type=backend.alias)
477 475 repo2 = Repository.get_by_repo_name(target_repo)
478 476 response = self.app.post(
479 477 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
480 478 params={'id_fork_of': repo2.repo_id, '_method': 'put',
481 479 'csrf_token': csrf_token})
482 480 repo = Repository.get_by_repo_name(backend.repo_name)
483 481 repo2 = Repository.get_by_repo_name(target_repo)
484 482 assert_session_flash(
485 483 response,
486 484 'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name))
487 485
488 486 assert repo.fork == repo2
489 487 response = response.follow()
490 488 # check if given repo is selected
491 489
492 490 opt = 'This repository is a fork of <a href="%s">%s</a>' % (
493 491 url('summary_home', repo_name=repo2.repo_name), repo2.repo_name)
494 492
495 493 response.mustcontain(opt)
496 494
497 495 fixture.destroy_repo(target_repo, forks='detach')
498 496
499 497 @pytest.mark.backends("hg", "git")
500 498 def test_set_fork_of_other_type_repo(self, autologin_user, backend,
501 499 csrf_token):
502 500 TARGET_REPO_MAP = {
503 501 'git': {
504 502 'type': 'hg',
505 503 'repo_name': HG_REPO},
506 504 'hg': {
507 505 'type': 'git',
508 506 'repo_name': GIT_REPO},
509 507 }
510 508 target_repo = TARGET_REPO_MAP[backend.alias]
511 509
512 510 repo2 = Repository.get_by_repo_name(target_repo['repo_name'])
513 511 response = self.app.post(
514 512 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
515 513 params={'id_fork_of': repo2.repo_id, '_method': 'put',
516 514 'csrf_token': csrf_token})
517 515 assert_session_flash(
518 516 response,
519 517 'Cannot set repository as fork of repository with other type')
520 518
521 519 def test_set_fork_of_none(self, autologin_user, backend, csrf_token):
522 520 # mark it as None
523 521 response = self.app.post(
524 522 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
525 523 params={'id_fork_of': None, '_method': 'put',
526 524 'csrf_token': csrf_token})
527 525 assert_session_flash(
528 526 response,
529 527 'Marked repo %s as fork of %s'
530 528 % (backend.repo_name, "Nothing"))
531 529 assert backend.repo.fork is None
532 530
533 531 def test_set_fork_of_same_repo(self, autologin_user, backend, csrf_token):
534 532 repo = Repository.get_by_repo_name(backend.repo_name)
535 533 response = self.app.post(
536 534 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
537 535 params={'id_fork_of': repo.repo_id, '_method': 'put',
538 536 'csrf_token': csrf_token})
539 537 assert_session_flash(
540 538 response, 'An error occurred during this operation')
541 539
542 540 def test_create_on_top_level_without_permissions(self, backend):
543 541 session = login_user_session(
544 542 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
545 543 csrf_token = auth.get_csrf_token(session)
546 544
547 545 # revoke
548 546 user_model = UserModel()
549 547 # disable fork and create on default user
550 548 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
551 549 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
552 550 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
553 551 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
554 552
555 553 # disable on regular user
556 554 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
557 555 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
558 556 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
559 557 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
560 558 Session().commit()
561 559
562 560 repo_name = backend.new_repo_name()
563 561 description = 'description for newly created repo'
564 562 response = self.app.post(
565 563 url('repos'),
566 564 fixture._get_repo_create_params(
567 565 repo_private=False,
568 566 repo_name=repo_name,
569 567 repo_type=backend.alias,
570 568 repo_description=description,
571 569 csrf_token=csrf_token))
572 570
573 571 response.mustcontain(
574 572 u"You do not have the permission to store repositories in "
575 573 u"the root location.")
576 574
577 575 @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
578 576 def test_create_repo_when_filesystem_op_fails(
579 577 self, autologin_user, backend, csrf_token):
580 578 repo_name = backend.new_repo_name()
581 579 description = 'description for newly created repo'
582 580
583 581 response = self.app.post(
584 582 url('repos'),
585 583 fixture._get_repo_create_params(
586 584 repo_private=False,
587 585 repo_name=repo_name,
588 586 repo_type=backend.alias,
589 587 repo_description=description,
590 588 csrf_token=csrf_token))
591 589
592 590 assert_session_flash(
593 591 response, 'Error creating repository %s' % repo_name)
594 592 # repo must not be in db
595 593 assert backend.repo is None
596 594 # repo must not be in filesystem !
597 595 assert not repo_on_filesystem(repo_name)
598 596
599 597 def assert_repository_is_created_correctly(
600 598 self, repo_name, description, backend):
601 599 repo_name_utf8 = repo_name.encode('utf-8')
602 600
603 601 # run the check page that triggers the flash message
604 602 response = self.app.get(url('repo_check_home', repo_name=repo_name))
605 603 assert response.json == {u'result': True}
606 604 assert_session_flash(
607 605 response,
608 606 u'Created repository <a href="/%s">%s</a>'
609 607 % (urllib.quote(repo_name_utf8), repo_name))
610 608
611 609 # test if the repo was created in the database
612 610 new_repo = RepoModel().get_by_repo_name(repo_name)
613 611
614 612 assert new_repo.repo_name == repo_name
615 613 assert new_repo.description == description
616 614
617 615 # test if the repository is visible in the list ?
618 616 response = self.app.get(url('summary_home', repo_name=repo_name))
619 617 response.mustcontain(repo_name)
620 618 response.mustcontain(backend.alias)
621 619
622 620 assert repo_on_filesystem(repo_name)
623 621
624 622
625 623 @pytest.mark.usefixtures("app")
626 624 class TestVcsSettings(object):
627 625 FORM_DATA = {
628 626 'inherit_global_settings': False,
629 627 'hooks_changegroup_repo_size': False,
630 628 'hooks_changegroup_push_logger': False,
631 629 'hooks_outgoing_pull_logger': False,
632 630 'extensions_largefiles': False,
633 631 'phases_publish': 'false',
634 632 'rhodecode_pr_merge_enabled': False,
635 633 'rhodecode_use_outdated_comments': False,
636 634 'new_svn_branch': '',
637 635 'new_svn_tag': ''
638 636 }
639 637
640 638 @pytest.mark.skip_backends('svn')
641 639 def test_global_settings_initial_values(self, autologin_user, backend):
642 640 repo_name = backend.repo_name
643 641 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
644 642
645 643 expected_settings = (
646 644 'rhodecode_use_outdated_comments', 'rhodecode_pr_merge_enabled',
647 645 'hooks_changegroup_repo_size', 'hooks_changegroup_push_logger',
648 646 'hooks_outgoing_pull_logger'
649 647 )
650 648 for setting in expected_settings:
651 649 self.assert_repo_value_equals_global_value(response, setting)
652 650
653 651 def test_show_settings_requires_repo_admin_permission(
654 652 self, backend, user_util, settings_util):
655 653 repo = backend.create_repo()
656 654 repo_name = repo.repo_name
657 655 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
658 656 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
659 657 login_user_session(
660 658 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
661 659 self.app.get(url('repo_vcs_settings', repo_name=repo_name), status=200)
662 660
663 661 def test_inherit_global_settings_flag_is_true_by_default(
664 662 self, autologin_user, backend):
665 663 repo_name = backend.repo_name
666 664 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
667 665
668 666 assert_response = AssertResponse(response)
669 667 element = assert_response.get_element('#inherit_global_settings')
670 668 assert element.checked
671 669
672 670 @pytest.mark.parametrize('checked_value', [True, False])
673 671 def test_inherit_global_settings_value(
674 672 self, autologin_user, backend, checked_value, settings_util):
675 673 repo = backend.create_repo()
676 674 repo_name = repo.repo_name
677 675 settings_util.create_repo_rhodecode_setting(
678 676 repo, 'inherit_vcs_settings', checked_value, 'bool')
679 677 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
680 678
681 679 assert_response = AssertResponse(response)
682 680 element = assert_response.get_element('#inherit_global_settings')
683 681 assert element.checked == checked_value
684 682
685 683 @pytest.mark.skip_backends('svn')
686 684 def test_hooks_settings_are_created(
687 685 self, autologin_user, backend, csrf_token):
688 686 repo_name = backend.repo_name
689 687 data = self.FORM_DATA.copy()
690 688 data['csrf_token'] = csrf_token
691 689 self.app.post(
692 690 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
693 691 settings = SettingsModel(repo=repo_name)
694 692 try:
695 693 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
696 694 ui = settings.get_ui_by_section_and_key(section, key)
697 695 assert ui.ui_active is False
698 696 finally:
699 697 self._cleanup_repo_settings(settings)
700 698
701 699 def test_hooks_settings_are_not_created_for_svn(
702 700 self, autologin_user, backend_svn, csrf_token):
703 701 repo_name = backend_svn.repo_name
704 702 data = self.FORM_DATA.copy()
705 703 data['csrf_token'] = csrf_token
706 704 self.app.post(
707 705 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
708 706 settings = SettingsModel(repo=repo_name)
709 707 try:
710 708 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
711 709 ui = settings.get_ui_by_section_and_key(section, key)
712 710 assert ui is None
713 711 finally:
714 712 self._cleanup_repo_settings(settings)
715 713
716 714 @pytest.mark.skip_backends('svn')
717 715 def test_hooks_settings_are_updated(
718 716 self, autologin_user, backend, csrf_token):
719 717 repo_name = backend.repo_name
720 718 settings = SettingsModel(repo=repo_name)
721 719 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
722 720 settings.create_ui_section_value(section, '', key=key, active=True)
723 721
724 722 data = self.FORM_DATA.copy()
725 723 data['csrf_token'] = csrf_token
726 724 self.app.post(
727 725 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
728 726 try:
729 727 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
730 728 ui = settings.get_ui_by_section_and_key(section, key)
731 729 assert ui.ui_active is False
732 730 finally:
733 731 self._cleanup_repo_settings(settings)
734 732
735 733 def test_hooks_settings_are_not_updated_for_svn(
736 734 self, autologin_user, backend_svn, csrf_token):
737 735 repo_name = backend_svn.repo_name
738 736 settings = SettingsModel(repo=repo_name)
739 737 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
740 738 settings.create_ui_section_value(section, '', key=key, active=True)
741 739
742 740 data = self.FORM_DATA.copy()
743 741 data['csrf_token'] = csrf_token
744 742 self.app.post(
745 743 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
746 744 try:
747 745 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
748 746 ui = settings.get_ui_by_section_and_key(section, key)
749 747 assert ui.ui_active is True
750 748 finally:
751 749 self._cleanup_repo_settings(settings)
752 750
753 751 @pytest.mark.skip_backends('svn')
754 752 def test_pr_settings_are_created(
755 753 self, autologin_user, backend, csrf_token):
756 754 repo_name = backend.repo_name
757 755 data = self.FORM_DATA.copy()
758 756 data['csrf_token'] = csrf_token
759 757 self.app.post(
760 758 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
761 759 settings = SettingsModel(repo=repo_name)
762 760 try:
763 761 for name in VcsSettingsModel.GENERAL_SETTINGS:
764 762 setting = settings.get_setting_by_name(name)
765 763 assert setting.app_settings_value is False
766 764 finally:
767 765 self._cleanup_repo_settings(settings)
768 766
769 767 def test_pr_settings_are_not_created_for_svn(
770 768 self, autologin_user, backend_svn, csrf_token):
771 769 repo_name = backend_svn.repo_name
772 770 data = self.FORM_DATA.copy()
773 771 data['csrf_token'] = csrf_token
774 772 self.app.post(
775 773 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
776 774 settings = SettingsModel(repo=repo_name)
777 775 try:
778 776 for name in VcsSettingsModel.GENERAL_SETTINGS:
779 777 setting = settings.get_setting_by_name(name)
780 778 assert setting is None
781 779 finally:
782 780 self._cleanup_repo_settings(settings)
783 781
784 782 def test_pr_settings_creation_requires_repo_admin_permission(
785 783 self, backend, user_util, settings_util, csrf_token):
786 784 repo = backend.create_repo()
787 785 repo_name = repo.repo_name
788 786 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
789 787
790 788 logout_user_session(self.app, csrf_token)
791 789 session = login_user_session(
792 790 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
793 791 new_csrf_token = auth.get_csrf_token(session)
794 792
795 793 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
796 794 data = self.FORM_DATA.copy()
797 795 data['csrf_token'] = new_csrf_token
798 796 settings = SettingsModel(repo=repo_name)
799 797
800 798 try:
801 799 self.app.post(
802 800 url('repo_vcs_settings', repo_name=repo_name), data,
803 801 status=302)
804 802 finally:
805 803 self._cleanup_repo_settings(settings)
806 804
807 805 @pytest.mark.skip_backends('svn')
808 806 def test_pr_settings_are_updated(
809 807 self, autologin_user, backend, csrf_token):
810 808 repo_name = backend.repo_name
811 809 settings = SettingsModel(repo=repo_name)
812 810 for name in VcsSettingsModel.GENERAL_SETTINGS:
813 811 settings.create_or_update_setting(name, True, 'bool')
814 812
815 813 data = self.FORM_DATA.copy()
816 814 data['csrf_token'] = csrf_token
817 815 self.app.post(
818 816 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
819 817 try:
820 818 for name in VcsSettingsModel.GENERAL_SETTINGS:
821 819 setting = settings.get_setting_by_name(name)
822 820 assert setting.app_settings_value is False
823 821 finally:
824 822 self._cleanup_repo_settings(settings)
825 823
826 824 def test_pr_settings_are_not_updated_for_svn(
827 825 self, autologin_user, backend_svn, csrf_token):
828 826 repo_name = backend_svn.repo_name
829 827 settings = SettingsModel(repo=repo_name)
830 828 for name in VcsSettingsModel.GENERAL_SETTINGS:
831 829 settings.create_or_update_setting(name, True, 'bool')
832 830
833 831 data = self.FORM_DATA.copy()
834 832 data['csrf_token'] = csrf_token
835 833 self.app.post(
836 834 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
837 835 try:
838 836 for name in VcsSettingsModel.GENERAL_SETTINGS:
839 837 setting = settings.get_setting_by_name(name)
840 838 assert setting.app_settings_value is True
841 839 finally:
842 840 self._cleanup_repo_settings(settings)
843 841
844 842 def test_svn_settings_are_created(
845 843 self, autologin_user, backend_svn, csrf_token, settings_util):
846 844 repo_name = backend_svn.repo_name
847 845 data = self.FORM_DATA.copy()
848 846 data['new_svn_tag'] = 'svn-tag'
849 847 data['new_svn_branch'] = 'svn-branch'
850 848 data['csrf_token'] = csrf_token
851 849
852 850 # Create few global settings to make sure that uniqueness validators
853 851 # are not triggered
854 852 settings_util.create_rhodecode_ui(
855 853 VcsSettingsModel.SVN_BRANCH_SECTION, 'svn-branch')
856 854 settings_util.create_rhodecode_ui(
857 855 VcsSettingsModel.SVN_TAG_SECTION, 'svn-tag')
858 856
859 857 self.app.post(
860 858 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
861 859 settings = SettingsModel(repo=repo_name)
862 860 try:
863 861 svn_branches = settings.get_ui_by_section(
864 862 VcsSettingsModel.SVN_BRANCH_SECTION)
865 863 svn_branch_names = [b.ui_value for b in svn_branches]
866 864 svn_tags = settings.get_ui_by_section(
867 865 VcsSettingsModel.SVN_TAG_SECTION)
868 866 svn_tag_names = [b.ui_value for b in svn_tags]
869 867 assert 'svn-branch' in svn_branch_names
870 868 assert 'svn-tag' in svn_tag_names
871 869 finally:
872 870 self._cleanup_repo_settings(settings)
873 871
874 872 def test_svn_settings_are_unique(
875 873 self, autologin_user, backend_svn, csrf_token, settings_util):
876 874 repo = backend_svn.repo
877 875 repo_name = repo.repo_name
878 876 data = self.FORM_DATA.copy()
879 877 data['new_svn_tag'] = 'test_tag'
880 878 data['new_svn_branch'] = 'test_branch'
881 879 data['csrf_token'] = csrf_token
882 880 settings_util.create_repo_rhodecode_ui(
883 881 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch')
884 882 settings_util.create_repo_rhodecode_ui(
885 883 repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag')
886 884
887 885 response = self.app.post(
888 886 url('repo_vcs_settings', repo_name=repo_name), data, status=200)
889 887 response.mustcontain('Pattern already exists')
890 888
891 889 def test_svn_settings_with_empty_values_are_not_created(
892 890 self, autologin_user, backend_svn, csrf_token):
893 891 repo_name = backend_svn.repo_name
894 892 data = self.FORM_DATA.copy()
895 893 data['csrf_token'] = csrf_token
896 894 self.app.post(
897 895 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
898 896 settings = SettingsModel(repo=repo_name)
899 897 try:
900 898 svn_branches = settings.get_ui_by_section(
901 899 VcsSettingsModel.SVN_BRANCH_SECTION)
902 900 svn_tags = settings.get_ui_by_section(
903 901 VcsSettingsModel.SVN_TAG_SECTION)
904 902 assert len(svn_branches) == 0
905 903 assert len(svn_tags) == 0
906 904 finally:
907 905 self._cleanup_repo_settings(settings)
908 906
909 907 def test_svn_settings_are_shown_for_svn_repository(
910 908 self, autologin_user, backend_svn, csrf_token):
911 909 repo_name = backend_svn.repo_name
912 910 response = self.app.get(
913 911 url('repo_vcs_settings', repo_name=repo_name), status=200)
914 912 response.mustcontain('Subversion Settings')
915 913
916 914 @pytest.mark.skip_backends('svn')
917 915 def test_svn_settings_are_not_created_for_not_svn_repository(
918 916 self, autologin_user, backend, csrf_token):
919 917 repo_name = backend.repo_name
920 918 data = self.FORM_DATA.copy()
921 919 data['csrf_token'] = csrf_token
922 920 self.app.post(
923 921 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
924 922 settings = SettingsModel(repo=repo_name)
925 923 try:
926 924 svn_branches = settings.get_ui_by_section(
927 925 VcsSettingsModel.SVN_BRANCH_SECTION)
928 926 svn_tags = settings.get_ui_by_section(
929 927 VcsSettingsModel.SVN_TAG_SECTION)
930 928 assert len(svn_branches) == 0
931 929 assert len(svn_tags) == 0
932 930 finally:
933 931 self._cleanup_repo_settings(settings)
934 932
935 933 @pytest.mark.skip_backends('svn')
936 934 def test_svn_settings_are_shown_only_for_svn_repository(
937 935 self, autologin_user, backend, csrf_token):
938 936 repo_name = backend.repo_name
939 937 response = self.app.get(
940 938 url('repo_vcs_settings', repo_name=repo_name), status=200)
941 939 response.mustcontain(no='Subversion Settings')
942 940
943 941 def test_hg_settings_are_created(
944 942 self, autologin_user, backend_hg, csrf_token):
945 943 repo_name = backend_hg.repo_name
946 944 data = self.FORM_DATA.copy()
947 945 data['new_svn_tag'] = 'svn-tag'
948 946 data['new_svn_branch'] = 'svn-branch'
949 947 data['csrf_token'] = csrf_token
950 948 self.app.post(
951 949 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
952 950 settings = SettingsModel(repo=repo_name)
953 951 try:
954 952 largefiles_ui = settings.get_ui_by_section_and_key(
955 953 'extensions', 'largefiles')
956 954 assert largefiles_ui.ui_active is False
957 955 phases_ui = settings.get_ui_by_section_and_key(
958 956 'phases', 'publish')
959 957 assert str2bool(phases_ui.ui_value) is False
960 958 finally:
961 959 self._cleanup_repo_settings(settings)
962 960
963 961 def test_hg_settings_are_updated(
964 962 self, autologin_user, backend_hg, csrf_token):
965 963 repo_name = backend_hg.repo_name
966 964 settings = SettingsModel(repo=repo_name)
967 965 settings.create_ui_section_value(
968 966 'extensions', '', key='largefiles', active=True)
969 967 settings.create_ui_section_value(
970 968 'phases', '1', key='publish', active=True)
971 969
972 970 data = self.FORM_DATA.copy()
973 971 data['csrf_token'] = csrf_token
974 972 self.app.post(
975 973 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
976 974 try:
977 975 largefiles_ui = settings.get_ui_by_section_and_key(
978 976 'extensions', 'largefiles')
979 977 assert largefiles_ui.ui_active is False
980 978 phases_ui = settings.get_ui_by_section_and_key(
981 979 'phases', 'publish')
982 980 assert str2bool(phases_ui.ui_value) is False
983 981 finally:
984 982 self._cleanup_repo_settings(settings)
985 983
986 984 def test_hg_settings_are_shown_for_hg_repository(
987 985 self, autologin_user, backend_hg, csrf_token):
988 986 repo_name = backend_hg.repo_name
989 987 response = self.app.get(
990 988 url('repo_vcs_settings', repo_name=repo_name), status=200)
991 989 response.mustcontain('Mercurial Settings')
992 990
993 991 @pytest.mark.skip_backends('hg')
994 992 def test_hg_settings_are_created_only_for_hg_repository(
995 993 self, autologin_user, backend, csrf_token):
996 994 repo_name = backend.repo_name
997 995 data = self.FORM_DATA.copy()
998 996 data['csrf_token'] = csrf_token
999 997 self.app.post(
1000 998 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1001 999 settings = SettingsModel(repo=repo_name)
1002 1000 try:
1003 1001 largefiles_ui = settings.get_ui_by_section_and_key(
1004 1002 'extensions', 'largefiles')
1005 1003 assert largefiles_ui is None
1006 1004 phases_ui = settings.get_ui_by_section_and_key(
1007 1005 'phases', 'publish')
1008 1006 assert phases_ui is None
1009 1007 finally:
1010 1008 self._cleanup_repo_settings(settings)
1011 1009
1012 1010 @pytest.mark.skip_backends('hg')
1013 1011 def test_hg_settings_are_shown_only_for_hg_repository(
1014 1012 self, autologin_user, backend, csrf_token):
1015 1013 repo_name = backend.repo_name
1016 1014 response = self.app.get(
1017 1015 url('repo_vcs_settings', repo_name=repo_name), status=200)
1018 1016 response.mustcontain(no='Mercurial Settings')
1019 1017
1020 1018 @pytest.mark.skip_backends('hg')
1021 1019 def test_hg_settings_are_updated_only_for_hg_repository(
1022 1020 self, autologin_user, backend, csrf_token):
1023 1021 repo_name = backend.repo_name
1024 1022 settings = SettingsModel(repo=repo_name)
1025 1023 settings.create_ui_section_value(
1026 1024 'extensions', '', key='largefiles', active=True)
1027 1025 settings.create_ui_section_value(
1028 1026 'phases', '1', key='publish', active=True)
1029 1027
1030 1028 data = self.FORM_DATA.copy()
1031 1029 data['csrf_token'] = csrf_token
1032 1030 self.app.post(
1033 1031 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1034 1032 try:
1035 1033 largefiles_ui = settings.get_ui_by_section_and_key(
1036 1034 'extensions', 'largefiles')
1037 1035 assert largefiles_ui.ui_active is True
1038 1036 phases_ui = settings.get_ui_by_section_and_key(
1039 1037 'phases', 'publish')
1040 1038 assert phases_ui.ui_value == '1'
1041 1039 finally:
1042 1040 self._cleanup_repo_settings(settings)
1043 1041
1044 1042 def test_per_repo_svn_settings_are_displayed(
1045 1043 self, autologin_user, backend_svn, settings_util):
1046 1044 repo = backend_svn.create_repo()
1047 1045 repo_name = repo.repo_name
1048 1046 branches = [
1049 1047 settings_util.create_repo_rhodecode_ui(
1050 1048 repo, VcsSettingsModel.SVN_BRANCH_SECTION,
1051 1049 'branch_{}'.format(i))
1052 1050 for i in range(10)]
1053 1051 tags = [
1054 1052 settings_util.create_repo_rhodecode_ui(
1055 1053 repo, VcsSettingsModel.SVN_TAG_SECTION, 'tag_{}'.format(i))
1056 1054 for i in range(10)]
1057 1055
1058 1056 response = self.app.get(
1059 1057 url('repo_vcs_settings', repo_name=repo_name), status=200)
1060 1058 assert_response = AssertResponse(response)
1061 1059 for branch in branches:
1062 1060 css_selector = '[name=branch_value_{}]'.format(branch.ui_id)
1063 1061 element = assert_response.get_element(css_selector)
1064 1062 assert element.value == branch.ui_value
1065 1063 for tag in tags:
1066 1064 css_selector = '[name=tag_ui_value_new_{}]'.format(tag.ui_id)
1067 1065 element = assert_response.get_element(css_selector)
1068 1066 assert element.value == tag.ui_value
1069 1067
1070 1068 def test_per_repo_hg_and_pr_settings_are_not_displayed_for_svn(
1071 1069 self, autologin_user, backend_svn, settings_util):
1072 1070 repo = backend_svn.create_repo()
1073 1071 repo_name = repo.repo_name
1074 1072 response = self.app.get(
1075 1073 url('repo_vcs_settings', repo_name=repo_name), status=200)
1076 1074 response.mustcontain(no='<label>Hooks:</label>')
1077 1075 response.mustcontain(no='<label>Pull Request Settings:</label>')
1078 1076
1079 1077 def test_inherit_global_settings_value_is_saved(
1080 1078 self, autologin_user, backend, csrf_token):
1081 1079 repo_name = backend.repo_name
1082 1080 data = self.FORM_DATA.copy()
1083 1081 data['csrf_token'] = csrf_token
1084 1082 data['inherit_global_settings'] = True
1085 1083 self.app.post(
1086 1084 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1087 1085
1088 1086 settings = SettingsModel(repo=repo_name)
1089 1087 vcs_settings = VcsSettingsModel(repo=repo_name)
1090 1088 try:
1091 1089 assert vcs_settings.inherit_global_settings is True
1092 1090 finally:
1093 1091 self._cleanup_repo_settings(settings)
1094 1092
1095 1093 def test_repo_cache_is_invalidated_when_settings_are_updated(
1096 1094 self, autologin_user, backend, csrf_token):
1097 1095 repo_name = backend.repo_name
1098 1096 data = self.FORM_DATA.copy()
1099 1097 data['csrf_token'] = csrf_token
1100 1098 data['inherit_global_settings'] = True
1101 1099 settings = SettingsModel(repo=repo_name)
1102 1100
1103 1101 invalidation_patcher = mock.patch(
1104 1102 'rhodecode.controllers.admin.repos.ScmModel.mark_for_invalidation')
1105 1103 with invalidation_patcher as invalidation_mock:
1106 1104 self.app.post(
1107 1105 url('repo_vcs_settings', repo_name=repo_name), data,
1108 1106 status=302)
1109 1107 try:
1110 1108 invalidation_mock.assert_called_once_with(repo_name, delete=True)
1111 1109 finally:
1112 1110 self._cleanup_repo_settings(settings)
1113 1111
1114 1112 def test_other_settings_not_saved_inherit_global_settings_is_true(
1115 1113 self, autologin_user, backend, csrf_token):
1116 1114 repo_name = backend.repo_name
1117 1115 data = self.FORM_DATA.copy()
1118 1116 data['csrf_token'] = csrf_token
1119 1117 data['inherit_global_settings'] = True
1120 1118 self.app.post(
1121 1119 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1122 1120
1123 1121 settings = SettingsModel(repo=repo_name)
1124 1122 ui_settings = (
1125 1123 VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS)
1126 1124
1127 1125 vcs_settings = []
1128 1126 try:
1129 1127 for section, key in ui_settings:
1130 1128 ui = settings.get_ui_by_section_and_key(section, key)
1131 1129 if ui:
1132 1130 vcs_settings.append(ui)
1133 1131 vcs_settings.extend(settings.get_ui_by_section(
1134 1132 VcsSettingsModel.SVN_BRANCH_SECTION))
1135 1133 vcs_settings.extend(settings.get_ui_by_section(
1136 1134 VcsSettingsModel.SVN_TAG_SECTION))
1137 1135 for name in VcsSettingsModel.GENERAL_SETTINGS:
1138 1136 setting = settings.get_setting_by_name(name)
1139 1137 if setting:
1140 1138 vcs_settings.append(setting)
1141 1139 assert vcs_settings == []
1142 1140 finally:
1143 1141 self._cleanup_repo_settings(settings)
1144 1142
1145 1143 def test_delete_svn_branch_and_tag_patterns(
1146 1144 self, autologin_user, backend_svn, settings_util, csrf_token):
1147 1145 repo = backend_svn.create_repo()
1148 1146 repo_name = repo.repo_name
1149 1147 branch = settings_util.create_repo_rhodecode_ui(
1150 1148 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch',
1151 1149 cleanup=False)
1152 1150 tag = settings_util.create_repo_rhodecode_ui(
1153 1151 repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag', cleanup=False)
1154 1152 data = {
1155 1153 '_method': 'delete',
1156 1154 'csrf_token': csrf_token
1157 1155 }
1158 1156 for id_ in (branch.ui_id, tag.ui_id):
1159 1157 data['delete_svn_pattern'] = id_,
1160 1158 self.app.post(
1161 1159 url('repo_vcs_settings', repo_name=repo_name), data,
1162 1160 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
1163 1161 settings = VcsSettingsModel(repo=repo_name)
1164 1162 assert settings.get_repo_svn_branch_patterns() == []
1165 1163
1166 1164 def test_delete_svn_branch_requires_repo_admin_permission(
1167 1165 self, backend_svn, user_util, settings_util, csrf_token):
1168 1166 repo = backend_svn.create_repo()
1169 1167 repo_name = repo.repo_name
1170 1168 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1171 1169 logout_user_session(self.app, csrf_token)
1172 1170 session = login_user_session(
1173 1171 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
1174 1172 csrf_token = auth.get_csrf_token(session)
1175 1173 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
1176 1174 branch = settings_util.create_repo_rhodecode_ui(
1177 1175 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch',
1178 1176 cleanup=False)
1179 1177 data = {
1180 1178 '_method': 'delete',
1181 1179 'csrf_token': csrf_token,
1182 1180 'delete_svn_pattern': branch.ui_id
1183 1181 }
1184 1182 self.app.post(
1185 1183 url('repo_vcs_settings', repo_name=repo_name), data,
1186 1184 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
1187 1185
1188 1186 def test_delete_svn_branch_raises_400_when_not_found(
1189 1187 self, autologin_user, backend_svn, settings_util, csrf_token):
1190 1188 repo_name = backend_svn.repo_name
1191 1189 data = {
1192 1190 '_method': 'delete',
1193 1191 'delete_svn_pattern': 123,
1194 1192 'csrf_token': csrf_token
1195 1193 }
1196 1194 self.app.post(
1197 1195 url('repo_vcs_settings', repo_name=repo_name), data,
1198 1196 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400)
1199 1197
1200 1198 def test_delete_svn_branch_raises_400_when_no_id_specified(
1201 1199 self, autologin_user, backend_svn, settings_util, csrf_token):
1202 1200 repo_name = backend_svn.repo_name
1203 1201 data = {
1204 1202 '_method': 'delete',
1205 1203 'csrf_token': csrf_token
1206 1204 }
1207 1205 self.app.post(
1208 1206 url('repo_vcs_settings', repo_name=repo_name), data,
1209 1207 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400)
1210 1208
1211 1209 def _cleanup_repo_settings(self, settings_model):
1212 1210 cleanup = []
1213 1211 ui_settings = (
1214 1212 VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS)
1215 1213
1216 1214 for section, key in ui_settings:
1217 1215 ui = settings_model.get_ui_by_section_and_key(section, key)
1218 1216 if ui:
1219 1217 cleanup.append(ui)
1220 1218
1221 1219 cleanup.extend(settings_model.get_ui_by_section(
1222 1220 VcsSettingsModel.INHERIT_SETTINGS))
1223 1221 cleanup.extend(settings_model.get_ui_by_section(
1224 1222 VcsSettingsModel.SVN_BRANCH_SECTION))
1225 1223 cleanup.extend(settings_model.get_ui_by_section(
1226 1224 VcsSettingsModel.SVN_TAG_SECTION))
1227 1225
1228 1226 for name in VcsSettingsModel.GENERAL_SETTINGS:
1229 1227 setting = settings_model.get_setting_by_name(name)
1230 1228 if setting:
1231 1229 cleanup.append(setting)
1232 1230
1233 1231 for object_ in cleanup:
1234 1232 Session().delete(object_)
1235 1233 Session().commit()
1236 1234
1237 1235 def assert_repo_value_equals_global_value(self, response, setting):
1238 1236 assert_response = AssertResponse(response)
1239 1237 global_css_selector = '[name={}_inherited]'.format(setting)
1240 1238 repo_css_selector = '[name={}]'.format(setting)
1241 1239 repo_element = assert_response.get_element(repo_css_selector)
1242 1240 global_element = assert_response.get_element(global_css_selector)
1243 1241 assert repo_element.value == global_element.value
1244 1242
1245 1243
1246 def repo_on_filesystem(repo_name):
1247 try:
1248 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
1249 return True
1250 except Exception:
1251 return False
1252
1253
1254 1244 def _get_permission_for_user(user, repo):
1255 1245 perm = UserRepoToPerm.query()\
1256 1246 .filter(UserRepoToPerm.repository ==
1257 1247 Repository.get_by_repo_name(repo))\
1258 1248 .filter(UserRepoToPerm.user == User.get_by_username(user))\
1259 1249 .all()
1260 1250 return perm
@@ -1,525 +1,515 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import re
22 import os
23 22
24 23 import mock
25 24 import pytest
26 25
27 26 from rhodecode.controllers import summary
28 from rhodecode.lib import vcs
29 27 from rhodecode.lib import helpers as h
30 28 from rhodecode.lib.compat import OrderedDict
31 29 from rhodecode.lib.vcs.exceptions import RepositoryRequirementError
32 30 from rhodecode.model.db import Repository
33 31 from rhodecode.model.meta import Session
34 32 from rhodecode.model.repo import RepoModel
35 33 from rhodecode.model.scm import ScmModel
36 34 from rhodecode.tests import (
37 TestController, url, HG_REPO, assert_session_flash, TESTS_TMP_PATH)
35 TestController, url, HG_REPO, assert_session_flash)
38 36 from rhodecode.tests.fixture import Fixture
39 from rhodecode.tests.utils import AssertResponse
37 from rhodecode.tests.utils import AssertResponse, repo_on_filesystem
40 38
41 39
42 40 fixture = Fixture()
43 41
44 42
45 43 class TestSummaryController(TestController):
46 44 def test_index(self, backend):
47 45 self.log_user()
48 46 repo_id = backend.repo.repo_id
49 47 repo_name = backend.repo_name
50 48 with mock.patch('rhodecode.lib.helpers.is_svn_without_proxy',
51 49 return_value=False):
52 50 response = self.app.get(url('summary_home', repo_name=repo_name))
53 51
54 52 # repo type
55 53 response.mustcontain(
56 54 '<i class="icon-%s">' % (backend.alias, )
57 55 )
58 56 # public/private
59 57 response.mustcontain(
60 58 """<i class="icon-unlock-alt">"""
61 59 )
62 60
63 61 # clone url...
64 62 response.mustcontain(
65 63 'id="clone_url" readonly="readonly"'
66 64 ' value="http://test_admin@test.example.com:80/%s"' % (repo_name, ))
67 65 response.mustcontain(
68 66 'id="clone_url_id" readonly="readonly"'
69 67 ' value="http://test_admin@test.example.com:80/_%s"' % (repo_id, ))
70 68
71 69 def test_index_svn_without_proxy(self, backend_svn):
72 70 self.log_user()
73 71 repo_id = backend_svn.repo.repo_id
74 72 repo_name = backend_svn.repo_name
75 73 response = self.app.get(url('summary_home', repo_name=repo_name))
76 74 # clone url...
77 75 response.mustcontain(
78 76 'id="clone_url" disabled'
79 77 ' value="http://test_admin@test.example.com:80/%s"' % (repo_name, ))
80 78 response.mustcontain(
81 79 'id="clone_url_id" disabled'
82 80 ' value="http://test_admin@test.example.com:80/_%s"' % (repo_id, ))
83 81
84 82 def test_index_with_trailing_slash(self, autologin_user, backend):
85 83 repo_id = backend.repo.repo_id
86 84 repo_name = backend.repo_name
87 85 with mock.patch('rhodecode.lib.helpers.is_svn_without_proxy',
88 86 return_value=False):
89 87 response = self.app.get(
90 88 url('summary_home', repo_name=repo_name) + '/',
91 89 status=200)
92 90
93 91 # clone url...
94 92 response.mustcontain(
95 93 'id="clone_url" readonly="readonly"'
96 94 ' value="http://test_admin@test.example.com:80/%s"' % (repo_name, ))
97 95 response.mustcontain(
98 96 'id="clone_url_id" readonly="readonly"'
99 97 ' value="http://test_admin@test.example.com:80/_%s"' % (repo_id, ))
100 98
101 99 def test_index_by_id(self, backend):
102 100 self.log_user()
103 101 repo_id = backend.repo.repo_id
104 102 response = self.app.get(url(
105 103 'summary_home', repo_name='_%s' % (repo_id,)))
106 104
107 105 # repo type
108 106 response.mustcontain(
109 107 '<i class="icon-%s">' % (backend.alias, )
110 108 )
111 109 # public/private
112 110 response.mustcontain(
113 111 """<i class="icon-unlock-alt">"""
114 112 )
115 113
116 114 def test_index_by_repo_having_id_path_in_name_hg(self):
117 115 self.log_user()
118 116 fixture.create_repo(name='repo_1')
119 117 response = self.app.get(url('summary_home', repo_name='repo_1'))
120 118
121 119 try:
122 120 response.mustcontain("repo_1")
123 121 finally:
124 122 RepoModel().delete(Repository.get_by_repo_name('repo_1'))
125 123 Session().commit()
126 124
127 125 def test_index_with_anonymous_access_disabled(self):
128 126 with fixture.anon_access(False):
129 127 response = self.app.get(url('summary_home', repo_name=HG_REPO),
130 128 status=302)
131 129 assert 'login' in response.location
132 130
133 131 def _enable_stats(self, repo):
134 132 r = Repository.get_by_repo_name(repo)
135 133 r.enable_statistics = True
136 134 Session().add(r)
137 135 Session().commit()
138 136
139 137 expected_trending = {
140 138 'hg': {
141 139 "py": {"count": 68, "desc": ["Python"]},
142 140 "rst": {"count": 16, "desc": ["Rst"]},
143 141 "css": {"count": 2, "desc": ["Css"]},
144 142 "sh": {"count": 2, "desc": ["Bash"]},
145 143 "bat": {"count": 1, "desc": ["Batch"]},
146 144 "cfg": {"count": 1, "desc": ["Ini"]},
147 145 "html": {"count": 1, "desc": ["EvoqueHtml", "Html"]},
148 146 "ini": {"count": 1, "desc": ["Ini"]},
149 147 "js": {"count": 1, "desc": ["Javascript"]},
150 148 "makefile": {"count": 1, "desc": ["Makefile", "Makefile"]}
151 149 },
152 150 'git': {
153 151 "py": {"count": 68, "desc": ["Python"]},
154 152 "rst": {"count": 16, "desc": ["Rst"]},
155 153 "css": {"count": 2, "desc": ["Css"]},
156 154 "sh": {"count": 2, "desc": ["Bash"]},
157 155 "bat": {"count": 1, "desc": ["Batch"]},
158 156 "cfg": {"count": 1, "desc": ["Ini"]},
159 157 "html": {"count": 1, "desc": ["EvoqueHtml", "Html"]},
160 158 "ini": {"count": 1, "desc": ["Ini"]},
161 159 "js": {"count": 1, "desc": ["Javascript"]},
162 160 "makefile": {"count": 1, "desc": ["Makefile", "Makefile"]}
163 161 },
164 162 'svn': {
165 163 "py": {"count": 75, "desc": ["Python"]},
166 164 "rst": {"count": 16, "desc": ["Rst"]},
167 165 "html": {"count": 11, "desc": ["EvoqueHtml", "Html"]},
168 166 "css": {"count": 2, "desc": ["Css"]},
169 167 "bat": {"count": 1, "desc": ["Batch"]},
170 168 "cfg": {"count": 1, "desc": ["Ini"]},
171 169 "ini": {"count": 1, "desc": ["Ini"]},
172 170 "js": {"count": 1, "desc": ["Javascript"]},
173 171 "makefile": {"count": 1, "desc": ["Makefile", "Makefile"]},
174 172 "sh": {"count": 1, "desc": ["Bash"]}
175 173 },
176 174 }
177 175
178 176 def test_repo_stats(self, backend, xhr_header):
179 177 self.log_user()
180 178 response = self.app.get(
181 179 url('repo_stats',
182 180 repo_name=backend.repo_name, commit_id='tip'),
183 181 extra_environ=xhr_header,
184 182 status=200)
185 183 assert re.match(r'6[\d\.]+ KiB', response.json['size'])
186 184
187 185 def test_repo_stats_code_stats_enabled(self, backend, xhr_header):
188 186 self.log_user()
189 187 repo_name = backend.repo_name
190 188
191 189 # codes stats
192 190 self._enable_stats(repo_name)
193 191 ScmModel().mark_for_invalidation(repo_name)
194 192
195 193 response = self.app.get(
196 194 url('repo_stats',
197 195 repo_name=backend.repo_name, commit_id='tip'),
198 196 extra_environ=xhr_header,
199 197 status=200)
200 198
201 199 expected_data = self.expected_trending[backend.alias]
202 200 returned_stats = response.json['code_stats']
203 201 for k, v in expected_data.items():
204 202 assert v == returned_stats[k]
205 203
206 204 def test_repo_refs_data(self, backend):
207 205 response = self.app.get(
208 206 url('repo_refs_data', repo_name=backend.repo_name),
209 207 status=200)
210 208
211 209 # Ensure that there is the correct amount of items in the result
212 210 repo = backend.repo.scm_instance()
213 211 data = response.json['results']
214 212 items = sum(len(section['children']) for section in data)
215 213 repo_refs = len(repo.branches) + len(repo.tags) + len(repo.bookmarks)
216 214 assert items == repo_refs
217 215
218 216 def test_index_shows_missing_requirements_message(
219 217 self, backend, autologin_user):
220 218 repo_name = backend.repo_name
221 219 scm_patcher = mock.patch.object(
222 220 Repository, 'scm_instance', side_effect=RepositoryRequirementError)
223 221
224 222 with scm_patcher:
225 223 response = self.app.get(url('summary_home', repo_name=repo_name))
226 224 assert_response = AssertResponse(response)
227 225 assert_response.element_contains(
228 226 '.main .alert-warning strong', 'Missing requirements')
229 227 assert_response.element_contains(
230 228 '.main .alert-warning',
231 229 'These commits cannot be displayed, because this repository'
232 230 ' uses the Mercurial largefiles extension, which was not enabled.')
233 231
234 232 def test_missing_requirements_page_does_not_contains_switch_to(
235 233 self, backend):
236 234 self.log_user()
237 235 repo_name = backend.repo_name
238 236 scm_patcher = mock.patch.object(
239 237 Repository, 'scm_instance', side_effect=RepositoryRequirementError)
240 238
241 239 with scm_patcher:
242 240 response = self.app.get(url('summary_home', repo_name=repo_name))
243 241 response.mustcontain(no='Switch To')
244 242
245 243
246 244 @pytest.mark.usefixtures('pylonsapp')
247 245 class TestSwitcherReferenceData:
248 246
249 247 def test_creates_reference_urls_based_on_name(self):
250 248 references = {
251 249 'name': 'commit_id',
252 250 }
253 251 controller = summary.SummaryController()
254 252 is_svn = False
255 253 result = controller._switcher_reference_data(
256 254 'repo_name', references, is_svn)
257 255 expected_url = h.url(
258 256 'files_home', repo_name='repo_name', revision='name',
259 257 at='name')
260 258 assert result[0]['files_url'] == expected_url
261 259
262 260 def test_urls_contain_commit_id_if_slash_in_name(self):
263 261 references = {
264 262 'name/with/slash': 'commit_id',
265 263 }
266 264 controller = summary.SummaryController()
267 265 is_svn = False
268 266 result = controller._switcher_reference_data(
269 267 'repo_name', references, is_svn)
270 268 expected_url = h.url(
271 269 'files_home', repo_name='repo_name', revision='commit_id',
272 270 at='name/with/slash')
273 271 assert result[0]['files_url'] == expected_url
274 272
275 273 def test_adds_reference_to_path_for_svn(self):
276 274 references = {
277 275 'name/with/slash': 'commit_id',
278 276 }
279 277 controller = summary.SummaryController()
280 278 is_svn = True
281 279 result = controller._switcher_reference_data(
282 280 'repo_name', references, is_svn)
283 281 expected_url = h.url(
284 282 'files_home', repo_name='repo_name', f_path='name/with/slash',
285 283 revision='commit_id', at='name/with/slash')
286 284 assert result[0]['files_url'] == expected_url
287 285
288 286
289 287 @pytest.mark.usefixtures('pylonsapp')
290 288 class TestCreateReferenceData:
291 289
292 290 @pytest.fixture
293 291 def example_refs(self):
294 292 section_1_refs = OrderedDict((('a', 'a_id'), ('b', 'b_id')))
295 293 example_refs = [
296 294 ('section_1', section_1_refs, 't1'),
297 295 ('section_2', {'c': 'c_id'}, 't2'),
298 296 ]
299 297 return example_refs
300 298
301 299 def test_generates_refs_based_on_commit_ids(self, example_refs):
302 300 repo = mock.Mock()
303 301 repo.name = 'test-repo'
304 302 repo.alias = 'git'
305 303 full_repo_name = 'pytest-repo-group/' + repo.name
306 304 controller = summary.SummaryController()
307 305
308 306 result = controller._create_reference_data(
309 307 repo, full_repo_name, example_refs)
310 308
311 309 expected_files_url = '/{}/files/'.format(full_repo_name)
312 310 expected_result = [
313 311 {
314 312 'children': [
315 313 {
316 314 'id': 'a', 'raw_id': 'a_id', 'text': 'a', 'type': 't1',
317 315 'files_url': expected_files_url + 'a/?at=a',
318 316 },
319 317 {
320 318 'id': 'b', 'raw_id': 'b_id', 'text': 'b', 'type': 't1',
321 319 'files_url': expected_files_url + 'b/?at=b',
322 320 }
323 321 ],
324 322 'text': 'section_1'
325 323 },
326 324 {
327 325 'children': [
328 326 {
329 327 'id': 'c', 'raw_id': 'c_id', 'text': 'c', 'type': 't2',
330 328 'files_url': expected_files_url + 'c/?at=c',
331 329 }
332 330 ],
333 331 'text': 'section_2'
334 332 }]
335 333 assert result == expected_result
336 334
337 335 def test_generates_refs_with_path_for_svn(self, example_refs):
338 336 repo = mock.Mock()
339 337 repo.name = 'test-repo'
340 338 repo.alias = 'svn'
341 339 full_repo_name = 'pytest-repo-group/' + repo.name
342 340 controller = summary.SummaryController()
343 341 result = controller._create_reference_data(
344 342 repo, full_repo_name, example_refs)
345 343
346 344 expected_files_url = '/{}/files/'.format(full_repo_name)
347 345 expected_result = [
348 346 {
349 347 'children': [
350 348 {
351 349 'id': 'a@a_id', 'raw_id': 'a_id',
352 350 'text': 'a', 'type': 't1',
353 351 'files_url': expected_files_url + 'a_id/a?at=a',
354 352 },
355 353 {
356 354 'id': 'b@b_id', 'raw_id': 'b_id',
357 355 'text': 'b', 'type': 't1',
358 356 'files_url': expected_files_url + 'b_id/b?at=b',
359 357 }
360 358 ],
361 359 'text': 'section_1'
362 360 },
363 361 {
364 362 'children': [
365 363 {
366 364 'id': 'c@c_id', 'raw_id': 'c_id',
367 365 'text': 'c', 'type': 't2',
368 366 'files_url': expected_files_url + 'c_id/c?at=c',
369 367 }
370 368 ],
371 369 'text': 'section_2'
372 370 }
373 371 ]
374 372 assert result == expected_result
375 373
376 374
377 375 @pytest.mark.usefixtures("app")
378 376 class TestRepoLocation:
379 377
380 378 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ™Ε‚'], ids=['', 'non-ascii'])
381 379 def test_manual_delete(self, autologin_user, backend, suffix, csrf_token):
382 380 repo = backend.create_repo(name_suffix=suffix)
383 381 repo_name = repo.repo_name
384 382
385 383 # delete from file system
386 384 RepoModel()._delete_filesystem_repo(repo)
387 385
388 386 # test if the repo is still in the database
389 387 new_repo = RepoModel().get_by_repo_name(repo_name)
390 388 assert new_repo.repo_name == repo_name
391 389
392 390 # check if repo is not in the filesystem
393 391 assert not repo_on_filesystem(repo_name)
394 392 self.assert_repo_not_found_redirect(repo_name)
395 393
396 394 def assert_repo_not_found_redirect(self, repo_name):
397 395 # run the check page that triggers the other flash message
398 396 response = self.app.get(url('repo_check_home', repo_name=repo_name))
399 397 assert_session_flash(
400 398 response, 'The repository at %s cannot be located.' % repo_name)
401 399
402 400
403 def repo_on_filesystem(repo_name):
404 try:
405 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
406 return True
407 except Exception:
408 return False
409
410
411 401 class TestCreateFilesUrl(object):
412 402 def test_creates_non_svn_url(self):
413 403 controller = summary.SummaryController()
414 404 repo = mock.Mock()
415 405 repo.name = 'abcde'
416 406 full_repo_name = 'test-repo-group/' + repo.name
417 407 ref_name = 'branch1'
418 408 raw_id = 'deadbeef0123456789'
419 409 is_svn = False
420 410
421 411 with mock.patch.object(summary.h, 'url') as url_mock:
422 412 result = controller._create_files_url(
423 413 repo, full_repo_name, ref_name, raw_id, is_svn)
424 414 url_mock.assert_called_once_with(
425 415 'files_home', repo_name=full_repo_name, f_path='',
426 416 revision=ref_name, at=ref_name)
427 417 assert result == url_mock.return_value
428 418
429 419 def test_creates_svn_url(self):
430 420 controller = summary.SummaryController()
431 421 repo = mock.Mock()
432 422 repo.name = 'abcde'
433 423 full_repo_name = 'test-repo-group/' + repo.name
434 424 ref_name = 'branch1'
435 425 raw_id = 'deadbeef0123456789'
436 426 is_svn = True
437 427
438 428 with mock.patch.object(summary.h, 'url') as url_mock:
439 429 result = controller._create_files_url(
440 430 repo, full_repo_name, ref_name, raw_id, is_svn)
441 431 url_mock.assert_called_once_with(
442 432 'files_home', repo_name=full_repo_name, f_path=ref_name,
443 433 revision=raw_id, at=ref_name)
444 434 assert result == url_mock.return_value
445 435
446 436 def test_name_has_slashes(self):
447 437 controller = summary.SummaryController()
448 438 repo = mock.Mock()
449 439 repo.name = 'abcde'
450 440 full_repo_name = 'test-repo-group/' + repo.name
451 441 ref_name = 'branch1/branch2'
452 442 raw_id = 'deadbeef0123456789'
453 443 is_svn = False
454 444
455 445 with mock.patch.object(summary.h, 'url') as url_mock:
456 446 result = controller._create_files_url(
457 447 repo, full_repo_name, ref_name, raw_id, is_svn)
458 448 url_mock.assert_called_once_with(
459 449 'files_home', repo_name=full_repo_name, f_path='', revision=raw_id,
460 450 at=ref_name)
461 451 assert result == url_mock.return_value
462 452
463 453
464 454 class TestReferenceItems(object):
465 455 repo = mock.Mock()
466 456 repo.name = 'pytest-repo'
467 457 repo_full_name = 'pytest-repo-group/' + repo.name
468 458 ref_type = 'branch'
469 459 fake_url = '/abcde/'
470 460
471 461 @staticmethod
472 462 def _format_function(name, id_):
473 463 return 'format_function_{}_{}'.format(name, id_)
474 464
475 465 def test_creates_required_amount_of_items(self):
476 466 amount = 100
477 467 refs = {
478 468 'ref{}'.format(i): '{0:040d}'.format(i)
479 469 for i in range(amount)
480 470 }
481 471
482 472 controller = summary.SummaryController()
483 473
484 474 url_patcher = mock.patch.object(
485 475 controller, '_create_files_url')
486 476 svn_patcher = mock.patch.object(
487 477 summary.h, 'is_svn', return_value=False)
488 478
489 479 with url_patcher as url_mock, svn_patcher:
490 480 result = controller._create_reference_items(
491 481 self.repo, self.repo_full_name, refs, self.ref_type,
492 482 self._format_function)
493 483 assert len(result) == amount
494 484 assert url_mock.call_count == amount
495 485
496 486 def test_single_item_details(self):
497 487 ref_name = 'ref1'
498 488 ref_id = 'deadbeef'
499 489 refs = {
500 490 ref_name: ref_id
501 491 }
502 492
503 493 controller = summary.SummaryController()
504 494 url_patcher = mock.patch.object(
505 495 controller, '_create_files_url', return_value=self.fake_url)
506 496 svn_patcher = mock.patch.object(
507 497 summary.h, 'is_svn', return_value=False)
508 498
509 499 with url_patcher as url_mock, svn_patcher:
510 500 result = controller._create_reference_items(
511 501 self.repo, self.repo_full_name, refs, self.ref_type,
512 502 self._format_function)
513 503
514 504 url_mock.assert_called_once_with(
515 505 self.repo, self.repo_full_name, ref_name, ref_id, False)
516 506 expected_result = [
517 507 {
518 508 'text': ref_name,
519 509 'id': self._format_function(ref_name, ref_id),
520 510 'raw_id': ref_id,
521 511 'type': self.ref_type,
522 512 'files_url': self.fake_url
523 513 }
524 514 ]
525 515 assert result == expected_result
@@ -1,285 +1,293 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import threading
22 22 import time
23 23 import logging
24 24 import os.path
25 25 import subprocess
26 26 import urllib2
27 27 from urlparse import urlparse, parse_qsl
28 28 from urllib import unquote_plus
29 29
30 30 import pytest
31 31 import rc_testdata
32 32 from lxml.html import fromstring, tostring
33 33 from lxml.cssselect import CSSSelector
34 34
35 35 from rhodecode.model.db import User
36 36 from rhodecode.model.meta import Session
37 37 from rhodecode.model.scm import ScmModel
38 38 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
39 39
40 40
41 41 log = logging.getLogger(__name__)
42 42
43 43
44 44 def set_anonymous_access(enabled):
45 45 """(Dis)allows anonymous access depending on parameter `enabled`"""
46 46 user = User.get_default_user()
47 47 user.active = enabled
48 48 Session().add(user)
49 49 Session().commit()
50 50 log.info('anonymous access is now: %s', enabled)
51 51 assert enabled == User.get_default_user().active, (
52 52 'Cannot set anonymous access')
53 53
54 54
55 55 def check_xfail_backends(node, backend_alias):
56 56 # Using "xfail_backends" here intentionally, since this marks work
57 57 # which is "to be done" soon.
58 58 skip_marker = node.get_marker('xfail_backends')
59 59 if skip_marker and backend_alias in skip_marker.args:
60 60 msg = "Support for backend %s to be developed." % (backend_alias, )
61 61 msg = skip_marker.kwargs.get('reason', msg)
62 62 pytest.xfail(msg)
63 63
64 64
65 65 def check_skip_backends(node, backend_alias):
66 66 # Using "skip_backends" here intentionally, since this marks work which is
67 67 # not supported.
68 68 skip_marker = node.get_marker('skip_backends')
69 69 if skip_marker and backend_alias in skip_marker.args:
70 70 msg = "Feature not supported for backend %s." % (backend_alias, )
71 71 msg = skip_marker.kwargs.get('reason', msg)
72 72 pytest.skip(msg)
73 73
74 74
75 75 def extract_git_repo_from_dump(dump_name, repo_name):
76 76 """Create git repo `repo_name` from dump `dump_name`."""
77 77 repos_path = ScmModel().repos_path
78 78 target_path = os.path.join(repos_path, repo_name)
79 79 rc_testdata.extract_git_dump(dump_name, target_path)
80 80 return target_path
81 81
82 82
83 83 def extract_hg_repo_from_dump(dump_name, repo_name):
84 84 """Create hg repo `repo_name` from dump `dump_name`."""
85 85 repos_path = ScmModel().repos_path
86 86 target_path = os.path.join(repos_path, repo_name)
87 87 rc_testdata.extract_hg_dump(dump_name, target_path)
88 88 return target_path
89 89
90 90
91 91 def extract_svn_repo_from_dump(dump_name, repo_name):
92 92 """Create a svn repo `repo_name` from dump `dump_name`."""
93 93 repos_path = ScmModel().repos_path
94 94 target_path = os.path.join(repos_path, repo_name)
95 95 SubversionRepository(target_path, create=True)
96 96 _load_svn_dump_into_repo(dump_name, target_path)
97 97 return target_path
98 98
99 99
100 100 def assert_message_in_log(log_records, message, levelno, module):
101 101 messages = [
102 102 r.message for r in log_records
103 103 if r.module == module and r.levelno == levelno
104 104 ]
105 105 assert message in messages
106 106
107 107
108 108 def _load_svn_dump_into_repo(dump_name, repo_path):
109 109 """
110 110 Utility to populate a svn repository with a named dump
111 111
112 112 Currently the dumps are in rc_testdata. They might later on be
113 113 integrated with the main repository once they stabilize more.
114 114 """
115 115 dump = rc_testdata.load_svn_dump(dump_name)
116 116 load_dump = subprocess.Popen(
117 117 ['svnadmin', 'load', repo_path],
118 118 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
119 119 stderr=subprocess.PIPE)
120 120 out, err = load_dump.communicate(dump)
121 121 if load_dump.returncode != 0:
122 122 log.error("Output of load_dump command: %s", out)
123 123 log.error("Error output of load_dump command: %s", err)
124 124 raise Exception(
125 125 'Failed to load dump "%s" into repository at path "%s".'
126 126 % (dump_name, repo_path))
127 127
128 128
129 129 class AssertResponse(object):
130 130 """
131 131 Utility that helps to assert things about a given HTML response.
132 132 """
133 133
134 134 def __init__(self, response):
135 135 self.response = response
136 136
137 137 def one_element_exists(self, css_selector):
138 138 self.get_element(css_selector)
139 139
140 140 def no_element_exists(self, css_selector):
141 141 assert not self._get_elements(css_selector)
142 142
143 143 def element_equals_to(self, css_selector, expected_content):
144 144 element = self.get_element(css_selector)
145 145 element_text = self._element_to_string(element)
146 146 assert expected_content in element_text
147 147
148 148 def element_contains(self, css_selector, expected_content):
149 149 element = self.get_element(css_selector)
150 150 assert expected_content in element.text_content()
151 151
152 152 def contains_one_link(self, link_text, href):
153 153 doc = fromstring(self.response.body)
154 154 sel = CSSSelector('a[href]')
155 155 elements = [
156 156 e for e in sel(doc) if e.text_content().strip() == link_text]
157 157 assert len(elements) == 1, "Did not find link or found multiple links"
158 158 self._ensure_url_equal(elements[0].attrib.get('href'), href)
159 159
160 160 def contains_one_anchor(self, anchor_id):
161 161 doc = fromstring(self.response.body)
162 162 sel = CSSSelector('#' + anchor_id)
163 163 elements = sel(doc)
164 164 assert len(elements) == 1
165 165
166 166 def _ensure_url_equal(self, found, expected):
167 167 assert _Url(found) == _Url(expected)
168 168
169 169 def get_element(self, css_selector):
170 170 elements = self._get_elements(css_selector)
171 171 assert len(elements) == 1
172 172 return elements[0]
173 173
174 174 def get_elements(self, css_selector):
175 175 return self._get_elements(css_selector)
176 176
177 177 def _get_elements(self, css_selector):
178 178 doc = fromstring(self.response.body)
179 179 sel = CSSSelector(css_selector)
180 180 elements = sel(doc)
181 181 return elements
182 182
183 183 def _element_to_string(self, element):
184 184 return tostring(element)
185 185
186 186
187 187 class _Url(object):
188 188 """
189 189 A url object that can be compared with other url orbjects
190 190 without regard to the vagaries of encoding, escaping, and ordering
191 191 of parameters in query strings.
192 192
193 193 Inspired by
194 194 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
195 195 """
196 196
197 197 def __init__(self, url):
198 198 parts = urlparse(url)
199 199 _query = frozenset(parse_qsl(parts.query))
200 200 _path = unquote_plus(parts.path)
201 201 parts = parts._replace(query=_query, path=_path)
202 202 self.parts = parts
203 203
204 204 def __eq__(self, other):
205 205 return self.parts == other.parts
206 206
207 207 def __hash__(self):
208 208 return hash(self.parts)
209 209
210 210
211 211 def run_test_concurrently(times, raise_catched_exc=True):
212 212 """
213 213 Add this decorator to small pieces of code that you want to test
214 214 concurrently
215 215
216 216 ex:
217 217
218 218 @test_concurrently(25)
219 219 def my_test_function():
220 220 ...
221 221 """
222 222 def test_concurrently_decorator(test_func):
223 223 def wrapper(*args, **kwargs):
224 224 exceptions = []
225 225
226 226 def call_test_func():
227 227 try:
228 228 test_func(*args, **kwargs)
229 229 except Exception, e:
230 230 exceptions.append(e)
231 231 if raise_catched_exc:
232 232 raise
233 233 threads = []
234 234 for i in range(times):
235 235 threads.append(threading.Thread(target=call_test_func))
236 236 for t in threads:
237 237 t.start()
238 238 for t in threads:
239 239 t.join()
240 240 if exceptions:
241 241 raise Exception(
242 242 'test_concurrently intercepted %s exceptions: %s' % (
243 243 len(exceptions), exceptions))
244 244 return wrapper
245 245 return test_concurrently_decorator
246 246
247 247
248 248 def wait_for_url(url, timeout=10):
249 249 """
250 250 Wait until URL becomes reachable.
251 251
252 252 It polls the URL until the timeout is reached or it became reachable.
253 253 If will call to `py.test.fail` in case the URL is not reachable.
254 254 """
255 255 timeout = time.time() + timeout
256 256 last = 0
257 257 wait = 0.1
258 258
259 259 while (timeout > last):
260 260 last = time.time()
261 261 if is_url_reachable(url):
262 262 break
263 263 elif ((last + wait) > time.time()):
264 264 # Go to sleep because not enough time has passed since last check.
265 265 time.sleep(wait)
266 266 else:
267 267 pytest.fail("Timeout while waiting for URL {}".format(url))
268 268
269 269
270 270 def is_url_reachable(url):
271 271 try:
272 272 urllib2.urlopen(url)
273 273 except urllib2.URLError:
274 274 return False
275 275 return True
276 276
277 277
278 278 def get_session_from_response(response):
279 279 """
280 280 This returns the session from a response object. Pylons has some magic
281 281 to make the session available as `response.session`. But pyramid
282 282 doesn't expose it.
283 283 """
284 284 # TODO: Try to look up the session key also.
285 285 return response.request.environ['beaker.session']
286
287
288 def repo_on_filesystem(repo_name):
289 from rhodecode.lib import vcs
290 from rhodecode.tests import TESTS_TMP_PATH
291 repo = vcs.get_vcs_instance(
292 os.path.join(TESTS_TMP_PATH, repo_name), create=False)
293 return repo is not None
General Comments 0
You need to be logged in to leave comments. Login now