##// END OF EJS Templates
scm: add tests for .mark_for_invalidation(delete=True)
dan -
r338:85729415 default
parent child Browse files
Show More
@@ -1,305 +1,318 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
21 import os
22 import stat
22 import stat
23 import sys
23 import sys
24
24
25 import pytest
25 import pytest
26 from mock import Mock, patch, DEFAULT
26 from mock import Mock, patch, DEFAULT
27
27
28 import rhodecode
28 import rhodecode
29 from rhodecode.model import db, scm
29 from rhodecode.model import db, scm
30
30
31
31
32 def test_scm_instance_config(backend):
32 def test_scm_instance_config(backend):
33 repo = backend.create_repo()
33 repo = backend.create_repo()
34 with patch.multiple('rhodecode.model.db.Repository',
34 with patch.multiple('rhodecode.model.db.Repository',
35 _get_instance=DEFAULT,
35 _get_instance=DEFAULT,
36 _get_instance_cached=DEFAULT) as mocks:
36 _get_instance_cached=DEFAULT) as mocks:
37 repo.scm_instance()
37 repo.scm_instance()
38 mocks['_get_instance'].assert_called_with(
38 mocks['_get_instance'].assert_called_with(
39 config=None, cache=False)
39 config=None, cache=False)
40
40
41 config = {'some': 'value'}
41 config = {'some': 'value'}
42 repo.scm_instance(config=config)
42 repo.scm_instance(config=config)
43 mocks['_get_instance'].assert_called_with(
43 mocks['_get_instance'].assert_called_with(
44 config=config, cache=False)
44 config=config, cache=False)
45
45
46 with patch.dict(rhodecode.CONFIG, {'vcs_full_cache': 'true'}):
46 with patch.dict(rhodecode.CONFIG, {'vcs_full_cache': 'true'}):
47 repo.scm_instance(config=config)
47 repo.scm_instance(config=config)
48 mocks['_get_instance_cached'].assert_called()
48 mocks['_get_instance_cached'].assert_called()
49
49
50
50
51 def test__get_instance_config(backend):
51 def test__get_instance_config(backend):
52 repo = backend.create_repo()
52 repo = backend.create_repo()
53 vcs_class = Mock()
53 vcs_class = Mock()
54 with patch.multiple('rhodecode.model.db',
54 with patch.multiple('rhodecode.model.db',
55 get_scm=DEFAULT,
55 get_scm=DEFAULT,
56 get_backend=DEFAULT) as mocks:
56 get_backend=DEFAULT) as mocks:
57 mocks['get_scm'].return_value = backend.alias
57 mocks['get_scm'].return_value = backend.alias
58 mocks['get_backend'].return_value = vcs_class
58 mocks['get_backend'].return_value = vcs_class
59 with patch('rhodecode.model.db.Repository._config') as config_mock:
59 with patch('rhodecode.model.db.Repository._config') as config_mock:
60 repo._get_instance()
60 repo._get_instance()
61 vcs_class.assert_called_with(
61 vcs_class.assert_called_with(
62 repo.repo_full_path, config=config_mock, create=False,
62 repo.repo_full_path, config=config_mock, create=False,
63 with_wire={'cache': True})
63 with_wire={'cache': True})
64
64
65 new_config = {'override': 'old_config'}
65 new_config = {'override': 'old_config'}
66 repo._get_instance(config=new_config)
66 repo._get_instance(config=new_config)
67 vcs_class.assert_called_with(
67 vcs_class.assert_called_with(
68 repo.repo_full_path, config=new_config, create=False,
68 repo.repo_full_path, config=new_config, create=False,
69 with_wire={'cache': True})
69 with_wire={'cache': True})
70
70
71
71
72 def test_mark_for_invalidation_config(backend):
72 def test_mark_for_invalidation_config(backend):
73 repo = backend.create_repo()
73 repo = backend.create_repo()
74 with patch('rhodecode.model.db.Repository.update_commit_cache') as _mock:
74 with patch('rhodecode.model.db.Repository.update_commit_cache') as _mock:
75 scm.ScmModel().mark_for_invalidation(repo.repo_name)
75 scm.ScmModel().mark_for_invalidation(repo.repo_name)
76 _, kwargs = _mock.call_args
76 _, kwargs = _mock.call_args
77 assert kwargs['config'].__dict__ == repo._config.__dict__
77 assert kwargs['config'].__dict__ == repo._config.__dict__
78
78
79
79
80 def test_mark_for_invalidation_with_delete_updates_last_commit(backend):
81 commits = [{'message': 'A'}, {'message': 'B'}]
82 repo = backend.create_repo(commits=commits)
83 scm.ScmModel().mark_for_invalidation(repo.repo_name, delete=True)
84 assert repo.changeset_cache['revision'] == 1
85
86
87 def test_mark_for_invalidation_with_delete_updates_last_commit_empty(backend):
88 repo = backend.create_repo()
89 scm.ScmModel().mark_for_invalidation(repo.repo_name, delete=True)
90 assert repo.changeset_cache['revision'] == -1
91
92
80 def test_strip_with_multiple_heads(backend_hg):
93 def test_strip_with_multiple_heads(backend_hg):
81 commits = [
94 commits = [
82 {'message': 'A'},
95 {'message': 'A'},
83 {'message': 'a'},
96 {'message': 'a'},
84 {'message': 'b'},
97 {'message': 'b'},
85 {'message': 'B', 'parents': ['A']},
98 {'message': 'B', 'parents': ['A']},
86 {'message': 'a1'},
99 {'message': 'a1'},
87 ]
100 ]
88 repo = backend_hg.create_repo(commits=commits)
101 repo = backend_hg.create_repo(commits=commits)
89 commit_ids = backend_hg.commit_ids
102 commit_ids = backend_hg.commit_ids
90
103
91 model = scm.ScmModel()
104 model = scm.ScmModel()
92 model.strip(repo, commit_ids['b'], branch=None)
105 model.strip(repo, commit_ids['b'], branch=None)
93
106
94 vcs_repo = repo.scm_instance()
107 vcs_repo = repo.scm_instance()
95 rest_commit_ids = [c.raw_id for c in vcs_repo.get_changesets()]
108 rest_commit_ids = [c.raw_id for c in vcs_repo.get_changesets()]
96 assert len(rest_commit_ids) == 4
109 assert len(rest_commit_ids) == 4
97 assert commit_ids['b'] not in rest_commit_ids
110 assert commit_ids['b'] not in rest_commit_ids
98
111
99
112
100 def test_strip_with_single_heads(backend_hg):
113 def test_strip_with_single_heads(backend_hg):
101 commits = [
114 commits = [
102 {'message': 'A'},
115 {'message': 'A'},
103 {'message': 'a'},
116 {'message': 'a'},
104 {'message': 'b'},
117 {'message': 'b'},
105 ]
118 ]
106 repo = backend_hg.create_repo(commits=commits)
119 repo = backend_hg.create_repo(commits=commits)
107 commit_ids = backend_hg.commit_ids
120 commit_ids = backend_hg.commit_ids
108
121
109 model = scm.ScmModel()
122 model = scm.ScmModel()
110 model.strip(repo, commit_ids['b'], branch=None)
123 model.strip(repo, commit_ids['b'], branch=None)
111
124
112 vcs_repo = repo.scm_instance()
125 vcs_repo = repo.scm_instance()
113 rest_commit_ids = [c.raw_id for c in vcs_repo.get_changesets()]
126 rest_commit_ids = [c.raw_id for c in vcs_repo.get_changesets()]
114 assert len(rest_commit_ids) == 2
127 assert len(rest_commit_ids) == 2
115 assert commit_ids['b'] not in rest_commit_ids
128 assert commit_ids['b'] not in rest_commit_ids
116
129
117
130
118 def test_get_nodes_returns_unicode_flat(backend_random):
131 def test_get_nodes_returns_unicode_flat(backend_random):
119 repo = backend_random.repo
132 repo = backend_random.repo
120 directories, files = scm.ScmModel().get_nodes(
133 directories, files = scm.ScmModel().get_nodes(
121 repo.repo_name, repo.get_commit(commit_idx=0).raw_id,
134 repo.repo_name, repo.get_commit(commit_idx=0).raw_id,
122 flat=True)
135 flat=True)
123 assert_contains_only_unicode(directories)
136 assert_contains_only_unicode(directories)
124 assert_contains_only_unicode(files)
137 assert_contains_only_unicode(files)
125
138
126
139
127 def test_get_nodes_returns_unicode_non_flat(backend_random):
140 def test_get_nodes_returns_unicode_non_flat(backend_random):
128 repo = backend_random.repo
141 repo = backend_random.repo
129 directories, files = scm.ScmModel().get_nodes(
142 directories, files = scm.ScmModel().get_nodes(
130 repo.repo_name, repo.get_commit(commit_idx=0).raw_id,
143 repo.repo_name, repo.get_commit(commit_idx=0).raw_id,
131 flat=False)
144 flat=False)
132 # johbo: Checking only the names for now, since that is the critical
145 # johbo: Checking only the names for now, since that is the critical
133 # part.
146 # part.
134 assert_contains_only_unicode([d['name'] for d in directories])
147 assert_contains_only_unicode([d['name'] for d in directories])
135 assert_contains_only_unicode([f['name'] for f in files])
148 assert_contains_only_unicode([f['name'] for f in files])
136
149
137
150
138 def assert_contains_only_unicode(structure):
151 def assert_contains_only_unicode(structure):
139 assert structure
152 assert structure
140 for value in structure:
153 for value in structure:
141 assert isinstance(value, unicode)
154 assert isinstance(value, unicode)
142
155
143
156
144 @pytest.mark.backends("hg", "git")
157 @pytest.mark.backends("hg", "git")
145 def test_get_non_unicode_reference(backend):
158 def test_get_non_unicode_reference(backend):
146 model = scm.ScmModel()
159 model = scm.ScmModel()
147 non_unicode_list = ["AdΔ±nΔ±".decode("cp1254")]
160 non_unicode_list = ["AdΔ±nΔ±".decode("cp1254")]
148
161
149 def scm_instance():
162 def scm_instance():
150 return Mock(
163 return Mock(
151 branches=non_unicode_list, bookmarks=non_unicode_list,
164 branches=non_unicode_list, bookmarks=non_unicode_list,
152 tags=non_unicode_list, alias=backend.alias)
165 tags=non_unicode_list, alias=backend.alias)
153
166
154 repo = Mock(__class__=db.Repository, scm_instance=scm_instance)
167 repo = Mock(__class__=db.Repository, scm_instance=scm_instance)
155 choices, __ = model.get_repo_landing_revs(repo=repo)
168 choices, __ = model.get_repo_landing_revs(repo=repo)
156 if backend.alias == 'hg':
169 if backend.alias == 'hg':
157 valid_choices = [
170 valid_choices = [
158 'rev:tip', u'branch:Ad\xc4\xb1n\xc4\xb1',
171 'rev:tip', u'branch:Ad\xc4\xb1n\xc4\xb1',
159 u'book:Ad\xc4\xb1n\xc4\xb1', u'tag:Ad\xc4\xb1n\xc4\xb1']
172 u'book:Ad\xc4\xb1n\xc4\xb1', u'tag:Ad\xc4\xb1n\xc4\xb1']
160 else:
173 else:
161 valid_choices = [
174 valid_choices = [
162 'rev:tip', u'branch:Ad\xc4\xb1n\xc4\xb1',
175 'rev:tip', u'branch:Ad\xc4\xb1n\xc4\xb1',
163 u'tag:Ad\xc4\xb1n\xc4\xb1']
176 u'tag:Ad\xc4\xb1n\xc4\xb1']
164
177
165 assert choices == valid_choices
178 assert choices == valid_choices
166
179
167
180
168 class TestInstallSvnHooks(object):
181 class TestInstallSvnHooks(object):
169 HOOK_FILES = ('pre-commit', 'post-commit')
182 HOOK_FILES = ('pre-commit', 'post-commit')
170
183
171 def test_new_hooks_are_created(self, backend_svn):
184 def test_new_hooks_are_created(self, backend_svn):
172 model = scm.ScmModel()
185 model = scm.ScmModel()
173 repo = backend_svn.create_repo()
186 repo = backend_svn.create_repo()
174 vcs_repo = repo.scm_instance()
187 vcs_repo = repo.scm_instance()
175 model.install_svn_hooks(vcs_repo)
188 model.install_svn_hooks(vcs_repo)
176
189
177 hooks_path = os.path.join(vcs_repo.path, 'hooks')
190 hooks_path = os.path.join(vcs_repo.path, 'hooks')
178 assert os.path.isdir(hooks_path)
191 assert os.path.isdir(hooks_path)
179 for file_name in self.HOOK_FILES:
192 for file_name in self.HOOK_FILES:
180 file_path = os.path.join(hooks_path, file_name)
193 file_path = os.path.join(hooks_path, file_name)
181 self._check_hook_file_mode(file_path)
194 self._check_hook_file_mode(file_path)
182 self._check_hook_file_content(file_path)
195 self._check_hook_file_content(file_path)
183
196
184 def test_rc_hooks_are_replaced(self, backend_svn):
197 def test_rc_hooks_are_replaced(self, backend_svn):
185 model = scm.ScmModel()
198 model = scm.ScmModel()
186 repo = backend_svn.create_repo()
199 repo = backend_svn.create_repo()
187 vcs_repo = repo.scm_instance()
200 vcs_repo = repo.scm_instance()
188 hooks_path = os.path.join(vcs_repo.path, 'hooks')
201 hooks_path = os.path.join(vcs_repo.path, 'hooks')
189 file_paths = [os.path.join(hooks_path, f) for f in self.HOOK_FILES]
202 file_paths = [os.path.join(hooks_path, f) for f in self.HOOK_FILES]
190
203
191 for file_path in file_paths:
204 for file_path in file_paths:
192 self._create_fake_hook(
205 self._create_fake_hook(
193 file_path, content="RC_HOOK_VER = 'abcde'\n")
206 file_path, content="RC_HOOK_VER = 'abcde'\n")
194
207
195 model.install_svn_hooks(vcs_repo)
208 model.install_svn_hooks(vcs_repo)
196
209
197 for file_path in file_paths:
210 for file_path in file_paths:
198 self._check_hook_file_content(file_path)
211 self._check_hook_file_content(file_path)
199
212
200 def test_non_rc_hooks_are_not_replaced_without_force_create(
213 def test_non_rc_hooks_are_not_replaced_without_force_create(
201 self, backend_svn):
214 self, backend_svn):
202 model = scm.ScmModel()
215 model = scm.ScmModel()
203 repo = backend_svn.create_repo()
216 repo = backend_svn.create_repo()
204 vcs_repo = repo.scm_instance()
217 vcs_repo = repo.scm_instance()
205 hooks_path = os.path.join(vcs_repo.path, 'hooks')
218 hooks_path = os.path.join(vcs_repo.path, 'hooks')
206 file_paths = [os.path.join(hooks_path, f) for f in self.HOOK_FILES]
219 file_paths = [os.path.join(hooks_path, f) for f in self.HOOK_FILES]
207 non_rc_content = "exit 0\n"
220 non_rc_content = "exit 0\n"
208
221
209 for file_path in file_paths:
222 for file_path in file_paths:
210 self._create_fake_hook(file_path, content=non_rc_content)
223 self._create_fake_hook(file_path, content=non_rc_content)
211
224
212 model.install_svn_hooks(vcs_repo)
225 model.install_svn_hooks(vcs_repo)
213
226
214 for file_path in file_paths:
227 for file_path in file_paths:
215 with open(file_path, 'rt') as hook_file:
228 with open(file_path, 'rt') as hook_file:
216 content = hook_file.read()
229 content = hook_file.read()
217 assert content == non_rc_content
230 assert content == non_rc_content
218
231
219 def test_non_rc_hooks_are_replaced_with_force_create(self, backend_svn):
232 def test_non_rc_hooks_are_replaced_with_force_create(self, backend_svn):
220 model = scm.ScmModel()
233 model = scm.ScmModel()
221 repo = backend_svn.create_repo()
234 repo = backend_svn.create_repo()
222 vcs_repo = repo.scm_instance()
235 vcs_repo = repo.scm_instance()
223 hooks_path = os.path.join(vcs_repo.path, 'hooks')
236 hooks_path = os.path.join(vcs_repo.path, 'hooks')
224 file_paths = [os.path.join(hooks_path, f) for f in self.HOOK_FILES]
237 file_paths = [os.path.join(hooks_path, f) for f in self.HOOK_FILES]
225 non_rc_content = "exit 0\n"
238 non_rc_content = "exit 0\n"
226
239
227 for file_path in file_paths:
240 for file_path in file_paths:
228 self._create_fake_hook(file_path, content=non_rc_content)
241 self._create_fake_hook(file_path, content=non_rc_content)
229
242
230 model.install_svn_hooks(vcs_repo, force_create=True)
243 model.install_svn_hooks(vcs_repo, force_create=True)
231
244
232 for file_path in file_paths:
245 for file_path in file_paths:
233 self._check_hook_file_content(file_path)
246 self._check_hook_file_content(file_path)
234
247
235 def _check_hook_file_mode(self, file_path):
248 def _check_hook_file_mode(self, file_path):
236 assert os.path.exists(file_path)
249 assert os.path.exists(file_path)
237 stat_info = os.stat(file_path)
250 stat_info = os.stat(file_path)
238
251
239 file_mode = stat.S_IMODE(stat_info.st_mode)
252 file_mode = stat.S_IMODE(stat_info.st_mode)
240 expected_mode = int('755', 8)
253 expected_mode = int('755', 8)
241 assert expected_mode == file_mode
254 assert expected_mode == file_mode
242
255
243 def _check_hook_file_content(self, file_path):
256 def _check_hook_file_content(self, file_path):
244 with open(file_path, 'rt') as hook_file:
257 with open(file_path, 'rt') as hook_file:
245 content = hook_file.read()
258 content = hook_file.read()
246
259
247 expected_env = '#!{}'.format(sys.executable)
260 expected_env = '#!{}'.format(sys.executable)
248 expected_rc_version = "\nRC_HOOK_VER = '{}'\n".format(
261 expected_rc_version = "\nRC_HOOK_VER = '{}'\n".format(
249 rhodecode.__version__)
262 rhodecode.__version__)
250 assert content.strip().startswith(expected_env)
263 assert content.strip().startswith(expected_env)
251 assert expected_rc_version in content
264 assert expected_rc_version in content
252
265
253 def _create_fake_hook(self, file_path, content):
266 def _create_fake_hook(self, file_path, content):
254 with open(file_path, 'w') as hook_file:
267 with open(file_path, 'w') as hook_file:
255 hook_file.write(content)
268 hook_file.write(content)
256
269
257
270
258 class TestCheckRhodecodeHook(object):
271 class TestCheckRhodecodeHook(object):
259
272
260 @patch('os.path.exists', Mock(return_value=False))
273 @patch('os.path.exists', Mock(return_value=False))
261 def test_returns_true_when_no_hook_found(self):
274 def test_returns_true_when_no_hook_found(self):
262 result = scm._check_rhodecode_hook('/tmp/fake_hook_file.py')
275 result = scm._check_rhodecode_hook('/tmp/fake_hook_file.py')
263 assert result
276 assert result
264
277
265 @pytest.mark.parametrize("file_content, expected_result", [
278 @pytest.mark.parametrize("file_content, expected_result", [
266 ("RC_HOOK_VER = '3.3.3'\n", True),
279 ("RC_HOOK_VER = '3.3.3'\n", True),
267 ("RC_HOOK = '3.3.3'\n", False),
280 ("RC_HOOK = '3.3.3'\n", False),
268 ])
281 ])
269 @patch('os.path.exists', Mock(return_value=True))
282 @patch('os.path.exists', Mock(return_value=True))
270 def test_signatures(self, file_content, expected_result):
283 def test_signatures(self, file_content, expected_result):
271 hook_content_patcher = patch.object(
284 hook_content_patcher = patch.object(
272 scm, '_read_hook', return_value=file_content)
285 scm, '_read_hook', return_value=file_content)
273 with hook_content_patcher:
286 with hook_content_patcher:
274 result = scm._check_rhodecode_hook('/tmp/fake_hook_file.py')
287 result = scm._check_rhodecode_hook('/tmp/fake_hook_file.py')
275
288
276 assert result is expected_result
289 assert result is expected_result
277
290
278
291
279 class TestInstallHooks(object):
292 class TestInstallHooks(object):
280 def test_hooks_are_installed_for_git_repo(self, backend_git):
293 def test_hooks_are_installed_for_git_repo(self, backend_git):
281 repo = backend_git.create_repo()
294 repo = backend_git.create_repo()
282 model = scm.ScmModel()
295 model = scm.ScmModel()
283 scm_repo = repo.scm_instance()
296 scm_repo = repo.scm_instance()
284 with patch.object(model, 'install_git_hook') as hooks_mock:
297 with patch.object(model, 'install_git_hook') as hooks_mock:
285 model.install_hooks(scm_repo, repo_type='git')
298 model.install_hooks(scm_repo, repo_type='git')
286 hooks_mock.assert_called_once_with(scm_repo)
299 hooks_mock.assert_called_once_with(scm_repo)
287
300
288 def test_hooks_are_installed_for_svn_repo(self, backend_svn):
301 def test_hooks_are_installed_for_svn_repo(self, backend_svn):
289 repo = backend_svn.create_repo()
302 repo = backend_svn.create_repo()
290 scm_repo = repo.scm_instance()
303 scm_repo = repo.scm_instance()
291 model = scm.ScmModel()
304 model = scm.ScmModel()
292 with patch.object(scm.ScmModel, 'install_svn_hooks') as hooks_mock:
305 with patch.object(scm.ScmModel, 'install_svn_hooks') as hooks_mock:
293 model.install_hooks(scm_repo, repo_type='svn')
306 model.install_hooks(scm_repo, repo_type='svn')
294 hooks_mock.assert_called_once_with(scm_repo)
307 hooks_mock.assert_called_once_with(scm_repo)
295
308
296 @pytest.mark.parametrize('hook_method', [
309 @pytest.mark.parametrize('hook_method', [
297 'install_svn_hooks',
310 'install_svn_hooks',
298 'install_git_hook'])
311 'install_git_hook'])
299 def test_mercurial_doesnt_trigger_hooks(self, backend_hg, hook_method):
312 def test_mercurial_doesnt_trigger_hooks(self, backend_hg, hook_method):
300 repo = backend_hg.create_repo()
313 repo = backend_hg.create_repo()
301 scm_repo = repo.scm_instance()
314 scm_repo = repo.scm_instance()
302 model = scm.ScmModel()
315 model = scm.ScmModel()
303 with patch.object(scm.ScmModel, hook_method) as hooks_mock:
316 with patch.object(scm.ScmModel, hook_method) as hooks_mock:
304 model.install_hooks(scm_repo, repo_type='hg')
317 model.install_hooks(scm_repo, repo_type='hg')
305 assert hooks_mock.call_count == 0
318 assert hooks_mock.call_count == 0
General Comments 0
You need to be logged in to leave comments. Login now