##// END OF EJS Templates
tests: removed soon-to-be deleted test for deprecated action_logger.
marcink -
r1797:2007cb9b default
parent child Browse files
Show More
@@ -1,473 +1,468 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import json
22 22 import multiprocessing
23 23 import os
24 24
25 25 import mock
26 26 import py
27 27 import pytest
28 28
29 29 from rhodecode.lib import caching_query
30 30 from rhodecode.lib import utils
31 31 from rhodecode.lib.utils2 import md5
32 32 from rhodecode.model import settings
33 33 from rhodecode.model import db
34 34 from rhodecode.model import meta
35 35 from rhodecode.model.repo import RepoModel
36 36 from rhodecode.model.repo_group import RepoGroupModel
37 37 from rhodecode.model.scm import ScmModel
38 38 from rhodecode.model.settings import UiSetting, SettingsModel
39 39 from rhodecode.tests.fixture import Fixture
40 40 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
41 41
42 42
43 43 fixture = Fixture()
44 44
45 45
46 46 def extract_hooks(config):
47 47 """Return a dictionary with the hook entries of the given config."""
48 48 hooks = {}
49 49 config_items = config.serialize()
50 50 for section, name, value in config_items:
51 51 if section != 'hooks':
52 52 continue
53 53 hooks[name] = value
54 54
55 55 return hooks
56 56
57 57
58 58 def disable_hooks(request, hooks):
59 59 """Disables the given hooks from the UI settings."""
60 60 session = meta.Session()
61 61
62 62 model = SettingsModel()
63 63 for hook_key in hooks:
64 64 sett = model.get_ui_by_key(hook_key)
65 65 sett.ui_active = False
66 66 session.add(sett)
67 67
68 68 # Invalidate cache
69 69 ui_settings = session.query(db.RhodeCodeUi).options(
70 70 caching_query.FromCache('sql_cache_short', 'get_hg_ui_settings'))
71 71 ui_settings.invalidate()
72 72
73 73 ui_settings = session.query(db.RhodeCodeUi).options(
74 74 caching_query.FromCache(
75 75 'sql_cache_short', 'get_hook_settings', 'get_hook_settings'))
76 76 ui_settings.invalidate()
77 77
78 78 @request.addfinalizer
79 79 def rollback():
80 80 session.rollback()
81 81
82 82
83 83 HOOK_PRE_PUSH = db.RhodeCodeUi.HOOK_PRE_PUSH
84 84 HOOK_PRETX_PUSH = db.RhodeCodeUi.HOOK_PRETX_PUSH
85 85 HOOK_PUSH = db.RhodeCodeUi.HOOK_PUSH
86 86 HOOK_PRE_PULL = db.RhodeCodeUi.HOOK_PRE_PULL
87 87 HOOK_PULL = db.RhodeCodeUi.HOOK_PULL
88 88 HOOK_REPO_SIZE = db.RhodeCodeUi.HOOK_REPO_SIZE
89 89 HOOK_PUSH_KEY = db.RhodeCodeUi.HOOK_PUSH_KEY
90 90
91 91 HG_HOOKS = frozenset(
92 92 (HOOK_PRE_PULL, HOOK_PULL, HOOK_PRE_PUSH, HOOK_PRETX_PUSH, HOOK_PUSH,
93 93 HOOK_REPO_SIZE, HOOK_PUSH_KEY))
94 94
95 95
96 96 @pytest.mark.parametrize('disabled_hooks,expected_hooks', [
97 97 ([], HG_HOOKS),
98 98 (HG_HOOKS, []),
99 99
100 100 ([HOOK_PRE_PUSH, HOOK_PRETX_PUSH, HOOK_REPO_SIZE, HOOK_PUSH_KEY], [HOOK_PRE_PULL, HOOK_PULL, HOOK_PUSH]),
101 101
102 102 # When a pull/push hook is disabled, its pre-pull/push counterpart should
103 103 # be disabled too.
104 104 ([HOOK_PUSH], [HOOK_PRE_PULL, HOOK_PULL, HOOK_REPO_SIZE]),
105 105 ([HOOK_PULL], [HOOK_PRE_PUSH, HOOK_PRETX_PUSH, HOOK_PUSH, HOOK_REPO_SIZE,
106 106 HOOK_PUSH_KEY]),
107 107 ])
108 108 def test_make_db_config_hg_hooks(pylonsapp, request, disabled_hooks,
109 109 expected_hooks):
110 110 disable_hooks(request, disabled_hooks)
111 111
112 112 config = utils.make_db_config()
113 113 hooks = extract_hooks(config)
114 114
115 115 assert set(hooks.iterkeys()).intersection(HG_HOOKS) == set(expected_hooks)
116 116
117 117
118 118 @pytest.mark.parametrize('disabled_hooks,expected_hooks', [
119 119 ([], ['pull', 'push']),
120 120 ([HOOK_PUSH], ['pull']),
121 121 ([HOOK_PULL], ['push']),
122 122 ([HOOK_PULL, HOOK_PUSH], []),
123 123 ])
124 124 def test_get_enabled_hook_classes(disabled_hooks, expected_hooks):
125 125 hook_keys = (HOOK_PUSH, HOOK_PULL)
126 126 ui_settings = [
127 127 ('hooks', key, 'some value', key not in disabled_hooks)
128 128 for key in hook_keys]
129 129
130 130 result = utils.get_enabled_hook_classes(ui_settings)
131 131 assert sorted(result) == expected_hooks
132 132
133 133
134 134 def test_get_filesystem_repos_finds_repos(tmpdir, pylonsapp):
135 135 _stub_git_repo(tmpdir.ensure('repo', dir=True))
136 136 repos = list(utils.get_filesystem_repos(str(tmpdir)))
137 137 assert repos == [('repo', ('git', tmpdir.join('repo')))]
138 138
139 139
140 140 def test_get_filesystem_repos_skips_directories(tmpdir, pylonsapp):
141 141 tmpdir.ensure('not-a-repo', dir=True)
142 142 repos = list(utils.get_filesystem_repos(str(tmpdir)))
143 143 assert repos == []
144 144
145 145
146 146 def test_get_filesystem_repos_skips_directories_with_repos(tmpdir, pylonsapp):
147 147 _stub_git_repo(tmpdir.ensure('subdir/repo', dir=True))
148 148 repos = list(utils.get_filesystem_repos(str(tmpdir)))
149 149 assert repos == []
150 150
151 151
152 152 def test_get_filesystem_repos_finds_repos_in_subdirectories(tmpdir, pylonsapp):
153 153 _stub_git_repo(tmpdir.ensure('subdir/repo', dir=True))
154 154 repos = list(utils.get_filesystem_repos(str(tmpdir), recursive=True))
155 155 assert repos == [('subdir/repo', ('git', tmpdir.join('subdir', 'repo')))]
156 156
157 157
158 158 def test_get_filesystem_repos_skips_names_starting_with_dot(tmpdir):
159 159 _stub_git_repo(tmpdir.ensure('.repo', dir=True))
160 160 repos = list(utils.get_filesystem_repos(str(tmpdir)))
161 161 assert repos == []
162 162
163 163
164 164 def test_get_filesystem_repos_skips_files(tmpdir):
165 165 tmpdir.ensure('test-file')
166 166 repos = list(utils.get_filesystem_repos(str(tmpdir)))
167 167 assert repos == []
168 168
169 169
170 170 def test_get_filesystem_repos_skips_removed_repositories(tmpdir):
171 171 removed_repo_name = 'rm__00000000_000000_000000__.stub'
172 172 assert utils.REMOVED_REPO_PAT.match(removed_repo_name)
173 173 _stub_git_repo(tmpdir.ensure(removed_repo_name, dir=True))
174 174 repos = list(utils.get_filesystem_repos(str(tmpdir)))
175 175 assert repos == []
176 176
177 177
178 178 def _stub_git_repo(repo_path):
179 179 """
180 180 Make `repo_path` look like a Git repository.
181 181 """
182 182 repo_path.ensure('.git', dir=True)
183 183
184 184
185 185 @pytest.mark.parametrize('str_class', [str, unicode], ids=['str', 'unicode'])
186 186 def test_get_dirpaths_returns_all_paths(tmpdir, str_class):
187 187 tmpdir.ensure('test-file')
188 188 dirpaths = utils._get_dirpaths(str_class(tmpdir))
189 189 assert dirpaths == ['test-file']
190 190
191 191
192 192 def test_get_dirpaths_returns_all_paths_bytes(
193 193 tmpdir, platform_encodes_filenames):
194 194 if platform_encodes_filenames:
195 195 pytest.skip("This platform seems to encode filenames.")
196 196 tmpdir.ensure('repo-a-umlaut-\xe4')
197 197 dirpaths = utils._get_dirpaths(str(tmpdir))
198 198 assert dirpaths == ['repo-a-umlaut-\xe4']
199 199
200 200
201 201 def test_get_dirpaths_skips_paths_it_cannot_decode(
202 202 tmpdir, platform_encodes_filenames):
203 203 if platform_encodes_filenames:
204 204 pytest.skip("This platform seems to encode filenames.")
205 205 path_with_latin1 = 'repo-a-umlaut-\xe4'
206 206 tmpdir.ensure(path_with_latin1)
207 207 dirpaths = utils._get_dirpaths(unicode(tmpdir))
208 208 assert dirpaths == []
209 209
210 210
211 211 @pytest.fixture(scope='session')
212 212 def platform_encodes_filenames():
213 213 """
214 214 Boolean indicator if the current platform changes filename encodings.
215 215 """
216 216 path_with_latin1 = 'repo-a-umlaut-\xe4'
217 217 tmpdir = py.path.local.mkdtemp()
218 218 tmpdir.ensure(path_with_latin1)
219 219 read_path = tmpdir.listdir()[0].basename
220 220 tmpdir.remove()
221 221 return path_with_latin1 != read_path
222 222
223 223
224 def test_action_logger_action_size(pylonsapp, test_repo):
225 action = 'x' * 1200001
226 utils.action_logger(TEST_USER_ADMIN_LOGIN, action, test_repo, commit=True)
227
228
229 224 @pytest.fixture
230 225 def repo_groups(request):
231 226 session = meta.Session()
232 227 zombie_group = fixture.create_repo_group('zombie')
233 228 parent_group = fixture.create_repo_group('parent')
234 229 child_group = fixture.create_repo_group('parent/child')
235 230 groups_in_db = session.query(db.RepoGroup).all()
236 231 assert len(groups_in_db) == 3
237 232 assert child_group.group_parent_id == parent_group.group_id
238 233
239 234 @request.addfinalizer
240 235 def cleanup():
241 236 fixture.destroy_repo_group(zombie_group)
242 237 fixture.destroy_repo_group(child_group)
243 238 fixture.destroy_repo_group(parent_group)
244 239
245 240 return (zombie_group, parent_group, child_group)
246 241
247 242
248 243 def test_repo2db_mapper_groups(repo_groups):
249 244 session = meta.Session()
250 245 zombie_group, parent_group, child_group = repo_groups
251 246 zombie_path = os.path.join(
252 247 RepoGroupModel().repos_path, zombie_group.full_path)
253 248 os.rmdir(zombie_path)
254 249
255 250 # Avoid removing test repos when calling repo2db_mapper
256 251 repo_list = {
257 252 repo.repo_name: 'test' for repo in session.query(db.Repository).all()
258 253 }
259 254 utils.repo2db_mapper(repo_list, remove_obsolete=True)
260 255
261 256 groups_in_db = session.query(db.RepoGroup).all()
262 257 assert child_group in groups_in_db
263 258 assert parent_group in groups_in_db
264 259 assert zombie_path not in groups_in_db
265 260
266 261
267 262 def test_repo2db_mapper_enables_largefiles(backend):
268 263 repo = backend.create_repo()
269 264 repo_list = {repo.repo_name: 'test'}
270 265 with mock.patch('rhodecode.model.db.Repository.scm_instance') as scm_mock:
271 266 with mock.patch.multiple('rhodecode.model.scm.ScmModel',
272 267 install_git_hook=mock.DEFAULT,
273 268 install_svn_hooks=mock.DEFAULT):
274 269 utils.repo2db_mapper(repo_list, remove_obsolete=False)
275 270 _, kwargs = scm_mock.call_args
276 271 assert kwargs['config'].get('extensions', 'largefiles') == ''
277 272
278 273
279 274 @pytest.mark.backends("git", "svn")
280 275 def test_repo2db_mapper_installs_hooks_for_repos_in_db(backend):
281 276 repo = backend.create_repo()
282 277 repo_list = {repo.repo_name: 'test'}
283 278 with mock.patch.object(ScmModel, 'install_hooks') as install_hooks_mock:
284 279 utils.repo2db_mapper(repo_list, remove_obsolete=False)
285 280 install_hooks_mock.assert_called_once_with(
286 281 repo.scm_instance(), repo_type=backend.alias)
287 282
288 283
289 284 @pytest.mark.backends("git", "svn")
290 285 def test_repo2db_mapper_installs_hooks_for_newly_added_repos(backend):
291 286 repo = backend.create_repo()
292 287 RepoModel().delete(repo, fs_remove=False)
293 288 meta.Session().commit()
294 289 repo_list = {repo.repo_name: repo.scm_instance()}
295 290 with mock.patch.object(ScmModel, 'install_hooks') as install_hooks_mock:
296 291 utils.repo2db_mapper(repo_list, remove_obsolete=False)
297 292 assert install_hooks_mock.call_count == 1
298 293 install_hooks_args, _ = install_hooks_mock.call_args
299 294 assert install_hooks_args[0].name == repo.repo_name
300 295
301 296
302 297 class TestPasswordChanged(object):
303 298 def setup(self):
304 299 self.session = {
305 300 'rhodecode_user': {
306 301 'password': '0cc175b9c0f1b6a831c399e269772661'
307 302 }
308 303 }
309 304 self.auth_user = mock.Mock()
310 305 self.auth_user.userame = 'test'
311 306 self.auth_user.password = 'abc123'
312 307
313 308 def test_returns_false_for_default_user(self):
314 309 self.auth_user.username = db.User.DEFAULT_USER
315 310 result = utils.password_changed(self.auth_user, self.session)
316 311 assert result is False
317 312
318 313 def test_returns_false_if_password_was_not_changed(self):
319 314 self.session['rhodecode_user']['password'] = md5(
320 315 self.auth_user.password)
321 316 result = utils.password_changed(self.auth_user, self.session)
322 317 assert result is False
323 318
324 319 def test_returns_true_if_password_was_changed(self):
325 320 result = utils.password_changed(self.auth_user, self.session)
326 321 assert result is True
327 322
328 323 def test_returns_true_if_auth_user_password_is_empty(self):
329 324 self.auth_user.password = None
330 325 result = utils.password_changed(self.auth_user, self.session)
331 326 assert result is True
332 327
333 328 def test_returns_true_if_session_password_is_empty(self):
334 329 self.session['rhodecode_user'].pop('password')
335 330 result = utils.password_changed(self.auth_user, self.session)
336 331 assert result is True
337 332
338 333
339 334 class TestReadOpensourceLicenses(object):
340 335 def test_success(self):
341 336 utils._license_cache = None
342 337 json_data = '''
343 338 {
344 339 "python2.7-pytest-2.7.1": {"UNKNOWN": null},
345 340 "python2.7-Markdown-2.6.2": {
346 341 "BSD-3-Clause": "http://spdx.org/licenses/BSD-3-Clause"
347 342 }
348 343 }
349 344 '''
350 345 resource_string_patch = mock.patch.object(
351 346 utils.pkg_resources, 'resource_string', return_value=json_data)
352 347 with resource_string_patch:
353 348 result = utils.read_opensource_licenses()
354 349 assert result == json.loads(json_data)
355 350
356 351 def test_caching(self):
357 352 utils._license_cache = {
358 353 "python2.7-pytest-2.7.1": {
359 354 "UNKNOWN": None
360 355 },
361 356 "python2.7-Markdown-2.6.2": {
362 357 "BSD-3-Clause": "http://spdx.org/licenses/BSD-3-Clause"
363 358 }
364 359 }
365 360 resource_patch = mock.patch.object(
366 361 utils.pkg_resources, 'resource_string', side_effect=Exception)
367 362 json_patch = mock.patch.object(
368 363 utils.json, 'loads', side_effect=Exception)
369 364
370 365 with resource_patch as resource_mock, json_patch as json_mock:
371 366 result = utils.read_opensource_licenses()
372 367
373 368 assert resource_mock.call_count == 0
374 369 assert json_mock.call_count == 0
375 370 assert result == utils._license_cache
376 371
377 372 def test_licenses_file_contains_no_unknown_licenses(self):
378 373 utils._license_cache = None
379 374 result = utils.read_opensource_licenses()
380 375 license_names = []
381 376 for licenses in result.values():
382 377 license_names.extend(licenses.keys())
383 378 assert 'UNKNOWN' not in license_names
384 379
385 380
386 381 class TestMakeDbConfig(object):
387 382 def test_data_from_config_data_from_db_returned(self):
388 383 test_data = [
389 384 ('section1', 'option1', 'value1'),
390 385 ('section2', 'option2', 'value2'),
391 386 ('section3', 'option3', 'value3'),
392 387 ]
393 388 with mock.patch.object(utils, 'config_data_from_db') as config_mock:
394 389 config_mock.return_value = test_data
395 390 kwargs = {'clear_session': False, 'repo': 'test_repo'}
396 391 result = utils.make_db_config(**kwargs)
397 392 config_mock.assert_called_once_with(**kwargs)
398 393 for section, option, expected_value in test_data:
399 394 value = result.get(section, option)
400 395 assert value == expected_value
401 396
402 397
403 398 class TestConfigDataFromDb(object):
404 399 def test_config_data_from_db_returns_active_settings(self):
405 400 test_data = [
406 401 UiSetting('section1', 'option1', 'value1', True),
407 402 UiSetting('section2', 'option2', 'value2', True),
408 403 UiSetting('section3', 'option3', 'value3', False),
409 404 ]
410 405 repo_name = 'test_repo'
411 406
412 407 model_patch = mock.patch.object(settings, 'VcsSettingsModel')
413 408 hooks_patch = mock.patch.object(
414 409 utils, 'get_enabled_hook_classes',
415 410 return_value=['pull', 'push', 'repo_size'])
416 411 with model_patch as model_mock, hooks_patch:
417 412 instance_mock = mock.Mock()
418 413 model_mock.return_value = instance_mock
419 414 instance_mock.get_ui_settings.return_value = test_data
420 415 result = utils.config_data_from_db(
421 416 clear_session=False, repo=repo_name)
422 417
423 418 self._assert_repo_name_passed(model_mock, repo_name)
424 419
425 420 expected_result = [
426 421 ('section1', 'option1', 'value1'),
427 422 ('section2', 'option2', 'value2'),
428 423 ]
429 424 assert result == expected_result
430 425
431 426 def _assert_repo_name_passed(self, model_mock, repo_name):
432 427 assert model_mock.call_count == 1
433 428 call_args, call_kwargs = model_mock.call_args
434 429 assert call_kwargs['repo'] == repo_name
435 430
436 431
437 432 class TestIsDirWritable(object):
438 433 def test_returns_false_when_not_writable(self):
439 434 with mock.patch('__builtin__.open', side_effect=OSError):
440 435 assert not utils._is_dir_writable('/stub-path')
441 436
442 437 def test_returns_true_when_writable(self, tmpdir):
443 438 assert utils._is_dir_writable(str(tmpdir))
444 439
445 440 def test_is_safe_against_race_conditions(self, tmpdir):
446 441 workers = multiprocessing.Pool()
447 442 directories = [str(tmpdir)] * 10
448 443 workers.map(utils._is_dir_writable, directories)
449 444
450 445
451 446 class TestGetEnabledHooks(object):
452 447 def test_only_active_hooks_are_enabled(self):
453 448 ui_settings = [
454 449 UiSetting('hooks', db.RhodeCodeUi.HOOK_PUSH, 'value', True),
455 450 UiSetting('hooks', db.RhodeCodeUi.HOOK_REPO_SIZE, 'value', True),
456 451 UiSetting('hooks', db.RhodeCodeUi.HOOK_PULL, 'value', False)
457 452 ]
458 453 result = utils.get_enabled_hook_classes(ui_settings)
459 454 assert result == ['push', 'repo_size']
460 455
461 456 def test_all_hooks_are_enabled(self):
462 457 ui_settings = [
463 458 UiSetting('hooks', db.RhodeCodeUi.HOOK_PUSH, 'value', True),
464 459 UiSetting('hooks', db.RhodeCodeUi.HOOK_REPO_SIZE, 'value', True),
465 460 UiSetting('hooks', db.RhodeCodeUi.HOOK_PULL, 'value', True)
466 461 ]
467 462 result = utils.get_enabled_hook_classes(ui_settings)
468 463 assert result == ['push', 'repo_size', 'pull']
469 464
470 465 def test_no_enabled_hooks_when_no_hook_settings_are_found(self):
471 466 ui_settings = []
472 467 result = utils.get_enabled_hook_classes(ui_settings)
473 468 assert result == []
General Comments 0
You need to be logged in to leave comments. Login now