##// END OF EJS Templates
tests: fixed some tests after new audit-logger
marcink -
r1700:818de782 default
parent child Browse files
Show More
@@ -1,172 +1,174 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
21 import os
22 import csv
22 import csv
23 import datetime
23 import datetime
24
24
25 import pytest
25 import pytest
26
26
27 from rhodecode.tests import *
27 from rhodecode.tests import *
28 from rhodecode.model.db import UserLog
28 from rhodecode.model.db import UserLog
29 from rhodecode.model.meta import Session
29 from rhodecode.model.meta import Session
30 from rhodecode.lib.utils2 import safe_unicode
30 from rhodecode.lib.utils2 import safe_unicode
31
31
32 dn = os.path.dirname
32 dn = os.path.dirname
33 FIXTURES = os.path.join(dn(dn(os.path.abspath(__file__))), 'fixtures')
33 FIXTURES = os.path.join(dn(dn(os.path.abspath(__file__))), 'fixtures')
34
34
35
35
36 class TestAdminController(TestController):
36 class TestAdminController(TestController):
37
37
38 @pytest.fixture(scope='class', autouse=True)
38 @pytest.fixture(scope='class', autouse=True)
39 def prepare(self, request, pylonsapp):
39 def prepare(self, request, pylonsapp):
40 UserLog.query().delete()
40 UserLog.query().delete()
41 Session().commit()
41 Session().commit()
42
42
43 def strptime(val):
43 def strptime(val):
44 fmt = '%Y-%m-%d %H:%M:%S'
44 fmt = '%Y-%m-%d %H:%M:%S'
45 if '.' not in val:
45 if '.' not in val:
46 return datetime.datetime.strptime(val, fmt)
46 return datetime.datetime.strptime(val, fmt)
47
47
48 nofrag, frag = val.split(".")
48 nofrag, frag = val.split(".")
49 date = datetime.datetime.strptime(nofrag, fmt)
49 date = datetime.datetime.strptime(nofrag, fmt)
50
50
51 frag = frag[:6] # truncate to microseconds
51 frag = frag[:6] # truncate to microseconds
52 frag += (6 - len(frag)) * '0' # add 0s
52 frag += (6 - len(frag)) * '0' # add 0s
53 return date.replace(microsecond=int(frag))
53 return date.replace(microsecond=int(frag))
54
54
55 with open(os.path.join(FIXTURES, 'journal_dump.csv')) as f:
55 with open(os.path.join(FIXTURES, 'journal_dump.csv')) as f:
56 for row in csv.DictReader(f):
56 for row in csv.DictReader(f):
57 ul = UserLog()
57 ul = UserLog()
58 for k, v in row.iteritems():
58 for k, v in row.iteritems():
59 v = safe_unicode(v)
59 v = safe_unicode(v)
60 if k == 'action_date':
60 if k == 'action_date':
61 v = strptime(v)
61 v = strptime(v)
62 if k in ['user_id', 'repository_id']:
62 if k in ['user_id', 'repository_id']:
63 # nullable due to FK problems
63 # nullable due to FK problems
64 v = None
64 v = None
65 setattr(ul, k, v)
65 setattr(ul, k, v)
66 Session().add(ul)
66 Session().add(ul)
67 Session().commit()
67 Session().commit()
68
68
69 @request.addfinalizer
69 @request.addfinalizer
70 def cleanup():
70 def cleanup():
71 UserLog.query().delete()
71 UserLog.query().delete()
72 Session().commit()
72 Session().commit()
73
73
74 def test_index(self):
74 def test_index(self):
75 self.log_user()
75 self.log_user()
76 response = self.app.get(url(controller='admin/admin', action='index'))
76 response = self.app.get(url(controller='admin/admin', action='index'))
77 response.mustcontain('Admin journal')
77 response.mustcontain('Admin journal')
78
78
79 def test_filter_all_entries(self):
79 def test_filter_all_entries(self):
80 self.log_user()
80 self.log_user()
81 response = self.app.get(url(controller='admin/admin', action='index',))
81 response = self.app.get(url(controller='admin/admin', action='index',))
82 response.mustcontain('2034 entries')
82 all_count = UserLog.query().count()
83 response.mustcontain('%s entries' % all_count)
83
84
84 def test_filter_journal_filter_exact_match_on_repository(self):
85 def test_filter_journal_filter_exact_match_on_repository(self):
85 self.log_user()
86 self.log_user()
86 response = self.app.get(url(controller='admin/admin', action='index',
87 response = self.app.get(url(controller='admin/admin', action='index',
87 filter='repository:rhodecode'))
88 filter='repository:rhodecode'))
88 response.mustcontain('3 entries')
89 response.mustcontain('3 entries')
89
90
90 def test_filter_journal_filter_exact_match_on_repository_CamelCase(self):
91 def test_filter_journal_filter_exact_match_on_repository_CamelCase(self):
91 self.log_user()
92 self.log_user()
92 response = self.app.get(url(controller='admin/admin', action='index',
93 response = self.app.get(url(controller='admin/admin', action='index',
93 filter='repository:RhodeCode'))
94 filter='repository:RhodeCode'))
94 response.mustcontain('3 entries')
95 response.mustcontain('3 entries')
95
96
96 def test_filter_journal_filter_wildcard_on_repository(self):
97 def test_filter_journal_filter_wildcard_on_repository(self):
97 self.log_user()
98 self.log_user()
98 response = self.app.get(url(controller='admin/admin', action='index',
99 response = self.app.get(url(controller='admin/admin', action='index',
99 filter='repository:*test*'))
100 filter='repository:*test*'))
100 response.mustcontain('862 entries')
101 response.mustcontain('862 entries')
101
102
102 def test_filter_journal_filter_prefix_on_repository(self):
103 def test_filter_journal_filter_prefix_on_repository(self):
103 self.log_user()
104 self.log_user()
104 response = self.app.get(url(controller='admin/admin', action='index',
105 response = self.app.get(url(controller='admin/admin', action='index',
105 filter='repository:test*'))
106 filter='repository:test*'))
106 response.mustcontain('257 entries')
107 response.mustcontain('257 entries')
107
108
108 def test_filter_journal_filter_prefix_on_repository_CamelCase(self):
109 def test_filter_journal_filter_prefix_on_repository_CamelCase(self):
109 self.log_user()
110 self.log_user()
110 response = self.app.get(url(controller='admin/admin', action='index',
111 response = self.app.get(url(controller='admin/admin', action='index',
111 filter='repository:Test*'))
112 filter='repository:Test*'))
112 response.mustcontain('257 entries')
113 response.mustcontain('257 entries')
113
114
114 def test_filter_journal_filter_prefix_on_repository_and_user(self):
115 def test_filter_journal_filter_prefix_on_repository_and_user(self):
115 self.log_user()
116 self.log_user()
116 response = self.app.get(url(controller='admin/admin', action='index',
117 response = self.app.get(url(controller='admin/admin', action='index',
117 filter='repository:test* AND username:demo'))
118 filter='repository:test* AND username:demo'))
118 response.mustcontain('130 entries')
119 response.mustcontain('130 entries')
119
120
120 def test_filter_journal_filter_prefix_on_repository_or_target_repo(self):
121 def test_filter_journal_filter_prefix_on_repository_or_target_repo(self):
121 self.log_user()
122 self.log_user()
122 response = self.app.get(url(controller='admin/admin', action='index',
123 response = self.app.get(url(controller='admin/admin', action='index',
123 filter='repository:test* OR repository:rhodecode'))
124 filter='repository:test* OR repository:rhodecode'))
124 response.mustcontain('260 entries') # 257 + 3
125 response.mustcontain('260 entries') # 257 + 3
125
126
126 def test_filter_journal_filter_exact_match_on_username(self):
127 def test_filter_journal_filter_exact_match_on_username(self):
127 self.log_user()
128 self.log_user()
128 response = self.app.get(url(controller='admin/admin', action='index',
129 response = self.app.get(url(controller='admin/admin', action='index',
129 filter='username:demo'))
130 filter='username:demo'))
130 response.mustcontain('1087 entries')
131 response.mustcontain('1087 entries')
131
132
132 def test_filter_journal_filter_exact_match_on_username_camelCase(self):
133 def test_filter_journal_filter_exact_match_on_username_camelCase(self):
133 self.log_user()
134 self.log_user()
134 response = self.app.get(url(controller='admin/admin', action='index',
135 response = self.app.get(url(controller='admin/admin', action='index',
135 filter='username:DemO'))
136 filter='username:DemO'))
136 response.mustcontain('1087 entries')
137 response.mustcontain('1087 entries')
137
138
138 def test_filter_journal_filter_wildcard_on_username(self):
139 def test_filter_journal_filter_wildcard_on_username(self):
139 self.log_user()
140 self.log_user()
140 response = self.app.get(url(controller='admin/admin', action='index',
141 response = self.app.get(url(controller='admin/admin', action='index',
141 filter='username:*test*'))
142 filter='username:*test*'))
142 response.mustcontain('100 entries')
143 entries_count = UserLog.query().filter(UserLog.username.ilike('%test%')).count()
144 response.mustcontain('{} entries'.format(entries_count))
143
145
144 def test_filter_journal_filter_prefix_on_username(self):
146 def test_filter_journal_filter_prefix_on_username(self):
145 self.log_user()
147 self.log_user()
146 response = self.app.get(url(controller='admin/admin', action='index',
148 response = self.app.get(url(controller='admin/admin', action='index',
147 filter='username:demo*'))
149 filter='username:demo*'))
148 response.mustcontain('1101 entries')
150 response.mustcontain('1101 entries')
149
151
150 def test_filter_journal_filter_prefix_on_user_or_other_user(self):
152 def test_filter_journal_filter_prefix_on_user_or_other_user(self):
151 self.log_user()
153 self.log_user()
152 response = self.app.get(url(controller='admin/admin', action='index',
154 response = self.app.get(url(controller='admin/admin', action='index',
153 filter='username:demo OR username:volcan'))
155 filter='username:demo OR username:volcan'))
154 response.mustcontain('1095 entries') # 1087 + 8
156 response.mustcontain('1095 entries') # 1087 + 8
155
157
156 def test_filter_journal_filter_wildcard_on_action(self):
158 def test_filter_journal_filter_wildcard_on_action(self):
157 self.log_user()
159 self.log_user()
158 response = self.app.get(url(controller='admin/admin', action='index',
160 response = self.app.get(url(controller='admin/admin', action='index',
159 filter='action:*pull_request*'))
161 filter='action:*pull_request*'))
160 response.mustcontain('187 entries')
162 response.mustcontain('187 entries')
161
163
162 def test_filter_journal_filter_on_date(self):
164 def test_filter_journal_filter_on_date(self):
163 self.log_user()
165 self.log_user()
164 response = self.app.get(url(controller='admin/admin', action='index',
166 response = self.app.get(url(controller='admin/admin', action='index',
165 filter='date:20121010'))
167 filter='date:20121010'))
166 response.mustcontain('47 entries')
168 response.mustcontain('47 entries')
167
169
168 def test_filter_journal_filter_on_date_2(self):
170 def test_filter_journal_filter_on_date_2(self):
169 self.log_user()
171 self.log_user()
170 response = self.app.get(url(controller='admin/admin', action='index',
172 response = self.app.get(url(controller='admin/admin', action='index',
171 filter='date:20121020'))
173 filter='date:20121020'))
172 response.mustcontain('17 entries')
174 response.mustcontain('17 entries')
@@ -1,1264 +1,1268 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import urllib
21 import urllib
22
22
23 import mock
23 import mock
24 import pytest
24 import pytest
25
25
26 from rhodecode.lib import auth
26 from rhodecode.lib import auth
27 from rhodecode.lib.utils2 import safe_str, str2bool, safe_unicode
27 from rhodecode.lib.utils2 import safe_str, str2bool, safe_unicode
28 from rhodecode.lib.vcs.exceptions import RepositoryRequirementError
28 from rhodecode.lib.vcs.exceptions import RepositoryRequirementError
29 from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\
29 from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\
30 Permission
30 Permission
31 from rhodecode.model.meta import Session
31 from rhodecode.model.meta import Session
32 from rhodecode.model.repo import RepoModel
32 from rhodecode.model.repo import RepoModel
33 from rhodecode.model.repo_group import RepoGroupModel
33 from rhodecode.model.repo_group import RepoGroupModel
34 from rhodecode.model.settings import SettingsModel, VcsSettingsModel
34 from rhodecode.model.settings import SettingsModel, VcsSettingsModel
35 from rhodecode.model.user import UserModel
35 from rhodecode.model.user import UserModel
36 from rhodecode.tests import (
36 from rhodecode.tests import (
37 login_user_session, url, assert_session_flash, TEST_USER_ADMIN_LOGIN,
37 login_user_session, url, assert_session_flash, TEST_USER_ADMIN_LOGIN,
38 TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS, HG_REPO, GIT_REPO,
38 TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS, HG_REPO, GIT_REPO,
39 logout_user_session)
39 logout_user_session)
40 from rhodecode.tests.fixture import Fixture, error_function
40 from rhodecode.tests.fixture import Fixture, error_function
41 from rhodecode.tests.utils import AssertResponse, repo_on_filesystem
41 from rhodecode.tests.utils import AssertResponse, repo_on_filesystem
42
42
43 fixture = Fixture()
43 fixture = Fixture()
44
44
45
45
46 @pytest.mark.usefixtures("app")
46 @pytest.mark.usefixtures("app")
47 class TestAdminRepos(object):
47 class TestAdminRepos(object):
48
48
49 def test_index(self):
49 def test_index(self):
50 self.app.get(url('repos'))
50 self.app.get(url('repos'))
51
51
52 def test_create_page_restricted(self, autologin_user, backend):
52 def test_create_page_restricted(self, autologin_user, backend):
53 with mock.patch('rhodecode.BACKENDS', {'git': 'git'}):
53 with mock.patch('rhodecode.BACKENDS', {'git': 'git'}):
54 response = self.app.get(url('new_repo'), status=200)
54 response = self.app.get(url('new_repo'), status=200)
55 assert_response = AssertResponse(response)
55 assert_response = AssertResponse(response)
56 element = assert_response.get_element('#repo_type')
56 element = assert_response.get_element('#repo_type')
57 assert element.text_content() == '\ngit\n'
57 assert element.text_content() == '\ngit\n'
58
58
59 def test_create_page_non_restricted(self, autologin_user, backend):
59 def test_create_page_non_restricted(self, autologin_user, backend):
60 response = self.app.get(url('new_repo'), status=200)
60 response = self.app.get(url('new_repo'), status=200)
61 assert_response = AssertResponse(response)
61 assert_response = AssertResponse(response)
62 assert_response.element_contains('#repo_type', 'git')
62 assert_response.element_contains('#repo_type', 'git')
63 assert_response.element_contains('#repo_type', 'svn')
63 assert_response.element_contains('#repo_type', 'svn')
64 assert_response.element_contains('#repo_type', 'hg')
64 assert_response.element_contains('#repo_type', 'hg')
65
65
66 @pytest.mark.parametrize("suffix",
66 @pytest.mark.parametrize("suffix",
67 [u'', u'xxa'], ids=['', 'non-ascii'])
67 [u'', u'xxa'], ids=['', 'non-ascii'])
68 def test_create(self, autologin_user, backend, suffix, csrf_token):
68 def test_create(self, autologin_user, backend, suffix, csrf_token):
69 repo_name_unicode = backend.new_repo_name(suffix=suffix)
69 repo_name_unicode = backend.new_repo_name(suffix=suffix)
70 repo_name = repo_name_unicode.encode('utf8')
70 repo_name = repo_name_unicode.encode('utf8')
71 description_unicode = u'description for newly created repo' + suffix
71 description_unicode = u'description for newly created repo' + suffix
72 description = description_unicode.encode('utf8')
72 description = description_unicode.encode('utf8')
73 response = self.app.post(
73 response = self.app.post(
74 url('repos'),
74 url('repos'),
75 fixture._get_repo_create_params(
75 fixture._get_repo_create_params(
76 repo_private=False,
76 repo_private=False,
77 repo_name=repo_name,
77 repo_name=repo_name,
78 repo_type=backend.alias,
78 repo_type=backend.alias,
79 repo_description=description,
79 repo_description=description,
80 csrf_token=csrf_token),
80 csrf_token=csrf_token),
81 status=302)
81 status=302)
82
82
83 self.assert_repository_is_created_correctly(
83 self.assert_repository_is_created_correctly(
84 repo_name, description, backend)
84 repo_name, description, backend)
85
85
86 def test_create_numeric(self, autologin_user, backend, csrf_token):
86 def test_create_numeric(self, autologin_user, backend, csrf_token):
87 numeric_repo = '1234'
87 numeric_repo = '1234'
88 repo_name = numeric_repo
88 repo_name = numeric_repo
89 description = 'description for newly created repo' + numeric_repo
89 description = 'description for newly created repo' + numeric_repo
90 self.app.post(
90 self.app.post(
91 url('repos'),
91 url('repos'),
92 fixture._get_repo_create_params(
92 fixture._get_repo_create_params(
93 repo_private=False,
93 repo_private=False,
94 repo_name=repo_name,
94 repo_name=repo_name,
95 repo_type=backend.alias,
95 repo_type=backend.alias,
96 repo_description=description,
96 repo_description=description,
97 csrf_token=csrf_token))
97 csrf_token=csrf_token))
98
98
99 self.assert_repository_is_created_correctly(
99 self.assert_repository_is_created_correctly(
100 repo_name, description, backend)
100 repo_name, description, backend)
101
101
102 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ‡Δ™'], ids=['', 'non-ascii'])
102 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ‡Δ™'], ids=['', 'non-ascii'])
103 def test_create_in_group(
103 def test_create_in_group(
104 self, autologin_user, backend, suffix, csrf_token):
104 self, autologin_user, backend, suffix, csrf_token):
105 # create GROUP
105 # create GROUP
106 group_name = 'sometest_%s' % backend.alias
106 group_name = 'sometest_%s' % backend.alias
107 gr = RepoGroupModel().create(group_name=group_name,
107 gr = RepoGroupModel().create(group_name=group_name,
108 group_description='test',
108 group_description='test',
109 owner=TEST_USER_ADMIN_LOGIN)
109 owner=TEST_USER_ADMIN_LOGIN)
110 Session().commit()
110 Session().commit()
111
111
112 repo_name = u'ingroup' + suffix
112 repo_name = u'ingroup' + suffix
113 repo_name_full = RepoGroup.url_sep().join(
113 repo_name_full = RepoGroup.url_sep().join(
114 [group_name, repo_name])
114 [group_name, repo_name])
115 description = u'description for newly created repo'
115 description = u'description for newly created repo'
116 self.app.post(
116 self.app.post(
117 url('repos'),
117 url('repos'),
118 fixture._get_repo_create_params(
118 fixture._get_repo_create_params(
119 repo_private=False,
119 repo_private=False,
120 repo_name=safe_str(repo_name),
120 repo_name=safe_str(repo_name),
121 repo_type=backend.alias,
121 repo_type=backend.alias,
122 repo_description=description,
122 repo_description=description,
123 repo_group=gr.group_id,
123 repo_group=gr.group_id,
124 csrf_token=csrf_token))
124 csrf_token=csrf_token))
125
125
126 # TODO: johbo: Cleanup work to fixture
126 # TODO: johbo: Cleanup work to fixture
127 try:
127 try:
128 self.assert_repository_is_created_correctly(
128 self.assert_repository_is_created_correctly(
129 repo_name_full, description, backend)
129 repo_name_full, description, backend)
130
130
131 new_repo = RepoModel().get_by_repo_name(repo_name_full)
131 new_repo = RepoModel().get_by_repo_name(repo_name_full)
132 inherited_perms = UserRepoToPerm.query().filter(
132 inherited_perms = UserRepoToPerm.query().filter(
133 UserRepoToPerm.repository_id == new_repo.repo_id).all()
133 UserRepoToPerm.repository_id == new_repo.repo_id).all()
134 assert len(inherited_perms) == 1
134 assert len(inherited_perms) == 1
135 finally:
135 finally:
136 RepoModel().delete(repo_name_full)
136 RepoModel().delete(repo_name_full)
137 RepoGroupModel().delete(group_name)
137 RepoGroupModel().delete(group_name)
138 Session().commit()
138 Session().commit()
139
139
140 def test_create_in_group_numeric(
140 def test_create_in_group_numeric(
141 self, autologin_user, backend, csrf_token):
141 self, autologin_user, backend, csrf_token):
142 # create GROUP
142 # create GROUP
143 group_name = 'sometest_%s' % backend.alias
143 group_name = 'sometest_%s' % backend.alias
144 gr = RepoGroupModel().create(group_name=group_name,
144 gr = RepoGroupModel().create(group_name=group_name,
145 group_description='test',
145 group_description='test',
146 owner=TEST_USER_ADMIN_LOGIN)
146 owner=TEST_USER_ADMIN_LOGIN)
147 Session().commit()
147 Session().commit()
148
148
149 repo_name = '12345'
149 repo_name = '12345'
150 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
150 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
151 description = 'description for newly created repo'
151 description = 'description for newly created repo'
152 self.app.post(
152 self.app.post(
153 url('repos'),
153 url('repos'),
154 fixture._get_repo_create_params(
154 fixture._get_repo_create_params(
155 repo_private=False,
155 repo_private=False,
156 repo_name=repo_name,
156 repo_name=repo_name,
157 repo_type=backend.alias,
157 repo_type=backend.alias,
158 repo_description=description,
158 repo_description=description,
159 repo_group=gr.group_id,
159 repo_group=gr.group_id,
160 csrf_token=csrf_token))
160 csrf_token=csrf_token))
161
161
162 # TODO: johbo: Cleanup work to fixture
162 # TODO: johbo: Cleanup work to fixture
163 try:
163 try:
164 self.assert_repository_is_created_correctly(
164 self.assert_repository_is_created_correctly(
165 repo_name_full, description, backend)
165 repo_name_full, description, backend)
166
166
167 new_repo = RepoModel().get_by_repo_name(repo_name_full)
167 new_repo = RepoModel().get_by_repo_name(repo_name_full)
168 inherited_perms = UserRepoToPerm.query()\
168 inherited_perms = UserRepoToPerm.query()\
169 .filter(UserRepoToPerm.repository_id == new_repo.repo_id).all()
169 .filter(UserRepoToPerm.repository_id == new_repo.repo_id).all()
170 assert len(inherited_perms) == 1
170 assert len(inherited_perms) == 1
171 finally:
171 finally:
172 RepoModel().delete(repo_name_full)
172 RepoModel().delete(repo_name_full)
173 RepoGroupModel().delete(group_name)
173 RepoGroupModel().delete(group_name)
174 Session().commit()
174 Session().commit()
175
175
176 def test_create_in_group_without_needed_permissions(self, backend):
176 def test_create_in_group_without_needed_permissions(self, backend):
177 session = login_user_session(
177 session = login_user_session(
178 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
178 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
179 csrf_token = auth.get_csrf_token(session)
179 csrf_token = auth.get_csrf_token(session)
180 # revoke
180 # revoke
181 user_model = UserModel()
181 user_model = UserModel()
182 # disable fork and create on default user
182 # disable fork and create on default user
183 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
183 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
184 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
184 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
185 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
185 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
186 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
186 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
187
187
188 # disable on regular user
188 # disable on regular user
189 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
189 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
190 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
190 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
191 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
191 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
192 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
192 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
193 Session().commit()
193 Session().commit()
194
194
195 # create GROUP
195 # create GROUP
196 group_name = 'reg_sometest_%s' % backend.alias
196 group_name = 'reg_sometest_%s' % backend.alias
197 gr = RepoGroupModel().create(group_name=group_name,
197 gr = RepoGroupModel().create(group_name=group_name,
198 group_description='test',
198 group_description='test',
199 owner=TEST_USER_ADMIN_LOGIN)
199 owner=TEST_USER_ADMIN_LOGIN)
200 Session().commit()
200 Session().commit()
201
201
202 group_name_allowed = 'reg_sometest_allowed_%s' % backend.alias
202 group_name_allowed = 'reg_sometest_allowed_%s' % backend.alias
203 gr_allowed = RepoGroupModel().create(
203 gr_allowed = RepoGroupModel().create(
204 group_name=group_name_allowed,
204 group_name=group_name_allowed,
205 group_description='test',
205 group_description='test',
206 owner=TEST_USER_REGULAR_LOGIN)
206 owner=TEST_USER_REGULAR_LOGIN)
207 Session().commit()
207 Session().commit()
208
208
209 repo_name = 'ingroup'
209 repo_name = 'ingroup'
210 description = 'description for newly created repo'
210 description = 'description for newly created repo'
211 response = self.app.post(
211 response = self.app.post(
212 url('repos'),
212 url('repos'),
213 fixture._get_repo_create_params(
213 fixture._get_repo_create_params(
214 repo_private=False,
214 repo_private=False,
215 repo_name=repo_name,
215 repo_name=repo_name,
216 repo_type=backend.alias,
216 repo_type=backend.alias,
217 repo_description=description,
217 repo_description=description,
218 repo_group=gr.group_id,
218 repo_group=gr.group_id,
219 csrf_token=csrf_token))
219 csrf_token=csrf_token))
220
220
221 response.mustcontain('Invalid value')
221 response.mustcontain('Invalid value')
222
222
223 # user is allowed to create in this group
223 # user is allowed to create in this group
224 repo_name = 'ingroup'
224 repo_name = 'ingroup'
225 repo_name_full = RepoGroup.url_sep().join(
225 repo_name_full = RepoGroup.url_sep().join(
226 [group_name_allowed, repo_name])
226 [group_name_allowed, repo_name])
227 description = 'description for newly created repo'
227 description = 'description for newly created repo'
228 response = self.app.post(
228 response = self.app.post(
229 url('repos'),
229 url('repos'),
230 fixture._get_repo_create_params(
230 fixture._get_repo_create_params(
231 repo_private=False,
231 repo_private=False,
232 repo_name=repo_name,
232 repo_name=repo_name,
233 repo_type=backend.alias,
233 repo_type=backend.alias,
234 repo_description=description,
234 repo_description=description,
235 repo_group=gr_allowed.group_id,
235 repo_group=gr_allowed.group_id,
236 csrf_token=csrf_token))
236 csrf_token=csrf_token))
237
237
238 # TODO: johbo: Cleanup in pytest fixture
238 # TODO: johbo: Cleanup in pytest fixture
239 try:
239 try:
240 self.assert_repository_is_created_correctly(
240 self.assert_repository_is_created_correctly(
241 repo_name_full, description, backend)
241 repo_name_full, description, backend)
242
242
243 new_repo = RepoModel().get_by_repo_name(repo_name_full)
243 new_repo = RepoModel().get_by_repo_name(repo_name_full)
244 inherited_perms = UserRepoToPerm.query().filter(
244 inherited_perms = UserRepoToPerm.query().filter(
245 UserRepoToPerm.repository_id == new_repo.repo_id).all()
245 UserRepoToPerm.repository_id == new_repo.repo_id).all()
246 assert len(inherited_perms) == 1
246 assert len(inherited_perms) == 1
247
247
248 assert repo_on_filesystem(repo_name_full)
248 assert repo_on_filesystem(repo_name_full)
249 finally:
249 finally:
250 RepoModel().delete(repo_name_full)
250 RepoModel().delete(repo_name_full)
251 RepoGroupModel().delete(group_name)
251 RepoGroupModel().delete(group_name)
252 RepoGroupModel().delete(group_name_allowed)
252 RepoGroupModel().delete(group_name_allowed)
253 Session().commit()
253 Session().commit()
254
254
255 def test_create_in_group_inherit_permissions(self, autologin_user, backend,
255 def test_create_in_group_inherit_permissions(self, autologin_user, backend,
256 csrf_token):
256 csrf_token):
257 # create GROUP
257 # create GROUP
258 group_name = 'sometest_%s' % backend.alias
258 group_name = 'sometest_%s' % backend.alias
259 gr = RepoGroupModel().create(group_name=group_name,
259 gr = RepoGroupModel().create(group_name=group_name,
260 group_description='test',
260 group_description='test',
261 owner=TEST_USER_ADMIN_LOGIN)
261 owner=TEST_USER_ADMIN_LOGIN)
262 perm = Permission.get_by_key('repository.write')
262 perm = Permission.get_by_key('repository.write')
263 RepoGroupModel().grant_user_permission(
263 RepoGroupModel().grant_user_permission(
264 gr, TEST_USER_REGULAR_LOGIN, perm)
264 gr, TEST_USER_REGULAR_LOGIN, perm)
265
265
266 # add repo permissions
266 # add repo permissions
267 Session().commit()
267 Session().commit()
268
268
269 repo_name = 'ingroup_inherited_%s' % backend.alias
269 repo_name = 'ingroup_inherited_%s' % backend.alias
270 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
270 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
271 description = 'description for newly created repo'
271 description = 'description for newly created repo'
272 self.app.post(
272 self.app.post(
273 url('repos'),
273 url('repos'),
274 fixture._get_repo_create_params(
274 fixture._get_repo_create_params(
275 repo_private=False,
275 repo_private=False,
276 repo_name=repo_name,
276 repo_name=repo_name,
277 repo_type=backend.alias,
277 repo_type=backend.alias,
278 repo_description=description,
278 repo_description=description,
279 repo_group=gr.group_id,
279 repo_group=gr.group_id,
280 repo_copy_permissions=True,
280 repo_copy_permissions=True,
281 csrf_token=csrf_token))
281 csrf_token=csrf_token))
282
282
283 # TODO: johbo: Cleanup to pytest fixture
283 # TODO: johbo: Cleanup to pytest fixture
284 try:
284 try:
285 self.assert_repository_is_created_correctly(
285 self.assert_repository_is_created_correctly(
286 repo_name_full, description, backend)
286 repo_name_full, description, backend)
287 except Exception:
287 except Exception:
288 RepoGroupModel().delete(group_name)
288 RepoGroupModel().delete(group_name)
289 Session().commit()
289 Session().commit()
290 raise
290 raise
291
291
292 # check if inherited permissions are applied
292 # check if inherited permissions are applied
293 new_repo = RepoModel().get_by_repo_name(repo_name_full)
293 new_repo = RepoModel().get_by_repo_name(repo_name_full)
294 inherited_perms = UserRepoToPerm.query().filter(
294 inherited_perms = UserRepoToPerm.query().filter(
295 UserRepoToPerm.repository_id == new_repo.repo_id).all()
295 UserRepoToPerm.repository_id == new_repo.repo_id).all()
296 assert len(inherited_perms) == 2
296 assert len(inherited_perms) == 2
297
297
298 assert TEST_USER_REGULAR_LOGIN in [
298 assert TEST_USER_REGULAR_LOGIN in [
299 x.user.username for x in inherited_perms]
299 x.user.username for x in inherited_perms]
300 assert 'repository.write' in [
300 assert 'repository.write' in [
301 x.permission.permission_name for x in inherited_perms]
301 x.permission.permission_name for x in inherited_perms]
302
302
303 RepoModel().delete(repo_name_full)
303 RepoModel().delete(repo_name_full)
304 RepoGroupModel().delete(group_name)
304 RepoGroupModel().delete(group_name)
305 Session().commit()
305 Session().commit()
306
306
307 @pytest.mark.xfail_backends(
307 @pytest.mark.xfail_backends(
308 "git", "hg", reason="Missing reposerver support")
308 "git", "hg", reason="Missing reposerver support")
309 def test_create_with_clone_uri(self, autologin_user, backend, reposerver,
309 def test_create_with_clone_uri(self, autologin_user, backend, reposerver,
310 csrf_token):
310 csrf_token):
311 source_repo = backend.create_repo(number_of_commits=2)
311 source_repo = backend.create_repo(number_of_commits=2)
312 source_repo_name = source_repo.repo_name
312 source_repo_name = source_repo.repo_name
313 reposerver.serve(source_repo.scm_instance())
313 reposerver.serve(source_repo.scm_instance())
314
314
315 repo_name = backend.new_repo_name()
315 repo_name = backend.new_repo_name()
316 response = self.app.post(
316 response = self.app.post(
317 url('repos'),
317 url('repos'),
318 fixture._get_repo_create_params(
318 fixture._get_repo_create_params(
319 repo_private=False,
319 repo_private=False,
320 repo_name=repo_name,
320 repo_name=repo_name,
321 repo_type=backend.alias,
321 repo_type=backend.alias,
322 repo_description='',
322 repo_description='',
323 clone_uri=reposerver.url,
323 clone_uri=reposerver.url,
324 csrf_token=csrf_token),
324 csrf_token=csrf_token),
325 status=302)
325 status=302)
326
326
327 # Should be redirected to the creating page
327 # Should be redirected to the creating page
328 response.mustcontain('repo_creating')
328 response.mustcontain('repo_creating')
329
329
330 # Expecting that both repositories have same history
330 # Expecting that both repositories have same history
331 source_repo = RepoModel().get_by_repo_name(source_repo_name)
331 source_repo = RepoModel().get_by_repo_name(source_repo_name)
332 source_vcs = source_repo.scm_instance()
332 source_vcs = source_repo.scm_instance()
333 repo = RepoModel().get_by_repo_name(repo_name)
333 repo = RepoModel().get_by_repo_name(repo_name)
334 repo_vcs = repo.scm_instance()
334 repo_vcs = repo.scm_instance()
335 assert source_vcs[0].message == repo_vcs[0].message
335 assert source_vcs[0].message == repo_vcs[0].message
336 assert source_vcs.count() == repo_vcs.count()
336 assert source_vcs.count() == repo_vcs.count()
337 assert source_vcs.commit_ids == repo_vcs.commit_ids
337 assert source_vcs.commit_ids == repo_vcs.commit_ids
338
338
339 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
339 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
340 def test_create_remote_repo_wrong_clone_uri(self, autologin_user, backend,
340 def test_create_remote_repo_wrong_clone_uri(self, autologin_user, backend,
341 csrf_token):
341 csrf_token):
342 repo_name = backend.new_repo_name()
342 repo_name = backend.new_repo_name()
343 description = 'description for newly created repo'
343 description = 'description for newly created repo'
344 response = self.app.post(
344 response = self.app.post(
345 url('repos'),
345 url('repos'),
346 fixture._get_repo_create_params(
346 fixture._get_repo_create_params(
347 repo_private=False,
347 repo_private=False,
348 repo_name=repo_name,
348 repo_name=repo_name,
349 repo_type=backend.alias,
349 repo_type=backend.alias,
350 repo_description=description,
350 repo_description=description,
351 clone_uri='http://repo.invalid/repo',
351 clone_uri='http://repo.invalid/repo',
352 csrf_token=csrf_token))
352 csrf_token=csrf_token))
353 response.mustcontain('invalid clone url')
353 response.mustcontain('invalid clone url')
354
354
355 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
355 @pytest.mark.xfail_backends("svn", reason="Depends on import support")
356 def test_create_remote_repo_wrong_clone_uri_hg_svn(
356 def test_create_remote_repo_wrong_clone_uri_hg_svn(
357 self, autologin_user, backend, csrf_token):
357 self, autologin_user, backend, csrf_token):
358 repo_name = backend.new_repo_name()
358 repo_name = backend.new_repo_name()
359 description = 'description for newly created repo'
359 description = 'description for newly created repo'
360 response = self.app.post(
360 response = self.app.post(
361 url('repos'),
361 url('repos'),
362 fixture._get_repo_create_params(
362 fixture._get_repo_create_params(
363 repo_private=False,
363 repo_private=False,
364 repo_name=repo_name,
364 repo_name=repo_name,
365 repo_type=backend.alias,
365 repo_type=backend.alias,
366 repo_description=description,
366 repo_description=description,
367 clone_uri='svn+http://svn.invalid/repo',
367 clone_uri='svn+http://svn.invalid/repo',
368 csrf_token=csrf_token))
368 csrf_token=csrf_token))
369 response.mustcontain('invalid clone url')
369 response.mustcontain('invalid clone url')
370
370
371 def test_create_with_git_suffix(
371 def test_create_with_git_suffix(
372 self, autologin_user, backend, csrf_token):
372 self, autologin_user, backend, csrf_token):
373 repo_name = backend.new_repo_name() + ".git"
373 repo_name = backend.new_repo_name() + ".git"
374 description = 'description for newly created repo'
374 description = 'description for newly created repo'
375 response = self.app.post(
375 response = self.app.post(
376 url('repos'),
376 url('repos'),
377 fixture._get_repo_create_params(
377 fixture._get_repo_create_params(
378 repo_private=False,
378 repo_private=False,
379 repo_name=repo_name,
379 repo_name=repo_name,
380 repo_type=backend.alias,
380 repo_type=backend.alias,
381 repo_description=description,
381 repo_description=description,
382 csrf_token=csrf_token))
382 csrf_token=csrf_token))
383 response.mustcontain('Repository name cannot end with .git')
383 response.mustcontain('Repository name cannot end with .git')
384
384
385 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ™Ε‚'], ids=['', 'non-ascii'])
385 @pytest.mark.parametrize("suffix", [u'', u'Δ…Δ™Ε‚'], ids=['', 'non-ascii'])
386 def test_delete(self, autologin_user, backend, suffix, csrf_token):
386 def test_delete(self, autologin_user, backend, suffix, csrf_token):
387 repo = backend.create_repo(name_suffix=suffix)
387 repo = backend.create_repo(name_suffix=suffix)
388 repo_name = repo.repo_name
388 repo_name = repo.repo_name
389
389
390 response = self.app.post(url('repo', repo_name=repo_name),
390 response = self.app.post(url('repo', repo_name=repo_name),
391 params={'_method': 'delete',
391 params={'_method': 'delete',
392 'csrf_token': csrf_token})
392 'csrf_token': csrf_token})
393 assert_session_flash(response, 'Deleted repository %s' % (repo_name))
393 assert_session_flash(response, 'Deleted repository %s' % (repo_name))
394 response.follow()
394 response.follow()
395
395
396 # check if repo was deleted from db
396 # check if repo was deleted from db
397 assert RepoModel().get_by_repo_name(repo_name) is None
397 assert RepoModel().get_by_repo_name(repo_name) is None
398 assert not repo_on_filesystem(repo_name)
398 assert not repo_on_filesystem(repo_name)
399
399
400 def test_show(self, autologin_user, backend):
400 def test_show(self, autologin_user, backend):
401 self.app.get(url('repo', repo_name=backend.repo_name))
401 self.app.get(url('repo', repo_name=backend.repo_name))
402
402
403 def test_edit(self, backend, autologin_user):
403 def test_edit(self, backend, autologin_user):
404 self.app.get(url('edit_repo', repo_name=backend.repo_name))
404 self.app.get(url('edit_repo', repo_name=backend.repo_name))
405
405
406 def test_edit_accessible_when_missing_requirements(
406 def test_edit_accessible_when_missing_requirements(
407 self, backend_hg, autologin_user):
407 self, backend_hg, autologin_user):
408 scm_patcher = mock.patch.object(
408 scm_patcher = mock.patch.object(
409 Repository, 'scm_instance', side_effect=RepositoryRequirementError)
409 Repository, 'scm_instance', side_effect=RepositoryRequirementError)
410 with scm_patcher:
410 with scm_patcher:
411 self.app.get(url('edit_repo', repo_name=backend_hg.repo_name))
411 self.app.get(url('edit_repo', repo_name=backend_hg.repo_name))
412
412
413 def test_set_private_flag_sets_default_to_none(
413 def test_set_private_flag_sets_default_to_none(
414 self, autologin_user, backend, csrf_token):
414 self, autologin_user, backend, csrf_token):
415 # initially repository perm should be read
415 # initially repository perm should be read
416 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
416 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
417 assert len(perm) == 1
417 assert len(perm) == 1
418 assert perm[0].permission.permission_name == 'repository.read'
418 assert perm[0].permission.permission_name == 'repository.read'
419 assert not backend.repo.private
419 assert not backend.repo.private
420
420
421 response = self.app.post(
421 response = self.app.post(
422 url('repo', repo_name=backend.repo_name),
422 url('repo', repo_name=backend.repo_name),
423 fixture._get_repo_create_params(
423 fixture._get_repo_create_params(
424 repo_private=1,
424 repo_private=1,
425 repo_name=backend.repo_name,
425 repo_name=backend.repo_name,
426 repo_type=backend.alias,
426 repo_type=backend.alias,
427 user=TEST_USER_ADMIN_LOGIN,
427 user=TEST_USER_ADMIN_LOGIN,
428 _method='put',
428 _method='put',
429 csrf_token=csrf_token))
429 csrf_token=csrf_token))
430 assert_session_flash(
430 assert_session_flash(
431 response,
431 response,
432 msg='Repository %s updated successfully' % (backend.repo_name))
432 msg='Repository %s updated successfully' % (backend.repo_name))
433 assert backend.repo.private
433 assert backend.repo.private
434
434
435 # now the repo default permission should be None
435 # now the repo default permission should be None
436 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
436 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
437 assert len(perm) == 1
437 assert len(perm) == 1
438 assert perm[0].permission.permission_name == 'repository.none'
438 assert perm[0].permission.permission_name == 'repository.none'
439
439
440 response = self.app.post(
440 response = self.app.post(
441 url('repo', repo_name=backend.repo_name),
441 url('repo', repo_name=backend.repo_name),
442 fixture._get_repo_create_params(
442 fixture._get_repo_create_params(
443 repo_private=False,
443 repo_private=False,
444 repo_name=backend.repo_name,
444 repo_name=backend.repo_name,
445 repo_type=backend.alias,
445 repo_type=backend.alias,
446 user=TEST_USER_ADMIN_LOGIN,
446 user=TEST_USER_ADMIN_LOGIN,
447 _method='put',
447 _method='put',
448 csrf_token=csrf_token))
448 csrf_token=csrf_token))
449 assert_session_flash(
449 assert_session_flash(
450 response,
450 response,
451 msg='Repository %s updated successfully' % (backend.repo_name))
451 msg='Repository %s updated successfully' % (backend.repo_name))
452 assert not backend.repo.private
452 assert not backend.repo.private
453
453
454 # we turn off private now the repo default permission should stay None
454 # we turn off private now the repo default permission should stay None
455 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
455 perm = _get_permission_for_user(user='default', repo=backend.repo_name)
456 assert len(perm) == 1
456 assert len(perm) == 1
457 assert perm[0].permission.permission_name == 'repository.none'
457 assert perm[0].permission.permission_name == 'repository.none'
458
458
459 # update this permission back
459 # update this permission back
460 perm[0].permission = Permission.get_by_key('repository.read')
460 perm[0].permission = Permission.get_by_key('repository.read')
461 Session().add(perm[0])
461 Session().add(perm[0])
462 Session().commit()
462 Session().commit()
463
463
464 def test_default_user_cannot_access_private_repo_in_a_group(
464 def test_default_user_cannot_access_private_repo_in_a_group(
465 self, autologin_user, user_util, backend, csrf_token):
465 self, autologin_user, user_util, backend, csrf_token):
466
466
467 group = user_util.create_repo_group()
467 group = user_util.create_repo_group()
468
468
469 repo = backend.create_repo(
469 repo = backend.create_repo(
470 repo_private=True, repo_group=group, repo_copy_permissions=True)
470 repo_private=True, repo_group=group, repo_copy_permissions=True)
471
471
472 permissions = _get_permission_for_user(
472 permissions = _get_permission_for_user(
473 user='default', repo=repo.repo_name)
473 user='default', repo=repo.repo_name)
474 assert len(permissions) == 1
474 assert len(permissions) == 1
475 assert permissions[0].permission.permission_name == 'repository.none'
475 assert permissions[0].permission.permission_name == 'repository.none'
476 assert permissions[0].repository.private is True
476 assert permissions[0].repository.private is True
477
477
478 def test_set_repo_fork_has_no_self_id(self, autologin_user, backend):
478 def test_set_repo_fork_has_no_self_id(self, autologin_user, backend):
479 repo = backend.repo
479 repo = backend.repo
480 response = self.app.get(
480 response = self.app.get(
481 url('edit_repo_advanced', repo_name=backend.repo_name))
481 url('edit_repo_advanced', repo_name=backend.repo_name))
482 opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id
482 opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id
483 response.mustcontain(no=[opt])
483 response.mustcontain(no=[opt])
484
484
485 def test_set_fork_of_target_repo(
485 def test_set_fork_of_target_repo(
486 self, autologin_user, backend, csrf_token):
486 self, autologin_user, backend, csrf_token):
487 target_repo = 'target_%s' % backend.alias
487 target_repo = 'target_%s' % backend.alias
488 fixture.create_repo(target_repo, repo_type=backend.alias)
488 fixture.create_repo(target_repo, repo_type=backend.alias)
489 repo2 = Repository.get_by_repo_name(target_repo)
489 repo2 = Repository.get_by_repo_name(target_repo)
490 response = self.app.post(
490 response = self.app.post(
491 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
491 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
492 params={'id_fork_of': repo2.repo_id, '_method': 'put',
492 params={'id_fork_of': repo2.repo_id, '_method': 'put',
493 'csrf_token': csrf_token})
493 'csrf_token': csrf_token})
494 repo = Repository.get_by_repo_name(backend.repo_name)
494 repo = Repository.get_by_repo_name(backend.repo_name)
495 repo2 = Repository.get_by_repo_name(target_repo)
495 repo2 = Repository.get_by_repo_name(target_repo)
496 assert_session_flash(
496 assert_session_flash(
497 response,
497 response,
498 'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name))
498 'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name))
499
499
500 assert repo.fork == repo2
500 assert repo.fork == repo2
501 response = response.follow()
501 response = response.follow()
502 # check if given repo is selected
502 # check if given repo is selected
503
503
504 opt = 'This repository is a fork of <a href="%s">%s</a>' % (
504 opt = 'This repository is a fork of <a href="%s">%s</a>' % (
505 url('summary_home', repo_name=repo2.repo_name), repo2.repo_name)
505 url('summary_home', repo_name=repo2.repo_name), repo2.repo_name)
506
506
507 response.mustcontain(opt)
507 response.mustcontain(opt)
508
508
509 fixture.destroy_repo(target_repo, forks='detach')
509 fixture.destroy_repo(target_repo, forks='detach')
510
510
511 @pytest.mark.backends("hg", "git")
511 @pytest.mark.backends("hg", "git")
512 def test_set_fork_of_other_type_repo(self, autologin_user, backend,
512 def test_set_fork_of_other_type_repo(self, autologin_user, backend,
513 csrf_token):
513 csrf_token):
514 TARGET_REPO_MAP = {
514 TARGET_REPO_MAP = {
515 'git': {
515 'git': {
516 'type': 'hg',
516 'type': 'hg',
517 'repo_name': HG_REPO},
517 'repo_name': HG_REPO},
518 'hg': {
518 'hg': {
519 'type': 'git',
519 'type': 'git',
520 'repo_name': GIT_REPO},
520 'repo_name': GIT_REPO},
521 }
521 }
522 target_repo = TARGET_REPO_MAP[backend.alias]
522 target_repo = TARGET_REPO_MAP[backend.alias]
523
523
524 repo2 = Repository.get_by_repo_name(target_repo['repo_name'])
524 repo2 = Repository.get_by_repo_name(target_repo['repo_name'])
525 response = self.app.post(
525 response = self.app.post(
526 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
526 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
527 params={'id_fork_of': repo2.repo_id, '_method': 'put',
527 params={'id_fork_of': repo2.repo_id, '_method': 'put',
528 'csrf_token': csrf_token})
528 'csrf_token': csrf_token})
529 assert_session_flash(
529 assert_session_flash(
530 response,
530 response,
531 'Cannot set repository as fork of repository with other type')
531 'Cannot set repository as fork of repository with other type')
532
532
533 def test_set_fork_of_none(self, autologin_user, backend, csrf_token):
533 def test_set_fork_of_none(self, autologin_user, backend, csrf_token):
534 # mark it as None
534 # mark it as None
535 response = self.app.post(
535 response = self.app.post(
536 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
536 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
537 params={'id_fork_of': None, '_method': 'put',
537 params={'id_fork_of': None, '_method': 'put',
538 'csrf_token': csrf_token})
538 'csrf_token': csrf_token})
539 assert_session_flash(
539 assert_session_flash(
540 response,
540 response,
541 'Marked repo %s as fork of %s'
541 'Marked repo %s as fork of %s'
542 % (backend.repo_name, "Nothing"))
542 % (backend.repo_name, "Nothing"))
543 assert backend.repo.fork is None
543 assert backend.repo.fork is None
544
544
545 def test_set_fork_of_same_repo(self, autologin_user, backend, csrf_token):
545 def test_set_fork_of_same_repo(self, autologin_user, backend, csrf_token):
546 repo = Repository.get_by_repo_name(backend.repo_name)
546 repo = Repository.get_by_repo_name(backend.repo_name)
547 response = self.app.post(
547 response = self.app.post(
548 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
548 url('edit_repo_advanced_fork', repo_name=backend.repo_name),
549 params={'id_fork_of': repo.repo_id, '_method': 'put',
549 params={'id_fork_of': repo.repo_id, '_method': 'put',
550 'csrf_token': csrf_token})
550 'csrf_token': csrf_token})
551 assert_session_flash(
551 assert_session_flash(
552 response, 'An error occurred during this operation')
552 response, 'An error occurred during this operation')
553
553
554 def test_create_on_top_level_without_permissions(self, backend):
554 def test_create_on_top_level_without_permissions(self, backend):
555 session = login_user_session(
555 session = login_user_session(
556 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
556 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
557 csrf_token = auth.get_csrf_token(session)
557 csrf_token = auth.get_csrf_token(session)
558
558
559 # revoke
559 # revoke
560 user_model = UserModel()
560 user_model = UserModel()
561 # disable fork and create on default user
561 # disable fork and create on default user
562 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
562 user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
563 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
563 user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
564 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
564 user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
565 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
565 user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
566
566
567 # disable on regular user
567 # disable on regular user
568 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
568 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
569 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
569 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
570 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
570 user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
571 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
571 user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
572 Session().commit()
572 Session().commit()
573
573
574 repo_name = backend.new_repo_name()
574 repo_name = backend.new_repo_name()
575 description = 'description for newly created repo'
575 description = 'description for newly created repo'
576 response = self.app.post(
576 response = self.app.post(
577 url('repos'),
577 url('repos'),
578 fixture._get_repo_create_params(
578 fixture._get_repo_create_params(
579 repo_private=False,
579 repo_private=False,
580 repo_name=repo_name,
580 repo_name=repo_name,
581 repo_type=backend.alias,
581 repo_type=backend.alias,
582 repo_description=description,
582 repo_description=description,
583 csrf_token=csrf_token))
583 csrf_token=csrf_token))
584
584
585 response.mustcontain(
585 response.mustcontain(
586 u"You do not have the permission to store repositories in "
586 u"You do not have the permission to store repositories in "
587 u"the root location.")
587 u"the root location.")
588
588
589 @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
589 @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
590 def test_create_repo_when_filesystem_op_fails(
590 def test_create_repo_when_filesystem_op_fails(
591 self, autologin_user, backend, csrf_token):
591 self, autologin_user, backend, csrf_token):
592 repo_name = backend.new_repo_name()
592 repo_name = backend.new_repo_name()
593 description = 'description for newly created repo'
593 description = 'description for newly created repo'
594
594
595 response = self.app.post(
595 response = self.app.post(
596 url('repos'),
596 url('repos'),
597 fixture._get_repo_create_params(
597 fixture._get_repo_create_params(
598 repo_private=False,
598 repo_private=False,
599 repo_name=repo_name,
599 repo_name=repo_name,
600 repo_type=backend.alias,
600 repo_type=backend.alias,
601 repo_description=description,
601 repo_description=description,
602 csrf_token=csrf_token))
602 csrf_token=csrf_token))
603
603
604 assert_session_flash(
604 assert_session_flash(
605 response, 'Error creating repository %s' % repo_name)
605 response, 'Error creating repository %s' % repo_name)
606 # repo must not be in db
606 # repo must not be in db
607 assert backend.repo is None
607 assert backend.repo is None
608 # repo must not be in filesystem !
608 # repo must not be in filesystem !
609 assert not repo_on_filesystem(repo_name)
609 assert not repo_on_filesystem(repo_name)
610
610
611 def assert_repository_is_created_correctly(
611 def assert_repository_is_created_correctly(
612 self, repo_name, description, backend):
612 self, repo_name, description, backend):
613 repo_name_utf8 = safe_str(repo_name)
613 repo_name_utf8 = safe_str(repo_name)
614
614
615 # run the check page that triggers the flash message
615 # run the check page that triggers the flash message
616 response = self.app.get(url('repo_check_home', repo_name=repo_name))
616 response = self.app.get(url('repo_check_home', repo_name=repo_name))
617 assert response.json == {u'result': True}
617 assert response.json == {u'result': True}
618
618
619 flash_msg = u'Created repository <a href="/{}">{}</a>'.format(
619 flash_msg = u'Created repository <a href="/{}">{}</a>'.format(
620 urllib.quote(repo_name_utf8), repo_name)
620 urllib.quote(repo_name_utf8), repo_name)
621 assert_session_flash(response, flash_msg)
621 assert_session_flash(response, flash_msg)
622
622
623 # test if the repo was created in the database
623 # test if the repo was created in the database
624 new_repo = RepoModel().get_by_repo_name(repo_name)
624 new_repo = RepoModel().get_by_repo_name(repo_name)
625
625
626 assert new_repo.repo_name == repo_name
626 assert new_repo.repo_name == repo_name
627 assert new_repo.description == description
627 assert new_repo.description == description
628
628
629 # test if the repository is visible in the list ?
629 # test if the repository is visible in the list ?
630 response = self.app.get(url('summary_home', repo_name=repo_name))
630 response = self.app.get(url('summary_home', repo_name=repo_name))
631 response.mustcontain(repo_name)
631 response.mustcontain(repo_name)
632 response.mustcontain(backend.alias)
632 response.mustcontain(backend.alias)
633
633
634 assert repo_on_filesystem(repo_name)
634 assert repo_on_filesystem(repo_name)
635
635
636
636
637 @pytest.mark.usefixtures("app")
637 @pytest.mark.usefixtures("app")
638 class TestVcsSettings(object):
638 class TestVcsSettings(object):
639 FORM_DATA = {
639 FORM_DATA = {
640 'inherit_global_settings': False,
640 'inherit_global_settings': False,
641 'hooks_changegroup_repo_size': False,
641 'hooks_changegroup_repo_size': False,
642 'hooks_changegroup_push_logger': False,
642 'hooks_changegroup_push_logger': False,
643 'hooks_outgoing_pull_logger': False,
643 'hooks_outgoing_pull_logger': False,
644 'extensions_largefiles': False,
644 'extensions_largefiles': False,
645 'phases_publish': 'False',
645 'phases_publish': 'False',
646 'rhodecode_pr_merge_enabled': False,
646 'rhodecode_pr_merge_enabled': False,
647 'rhodecode_use_outdated_comments': False,
647 'rhodecode_use_outdated_comments': False,
648 'new_svn_branch': '',
648 'new_svn_branch': '',
649 'new_svn_tag': ''
649 'new_svn_tag': ''
650 }
650 }
651
651
652 @pytest.mark.skip_backends('svn')
652 @pytest.mark.skip_backends('svn')
653 def test_global_settings_initial_values(self, autologin_user, backend):
653 def test_global_settings_initial_values(self, autologin_user, backend):
654 repo_name = backend.repo_name
654 repo_name = backend.repo_name
655 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
655 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
656
656
657 expected_settings = (
657 expected_settings = (
658 'rhodecode_use_outdated_comments', 'rhodecode_pr_merge_enabled',
658 'rhodecode_use_outdated_comments', 'rhodecode_pr_merge_enabled',
659 'hooks_changegroup_repo_size', 'hooks_changegroup_push_logger',
659 'hooks_changegroup_repo_size', 'hooks_changegroup_push_logger',
660 'hooks_outgoing_pull_logger'
660 'hooks_outgoing_pull_logger'
661 )
661 )
662 for setting in expected_settings:
662 for setting in expected_settings:
663 self.assert_repo_value_equals_global_value(response, setting)
663 self.assert_repo_value_equals_global_value(response, setting)
664
664
665 def test_show_settings_requires_repo_admin_permission(
665 def test_show_settings_requires_repo_admin_permission(
666 self, backend, user_util, settings_util):
666 self, backend, user_util, settings_util):
667 repo = backend.create_repo()
667 repo = backend.create_repo()
668 repo_name = repo.repo_name
668 repo_name = repo.repo_name
669 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
669 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
670 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
670 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
671 login_user_session(
671 login_user_session(
672 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
672 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
673 self.app.get(url('repo_vcs_settings', repo_name=repo_name), status=200)
673 self.app.get(url('repo_vcs_settings', repo_name=repo_name), status=200)
674
674
675 def test_inherit_global_settings_flag_is_true_by_default(
675 def test_inherit_global_settings_flag_is_true_by_default(
676 self, autologin_user, backend):
676 self, autologin_user, backend):
677 repo_name = backend.repo_name
677 repo_name = backend.repo_name
678 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
678 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
679
679
680 assert_response = AssertResponse(response)
680 assert_response = AssertResponse(response)
681 element = assert_response.get_element('#inherit_global_settings')
681 element = assert_response.get_element('#inherit_global_settings')
682 assert element.checked
682 assert element.checked
683
683
684 @pytest.mark.parametrize('checked_value', [True, False])
684 @pytest.mark.parametrize('checked_value', [True, False])
685 def test_inherit_global_settings_value(
685 def test_inherit_global_settings_value(
686 self, autologin_user, backend, checked_value, settings_util):
686 self, autologin_user, backend, checked_value, settings_util):
687 repo = backend.create_repo()
687 repo = backend.create_repo()
688 repo_name = repo.repo_name
688 repo_name = repo.repo_name
689 settings_util.create_repo_rhodecode_setting(
689 settings_util.create_repo_rhodecode_setting(
690 repo, 'inherit_vcs_settings', checked_value, 'bool')
690 repo, 'inherit_vcs_settings', checked_value, 'bool')
691 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
691 response = self.app.get(url('repo_vcs_settings', repo_name=repo_name))
692
692
693 assert_response = AssertResponse(response)
693 assert_response = AssertResponse(response)
694 element = assert_response.get_element('#inherit_global_settings')
694 element = assert_response.get_element('#inherit_global_settings')
695 assert element.checked == checked_value
695 assert element.checked == checked_value
696
696
697 @pytest.mark.skip_backends('svn')
697 @pytest.mark.skip_backends('svn')
698 def test_hooks_settings_are_created(
698 def test_hooks_settings_are_created(
699 self, autologin_user, backend, csrf_token):
699 self, autologin_user, backend, csrf_token):
700 repo_name = backend.repo_name
700 repo_name = backend.repo_name
701 data = self.FORM_DATA.copy()
701 data = self.FORM_DATA.copy()
702 data['csrf_token'] = csrf_token
702 data['csrf_token'] = csrf_token
703 self.app.post(
703 self.app.post(
704 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
704 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
705 settings = SettingsModel(repo=repo_name)
705 settings = SettingsModel(repo=repo_name)
706 try:
706 try:
707 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
707 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
708 ui = settings.get_ui_by_section_and_key(section, key)
708 ui = settings.get_ui_by_section_and_key(section, key)
709 assert ui.ui_active is False
709 assert ui.ui_active is False
710 finally:
710 finally:
711 self._cleanup_repo_settings(settings)
711 self._cleanup_repo_settings(settings)
712
712
713 def test_hooks_settings_are_not_created_for_svn(
713 def test_hooks_settings_are_not_created_for_svn(
714 self, autologin_user, backend_svn, csrf_token):
714 self, autologin_user, backend_svn, csrf_token):
715 repo_name = backend_svn.repo_name
715 repo_name = backend_svn.repo_name
716 data = self.FORM_DATA.copy()
716 data = self.FORM_DATA.copy()
717 data['csrf_token'] = csrf_token
717 data['csrf_token'] = csrf_token
718 self.app.post(
718 self.app.post(
719 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
719 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
720 settings = SettingsModel(repo=repo_name)
720 settings = SettingsModel(repo=repo_name)
721 try:
721 try:
722 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
722 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
723 ui = settings.get_ui_by_section_and_key(section, key)
723 ui = settings.get_ui_by_section_and_key(section, key)
724 assert ui is None
724 assert ui is None
725 finally:
725 finally:
726 self._cleanup_repo_settings(settings)
726 self._cleanup_repo_settings(settings)
727
727
728 @pytest.mark.skip_backends('svn')
728 @pytest.mark.skip_backends('svn')
729 def test_hooks_settings_are_updated(
729 def test_hooks_settings_are_updated(
730 self, autologin_user, backend, csrf_token):
730 self, autologin_user, backend, csrf_token):
731 repo_name = backend.repo_name
731 repo_name = backend.repo_name
732 settings = SettingsModel(repo=repo_name)
732 settings = SettingsModel(repo=repo_name)
733 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
733 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
734 settings.create_ui_section_value(section, '', key=key, active=True)
734 settings.create_ui_section_value(section, '', key=key, active=True)
735
735
736 data = self.FORM_DATA.copy()
736 data = self.FORM_DATA.copy()
737 data['csrf_token'] = csrf_token
737 data['csrf_token'] = csrf_token
738 self.app.post(
738 self.app.post(
739 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
739 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
740 try:
740 try:
741 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
741 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
742 ui = settings.get_ui_by_section_and_key(section, key)
742 ui = settings.get_ui_by_section_and_key(section, key)
743 assert ui.ui_active is False
743 assert ui.ui_active is False
744 finally:
744 finally:
745 self._cleanup_repo_settings(settings)
745 self._cleanup_repo_settings(settings)
746
746
747 def test_hooks_settings_are_not_updated_for_svn(
747 def test_hooks_settings_are_not_updated_for_svn(
748 self, autologin_user, backend_svn, csrf_token):
748 self, autologin_user, backend_svn, csrf_token):
749 repo_name = backend_svn.repo_name
749 repo_name = backend_svn.repo_name
750 settings = SettingsModel(repo=repo_name)
750 settings = SettingsModel(repo=repo_name)
751 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
751 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
752 settings.create_ui_section_value(section, '', key=key, active=True)
752 settings.create_ui_section_value(section, '', key=key, active=True)
753
753
754 data = self.FORM_DATA.copy()
754 data = self.FORM_DATA.copy()
755 data['csrf_token'] = csrf_token
755 data['csrf_token'] = csrf_token
756 self.app.post(
756 self.app.post(
757 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
757 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
758 try:
758 try:
759 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
759 for section, key in VcsSettingsModel.HOOKS_SETTINGS:
760 ui = settings.get_ui_by_section_and_key(section, key)
760 ui = settings.get_ui_by_section_and_key(section, key)
761 assert ui.ui_active is True
761 assert ui.ui_active is True
762 finally:
762 finally:
763 self._cleanup_repo_settings(settings)
763 self._cleanup_repo_settings(settings)
764
764
765 @pytest.mark.skip_backends('svn')
765 @pytest.mark.skip_backends('svn')
766 def test_pr_settings_are_created(
766 def test_pr_settings_are_created(
767 self, autologin_user, backend, csrf_token):
767 self, autologin_user, backend, csrf_token):
768 repo_name = backend.repo_name
768 repo_name = backend.repo_name
769 data = self.FORM_DATA.copy()
769 data = self.FORM_DATA.copy()
770 data['csrf_token'] = csrf_token
770 data['csrf_token'] = csrf_token
771 self.app.post(
771 self.app.post(
772 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
772 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
773 settings = SettingsModel(repo=repo_name)
773 settings = SettingsModel(repo=repo_name)
774 try:
774 try:
775 for name in VcsSettingsModel.GENERAL_SETTINGS:
775 for name in VcsSettingsModel.GENERAL_SETTINGS:
776 setting = settings.get_setting_by_name(name)
776 setting = settings.get_setting_by_name(name)
777 assert setting.app_settings_value is False
777 assert setting.app_settings_value is False
778 finally:
778 finally:
779 self._cleanup_repo_settings(settings)
779 self._cleanup_repo_settings(settings)
780
780
781 def test_pr_settings_are_not_created_for_svn(
781 def test_pr_settings_are_not_created_for_svn(
782 self, autologin_user, backend_svn, csrf_token):
782 self, autologin_user, backend_svn, csrf_token):
783 repo_name = backend_svn.repo_name
783 repo_name = backend_svn.repo_name
784 data = self.FORM_DATA.copy()
784 data = self.FORM_DATA.copy()
785 data['csrf_token'] = csrf_token
785 data['csrf_token'] = csrf_token
786 self.app.post(
786 self.app.post(
787 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
787 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
788 settings = SettingsModel(repo=repo_name)
788 settings = SettingsModel(repo=repo_name)
789 try:
789 try:
790 for name in VcsSettingsModel.GENERAL_SETTINGS:
790 for name in VcsSettingsModel.GENERAL_SETTINGS:
791 setting = settings.get_setting_by_name(name)
791 setting = settings.get_setting_by_name(name)
792 assert setting is None
792 assert setting is None
793 finally:
793 finally:
794 self._cleanup_repo_settings(settings)
794 self._cleanup_repo_settings(settings)
795
795
796 def test_pr_settings_creation_requires_repo_admin_permission(
796 def test_pr_settings_creation_requires_repo_admin_permission(
797 self, backend, user_util, settings_util, csrf_token):
797 self, backend, user_util, settings_util, csrf_token):
798 repo = backend.create_repo()
798 repo = backend.create_repo()
799 repo_name = repo.repo_name
799 repo_name = repo.repo_name
800 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
801
800
802 logout_user_session(self.app, csrf_token)
801 logout_user_session(self.app, csrf_token)
803 session = login_user_session(
802 session = login_user_session(
804 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
803 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
805 new_csrf_token = auth.get_csrf_token(session)
804 new_csrf_token = auth.get_csrf_token(session)
806
805
806 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
807 repo = Repository.get_by_repo_name(repo_name)
807 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
808 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
808 data = self.FORM_DATA.copy()
809 data = self.FORM_DATA.copy()
809 data['csrf_token'] = new_csrf_token
810 data['csrf_token'] = new_csrf_token
810 settings = SettingsModel(repo=repo_name)
811 settings = SettingsModel(repo=repo_name)
811
812
812 try:
813 try:
813 self.app.post(
814 self.app.post(
814 url('repo_vcs_settings', repo_name=repo_name), data,
815 url('repo_vcs_settings', repo_name=repo_name), data,
815 status=302)
816 status=302)
816 finally:
817 finally:
817 self._cleanup_repo_settings(settings)
818 self._cleanup_repo_settings(settings)
818
819
819 @pytest.mark.skip_backends('svn')
820 @pytest.mark.skip_backends('svn')
820 def test_pr_settings_are_updated(
821 def test_pr_settings_are_updated(
821 self, autologin_user, backend, csrf_token):
822 self, autologin_user, backend, csrf_token):
822 repo_name = backend.repo_name
823 repo_name = backend.repo_name
823 settings = SettingsModel(repo=repo_name)
824 settings = SettingsModel(repo=repo_name)
824 for name in VcsSettingsModel.GENERAL_SETTINGS:
825 for name in VcsSettingsModel.GENERAL_SETTINGS:
825 settings.create_or_update_setting(name, True, 'bool')
826 settings.create_or_update_setting(name, True, 'bool')
826
827
827 data = self.FORM_DATA.copy()
828 data = self.FORM_DATA.copy()
828 data['csrf_token'] = csrf_token
829 data['csrf_token'] = csrf_token
829 self.app.post(
830 self.app.post(
830 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
831 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
831 try:
832 try:
832 for name in VcsSettingsModel.GENERAL_SETTINGS:
833 for name in VcsSettingsModel.GENERAL_SETTINGS:
833 setting = settings.get_setting_by_name(name)
834 setting = settings.get_setting_by_name(name)
834 assert setting.app_settings_value is False
835 assert setting.app_settings_value is False
835 finally:
836 finally:
836 self._cleanup_repo_settings(settings)
837 self._cleanup_repo_settings(settings)
837
838
838 def test_pr_settings_are_not_updated_for_svn(
839 def test_pr_settings_are_not_updated_for_svn(
839 self, autologin_user, backend_svn, csrf_token):
840 self, autologin_user, backend_svn, csrf_token):
840 repo_name = backend_svn.repo_name
841 repo_name = backend_svn.repo_name
841 settings = SettingsModel(repo=repo_name)
842 settings = SettingsModel(repo=repo_name)
842 for name in VcsSettingsModel.GENERAL_SETTINGS:
843 for name in VcsSettingsModel.GENERAL_SETTINGS:
843 settings.create_or_update_setting(name, True, 'bool')
844 settings.create_or_update_setting(name, True, 'bool')
844
845
845 data = self.FORM_DATA.copy()
846 data = self.FORM_DATA.copy()
846 data['csrf_token'] = csrf_token
847 data['csrf_token'] = csrf_token
847 self.app.post(
848 self.app.post(
848 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
849 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
849 try:
850 try:
850 for name in VcsSettingsModel.GENERAL_SETTINGS:
851 for name in VcsSettingsModel.GENERAL_SETTINGS:
851 setting = settings.get_setting_by_name(name)
852 setting = settings.get_setting_by_name(name)
852 assert setting.app_settings_value is True
853 assert setting.app_settings_value is True
853 finally:
854 finally:
854 self._cleanup_repo_settings(settings)
855 self._cleanup_repo_settings(settings)
855
856
856 def test_svn_settings_are_created(
857 def test_svn_settings_are_created(
857 self, autologin_user, backend_svn, csrf_token, settings_util):
858 self, autologin_user, backend_svn, csrf_token, settings_util):
858 repo_name = backend_svn.repo_name
859 repo_name = backend_svn.repo_name
859 data = self.FORM_DATA.copy()
860 data = self.FORM_DATA.copy()
860 data['new_svn_tag'] = 'svn-tag'
861 data['new_svn_tag'] = 'svn-tag'
861 data['new_svn_branch'] = 'svn-branch'
862 data['new_svn_branch'] = 'svn-branch'
862 data['csrf_token'] = csrf_token
863 data['csrf_token'] = csrf_token
863
864
864 # Create few global settings to make sure that uniqueness validators
865 # Create few global settings to make sure that uniqueness validators
865 # are not triggered
866 # are not triggered
866 settings_util.create_rhodecode_ui(
867 settings_util.create_rhodecode_ui(
867 VcsSettingsModel.SVN_BRANCH_SECTION, 'svn-branch')
868 VcsSettingsModel.SVN_BRANCH_SECTION, 'svn-branch')
868 settings_util.create_rhodecode_ui(
869 settings_util.create_rhodecode_ui(
869 VcsSettingsModel.SVN_TAG_SECTION, 'svn-tag')
870 VcsSettingsModel.SVN_TAG_SECTION, 'svn-tag')
870
871
871 self.app.post(
872 self.app.post(
872 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
873 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
873 settings = SettingsModel(repo=repo_name)
874 settings = SettingsModel(repo=repo_name)
874 try:
875 try:
875 svn_branches = settings.get_ui_by_section(
876 svn_branches = settings.get_ui_by_section(
876 VcsSettingsModel.SVN_BRANCH_SECTION)
877 VcsSettingsModel.SVN_BRANCH_SECTION)
877 svn_branch_names = [b.ui_value for b in svn_branches]
878 svn_branch_names = [b.ui_value for b in svn_branches]
878 svn_tags = settings.get_ui_by_section(
879 svn_tags = settings.get_ui_by_section(
879 VcsSettingsModel.SVN_TAG_SECTION)
880 VcsSettingsModel.SVN_TAG_SECTION)
880 svn_tag_names = [b.ui_value for b in svn_tags]
881 svn_tag_names = [b.ui_value for b in svn_tags]
881 assert 'svn-branch' in svn_branch_names
882 assert 'svn-branch' in svn_branch_names
882 assert 'svn-tag' in svn_tag_names
883 assert 'svn-tag' in svn_tag_names
883 finally:
884 finally:
884 self._cleanup_repo_settings(settings)
885 self._cleanup_repo_settings(settings)
885
886
886 def test_svn_settings_are_unique(
887 def test_svn_settings_are_unique(
887 self, autologin_user, backend_svn, csrf_token, settings_util):
888 self, autologin_user, backend_svn, csrf_token, settings_util):
888 repo = backend_svn.repo
889 repo = backend_svn.repo
889 repo_name = repo.repo_name
890 repo_name = repo.repo_name
890 data = self.FORM_DATA.copy()
891 data = self.FORM_DATA.copy()
891 data['new_svn_tag'] = 'test_tag'
892 data['new_svn_tag'] = 'test_tag'
892 data['new_svn_branch'] = 'test_branch'
893 data['new_svn_branch'] = 'test_branch'
893 data['csrf_token'] = csrf_token
894 data['csrf_token'] = csrf_token
894 settings_util.create_repo_rhodecode_ui(
895 settings_util.create_repo_rhodecode_ui(
895 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch')
896 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch')
896 settings_util.create_repo_rhodecode_ui(
897 settings_util.create_repo_rhodecode_ui(
897 repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag')
898 repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag')
898
899
899 response = self.app.post(
900 response = self.app.post(
900 url('repo_vcs_settings', repo_name=repo_name), data, status=200)
901 url('repo_vcs_settings', repo_name=repo_name), data, status=200)
901 response.mustcontain('Pattern already exists')
902 response.mustcontain('Pattern already exists')
902
903
903 def test_svn_settings_with_empty_values_are_not_created(
904 def test_svn_settings_with_empty_values_are_not_created(
904 self, autologin_user, backend_svn, csrf_token):
905 self, autologin_user, backend_svn, csrf_token):
905 repo_name = backend_svn.repo_name
906 repo_name = backend_svn.repo_name
906 data = self.FORM_DATA.copy()
907 data = self.FORM_DATA.copy()
907 data['csrf_token'] = csrf_token
908 data['csrf_token'] = csrf_token
908 self.app.post(
909 self.app.post(
909 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
910 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
910 settings = SettingsModel(repo=repo_name)
911 settings = SettingsModel(repo=repo_name)
911 try:
912 try:
912 svn_branches = settings.get_ui_by_section(
913 svn_branches = settings.get_ui_by_section(
913 VcsSettingsModel.SVN_BRANCH_SECTION)
914 VcsSettingsModel.SVN_BRANCH_SECTION)
914 svn_tags = settings.get_ui_by_section(
915 svn_tags = settings.get_ui_by_section(
915 VcsSettingsModel.SVN_TAG_SECTION)
916 VcsSettingsModel.SVN_TAG_SECTION)
916 assert len(svn_branches) == 0
917 assert len(svn_branches) == 0
917 assert len(svn_tags) == 0
918 assert len(svn_tags) == 0
918 finally:
919 finally:
919 self._cleanup_repo_settings(settings)
920 self._cleanup_repo_settings(settings)
920
921
921 def test_svn_settings_are_shown_for_svn_repository(
922 def test_svn_settings_are_shown_for_svn_repository(
922 self, autologin_user, backend_svn, csrf_token):
923 self, autologin_user, backend_svn, csrf_token):
923 repo_name = backend_svn.repo_name
924 repo_name = backend_svn.repo_name
924 response = self.app.get(
925 response = self.app.get(
925 url('repo_vcs_settings', repo_name=repo_name), status=200)
926 url('repo_vcs_settings', repo_name=repo_name), status=200)
926 response.mustcontain('Subversion Settings')
927 response.mustcontain('Subversion Settings')
927
928
928 @pytest.mark.skip_backends('svn')
929 @pytest.mark.skip_backends('svn')
929 def test_svn_settings_are_not_created_for_not_svn_repository(
930 def test_svn_settings_are_not_created_for_not_svn_repository(
930 self, autologin_user, backend, csrf_token):
931 self, autologin_user, backend, csrf_token):
931 repo_name = backend.repo_name
932 repo_name = backend.repo_name
932 data = self.FORM_DATA.copy()
933 data = self.FORM_DATA.copy()
933 data['csrf_token'] = csrf_token
934 data['csrf_token'] = csrf_token
934 self.app.post(
935 self.app.post(
935 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
936 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
936 settings = SettingsModel(repo=repo_name)
937 settings = SettingsModel(repo=repo_name)
937 try:
938 try:
938 svn_branches = settings.get_ui_by_section(
939 svn_branches = settings.get_ui_by_section(
939 VcsSettingsModel.SVN_BRANCH_SECTION)
940 VcsSettingsModel.SVN_BRANCH_SECTION)
940 svn_tags = settings.get_ui_by_section(
941 svn_tags = settings.get_ui_by_section(
941 VcsSettingsModel.SVN_TAG_SECTION)
942 VcsSettingsModel.SVN_TAG_SECTION)
942 assert len(svn_branches) == 0
943 assert len(svn_branches) == 0
943 assert len(svn_tags) == 0
944 assert len(svn_tags) == 0
944 finally:
945 finally:
945 self._cleanup_repo_settings(settings)
946 self._cleanup_repo_settings(settings)
946
947
947 @pytest.mark.skip_backends('svn')
948 @pytest.mark.skip_backends('svn')
948 def test_svn_settings_are_shown_only_for_svn_repository(
949 def test_svn_settings_are_shown_only_for_svn_repository(
949 self, autologin_user, backend, csrf_token):
950 self, autologin_user, backend, csrf_token):
950 repo_name = backend.repo_name
951 repo_name = backend.repo_name
951 response = self.app.get(
952 response = self.app.get(
952 url('repo_vcs_settings', repo_name=repo_name), status=200)
953 url('repo_vcs_settings', repo_name=repo_name), status=200)
953 response.mustcontain(no='Subversion Settings')
954 response.mustcontain(no='Subversion Settings')
954
955
955 def test_hg_settings_are_created(
956 def test_hg_settings_are_created(
956 self, autologin_user, backend_hg, csrf_token):
957 self, autologin_user, backend_hg, csrf_token):
957 repo_name = backend_hg.repo_name
958 repo_name = backend_hg.repo_name
958 data = self.FORM_DATA.copy()
959 data = self.FORM_DATA.copy()
959 data['new_svn_tag'] = 'svn-tag'
960 data['new_svn_tag'] = 'svn-tag'
960 data['new_svn_branch'] = 'svn-branch'
961 data['new_svn_branch'] = 'svn-branch'
961 data['csrf_token'] = csrf_token
962 data['csrf_token'] = csrf_token
962 self.app.post(
963 self.app.post(
963 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
964 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
964 settings = SettingsModel(repo=repo_name)
965 settings = SettingsModel(repo=repo_name)
965 try:
966 try:
966 largefiles_ui = settings.get_ui_by_section_and_key(
967 largefiles_ui = settings.get_ui_by_section_and_key(
967 'extensions', 'largefiles')
968 'extensions', 'largefiles')
968 assert largefiles_ui.ui_active is False
969 assert largefiles_ui.ui_active is False
969 phases_ui = settings.get_ui_by_section_and_key(
970 phases_ui = settings.get_ui_by_section_and_key(
970 'phases', 'publish')
971 'phases', 'publish')
971 assert str2bool(phases_ui.ui_value) is False
972 assert str2bool(phases_ui.ui_value) is False
972 finally:
973 finally:
973 self._cleanup_repo_settings(settings)
974 self._cleanup_repo_settings(settings)
974
975
975 def test_hg_settings_are_updated(
976 def test_hg_settings_are_updated(
976 self, autologin_user, backend_hg, csrf_token):
977 self, autologin_user, backend_hg, csrf_token):
977 repo_name = backend_hg.repo_name
978 repo_name = backend_hg.repo_name
978 settings = SettingsModel(repo=repo_name)
979 settings = SettingsModel(repo=repo_name)
979 settings.create_ui_section_value(
980 settings.create_ui_section_value(
980 'extensions', '', key='largefiles', active=True)
981 'extensions', '', key='largefiles', active=True)
981 settings.create_ui_section_value(
982 settings.create_ui_section_value(
982 'phases', '1', key='publish', active=True)
983 'phases', '1', key='publish', active=True)
983
984
984 data = self.FORM_DATA.copy()
985 data = self.FORM_DATA.copy()
985 data['csrf_token'] = csrf_token
986 data['csrf_token'] = csrf_token
986 self.app.post(
987 self.app.post(
987 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
988 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
988 try:
989 try:
989 largefiles_ui = settings.get_ui_by_section_and_key(
990 largefiles_ui = settings.get_ui_by_section_and_key(
990 'extensions', 'largefiles')
991 'extensions', 'largefiles')
991 assert largefiles_ui.ui_active is False
992 assert largefiles_ui.ui_active is False
992 phases_ui = settings.get_ui_by_section_and_key(
993 phases_ui = settings.get_ui_by_section_and_key(
993 'phases', 'publish')
994 'phases', 'publish')
994 assert str2bool(phases_ui.ui_value) is False
995 assert str2bool(phases_ui.ui_value) is False
995 finally:
996 finally:
996 self._cleanup_repo_settings(settings)
997 self._cleanup_repo_settings(settings)
997
998
998 def test_hg_settings_are_shown_for_hg_repository(
999 def test_hg_settings_are_shown_for_hg_repository(
999 self, autologin_user, backend_hg, csrf_token):
1000 self, autologin_user, backend_hg, csrf_token):
1000 repo_name = backend_hg.repo_name
1001 repo_name = backend_hg.repo_name
1001 response = self.app.get(
1002 response = self.app.get(
1002 url('repo_vcs_settings', repo_name=repo_name), status=200)
1003 url('repo_vcs_settings', repo_name=repo_name), status=200)
1003 response.mustcontain('Mercurial Settings')
1004 response.mustcontain('Mercurial Settings')
1004
1005
1005 @pytest.mark.skip_backends('hg')
1006 @pytest.mark.skip_backends('hg')
1006 def test_hg_settings_are_created_only_for_hg_repository(
1007 def test_hg_settings_are_created_only_for_hg_repository(
1007 self, autologin_user, backend, csrf_token):
1008 self, autologin_user, backend, csrf_token):
1008 repo_name = backend.repo_name
1009 repo_name = backend.repo_name
1009 data = self.FORM_DATA.copy()
1010 data = self.FORM_DATA.copy()
1010 data['csrf_token'] = csrf_token
1011 data['csrf_token'] = csrf_token
1011 self.app.post(
1012 self.app.post(
1012 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1013 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1013 settings = SettingsModel(repo=repo_name)
1014 settings = SettingsModel(repo=repo_name)
1014 try:
1015 try:
1015 largefiles_ui = settings.get_ui_by_section_and_key(
1016 largefiles_ui = settings.get_ui_by_section_and_key(
1016 'extensions', 'largefiles')
1017 'extensions', 'largefiles')
1017 assert largefiles_ui is None
1018 assert largefiles_ui is None
1018 phases_ui = settings.get_ui_by_section_and_key(
1019 phases_ui = settings.get_ui_by_section_and_key(
1019 'phases', 'publish')
1020 'phases', 'publish')
1020 assert phases_ui is None
1021 assert phases_ui is None
1021 finally:
1022 finally:
1022 self._cleanup_repo_settings(settings)
1023 self._cleanup_repo_settings(settings)
1023
1024
1024 @pytest.mark.skip_backends('hg')
1025 @pytest.mark.skip_backends('hg')
1025 def test_hg_settings_are_shown_only_for_hg_repository(
1026 def test_hg_settings_are_shown_only_for_hg_repository(
1026 self, autologin_user, backend, csrf_token):
1027 self, autologin_user, backend, csrf_token):
1027 repo_name = backend.repo_name
1028 repo_name = backend.repo_name
1028 response = self.app.get(
1029 response = self.app.get(
1029 url('repo_vcs_settings', repo_name=repo_name), status=200)
1030 url('repo_vcs_settings', repo_name=repo_name), status=200)
1030 response.mustcontain(no='Mercurial Settings')
1031 response.mustcontain(no='Mercurial Settings')
1031
1032
1032 @pytest.mark.skip_backends('hg')
1033 @pytest.mark.skip_backends('hg')
1033 def test_hg_settings_are_updated_only_for_hg_repository(
1034 def test_hg_settings_are_updated_only_for_hg_repository(
1034 self, autologin_user, backend, csrf_token):
1035 self, autologin_user, backend, csrf_token):
1035 repo_name = backend.repo_name
1036 repo_name = backend.repo_name
1036 settings = SettingsModel(repo=repo_name)
1037 settings = SettingsModel(repo=repo_name)
1037 settings.create_ui_section_value(
1038 settings.create_ui_section_value(
1038 'extensions', '', key='largefiles', active=True)
1039 'extensions', '', key='largefiles', active=True)
1039 settings.create_ui_section_value(
1040 settings.create_ui_section_value(
1040 'phases', '1', key='publish', active=True)
1041 'phases', '1', key='publish', active=True)
1041
1042
1042 data = self.FORM_DATA.copy()
1043 data = self.FORM_DATA.copy()
1043 data['csrf_token'] = csrf_token
1044 data['csrf_token'] = csrf_token
1044 self.app.post(
1045 self.app.post(
1045 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1046 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1046 try:
1047 try:
1047 largefiles_ui = settings.get_ui_by_section_and_key(
1048 largefiles_ui = settings.get_ui_by_section_and_key(
1048 'extensions', 'largefiles')
1049 'extensions', 'largefiles')
1049 assert largefiles_ui.ui_active is True
1050 assert largefiles_ui.ui_active is True
1050 phases_ui = settings.get_ui_by_section_and_key(
1051 phases_ui = settings.get_ui_by_section_and_key(
1051 'phases', 'publish')
1052 'phases', 'publish')
1052 assert phases_ui.ui_value == '1'
1053 assert phases_ui.ui_value == '1'
1053 finally:
1054 finally:
1054 self._cleanup_repo_settings(settings)
1055 self._cleanup_repo_settings(settings)
1055
1056
1056 def test_per_repo_svn_settings_are_displayed(
1057 def test_per_repo_svn_settings_are_displayed(
1057 self, autologin_user, backend_svn, settings_util):
1058 self, autologin_user, backend_svn, settings_util):
1058 repo = backend_svn.create_repo()
1059 repo = backend_svn.create_repo()
1059 repo_name = repo.repo_name
1060 repo_name = repo.repo_name
1060 branches = [
1061 branches = [
1061 settings_util.create_repo_rhodecode_ui(
1062 settings_util.create_repo_rhodecode_ui(
1062 repo, VcsSettingsModel.SVN_BRANCH_SECTION,
1063 repo, VcsSettingsModel.SVN_BRANCH_SECTION,
1063 'branch_{}'.format(i))
1064 'branch_{}'.format(i))
1064 for i in range(10)]
1065 for i in range(10)]
1065 tags = [
1066 tags = [
1066 settings_util.create_repo_rhodecode_ui(
1067 settings_util.create_repo_rhodecode_ui(
1067 repo, VcsSettingsModel.SVN_TAG_SECTION, 'tag_{}'.format(i))
1068 repo, VcsSettingsModel.SVN_TAG_SECTION, 'tag_{}'.format(i))
1068 for i in range(10)]
1069 for i in range(10)]
1069
1070
1070 response = self.app.get(
1071 response = self.app.get(
1071 url('repo_vcs_settings', repo_name=repo_name), status=200)
1072 url('repo_vcs_settings', repo_name=repo_name), status=200)
1072 assert_response = AssertResponse(response)
1073 assert_response = AssertResponse(response)
1073 for branch in branches:
1074 for branch in branches:
1074 css_selector = '[name=branch_value_{}]'.format(branch.ui_id)
1075 css_selector = '[name=branch_value_{}]'.format(branch.ui_id)
1075 element = assert_response.get_element(css_selector)
1076 element = assert_response.get_element(css_selector)
1076 assert element.value == branch.ui_value
1077 assert element.value == branch.ui_value
1077 for tag in tags:
1078 for tag in tags:
1078 css_selector = '[name=tag_ui_value_new_{}]'.format(tag.ui_id)
1079 css_selector = '[name=tag_ui_value_new_{}]'.format(tag.ui_id)
1079 element = assert_response.get_element(css_selector)
1080 element = assert_response.get_element(css_selector)
1080 assert element.value == tag.ui_value
1081 assert element.value == tag.ui_value
1081
1082
1082 def test_per_repo_hg_and_pr_settings_are_not_displayed_for_svn(
1083 def test_per_repo_hg_and_pr_settings_are_not_displayed_for_svn(
1083 self, autologin_user, backend_svn, settings_util):
1084 self, autologin_user, backend_svn, settings_util):
1084 repo = backend_svn.create_repo()
1085 repo = backend_svn.create_repo()
1085 repo_name = repo.repo_name
1086 repo_name = repo.repo_name
1086 response = self.app.get(
1087 response = self.app.get(
1087 url('repo_vcs_settings', repo_name=repo_name), status=200)
1088 url('repo_vcs_settings', repo_name=repo_name), status=200)
1088 response.mustcontain(no='<label>Hooks:</label>')
1089 response.mustcontain(no='<label>Hooks:</label>')
1089 response.mustcontain(no='<label>Pull Request Settings:</label>')
1090 response.mustcontain(no='<label>Pull Request Settings:</label>')
1090
1091
1091 def test_inherit_global_settings_value_is_saved(
1092 def test_inherit_global_settings_value_is_saved(
1092 self, autologin_user, backend, csrf_token):
1093 self, autologin_user, backend, csrf_token):
1093 repo_name = backend.repo_name
1094 repo_name = backend.repo_name
1094 data = self.FORM_DATA.copy()
1095 data = self.FORM_DATA.copy()
1095 data['csrf_token'] = csrf_token
1096 data['csrf_token'] = csrf_token
1096 data['inherit_global_settings'] = True
1097 data['inherit_global_settings'] = True
1097 self.app.post(
1098 self.app.post(
1098 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1099 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1099
1100
1100 settings = SettingsModel(repo=repo_name)
1101 settings = SettingsModel(repo=repo_name)
1101 vcs_settings = VcsSettingsModel(repo=repo_name)
1102 vcs_settings = VcsSettingsModel(repo=repo_name)
1102 try:
1103 try:
1103 assert vcs_settings.inherit_global_settings is True
1104 assert vcs_settings.inherit_global_settings is True
1104 finally:
1105 finally:
1105 self._cleanup_repo_settings(settings)
1106 self._cleanup_repo_settings(settings)
1106
1107
1107 def test_repo_cache_is_invalidated_when_settings_are_updated(
1108 def test_repo_cache_is_invalidated_when_settings_are_updated(
1108 self, autologin_user, backend, csrf_token):
1109 self, autologin_user, backend, csrf_token):
1109 repo_name = backend.repo_name
1110 repo_name = backend.repo_name
1110 data = self.FORM_DATA.copy()
1111 data = self.FORM_DATA.copy()
1111 data['csrf_token'] = csrf_token
1112 data['csrf_token'] = csrf_token
1112 data['inherit_global_settings'] = True
1113 data['inherit_global_settings'] = True
1113 settings = SettingsModel(repo=repo_name)
1114 settings = SettingsModel(repo=repo_name)
1114
1115
1115 invalidation_patcher = mock.patch(
1116 invalidation_patcher = mock.patch(
1116 'rhodecode.controllers.admin.repos.ScmModel.mark_for_invalidation')
1117 'rhodecode.controllers.admin.repos.ScmModel.mark_for_invalidation')
1117 with invalidation_patcher as invalidation_mock:
1118 with invalidation_patcher as invalidation_mock:
1118 self.app.post(
1119 self.app.post(
1119 url('repo_vcs_settings', repo_name=repo_name), data,
1120 url('repo_vcs_settings', repo_name=repo_name), data,
1120 status=302)
1121 status=302)
1121 try:
1122 try:
1122 invalidation_mock.assert_called_once_with(repo_name, delete=True)
1123 invalidation_mock.assert_called_once_with(repo_name, delete=True)
1123 finally:
1124 finally:
1124 self._cleanup_repo_settings(settings)
1125 self._cleanup_repo_settings(settings)
1125
1126
1126 def test_other_settings_not_saved_inherit_global_settings_is_true(
1127 def test_other_settings_not_saved_inherit_global_settings_is_true(
1127 self, autologin_user, backend, csrf_token):
1128 self, autologin_user, backend, csrf_token):
1128 repo_name = backend.repo_name
1129 repo_name = backend.repo_name
1129 data = self.FORM_DATA.copy()
1130 data = self.FORM_DATA.copy()
1130 data['csrf_token'] = csrf_token
1131 data['csrf_token'] = csrf_token
1131 data['inherit_global_settings'] = True
1132 data['inherit_global_settings'] = True
1132 self.app.post(
1133 self.app.post(
1133 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1134 url('repo_vcs_settings', repo_name=repo_name), data, status=302)
1134
1135
1135 settings = SettingsModel(repo=repo_name)
1136 settings = SettingsModel(repo=repo_name)
1136 ui_settings = (
1137 ui_settings = (
1137 VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS)
1138 VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS)
1138
1139
1139 vcs_settings = []
1140 vcs_settings = []
1140 try:
1141 try:
1141 for section, key in ui_settings:
1142 for section, key in ui_settings:
1142 ui = settings.get_ui_by_section_and_key(section, key)
1143 ui = settings.get_ui_by_section_and_key(section, key)
1143 if ui:
1144 if ui:
1144 vcs_settings.append(ui)
1145 vcs_settings.append(ui)
1145 vcs_settings.extend(settings.get_ui_by_section(
1146 vcs_settings.extend(settings.get_ui_by_section(
1146 VcsSettingsModel.SVN_BRANCH_SECTION))
1147 VcsSettingsModel.SVN_BRANCH_SECTION))
1147 vcs_settings.extend(settings.get_ui_by_section(
1148 vcs_settings.extend(settings.get_ui_by_section(
1148 VcsSettingsModel.SVN_TAG_SECTION))
1149 VcsSettingsModel.SVN_TAG_SECTION))
1149 for name in VcsSettingsModel.GENERAL_SETTINGS:
1150 for name in VcsSettingsModel.GENERAL_SETTINGS:
1150 setting = settings.get_setting_by_name(name)
1151 setting = settings.get_setting_by_name(name)
1151 if setting:
1152 if setting:
1152 vcs_settings.append(setting)
1153 vcs_settings.append(setting)
1153 assert vcs_settings == []
1154 assert vcs_settings == []
1154 finally:
1155 finally:
1155 self._cleanup_repo_settings(settings)
1156 self._cleanup_repo_settings(settings)
1156
1157
1157 def test_delete_svn_branch_and_tag_patterns(
1158 def test_delete_svn_branch_and_tag_patterns(
1158 self, autologin_user, backend_svn, settings_util, csrf_token):
1159 self, autologin_user, backend_svn, settings_util, csrf_token):
1159 repo = backend_svn.create_repo()
1160 repo = backend_svn.create_repo()
1160 repo_name = repo.repo_name
1161 repo_name = repo.repo_name
1161 branch = settings_util.create_repo_rhodecode_ui(
1162 branch = settings_util.create_repo_rhodecode_ui(
1162 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch',
1163 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch',
1163 cleanup=False)
1164 cleanup=False)
1164 tag = settings_util.create_repo_rhodecode_ui(
1165 tag = settings_util.create_repo_rhodecode_ui(
1165 repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag', cleanup=False)
1166 repo, VcsSettingsModel.SVN_TAG_SECTION, 'test_tag', cleanup=False)
1166 data = {
1167 data = {
1167 '_method': 'delete',
1168 '_method': 'delete',
1168 'csrf_token': csrf_token
1169 'csrf_token': csrf_token
1169 }
1170 }
1170 for id_ in (branch.ui_id, tag.ui_id):
1171 for id_ in (branch.ui_id, tag.ui_id):
1171 data['delete_svn_pattern'] = id_,
1172 data['delete_svn_pattern'] = id_,
1172 self.app.post(
1173 self.app.post(
1173 url('repo_vcs_settings', repo_name=repo_name), data,
1174 url('repo_vcs_settings', repo_name=repo_name), data,
1174 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
1175 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
1175 settings = VcsSettingsModel(repo=repo_name)
1176 settings = VcsSettingsModel(repo=repo_name)
1176 assert settings.get_repo_svn_branch_patterns() == []
1177 assert settings.get_repo_svn_branch_patterns() == []
1177
1178
1178 def test_delete_svn_branch_requires_repo_admin_permission(
1179 def test_delete_svn_branch_requires_repo_admin_permission(
1179 self, backend_svn, user_util, settings_util, csrf_token):
1180 self, backend_svn, user_util, settings_util, csrf_token):
1180 repo = backend_svn.create_repo()
1181 repo = backend_svn.create_repo()
1181 repo_name = repo.repo_name
1182 repo_name = repo.repo_name
1182 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1183
1183 logout_user_session(self.app, csrf_token)
1184 logout_user_session(self.app, csrf_token)
1184 session = login_user_session(
1185 session = login_user_session(
1185 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
1186 self.app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
1186 csrf_token = auth.get_csrf_token(session)
1187 csrf_token = auth.get_csrf_token(session)
1188
1189 repo = Repository.get_by_repo_name(repo_name)
1190 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1187 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
1191 user_util.grant_user_permission_to_repo(repo, user, 'repository.admin')
1188 branch = settings_util.create_repo_rhodecode_ui(
1192 branch = settings_util.create_repo_rhodecode_ui(
1189 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch',
1193 repo, VcsSettingsModel.SVN_BRANCH_SECTION, 'test_branch',
1190 cleanup=False)
1194 cleanup=False)
1191 data = {
1195 data = {
1192 '_method': 'delete',
1196 '_method': 'delete',
1193 'csrf_token': csrf_token,
1197 'csrf_token': csrf_token,
1194 'delete_svn_pattern': branch.ui_id
1198 'delete_svn_pattern': branch.ui_id
1195 }
1199 }
1196 self.app.post(
1200 self.app.post(
1197 url('repo_vcs_settings', repo_name=repo_name), data,
1201 url('repo_vcs_settings', repo_name=repo_name), data,
1198 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
1202 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=200)
1199
1203
1200 def test_delete_svn_branch_raises_400_when_not_found(
1204 def test_delete_svn_branch_raises_400_when_not_found(
1201 self, autologin_user, backend_svn, settings_util, csrf_token):
1205 self, autologin_user, backend_svn, settings_util, csrf_token):
1202 repo_name = backend_svn.repo_name
1206 repo_name = backend_svn.repo_name
1203 data = {
1207 data = {
1204 '_method': 'delete',
1208 '_method': 'delete',
1205 'delete_svn_pattern': 123,
1209 'delete_svn_pattern': 123,
1206 'csrf_token': csrf_token
1210 'csrf_token': csrf_token
1207 }
1211 }
1208 self.app.post(
1212 self.app.post(
1209 url('repo_vcs_settings', repo_name=repo_name), data,
1213 url('repo_vcs_settings', repo_name=repo_name), data,
1210 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400)
1214 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400)
1211
1215
1212 def test_delete_svn_branch_raises_400_when_no_id_specified(
1216 def test_delete_svn_branch_raises_400_when_no_id_specified(
1213 self, autologin_user, backend_svn, settings_util, csrf_token):
1217 self, autologin_user, backend_svn, settings_util, csrf_token):
1214 repo_name = backend_svn.repo_name
1218 repo_name = backend_svn.repo_name
1215 data = {
1219 data = {
1216 '_method': 'delete',
1220 '_method': 'delete',
1217 'csrf_token': csrf_token
1221 'csrf_token': csrf_token
1218 }
1222 }
1219 self.app.post(
1223 self.app.post(
1220 url('repo_vcs_settings', repo_name=repo_name), data,
1224 url('repo_vcs_settings', repo_name=repo_name), data,
1221 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400)
1225 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }, status=400)
1222
1226
1223 def _cleanup_repo_settings(self, settings_model):
1227 def _cleanup_repo_settings(self, settings_model):
1224 cleanup = []
1228 cleanup = []
1225 ui_settings = (
1229 ui_settings = (
1226 VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS)
1230 VcsSettingsModel.HOOKS_SETTINGS + VcsSettingsModel.HG_SETTINGS)
1227
1231
1228 for section, key in ui_settings:
1232 for section, key in ui_settings:
1229 ui = settings_model.get_ui_by_section_and_key(section, key)
1233 ui = settings_model.get_ui_by_section_and_key(section, key)
1230 if ui:
1234 if ui:
1231 cleanup.append(ui)
1235 cleanup.append(ui)
1232
1236
1233 cleanup.extend(settings_model.get_ui_by_section(
1237 cleanup.extend(settings_model.get_ui_by_section(
1234 VcsSettingsModel.INHERIT_SETTINGS))
1238 VcsSettingsModel.INHERIT_SETTINGS))
1235 cleanup.extend(settings_model.get_ui_by_section(
1239 cleanup.extend(settings_model.get_ui_by_section(
1236 VcsSettingsModel.SVN_BRANCH_SECTION))
1240 VcsSettingsModel.SVN_BRANCH_SECTION))
1237 cleanup.extend(settings_model.get_ui_by_section(
1241 cleanup.extend(settings_model.get_ui_by_section(
1238 VcsSettingsModel.SVN_TAG_SECTION))
1242 VcsSettingsModel.SVN_TAG_SECTION))
1239
1243
1240 for name in VcsSettingsModel.GENERAL_SETTINGS:
1244 for name in VcsSettingsModel.GENERAL_SETTINGS:
1241 setting = settings_model.get_setting_by_name(name)
1245 setting = settings_model.get_setting_by_name(name)
1242 if setting:
1246 if setting:
1243 cleanup.append(setting)
1247 cleanup.append(setting)
1244
1248
1245 for object_ in cleanup:
1249 for object_ in cleanup:
1246 Session().delete(object_)
1250 Session().delete(object_)
1247 Session().commit()
1251 Session().commit()
1248
1252
1249 def assert_repo_value_equals_global_value(self, response, setting):
1253 def assert_repo_value_equals_global_value(self, response, setting):
1250 assert_response = AssertResponse(response)
1254 assert_response = AssertResponse(response)
1251 global_css_selector = '[name={}_inherited]'.format(setting)
1255 global_css_selector = '[name={}_inherited]'.format(setting)
1252 repo_css_selector = '[name={}]'.format(setting)
1256 repo_css_selector = '[name={}]'.format(setting)
1253 repo_element = assert_response.get_element(repo_css_selector)
1257 repo_element = assert_response.get_element(repo_css_selector)
1254 global_element = assert_response.get_element(global_css_selector)
1258 global_element = assert_response.get_element(global_css_selector)
1255 assert repo_element.value == global_element.value
1259 assert repo_element.value == global_element.value
1256
1260
1257
1261
1258 def _get_permission_for_user(user, repo):
1262 def _get_permission_for_user(user, repo):
1259 perm = UserRepoToPerm.query()\
1263 perm = UserRepoToPerm.query()\
1260 .filter(UserRepoToPerm.repository ==
1264 .filter(UserRepoToPerm.repository ==
1261 Repository.get_by_repo_name(repo))\
1265 Repository.get_by_repo_name(repo))\
1262 .filter(UserRepoToPerm.user == User.get_by_username(user))\
1266 .filter(UserRepoToPerm.user == User.get_by_username(user))\
1263 .all()
1267 .all()
1264 return perm
1268 return perm
General Comments 0
You need to be logged in to leave comments. Login now