##// END OF EJS Templates
tests: fixed mock test for utils2
marcink -
r274:fe04cba5 default
parent child Browse files
Show More
@@ -1,466 +1,467 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import json
21 import json
22 import multiprocessing
22 import multiprocessing
23 import os
23 import os
24
24
25 import mock
25 import mock
26 import py
26 import py
27 import pytest
27 import pytest
28
28
29 from rhodecode.lib import caching_query
29 from rhodecode.lib import caching_query
30 from rhodecode.lib import utils
30 from rhodecode.lib import utils
31 from rhodecode.lib.utils2 import md5
31 from rhodecode.lib.utils2 import md5
32 from rhodecode.model import settings
32 from rhodecode.model import db
33 from rhodecode.model import db
33 from rhodecode.model import meta
34 from rhodecode.model import meta
34 from rhodecode.model.repo import RepoModel
35 from rhodecode.model.repo import RepoModel
35 from rhodecode.model.repo_group import RepoGroupModel
36 from rhodecode.model.repo_group import RepoGroupModel
36 from rhodecode.model.scm import ScmModel
37 from rhodecode.model.scm import ScmModel
37 from rhodecode.model.settings import UiSetting, SettingsModel
38 from rhodecode.model.settings import UiSetting, SettingsModel
38 from rhodecode.tests.fixture import Fixture
39 from rhodecode.tests.fixture import Fixture
39 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
40 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
40
41
41
42
42 fixture = Fixture()
43 fixture = Fixture()
43
44
44
45
45 def extract_hooks(config):
46 def extract_hooks(config):
46 """Return a dictionary with the hook entries of the given config."""
47 """Return a dictionary with the hook entries of the given config."""
47 hooks = {}
48 hooks = {}
48 config_items = config.serialize()
49 config_items = config.serialize()
49 for section, name, value in config_items:
50 for section, name, value in config_items:
50 if section != 'hooks':
51 if section != 'hooks':
51 continue
52 continue
52 hooks[name] = value
53 hooks[name] = value
53
54
54 return hooks
55 return hooks
55
56
56
57
57 def disable_hooks(request, hooks):
58 def disable_hooks(request, hooks):
58 """Disables the given hooks from the UI settings."""
59 """Disables the given hooks from the UI settings."""
59 session = meta.Session()
60 session = meta.Session()
60
61
61 model = SettingsModel()
62 model = SettingsModel()
62 for hook_key in hooks:
63 for hook_key in hooks:
63 sett = model.get_ui_by_key(hook_key)
64 sett = model.get_ui_by_key(hook_key)
64 sett.ui_active = False
65 sett.ui_active = False
65 session.add(sett)
66 session.add(sett)
66
67
67 # Invalidate cache
68 # Invalidate cache
68 ui_settings = session.query(db.RhodeCodeUi).options(
69 ui_settings = session.query(db.RhodeCodeUi).options(
69 caching_query.FromCache('sql_cache_short', 'get_hg_ui_settings'))
70 caching_query.FromCache('sql_cache_short', 'get_hg_ui_settings'))
70 ui_settings.invalidate()
71 ui_settings.invalidate()
71
72
72 ui_settings = session.query(db.RhodeCodeUi).options(
73 ui_settings = session.query(db.RhodeCodeUi).options(
73 caching_query.FromCache(
74 caching_query.FromCache(
74 'sql_cache_short', 'get_hook_settings', 'get_hook_settings'))
75 'sql_cache_short', 'get_hook_settings', 'get_hook_settings'))
75 ui_settings.invalidate()
76 ui_settings.invalidate()
76
77
77 @request.addfinalizer
78 @request.addfinalizer
78 def rollback():
79 def rollback():
79 session.rollback()
80 session.rollback()
80
81
81
82
82 HOOK_PRE_PUSH = db.RhodeCodeUi.HOOK_PRE_PUSH
83 HOOK_PRE_PUSH = db.RhodeCodeUi.HOOK_PRE_PUSH
83 HOOK_PUSH = db.RhodeCodeUi.HOOK_PUSH
84 HOOK_PUSH = db.RhodeCodeUi.HOOK_PUSH
84 HOOK_PRE_PULL = db.RhodeCodeUi.HOOK_PRE_PULL
85 HOOK_PRE_PULL = db.RhodeCodeUi.HOOK_PRE_PULL
85 HOOK_PULL = db.RhodeCodeUi.HOOK_PULL
86 HOOK_PULL = db.RhodeCodeUi.HOOK_PULL
86 HOOK_REPO_SIZE = db.RhodeCodeUi.HOOK_REPO_SIZE
87 HOOK_REPO_SIZE = db.RhodeCodeUi.HOOK_REPO_SIZE
87
88
88 HG_HOOKS = frozenset(
89 HG_HOOKS = frozenset(
89 (HOOK_PRE_PULL, HOOK_PULL, HOOK_PRE_PUSH, HOOK_PUSH, HOOK_REPO_SIZE))
90 (HOOK_PRE_PULL, HOOK_PULL, HOOK_PRE_PUSH, HOOK_PUSH, HOOK_REPO_SIZE))
90
91
91
92
92 @pytest.mark.parametrize('disabled_hooks,expected_hooks', [
93 @pytest.mark.parametrize('disabled_hooks,expected_hooks', [
93 ([], HG_HOOKS),
94 ([], HG_HOOKS),
94 ([HOOK_PRE_PUSH, HOOK_REPO_SIZE], [HOOK_PRE_PULL, HOOK_PULL, HOOK_PUSH]),
95 ([HOOK_PRE_PUSH, HOOK_REPO_SIZE], [HOOK_PRE_PULL, HOOK_PULL, HOOK_PUSH]),
95 (HG_HOOKS, []),
96 (HG_HOOKS, []),
96 # When a pull/push hook is disabled, its pre-pull/push counterpart should
97 # When a pull/push hook is disabled, its pre-pull/push counterpart should
97 # be disabled too.
98 # be disabled too.
98 ([HOOK_PUSH], [HOOK_PRE_PULL, HOOK_PULL, HOOK_REPO_SIZE]),
99 ([HOOK_PUSH], [HOOK_PRE_PULL, HOOK_PULL, HOOK_REPO_SIZE]),
99 ([HOOK_PULL], [HOOK_PRE_PUSH, HOOK_PUSH, HOOK_REPO_SIZE]),
100 ([HOOK_PULL], [HOOK_PRE_PUSH, HOOK_PUSH, HOOK_REPO_SIZE]),
100 ])
101 ])
101 def test_make_db_config_hg_hooks(pylonsapp, request, disabled_hooks,
102 def test_make_db_config_hg_hooks(pylonsapp, request, disabled_hooks,
102 expected_hooks):
103 expected_hooks):
103 disable_hooks(request, disabled_hooks)
104 disable_hooks(request, disabled_hooks)
104
105
105 config = utils.make_db_config()
106 config = utils.make_db_config()
106 hooks = extract_hooks(config)
107 hooks = extract_hooks(config)
107
108
108 assert set(hooks.iterkeys()).intersection(HG_HOOKS) == set(expected_hooks)
109 assert set(hooks.iterkeys()).intersection(HG_HOOKS) == set(expected_hooks)
109
110
110
111
111 @pytest.mark.parametrize('disabled_hooks,expected_hooks', [
112 @pytest.mark.parametrize('disabled_hooks,expected_hooks', [
112 ([], ['pull', 'push']),
113 ([], ['pull', 'push']),
113 ([HOOK_PUSH], ['pull']),
114 ([HOOK_PUSH], ['pull']),
114 ([HOOK_PULL], ['push']),
115 ([HOOK_PULL], ['push']),
115 ([HOOK_PULL, HOOK_PUSH], []),
116 ([HOOK_PULL, HOOK_PUSH], []),
116 ])
117 ])
117 def test_get_enabled_hook_classes(disabled_hooks, expected_hooks):
118 def test_get_enabled_hook_classes(disabled_hooks, expected_hooks):
118 hook_keys = (HOOK_PUSH, HOOK_PULL)
119 hook_keys = (HOOK_PUSH, HOOK_PULL)
119 ui_settings = [
120 ui_settings = [
120 ('hooks', key, 'some value', key not in disabled_hooks)
121 ('hooks', key, 'some value', key not in disabled_hooks)
121 for key in hook_keys]
122 for key in hook_keys]
122
123
123 result = utils.get_enabled_hook_classes(ui_settings)
124 result = utils.get_enabled_hook_classes(ui_settings)
124 assert sorted(result) == expected_hooks
125 assert sorted(result) == expected_hooks
125
126
126
127
127 def test_get_filesystem_repos_finds_repos(tmpdir, pylonsapp):
128 def test_get_filesystem_repos_finds_repos(tmpdir, pylonsapp):
128 _stub_git_repo(tmpdir.ensure('repo', dir=True))
129 _stub_git_repo(tmpdir.ensure('repo', dir=True))
129 repos = list(utils.get_filesystem_repos(str(tmpdir)))
130 repos = list(utils.get_filesystem_repos(str(tmpdir)))
130 assert repos == [('repo', ('git', tmpdir.join('repo')))]
131 assert repos == [('repo', ('git', tmpdir.join('repo')))]
131
132
132
133
133 def test_get_filesystem_repos_skips_directories(tmpdir, pylonsapp):
134 def test_get_filesystem_repos_skips_directories(tmpdir, pylonsapp):
134 tmpdir.ensure('not-a-repo', dir=True)
135 tmpdir.ensure('not-a-repo', dir=True)
135 repos = list(utils.get_filesystem_repos(str(tmpdir)))
136 repos = list(utils.get_filesystem_repos(str(tmpdir)))
136 assert repos == []
137 assert repos == []
137
138
138
139
139 def test_get_filesystem_repos_skips_directories_with_repos(tmpdir, pylonsapp):
140 def test_get_filesystem_repos_skips_directories_with_repos(tmpdir, pylonsapp):
140 _stub_git_repo(tmpdir.ensure('subdir/repo', dir=True))
141 _stub_git_repo(tmpdir.ensure('subdir/repo', dir=True))
141 repos = list(utils.get_filesystem_repos(str(tmpdir)))
142 repos = list(utils.get_filesystem_repos(str(tmpdir)))
142 assert repos == []
143 assert repos == []
143
144
144
145
145 def test_get_filesystem_repos_finds_repos_in_subdirectories(tmpdir, pylonsapp):
146 def test_get_filesystem_repos_finds_repos_in_subdirectories(tmpdir, pylonsapp):
146 _stub_git_repo(tmpdir.ensure('subdir/repo', dir=True))
147 _stub_git_repo(tmpdir.ensure('subdir/repo', dir=True))
147 repos = list(utils.get_filesystem_repos(str(tmpdir), recursive=True))
148 repos = list(utils.get_filesystem_repos(str(tmpdir), recursive=True))
148 assert repos == [('subdir/repo', ('git', tmpdir.join('subdir', 'repo')))]
149 assert repos == [('subdir/repo', ('git', tmpdir.join('subdir', 'repo')))]
149
150
150
151
151 def test_get_filesystem_repos_skips_names_starting_with_dot(tmpdir):
152 def test_get_filesystem_repos_skips_names_starting_with_dot(tmpdir):
152 _stub_git_repo(tmpdir.ensure('.repo', dir=True))
153 _stub_git_repo(tmpdir.ensure('.repo', dir=True))
153 repos = list(utils.get_filesystem_repos(str(tmpdir)))
154 repos = list(utils.get_filesystem_repos(str(tmpdir)))
154 assert repos == []
155 assert repos == []
155
156
156
157
157 def test_get_filesystem_repos_skips_files(tmpdir):
158 def test_get_filesystem_repos_skips_files(tmpdir):
158 tmpdir.ensure('test-file')
159 tmpdir.ensure('test-file')
159 repos = list(utils.get_filesystem_repos(str(tmpdir)))
160 repos = list(utils.get_filesystem_repos(str(tmpdir)))
160 assert repos == []
161 assert repos == []
161
162
162
163
163 def test_get_filesystem_repos_skips_removed_repositories(tmpdir):
164 def test_get_filesystem_repos_skips_removed_repositories(tmpdir):
164 removed_repo_name = 'rm__00000000_000000_000000__.stub'
165 removed_repo_name = 'rm__00000000_000000_000000__.stub'
165 assert utils.REMOVED_REPO_PAT.match(removed_repo_name)
166 assert utils.REMOVED_REPO_PAT.match(removed_repo_name)
166 _stub_git_repo(tmpdir.ensure(removed_repo_name, dir=True))
167 _stub_git_repo(tmpdir.ensure(removed_repo_name, dir=True))
167 repos = list(utils.get_filesystem_repos(str(tmpdir)))
168 repos = list(utils.get_filesystem_repos(str(tmpdir)))
168 assert repos == []
169 assert repos == []
169
170
170
171
171 def _stub_git_repo(repo_path):
172 def _stub_git_repo(repo_path):
172 """
173 """
173 Make `repo_path` look like a Git repository.
174 Make `repo_path` look like a Git repository.
174 """
175 """
175 repo_path.ensure('.git', dir=True)
176 repo_path.ensure('.git', dir=True)
176
177
177
178
178 @pytest.mark.parametrize('str_class', [str, unicode], ids=['str', 'unicode'])
179 @pytest.mark.parametrize('str_class', [str, unicode], ids=['str', 'unicode'])
179 def test_get_dirpaths_returns_all_paths(tmpdir, str_class):
180 def test_get_dirpaths_returns_all_paths(tmpdir, str_class):
180 tmpdir.ensure('test-file')
181 tmpdir.ensure('test-file')
181 dirpaths = utils._get_dirpaths(str_class(tmpdir))
182 dirpaths = utils._get_dirpaths(str_class(tmpdir))
182 assert dirpaths == ['test-file']
183 assert dirpaths == ['test-file']
183
184
184
185
185 def test_get_dirpaths_returns_all_paths_bytes(
186 def test_get_dirpaths_returns_all_paths_bytes(
186 tmpdir, platform_encodes_filenames):
187 tmpdir, platform_encodes_filenames):
187 if platform_encodes_filenames:
188 if platform_encodes_filenames:
188 pytest.skip("This platform seems to encode filenames.")
189 pytest.skip("This platform seems to encode filenames.")
189 tmpdir.ensure('repo-a-umlaut-\xe4')
190 tmpdir.ensure('repo-a-umlaut-\xe4')
190 dirpaths = utils._get_dirpaths(str(tmpdir))
191 dirpaths = utils._get_dirpaths(str(tmpdir))
191 assert dirpaths == ['repo-a-umlaut-\xe4']
192 assert dirpaths == ['repo-a-umlaut-\xe4']
192
193
193
194
194 def test_get_dirpaths_skips_paths_it_cannot_decode(
195 def test_get_dirpaths_skips_paths_it_cannot_decode(
195 tmpdir, platform_encodes_filenames):
196 tmpdir, platform_encodes_filenames):
196 if platform_encodes_filenames:
197 if platform_encodes_filenames:
197 pytest.skip("This platform seems to encode filenames.")
198 pytest.skip("This platform seems to encode filenames.")
198 path_with_latin1 = 'repo-a-umlaut-\xe4'
199 path_with_latin1 = 'repo-a-umlaut-\xe4'
199 tmpdir.ensure(path_with_latin1)
200 tmpdir.ensure(path_with_latin1)
200 dirpaths = utils._get_dirpaths(unicode(tmpdir))
201 dirpaths = utils._get_dirpaths(unicode(tmpdir))
201 assert dirpaths == []
202 assert dirpaths == []
202
203
203
204
204 @pytest.fixture(scope='session')
205 @pytest.fixture(scope='session')
205 def platform_encodes_filenames():
206 def platform_encodes_filenames():
206 """
207 """
207 Boolean indicator if the current platform changes filename encodings.
208 Boolean indicator if the current platform changes filename encodings.
208 """
209 """
209 path_with_latin1 = 'repo-a-umlaut-\xe4'
210 path_with_latin1 = 'repo-a-umlaut-\xe4'
210 tmpdir = py.path.local.mkdtemp()
211 tmpdir = py.path.local.mkdtemp()
211 tmpdir.ensure(path_with_latin1)
212 tmpdir.ensure(path_with_latin1)
212 read_path = tmpdir.listdir()[0].basename
213 read_path = tmpdir.listdir()[0].basename
213 tmpdir.remove()
214 tmpdir.remove()
214 return path_with_latin1 != read_path
215 return path_with_latin1 != read_path
215
216
216
217
217 def test_action_logger_action_size(pylonsapp, test_repo):
218 def test_action_logger_action_size(pylonsapp, test_repo):
218 action = 'x' * 1200001
219 action = 'x' * 1200001
219 utils.action_logger(TEST_USER_ADMIN_LOGIN, action, test_repo, commit=True)
220 utils.action_logger(TEST_USER_ADMIN_LOGIN, action, test_repo, commit=True)
220
221
221
222
222 @pytest.fixture
223 @pytest.fixture
223 def repo_groups(request):
224 def repo_groups(request):
224 session = meta.Session()
225 session = meta.Session()
225 zombie_group = fixture.create_repo_group('zombie')
226 zombie_group = fixture.create_repo_group('zombie')
226 parent_group = fixture.create_repo_group('parent')
227 parent_group = fixture.create_repo_group('parent')
227 child_group = fixture.create_repo_group('parent/child')
228 child_group = fixture.create_repo_group('parent/child')
228 groups_in_db = session.query(db.RepoGroup).all()
229 groups_in_db = session.query(db.RepoGroup).all()
229 assert len(groups_in_db) == 3
230 assert len(groups_in_db) == 3
230 assert child_group.group_parent_id == parent_group.group_id
231 assert child_group.group_parent_id == parent_group.group_id
231
232
232 @request.addfinalizer
233 @request.addfinalizer
233 def cleanup():
234 def cleanup():
234 fixture.destroy_repo_group(zombie_group)
235 fixture.destroy_repo_group(zombie_group)
235 fixture.destroy_repo_group(child_group)
236 fixture.destroy_repo_group(child_group)
236 fixture.destroy_repo_group(parent_group)
237 fixture.destroy_repo_group(parent_group)
237
238
238 return (zombie_group, parent_group, child_group)
239 return (zombie_group, parent_group, child_group)
239
240
240
241
241 def test_repo2db_mapper_groups(repo_groups):
242 def test_repo2db_mapper_groups(repo_groups):
242 session = meta.Session()
243 session = meta.Session()
243 zombie_group, parent_group, child_group = repo_groups
244 zombie_group, parent_group, child_group = repo_groups
244 zombie_path = os.path.join(
245 zombie_path = os.path.join(
245 RepoGroupModel().repos_path, zombie_group.full_path)
246 RepoGroupModel().repos_path, zombie_group.full_path)
246 os.rmdir(zombie_path)
247 os.rmdir(zombie_path)
247
248
248 # Avoid removing test repos when calling repo2db_mapper
249 # Avoid removing test repos when calling repo2db_mapper
249 repo_list = {
250 repo_list = {
250 repo.repo_name: 'test' for repo in session.query(db.Repository).all()
251 repo.repo_name: 'test' for repo in session.query(db.Repository).all()
251 }
252 }
252 utils.repo2db_mapper(repo_list, remove_obsolete=True)
253 utils.repo2db_mapper(repo_list, remove_obsolete=True)
253
254
254 groups_in_db = session.query(db.RepoGroup).all()
255 groups_in_db = session.query(db.RepoGroup).all()
255 assert child_group in groups_in_db
256 assert child_group in groups_in_db
256 assert parent_group in groups_in_db
257 assert parent_group in groups_in_db
257 assert zombie_path not in groups_in_db
258 assert zombie_path not in groups_in_db
258
259
259
260
260 def test_repo2db_mapper_enables_largefiles(backend):
261 def test_repo2db_mapper_enables_largefiles(backend):
261 repo = backend.create_repo()
262 repo = backend.create_repo()
262 repo_list = {repo.repo_name: 'test'}
263 repo_list = {repo.repo_name: 'test'}
263 with mock.patch('rhodecode.model.db.Repository.scm_instance') as scm_mock:
264 with mock.patch('rhodecode.model.db.Repository.scm_instance') as scm_mock:
264 with mock.patch.multiple('rhodecode.model.scm.ScmModel',
265 with mock.patch.multiple('rhodecode.model.scm.ScmModel',
265 install_git_hook=mock.DEFAULT,
266 install_git_hook=mock.DEFAULT,
266 install_svn_hooks=mock.DEFAULT):
267 install_svn_hooks=mock.DEFAULT):
267 utils.repo2db_mapper(repo_list, remove_obsolete=False)
268 utils.repo2db_mapper(repo_list, remove_obsolete=False)
268 _, kwargs = scm_mock.call_args
269 _, kwargs = scm_mock.call_args
269 assert kwargs['config'].get('extensions', 'largefiles') == ''
270 assert kwargs['config'].get('extensions', 'largefiles') == ''
270
271
271
272
272 @pytest.mark.backends("git", "svn")
273 @pytest.mark.backends("git", "svn")
273 def test_repo2db_mapper_installs_hooks_for_repos_in_db(backend):
274 def test_repo2db_mapper_installs_hooks_for_repos_in_db(backend):
274 repo = backend.create_repo()
275 repo = backend.create_repo()
275 repo_list = {repo.repo_name: 'test'}
276 repo_list = {repo.repo_name: 'test'}
276 with mock.patch.object(ScmModel, 'install_hooks') as install_hooks_mock:
277 with mock.patch.object(ScmModel, 'install_hooks') as install_hooks_mock:
277 utils.repo2db_mapper(repo_list, remove_obsolete=False)
278 utils.repo2db_mapper(repo_list, remove_obsolete=False)
278 install_hooks_mock.assert_called_once_with(
279 install_hooks_mock.assert_called_once_with(
279 repo.scm_instance(), repo_type=backend.alias)
280 repo.scm_instance(), repo_type=backend.alias)
280
281
281
282
282 @pytest.mark.backends("git", "svn")
283 @pytest.mark.backends("git", "svn")
283 def test_repo2db_mapper_installs_hooks_for_newly_added_repos(backend):
284 def test_repo2db_mapper_installs_hooks_for_newly_added_repos(backend):
284 repo = backend.create_repo()
285 repo = backend.create_repo()
285 RepoModel().delete(repo, fs_remove=False)
286 RepoModel().delete(repo, fs_remove=False)
286 meta.Session().commit()
287 meta.Session().commit()
287 repo_list = {repo.repo_name: repo.scm_instance()}
288 repo_list = {repo.repo_name: repo.scm_instance()}
288 with mock.patch.object(ScmModel, 'install_hooks') as install_hooks_mock:
289 with mock.patch.object(ScmModel, 'install_hooks') as install_hooks_mock:
289 utils.repo2db_mapper(repo_list, remove_obsolete=False)
290 utils.repo2db_mapper(repo_list, remove_obsolete=False)
290 assert install_hooks_mock.call_count == 1
291 assert install_hooks_mock.call_count == 1
291 install_hooks_args, _ = install_hooks_mock.call_args
292 install_hooks_args, _ = install_hooks_mock.call_args
292 assert install_hooks_args[0].name == repo.repo_name
293 assert install_hooks_args[0].name == repo.repo_name
293
294
294
295
295 class TestPasswordChanged(object):
296 class TestPasswordChanged(object):
296 def setup(self):
297 def setup(self):
297 self.session = {
298 self.session = {
298 'rhodecode_user': {
299 'rhodecode_user': {
299 'password': '0cc175b9c0f1b6a831c399e269772661'
300 'password': '0cc175b9c0f1b6a831c399e269772661'
300 }
301 }
301 }
302 }
302 self.auth_user = mock.Mock()
303 self.auth_user = mock.Mock()
303 self.auth_user.userame = 'test'
304 self.auth_user.userame = 'test'
304 self.auth_user.password = 'abc123'
305 self.auth_user.password = 'abc123'
305
306
306 def test_returns_false_for_default_user(self):
307 def test_returns_false_for_default_user(self):
307 self.auth_user.username = db.User.DEFAULT_USER
308 self.auth_user.username = db.User.DEFAULT_USER
308 result = utils.password_changed(self.auth_user, self.session)
309 result = utils.password_changed(self.auth_user, self.session)
309 assert result is False
310 assert result is False
310
311
311 def test_returns_false_if_password_was_not_changed(self):
312 def test_returns_false_if_password_was_not_changed(self):
312 self.session['rhodecode_user']['password'] = md5(
313 self.session['rhodecode_user']['password'] = md5(
313 self.auth_user.password)
314 self.auth_user.password)
314 result = utils.password_changed(self.auth_user, self.session)
315 result = utils.password_changed(self.auth_user, self.session)
315 assert result is False
316 assert result is False
316
317
317 def test_returns_true_if_password_was_changed(self):
318 def test_returns_true_if_password_was_changed(self):
318 result = utils.password_changed(self.auth_user, self.session)
319 result = utils.password_changed(self.auth_user, self.session)
319 assert result is True
320 assert result is True
320
321
321 def test_returns_true_if_auth_user_password_is_empty(self):
322 def test_returns_true_if_auth_user_password_is_empty(self):
322 self.auth_user.password = None
323 self.auth_user.password = None
323 result = utils.password_changed(self.auth_user, self.session)
324 result = utils.password_changed(self.auth_user, self.session)
324 assert result is True
325 assert result is True
325
326
326 def test_returns_true_if_session_password_is_empty(self):
327 def test_returns_true_if_session_password_is_empty(self):
327 self.session['rhodecode_user'].pop('password')
328 self.session['rhodecode_user'].pop('password')
328 result = utils.password_changed(self.auth_user, self.session)
329 result = utils.password_changed(self.auth_user, self.session)
329 assert result is True
330 assert result is True
330
331
331
332
332 class TestReadOpensourceLicenses(object):
333 class TestReadOpensourceLicenses(object):
333 def test_success(self):
334 def test_success(self):
334 utils._license_cache = None
335 utils._license_cache = None
335 json_data = '''
336 json_data = '''
336 {
337 {
337 "python2.7-pytest-2.7.1": {"UNKNOWN": null},
338 "python2.7-pytest-2.7.1": {"UNKNOWN": null},
338 "python2.7-Markdown-2.6.2": {
339 "python2.7-Markdown-2.6.2": {
339 "BSD-3-Clause": "http://spdx.org/licenses/BSD-3-Clause"
340 "BSD-3-Clause": "http://spdx.org/licenses/BSD-3-Clause"
340 }
341 }
341 }
342 }
342 '''
343 '''
343 resource_string_patch = mock.patch.object(
344 resource_string_patch = mock.patch.object(
344 utils.pkg_resources, 'resource_string', return_value=json_data)
345 utils.pkg_resources, 'resource_string', return_value=json_data)
345 with resource_string_patch:
346 with resource_string_patch:
346 result = utils.read_opensource_licenses()
347 result = utils.read_opensource_licenses()
347 assert result == json.loads(json_data)
348 assert result == json.loads(json_data)
348
349
349 def test_caching(self):
350 def test_caching(self):
350 utils._license_cache = {
351 utils._license_cache = {
351 "python2.7-pytest-2.7.1": {
352 "python2.7-pytest-2.7.1": {
352 "UNKNOWN": None
353 "UNKNOWN": None
353 },
354 },
354 "python2.7-Markdown-2.6.2": {
355 "python2.7-Markdown-2.6.2": {
355 "BSD-3-Clause": "http://spdx.org/licenses/BSD-3-Clause"
356 "BSD-3-Clause": "http://spdx.org/licenses/BSD-3-Clause"
356 }
357 }
357 }
358 }
358 resource_patch = mock.patch.object(
359 resource_patch = mock.patch.object(
359 utils.pkg_resources, 'resource_string', side_effect=Exception)
360 utils.pkg_resources, 'resource_string', side_effect=Exception)
360 json_patch = mock.patch.object(
361 json_patch = mock.patch.object(
361 utils.json, 'loads', side_effect=Exception)
362 utils.json, 'loads', side_effect=Exception)
362
363
363 with resource_patch as resource_mock, json_patch as json_mock:
364 with resource_patch as resource_mock, json_patch as json_mock:
364 result = utils.read_opensource_licenses()
365 result = utils.read_opensource_licenses()
365
366
366 assert resource_mock.call_count == 0
367 assert resource_mock.call_count == 0
367 assert json_mock.call_count == 0
368 assert json_mock.call_count == 0
368 assert result == utils._license_cache
369 assert result == utils._license_cache
369
370
370 def test_licenses_file_contains_no_unknown_licenses(self):
371 def test_licenses_file_contains_no_unknown_licenses(self):
371 utils._license_cache = None
372 utils._license_cache = None
372 result = utils.read_opensource_licenses()
373 result = utils.read_opensource_licenses()
373 license_names = []
374 license_names = []
374 for licenses in result.values():
375 for licenses in result.values():
375 license_names.extend(licenses.keys())
376 license_names.extend(licenses.keys())
376 assert 'UNKNOWN' not in license_names
377 assert 'UNKNOWN' not in license_names
377
378
378
379
379 class TestMakeDbConfig(object):
380 class TestMakeDbConfig(object):
380 def test_data_from_config_data_from_db_returned(self):
381 def test_data_from_config_data_from_db_returned(self):
381 test_data = [
382 test_data = [
382 ('section1', 'option1', 'value1'),
383 ('section1', 'option1', 'value1'),
383 ('section2', 'option2', 'value2'),
384 ('section2', 'option2', 'value2'),
384 ('section3', 'option3', 'value3'),
385 ('section3', 'option3', 'value3'),
385 ]
386 ]
386 with mock.patch.object(utils, 'config_data_from_db') as config_mock:
387 with mock.patch.object(utils, 'config_data_from_db') as config_mock:
387 config_mock.return_value = test_data
388 config_mock.return_value = test_data
388 kwargs = {'clear_session': False, 'repo': 'test_repo'}
389 kwargs = {'clear_session': False, 'repo': 'test_repo'}
389 result = utils.make_db_config(**kwargs)
390 result = utils.make_db_config(**kwargs)
390 config_mock.assert_called_once_with(**kwargs)
391 config_mock.assert_called_once_with(**kwargs)
391 for section, option, expected_value in test_data:
392 for section, option, expected_value in test_data:
392 value = result.get(section, option)
393 value = result.get(section, option)
393 assert value == expected_value
394 assert value == expected_value
394
395
395
396
396 class TestConfigDataFromDb(object):
397 class TestConfigDataFromDb(object):
397 def test_config_data_from_db_returns_active_settings(self):
398 def test_config_data_from_db_returns_active_settings(self):
398 test_data = [
399 test_data = [
399 UiSetting('section1', 'option1', 'value1', True),
400 UiSetting('section1', 'option1', 'value1', True),
400 UiSetting('section2', 'option2', 'value2', True),
401 UiSetting('section2', 'option2', 'value2', True),
401 UiSetting('section3', 'option3', 'value3', False),
402 UiSetting('section3', 'option3', 'value3', False),
402 ]
403 ]
403 repo_name = 'test_repo'
404 repo_name = 'test_repo'
404
405
405 model_patch = mock.patch.object(utils, 'VcsSettingsModel')
406 model_patch = mock.patch.object(settings, 'VcsSettingsModel')
406 hooks_patch = mock.patch.object(
407 hooks_patch = mock.patch.object(
407 utils, 'get_enabled_hook_classes',
408 utils, 'get_enabled_hook_classes',
408 return_value=['pull', 'push', 'repo_size'])
409 return_value=['pull', 'push', 'repo_size'])
409 with model_patch as model_mock, hooks_patch:
410 with model_patch as model_mock, hooks_patch:
410 instance_mock = mock.Mock()
411 instance_mock = mock.Mock()
411 model_mock.return_value = instance_mock
412 model_mock.return_value = instance_mock
412 instance_mock.get_ui_settings.return_value = test_data
413 instance_mock.get_ui_settings.return_value = test_data
413 result = utils.config_data_from_db(
414 result = utils.config_data_from_db(
414 clear_session=False, repo=repo_name)
415 clear_session=False, repo=repo_name)
415
416
416 self._assert_repo_name_passed(model_mock, repo_name)
417 self._assert_repo_name_passed(model_mock, repo_name)
417
418
418 expected_result = [
419 expected_result = [
419 ('section1', 'option1', 'value1'),
420 ('section1', 'option1', 'value1'),
420 ('section2', 'option2', 'value2'),
421 ('section2', 'option2', 'value2'),
421 ]
422 ]
422 assert result == expected_result
423 assert result == expected_result
423
424
424 def _assert_repo_name_passed(self, model_mock, repo_name):
425 def _assert_repo_name_passed(self, model_mock, repo_name):
425 assert model_mock.call_count == 1
426 assert model_mock.call_count == 1
426 call_args, call_kwargs = model_mock.call_args
427 call_args, call_kwargs = model_mock.call_args
427 assert call_kwargs['repo'] == repo_name
428 assert call_kwargs['repo'] == repo_name
428
429
429
430
430 class TestIsDirWritable(object):
431 class TestIsDirWritable(object):
431 def test_returns_false_when_not_writable(self):
432 def test_returns_false_when_not_writable(self):
432 with mock.patch('__builtin__.open', side_effect=OSError):
433 with mock.patch('__builtin__.open', side_effect=OSError):
433 assert not utils._is_dir_writable('/stub-path')
434 assert not utils._is_dir_writable('/stub-path')
434
435
435 def test_returns_true_when_writable(self, tmpdir):
436 def test_returns_true_when_writable(self, tmpdir):
436 assert utils._is_dir_writable(str(tmpdir))
437 assert utils._is_dir_writable(str(tmpdir))
437
438
438 def test_is_safe_against_race_conditions(self, tmpdir):
439 def test_is_safe_against_race_conditions(self, tmpdir):
439 workers = multiprocessing.Pool()
440 workers = multiprocessing.Pool()
440 directories = [str(tmpdir)] * 10
441 directories = [str(tmpdir)] * 10
441 workers.map(utils._is_dir_writable, directories)
442 workers.map(utils._is_dir_writable, directories)
442
443
443
444
444 class TestGetEnabledHooks(object):
445 class TestGetEnabledHooks(object):
445 def test_only_active_hooks_are_enabled(self):
446 def test_only_active_hooks_are_enabled(self):
446 ui_settings = [
447 ui_settings = [
447 UiSetting('hooks', db.RhodeCodeUi.HOOK_PUSH, 'value', True),
448 UiSetting('hooks', db.RhodeCodeUi.HOOK_PUSH, 'value', True),
448 UiSetting('hooks', db.RhodeCodeUi.HOOK_REPO_SIZE, 'value', True),
449 UiSetting('hooks', db.RhodeCodeUi.HOOK_REPO_SIZE, 'value', True),
449 UiSetting('hooks', db.RhodeCodeUi.HOOK_PULL, 'value', False)
450 UiSetting('hooks', db.RhodeCodeUi.HOOK_PULL, 'value', False)
450 ]
451 ]
451 result = utils.get_enabled_hook_classes(ui_settings)
452 result = utils.get_enabled_hook_classes(ui_settings)
452 assert result == ['push', 'repo_size']
453 assert result == ['push', 'repo_size']
453
454
454 def test_all_hooks_are_enabled(self):
455 def test_all_hooks_are_enabled(self):
455 ui_settings = [
456 ui_settings = [
456 UiSetting('hooks', db.RhodeCodeUi.HOOK_PUSH, 'value', True),
457 UiSetting('hooks', db.RhodeCodeUi.HOOK_PUSH, 'value', True),
457 UiSetting('hooks', db.RhodeCodeUi.HOOK_REPO_SIZE, 'value', True),
458 UiSetting('hooks', db.RhodeCodeUi.HOOK_REPO_SIZE, 'value', True),
458 UiSetting('hooks', db.RhodeCodeUi.HOOK_PULL, 'value', True)
459 UiSetting('hooks', db.RhodeCodeUi.HOOK_PULL, 'value', True)
459 ]
460 ]
460 result = utils.get_enabled_hook_classes(ui_settings)
461 result = utils.get_enabled_hook_classes(ui_settings)
461 assert result == ['push', 'repo_size', 'pull']
462 assert result == ['push', 'repo_size', 'pull']
462
463
463 def test_no_enabled_hooks_when_no_hook_settings_are_found(self):
464 def test_no_enabled_hooks_when_no_hook_settings_are_found(self):
464 ui_settings = []
465 ui_settings = []
465 result = utils.get_enabled_hook_classes(ui_settings)
466 result = utils.get_enabled_hook_classes(ui_settings)
466 assert result == []
467 assert result == []
General Comments 0
You need to be logged in to leave comments. Login now