##// END OF EJS Templates
tests: fixed mock test for utils2
marcink -
r274:fe04cba5 default
parent child Browse files
Show More
@@ -1,466 +1,467 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import json
22 22 import multiprocessing
23 23 import os
24 24
25 25 import mock
26 26 import py
27 27 import pytest
28 28
29 29 from rhodecode.lib import caching_query
30 30 from rhodecode.lib import utils
31 31 from rhodecode.lib.utils2 import md5
32 from rhodecode.model import settings
32 33 from rhodecode.model import db
33 34 from rhodecode.model import meta
34 35 from rhodecode.model.repo import RepoModel
35 36 from rhodecode.model.repo_group import RepoGroupModel
36 37 from rhodecode.model.scm import ScmModel
37 38 from rhodecode.model.settings import UiSetting, SettingsModel
38 39 from rhodecode.tests.fixture import Fixture
39 40 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
40 41
41 42
42 43 fixture = Fixture()
43 44
44 45
45 46 def extract_hooks(config):
46 47 """Return a dictionary with the hook entries of the given config."""
47 48 hooks = {}
48 49 config_items = config.serialize()
49 50 for section, name, value in config_items:
50 51 if section != 'hooks':
51 52 continue
52 53 hooks[name] = value
53 54
54 55 return hooks
55 56
56 57
57 58 def disable_hooks(request, hooks):
58 59 """Disables the given hooks from the UI settings."""
59 60 session = meta.Session()
60 61
61 62 model = SettingsModel()
62 63 for hook_key in hooks:
63 64 sett = model.get_ui_by_key(hook_key)
64 65 sett.ui_active = False
65 66 session.add(sett)
66 67
67 68 # Invalidate cache
68 69 ui_settings = session.query(db.RhodeCodeUi).options(
69 70 caching_query.FromCache('sql_cache_short', 'get_hg_ui_settings'))
70 71 ui_settings.invalidate()
71 72
72 73 ui_settings = session.query(db.RhodeCodeUi).options(
73 74 caching_query.FromCache(
74 75 'sql_cache_short', 'get_hook_settings', 'get_hook_settings'))
75 76 ui_settings.invalidate()
76 77
77 78 @request.addfinalizer
78 79 def rollback():
79 80 session.rollback()
80 81
81 82
82 83 HOOK_PRE_PUSH = db.RhodeCodeUi.HOOK_PRE_PUSH
83 84 HOOK_PUSH = db.RhodeCodeUi.HOOK_PUSH
84 85 HOOK_PRE_PULL = db.RhodeCodeUi.HOOK_PRE_PULL
85 86 HOOK_PULL = db.RhodeCodeUi.HOOK_PULL
86 87 HOOK_REPO_SIZE = db.RhodeCodeUi.HOOK_REPO_SIZE
87 88
88 89 HG_HOOKS = frozenset(
89 90 (HOOK_PRE_PULL, HOOK_PULL, HOOK_PRE_PUSH, HOOK_PUSH, HOOK_REPO_SIZE))
90 91
91 92
92 93 @pytest.mark.parametrize('disabled_hooks,expected_hooks', [
93 94 ([], HG_HOOKS),
94 95 ([HOOK_PRE_PUSH, HOOK_REPO_SIZE], [HOOK_PRE_PULL, HOOK_PULL, HOOK_PUSH]),
95 96 (HG_HOOKS, []),
96 97 # When a pull/push hook is disabled, its pre-pull/push counterpart should
97 98 # be disabled too.
98 99 ([HOOK_PUSH], [HOOK_PRE_PULL, HOOK_PULL, HOOK_REPO_SIZE]),
99 100 ([HOOK_PULL], [HOOK_PRE_PUSH, HOOK_PUSH, HOOK_REPO_SIZE]),
100 101 ])
101 102 def test_make_db_config_hg_hooks(pylonsapp, request, disabled_hooks,
102 103 expected_hooks):
103 104 disable_hooks(request, disabled_hooks)
104 105
105 106 config = utils.make_db_config()
106 107 hooks = extract_hooks(config)
107 108
108 109 assert set(hooks.iterkeys()).intersection(HG_HOOKS) == set(expected_hooks)
109 110
110 111
111 112 @pytest.mark.parametrize('disabled_hooks,expected_hooks', [
112 113 ([], ['pull', 'push']),
113 114 ([HOOK_PUSH], ['pull']),
114 115 ([HOOK_PULL], ['push']),
115 116 ([HOOK_PULL, HOOK_PUSH], []),
116 117 ])
117 118 def test_get_enabled_hook_classes(disabled_hooks, expected_hooks):
118 119 hook_keys = (HOOK_PUSH, HOOK_PULL)
119 120 ui_settings = [
120 121 ('hooks', key, 'some value', key not in disabled_hooks)
121 122 for key in hook_keys]
122 123
123 124 result = utils.get_enabled_hook_classes(ui_settings)
124 125 assert sorted(result) == expected_hooks
125 126
126 127
127 128 def test_get_filesystem_repos_finds_repos(tmpdir, pylonsapp):
128 129 _stub_git_repo(tmpdir.ensure('repo', dir=True))
129 130 repos = list(utils.get_filesystem_repos(str(tmpdir)))
130 131 assert repos == [('repo', ('git', tmpdir.join('repo')))]
131 132
132 133
133 134 def test_get_filesystem_repos_skips_directories(tmpdir, pylonsapp):
134 135 tmpdir.ensure('not-a-repo', dir=True)
135 136 repos = list(utils.get_filesystem_repos(str(tmpdir)))
136 137 assert repos == []
137 138
138 139
139 140 def test_get_filesystem_repos_skips_directories_with_repos(tmpdir, pylonsapp):
140 141 _stub_git_repo(tmpdir.ensure('subdir/repo', dir=True))
141 142 repos = list(utils.get_filesystem_repos(str(tmpdir)))
142 143 assert repos == []
143 144
144 145
145 146 def test_get_filesystem_repos_finds_repos_in_subdirectories(tmpdir, pylonsapp):
146 147 _stub_git_repo(tmpdir.ensure('subdir/repo', dir=True))
147 148 repos = list(utils.get_filesystem_repos(str(tmpdir), recursive=True))
148 149 assert repos == [('subdir/repo', ('git', tmpdir.join('subdir', 'repo')))]
149 150
150 151
151 152 def test_get_filesystem_repos_skips_names_starting_with_dot(tmpdir):
152 153 _stub_git_repo(tmpdir.ensure('.repo', dir=True))
153 154 repos = list(utils.get_filesystem_repos(str(tmpdir)))
154 155 assert repos == []
155 156
156 157
157 158 def test_get_filesystem_repos_skips_files(tmpdir):
158 159 tmpdir.ensure('test-file')
159 160 repos = list(utils.get_filesystem_repos(str(tmpdir)))
160 161 assert repos == []
161 162
162 163
163 164 def test_get_filesystem_repos_skips_removed_repositories(tmpdir):
164 165 removed_repo_name = 'rm__00000000_000000_000000__.stub'
165 166 assert utils.REMOVED_REPO_PAT.match(removed_repo_name)
166 167 _stub_git_repo(tmpdir.ensure(removed_repo_name, dir=True))
167 168 repos = list(utils.get_filesystem_repos(str(tmpdir)))
168 169 assert repos == []
169 170
170 171
171 172 def _stub_git_repo(repo_path):
172 173 """
173 174 Make `repo_path` look like a Git repository.
174 175 """
175 176 repo_path.ensure('.git', dir=True)
176 177
177 178
178 179 @pytest.mark.parametrize('str_class', [str, unicode], ids=['str', 'unicode'])
179 180 def test_get_dirpaths_returns_all_paths(tmpdir, str_class):
180 181 tmpdir.ensure('test-file')
181 182 dirpaths = utils._get_dirpaths(str_class(tmpdir))
182 183 assert dirpaths == ['test-file']
183 184
184 185
185 186 def test_get_dirpaths_returns_all_paths_bytes(
186 187 tmpdir, platform_encodes_filenames):
187 188 if platform_encodes_filenames:
188 189 pytest.skip("This platform seems to encode filenames.")
189 190 tmpdir.ensure('repo-a-umlaut-\xe4')
190 191 dirpaths = utils._get_dirpaths(str(tmpdir))
191 192 assert dirpaths == ['repo-a-umlaut-\xe4']
192 193
193 194
194 195 def test_get_dirpaths_skips_paths_it_cannot_decode(
195 196 tmpdir, platform_encodes_filenames):
196 197 if platform_encodes_filenames:
197 198 pytest.skip("This platform seems to encode filenames.")
198 199 path_with_latin1 = 'repo-a-umlaut-\xe4'
199 200 tmpdir.ensure(path_with_latin1)
200 201 dirpaths = utils._get_dirpaths(unicode(tmpdir))
201 202 assert dirpaths == []
202 203
203 204
204 205 @pytest.fixture(scope='session')
205 206 def platform_encodes_filenames():
206 207 """
207 208 Boolean indicator if the current platform changes filename encodings.
208 209 """
209 210 path_with_latin1 = 'repo-a-umlaut-\xe4'
210 211 tmpdir = py.path.local.mkdtemp()
211 212 tmpdir.ensure(path_with_latin1)
212 213 read_path = tmpdir.listdir()[0].basename
213 214 tmpdir.remove()
214 215 return path_with_latin1 != read_path
215 216
216 217
217 218 def test_action_logger_action_size(pylonsapp, test_repo):
218 219 action = 'x' * 1200001
219 220 utils.action_logger(TEST_USER_ADMIN_LOGIN, action, test_repo, commit=True)
220 221
221 222
222 223 @pytest.fixture
223 224 def repo_groups(request):
224 225 session = meta.Session()
225 226 zombie_group = fixture.create_repo_group('zombie')
226 227 parent_group = fixture.create_repo_group('parent')
227 228 child_group = fixture.create_repo_group('parent/child')
228 229 groups_in_db = session.query(db.RepoGroup).all()
229 230 assert len(groups_in_db) == 3
230 231 assert child_group.group_parent_id == parent_group.group_id
231 232
232 233 @request.addfinalizer
233 234 def cleanup():
234 235 fixture.destroy_repo_group(zombie_group)
235 236 fixture.destroy_repo_group(child_group)
236 237 fixture.destroy_repo_group(parent_group)
237 238
238 239 return (zombie_group, parent_group, child_group)
239 240
240 241
241 242 def test_repo2db_mapper_groups(repo_groups):
242 243 session = meta.Session()
243 244 zombie_group, parent_group, child_group = repo_groups
244 245 zombie_path = os.path.join(
245 246 RepoGroupModel().repos_path, zombie_group.full_path)
246 247 os.rmdir(zombie_path)
247 248
248 249 # Avoid removing test repos when calling repo2db_mapper
249 250 repo_list = {
250 251 repo.repo_name: 'test' for repo in session.query(db.Repository).all()
251 252 }
252 253 utils.repo2db_mapper(repo_list, remove_obsolete=True)
253 254
254 255 groups_in_db = session.query(db.RepoGroup).all()
255 256 assert child_group in groups_in_db
256 257 assert parent_group in groups_in_db
257 258 assert zombie_path not in groups_in_db
258 259
259 260
260 261 def test_repo2db_mapper_enables_largefiles(backend):
261 262 repo = backend.create_repo()
262 263 repo_list = {repo.repo_name: 'test'}
263 264 with mock.patch('rhodecode.model.db.Repository.scm_instance') as scm_mock:
264 265 with mock.patch.multiple('rhodecode.model.scm.ScmModel',
265 266 install_git_hook=mock.DEFAULT,
266 267 install_svn_hooks=mock.DEFAULT):
267 268 utils.repo2db_mapper(repo_list, remove_obsolete=False)
268 269 _, kwargs = scm_mock.call_args
269 270 assert kwargs['config'].get('extensions', 'largefiles') == ''
270 271
271 272
272 273 @pytest.mark.backends("git", "svn")
273 274 def test_repo2db_mapper_installs_hooks_for_repos_in_db(backend):
274 275 repo = backend.create_repo()
275 276 repo_list = {repo.repo_name: 'test'}
276 277 with mock.patch.object(ScmModel, 'install_hooks') as install_hooks_mock:
277 278 utils.repo2db_mapper(repo_list, remove_obsolete=False)
278 279 install_hooks_mock.assert_called_once_with(
279 280 repo.scm_instance(), repo_type=backend.alias)
280 281
281 282
282 283 @pytest.mark.backends("git", "svn")
283 284 def test_repo2db_mapper_installs_hooks_for_newly_added_repos(backend):
284 285 repo = backend.create_repo()
285 286 RepoModel().delete(repo, fs_remove=False)
286 287 meta.Session().commit()
287 288 repo_list = {repo.repo_name: repo.scm_instance()}
288 289 with mock.patch.object(ScmModel, 'install_hooks') as install_hooks_mock:
289 290 utils.repo2db_mapper(repo_list, remove_obsolete=False)
290 291 assert install_hooks_mock.call_count == 1
291 292 install_hooks_args, _ = install_hooks_mock.call_args
292 293 assert install_hooks_args[0].name == repo.repo_name
293 294
294 295
295 296 class TestPasswordChanged(object):
296 297 def setup(self):
297 298 self.session = {
298 299 'rhodecode_user': {
299 300 'password': '0cc175b9c0f1b6a831c399e269772661'
300 301 }
301 302 }
302 303 self.auth_user = mock.Mock()
303 304 self.auth_user.userame = 'test'
304 305 self.auth_user.password = 'abc123'
305 306
306 307 def test_returns_false_for_default_user(self):
307 308 self.auth_user.username = db.User.DEFAULT_USER
308 309 result = utils.password_changed(self.auth_user, self.session)
309 310 assert result is False
310 311
311 312 def test_returns_false_if_password_was_not_changed(self):
312 313 self.session['rhodecode_user']['password'] = md5(
313 314 self.auth_user.password)
314 315 result = utils.password_changed(self.auth_user, self.session)
315 316 assert result is False
316 317
317 318 def test_returns_true_if_password_was_changed(self):
318 319 result = utils.password_changed(self.auth_user, self.session)
319 320 assert result is True
320 321
321 322 def test_returns_true_if_auth_user_password_is_empty(self):
322 323 self.auth_user.password = None
323 324 result = utils.password_changed(self.auth_user, self.session)
324 325 assert result is True
325 326
326 327 def test_returns_true_if_session_password_is_empty(self):
327 328 self.session['rhodecode_user'].pop('password')
328 329 result = utils.password_changed(self.auth_user, self.session)
329 330 assert result is True
330 331
331 332
332 333 class TestReadOpensourceLicenses(object):
333 334 def test_success(self):
334 335 utils._license_cache = None
335 336 json_data = '''
336 337 {
337 338 "python2.7-pytest-2.7.1": {"UNKNOWN": null},
338 339 "python2.7-Markdown-2.6.2": {
339 340 "BSD-3-Clause": "http://spdx.org/licenses/BSD-3-Clause"
340 341 }
341 342 }
342 343 '''
343 344 resource_string_patch = mock.patch.object(
344 345 utils.pkg_resources, 'resource_string', return_value=json_data)
345 346 with resource_string_patch:
346 347 result = utils.read_opensource_licenses()
347 348 assert result == json.loads(json_data)
348 349
349 350 def test_caching(self):
350 351 utils._license_cache = {
351 352 "python2.7-pytest-2.7.1": {
352 353 "UNKNOWN": None
353 354 },
354 355 "python2.7-Markdown-2.6.2": {
355 356 "BSD-3-Clause": "http://spdx.org/licenses/BSD-3-Clause"
356 357 }
357 358 }
358 359 resource_patch = mock.patch.object(
359 360 utils.pkg_resources, 'resource_string', side_effect=Exception)
360 361 json_patch = mock.patch.object(
361 362 utils.json, 'loads', side_effect=Exception)
362 363
363 364 with resource_patch as resource_mock, json_patch as json_mock:
364 365 result = utils.read_opensource_licenses()
365 366
366 367 assert resource_mock.call_count == 0
367 368 assert json_mock.call_count == 0
368 369 assert result == utils._license_cache
369 370
370 371 def test_licenses_file_contains_no_unknown_licenses(self):
371 372 utils._license_cache = None
372 373 result = utils.read_opensource_licenses()
373 374 license_names = []
374 375 for licenses in result.values():
375 376 license_names.extend(licenses.keys())
376 377 assert 'UNKNOWN' not in license_names
377 378
378 379
379 380 class TestMakeDbConfig(object):
380 381 def test_data_from_config_data_from_db_returned(self):
381 382 test_data = [
382 383 ('section1', 'option1', 'value1'),
383 384 ('section2', 'option2', 'value2'),
384 385 ('section3', 'option3', 'value3'),
385 386 ]
386 387 with mock.patch.object(utils, 'config_data_from_db') as config_mock:
387 388 config_mock.return_value = test_data
388 389 kwargs = {'clear_session': False, 'repo': 'test_repo'}
389 390 result = utils.make_db_config(**kwargs)
390 391 config_mock.assert_called_once_with(**kwargs)
391 392 for section, option, expected_value in test_data:
392 393 value = result.get(section, option)
393 394 assert value == expected_value
394 395
395 396
396 397 class TestConfigDataFromDb(object):
397 398 def test_config_data_from_db_returns_active_settings(self):
398 399 test_data = [
399 400 UiSetting('section1', 'option1', 'value1', True),
400 401 UiSetting('section2', 'option2', 'value2', True),
401 402 UiSetting('section3', 'option3', 'value3', False),
402 403 ]
403 404 repo_name = 'test_repo'
404 405
405 model_patch = mock.patch.object(utils, 'VcsSettingsModel')
406 model_patch = mock.patch.object(settings, 'VcsSettingsModel')
406 407 hooks_patch = mock.patch.object(
407 408 utils, 'get_enabled_hook_classes',
408 409 return_value=['pull', 'push', 'repo_size'])
409 410 with model_patch as model_mock, hooks_patch:
410 411 instance_mock = mock.Mock()
411 412 model_mock.return_value = instance_mock
412 413 instance_mock.get_ui_settings.return_value = test_data
413 414 result = utils.config_data_from_db(
414 415 clear_session=False, repo=repo_name)
415 416
416 417 self._assert_repo_name_passed(model_mock, repo_name)
417 418
418 419 expected_result = [
419 420 ('section1', 'option1', 'value1'),
420 421 ('section2', 'option2', 'value2'),
421 422 ]
422 423 assert result == expected_result
423 424
424 425 def _assert_repo_name_passed(self, model_mock, repo_name):
425 426 assert model_mock.call_count == 1
426 427 call_args, call_kwargs = model_mock.call_args
427 428 assert call_kwargs['repo'] == repo_name
428 429
429 430
430 431 class TestIsDirWritable(object):
431 432 def test_returns_false_when_not_writable(self):
432 433 with mock.patch('__builtin__.open', side_effect=OSError):
433 434 assert not utils._is_dir_writable('/stub-path')
434 435
435 436 def test_returns_true_when_writable(self, tmpdir):
436 437 assert utils._is_dir_writable(str(tmpdir))
437 438
438 439 def test_is_safe_against_race_conditions(self, tmpdir):
439 440 workers = multiprocessing.Pool()
440 441 directories = [str(tmpdir)] * 10
441 442 workers.map(utils._is_dir_writable, directories)
442 443
443 444
444 445 class TestGetEnabledHooks(object):
445 446 def test_only_active_hooks_are_enabled(self):
446 447 ui_settings = [
447 448 UiSetting('hooks', db.RhodeCodeUi.HOOK_PUSH, 'value', True),
448 449 UiSetting('hooks', db.RhodeCodeUi.HOOK_REPO_SIZE, 'value', True),
449 450 UiSetting('hooks', db.RhodeCodeUi.HOOK_PULL, 'value', False)
450 451 ]
451 452 result = utils.get_enabled_hook_classes(ui_settings)
452 453 assert result == ['push', 'repo_size']
453 454
454 455 def test_all_hooks_are_enabled(self):
455 456 ui_settings = [
456 457 UiSetting('hooks', db.RhodeCodeUi.HOOK_PUSH, 'value', True),
457 458 UiSetting('hooks', db.RhodeCodeUi.HOOK_REPO_SIZE, 'value', True),
458 459 UiSetting('hooks', db.RhodeCodeUi.HOOK_PULL, 'value', True)
459 460 ]
460 461 result = utils.get_enabled_hook_classes(ui_settings)
461 462 assert result == ['push', 'repo_size', 'pull']
462 463
463 464 def test_no_enabled_hooks_when_no_hook_settings_are_found(self):
464 465 ui_settings = []
465 466 result = utils.get_enabled_hook_classes(ui_settings)
466 467 assert result == []
General Comments 0
You need to be logged in to leave comments. Login now