##// END OF EJS Templates
scm: add tests for .mark_for_invalidation(delete=True)
dan -
r338:85729415 default
parent child Browse files
Show More
@@ -1,305 +1,318 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import stat
23 23 import sys
24 24
25 25 import pytest
26 26 from mock import Mock, patch, DEFAULT
27 27
28 28 import rhodecode
29 29 from rhodecode.model import db, scm
30 30
31 31
32 32 def test_scm_instance_config(backend):
33 33 repo = backend.create_repo()
34 34 with patch.multiple('rhodecode.model.db.Repository',
35 35 _get_instance=DEFAULT,
36 36 _get_instance_cached=DEFAULT) as mocks:
37 37 repo.scm_instance()
38 38 mocks['_get_instance'].assert_called_with(
39 39 config=None, cache=False)
40 40
41 41 config = {'some': 'value'}
42 42 repo.scm_instance(config=config)
43 43 mocks['_get_instance'].assert_called_with(
44 44 config=config, cache=False)
45 45
46 46 with patch.dict(rhodecode.CONFIG, {'vcs_full_cache': 'true'}):
47 47 repo.scm_instance(config=config)
48 48 mocks['_get_instance_cached'].assert_called()
49 49
50 50
51 51 def test__get_instance_config(backend):
52 52 repo = backend.create_repo()
53 53 vcs_class = Mock()
54 54 with patch.multiple('rhodecode.model.db',
55 55 get_scm=DEFAULT,
56 56 get_backend=DEFAULT) as mocks:
57 57 mocks['get_scm'].return_value = backend.alias
58 58 mocks['get_backend'].return_value = vcs_class
59 59 with patch('rhodecode.model.db.Repository._config') as config_mock:
60 60 repo._get_instance()
61 61 vcs_class.assert_called_with(
62 62 repo.repo_full_path, config=config_mock, create=False,
63 63 with_wire={'cache': True})
64 64
65 65 new_config = {'override': 'old_config'}
66 66 repo._get_instance(config=new_config)
67 67 vcs_class.assert_called_with(
68 68 repo.repo_full_path, config=new_config, create=False,
69 69 with_wire={'cache': True})
70 70
71 71
72 72 def test_mark_for_invalidation_config(backend):
73 73 repo = backend.create_repo()
74 74 with patch('rhodecode.model.db.Repository.update_commit_cache') as _mock:
75 75 scm.ScmModel().mark_for_invalidation(repo.repo_name)
76 76 _, kwargs = _mock.call_args
77 77 assert kwargs['config'].__dict__ == repo._config.__dict__
78 78
79 79
80 def test_mark_for_invalidation_with_delete_updates_last_commit(backend):
81 commits = [{'message': 'A'}, {'message': 'B'}]
82 repo = backend.create_repo(commits=commits)
83 scm.ScmModel().mark_for_invalidation(repo.repo_name, delete=True)
84 assert repo.changeset_cache['revision'] == 1
85
86
87 def test_mark_for_invalidation_with_delete_updates_last_commit_empty(backend):
88 repo = backend.create_repo()
89 scm.ScmModel().mark_for_invalidation(repo.repo_name, delete=True)
90 assert repo.changeset_cache['revision'] == -1
91
92
80 93 def test_strip_with_multiple_heads(backend_hg):
81 94 commits = [
82 95 {'message': 'A'},
83 96 {'message': 'a'},
84 97 {'message': 'b'},
85 98 {'message': 'B', 'parents': ['A']},
86 99 {'message': 'a1'},
87 100 ]
88 101 repo = backend_hg.create_repo(commits=commits)
89 102 commit_ids = backend_hg.commit_ids
90 103
91 104 model = scm.ScmModel()
92 105 model.strip(repo, commit_ids['b'], branch=None)
93 106
94 107 vcs_repo = repo.scm_instance()
95 108 rest_commit_ids = [c.raw_id for c in vcs_repo.get_changesets()]
96 109 assert len(rest_commit_ids) == 4
97 110 assert commit_ids['b'] not in rest_commit_ids
98 111
99 112
100 113 def test_strip_with_single_heads(backend_hg):
101 114 commits = [
102 115 {'message': 'A'},
103 116 {'message': 'a'},
104 117 {'message': 'b'},
105 118 ]
106 119 repo = backend_hg.create_repo(commits=commits)
107 120 commit_ids = backend_hg.commit_ids
108 121
109 122 model = scm.ScmModel()
110 123 model.strip(repo, commit_ids['b'], branch=None)
111 124
112 125 vcs_repo = repo.scm_instance()
113 126 rest_commit_ids = [c.raw_id for c in vcs_repo.get_changesets()]
114 127 assert len(rest_commit_ids) == 2
115 128 assert commit_ids['b'] not in rest_commit_ids
116 129
117 130
118 131 def test_get_nodes_returns_unicode_flat(backend_random):
119 132 repo = backend_random.repo
120 133 directories, files = scm.ScmModel().get_nodes(
121 134 repo.repo_name, repo.get_commit(commit_idx=0).raw_id,
122 135 flat=True)
123 136 assert_contains_only_unicode(directories)
124 137 assert_contains_only_unicode(files)
125 138
126 139
127 140 def test_get_nodes_returns_unicode_non_flat(backend_random):
128 141 repo = backend_random.repo
129 142 directories, files = scm.ScmModel().get_nodes(
130 143 repo.repo_name, repo.get_commit(commit_idx=0).raw_id,
131 144 flat=False)
132 145 # johbo: Checking only the names for now, since that is the critical
133 146 # part.
134 147 assert_contains_only_unicode([d['name'] for d in directories])
135 148 assert_contains_only_unicode([f['name'] for f in files])
136 149
137 150
138 151 def assert_contains_only_unicode(structure):
139 152 assert structure
140 153 for value in structure:
141 154 assert isinstance(value, unicode)
142 155
143 156
144 157 @pytest.mark.backends("hg", "git")
145 158 def test_get_non_unicode_reference(backend):
146 159 model = scm.ScmModel()
147 160 non_unicode_list = ["AdΔ±nΔ±".decode("cp1254")]
148 161
149 162 def scm_instance():
150 163 return Mock(
151 164 branches=non_unicode_list, bookmarks=non_unicode_list,
152 165 tags=non_unicode_list, alias=backend.alias)
153 166
154 167 repo = Mock(__class__=db.Repository, scm_instance=scm_instance)
155 168 choices, __ = model.get_repo_landing_revs(repo=repo)
156 169 if backend.alias == 'hg':
157 170 valid_choices = [
158 171 'rev:tip', u'branch:Ad\xc4\xb1n\xc4\xb1',
159 172 u'book:Ad\xc4\xb1n\xc4\xb1', u'tag:Ad\xc4\xb1n\xc4\xb1']
160 173 else:
161 174 valid_choices = [
162 175 'rev:tip', u'branch:Ad\xc4\xb1n\xc4\xb1',
163 176 u'tag:Ad\xc4\xb1n\xc4\xb1']
164 177
165 178 assert choices == valid_choices
166 179
167 180
168 181 class TestInstallSvnHooks(object):
169 182 HOOK_FILES = ('pre-commit', 'post-commit')
170 183
171 184 def test_new_hooks_are_created(self, backend_svn):
172 185 model = scm.ScmModel()
173 186 repo = backend_svn.create_repo()
174 187 vcs_repo = repo.scm_instance()
175 188 model.install_svn_hooks(vcs_repo)
176 189
177 190 hooks_path = os.path.join(vcs_repo.path, 'hooks')
178 191 assert os.path.isdir(hooks_path)
179 192 for file_name in self.HOOK_FILES:
180 193 file_path = os.path.join(hooks_path, file_name)
181 194 self._check_hook_file_mode(file_path)
182 195 self._check_hook_file_content(file_path)
183 196
184 197 def test_rc_hooks_are_replaced(self, backend_svn):
185 198 model = scm.ScmModel()
186 199 repo = backend_svn.create_repo()
187 200 vcs_repo = repo.scm_instance()
188 201 hooks_path = os.path.join(vcs_repo.path, 'hooks')
189 202 file_paths = [os.path.join(hooks_path, f) for f in self.HOOK_FILES]
190 203
191 204 for file_path in file_paths:
192 205 self._create_fake_hook(
193 206 file_path, content="RC_HOOK_VER = 'abcde'\n")
194 207
195 208 model.install_svn_hooks(vcs_repo)
196 209
197 210 for file_path in file_paths:
198 211 self._check_hook_file_content(file_path)
199 212
200 213 def test_non_rc_hooks_are_not_replaced_without_force_create(
201 214 self, backend_svn):
202 215 model = scm.ScmModel()
203 216 repo = backend_svn.create_repo()
204 217 vcs_repo = repo.scm_instance()
205 218 hooks_path = os.path.join(vcs_repo.path, 'hooks')
206 219 file_paths = [os.path.join(hooks_path, f) for f in self.HOOK_FILES]
207 220 non_rc_content = "exit 0\n"
208 221
209 222 for file_path in file_paths:
210 223 self._create_fake_hook(file_path, content=non_rc_content)
211 224
212 225 model.install_svn_hooks(vcs_repo)
213 226
214 227 for file_path in file_paths:
215 228 with open(file_path, 'rt') as hook_file:
216 229 content = hook_file.read()
217 230 assert content == non_rc_content
218 231
219 232 def test_non_rc_hooks_are_replaced_with_force_create(self, backend_svn):
220 233 model = scm.ScmModel()
221 234 repo = backend_svn.create_repo()
222 235 vcs_repo = repo.scm_instance()
223 236 hooks_path = os.path.join(vcs_repo.path, 'hooks')
224 237 file_paths = [os.path.join(hooks_path, f) for f in self.HOOK_FILES]
225 238 non_rc_content = "exit 0\n"
226 239
227 240 for file_path in file_paths:
228 241 self._create_fake_hook(file_path, content=non_rc_content)
229 242
230 243 model.install_svn_hooks(vcs_repo, force_create=True)
231 244
232 245 for file_path in file_paths:
233 246 self._check_hook_file_content(file_path)
234 247
235 248 def _check_hook_file_mode(self, file_path):
236 249 assert os.path.exists(file_path)
237 250 stat_info = os.stat(file_path)
238 251
239 252 file_mode = stat.S_IMODE(stat_info.st_mode)
240 253 expected_mode = int('755', 8)
241 254 assert expected_mode == file_mode
242 255
243 256 def _check_hook_file_content(self, file_path):
244 257 with open(file_path, 'rt') as hook_file:
245 258 content = hook_file.read()
246 259
247 260 expected_env = '#!{}'.format(sys.executable)
248 261 expected_rc_version = "\nRC_HOOK_VER = '{}'\n".format(
249 262 rhodecode.__version__)
250 263 assert content.strip().startswith(expected_env)
251 264 assert expected_rc_version in content
252 265
253 266 def _create_fake_hook(self, file_path, content):
254 267 with open(file_path, 'w') as hook_file:
255 268 hook_file.write(content)
256 269
257 270
258 271 class TestCheckRhodecodeHook(object):
259 272
260 273 @patch('os.path.exists', Mock(return_value=False))
261 274 def test_returns_true_when_no_hook_found(self):
262 275 result = scm._check_rhodecode_hook('/tmp/fake_hook_file.py')
263 276 assert result
264 277
265 278 @pytest.mark.parametrize("file_content, expected_result", [
266 279 ("RC_HOOK_VER = '3.3.3'\n", True),
267 280 ("RC_HOOK = '3.3.3'\n", False),
268 281 ])
269 282 @patch('os.path.exists', Mock(return_value=True))
270 283 def test_signatures(self, file_content, expected_result):
271 284 hook_content_patcher = patch.object(
272 285 scm, '_read_hook', return_value=file_content)
273 286 with hook_content_patcher:
274 287 result = scm._check_rhodecode_hook('/tmp/fake_hook_file.py')
275 288
276 289 assert result is expected_result
277 290
278 291
279 292 class TestInstallHooks(object):
280 293 def test_hooks_are_installed_for_git_repo(self, backend_git):
281 294 repo = backend_git.create_repo()
282 295 model = scm.ScmModel()
283 296 scm_repo = repo.scm_instance()
284 297 with patch.object(model, 'install_git_hook') as hooks_mock:
285 298 model.install_hooks(scm_repo, repo_type='git')
286 299 hooks_mock.assert_called_once_with(scm_repo)
287 300
288 301 def test_hooks_are_installed_for_svn_repo(self, backend_svn):
289 302 repo = backend_svn.create_repo()
290 303 scm_repo = repo.scm_instance()
291 304 model = scm.ScmModel()
292 305 with patch.object(scm.ScmModel, 'install_svn_hooks') as hooks_mock:
293 306 model.install_hooks(scm_repo, repo_type='svn')
294 307 hooks_mock.assert_called_once_with(scm_repo)
295 308
296 309 @pytest.mark.parametrize('hook_method', [
297 310 'install_svn_hooks',
298 311 'install_git_hook'])
299 312 def test_mercurial_doesnt_trigger_hooks(self, backend_hg, hook_method):
300 313 repo = backend_hg.create_repo()
301 314 scm_repo = repo.scm_instance()
302 315 model = scm.ScmModel()
303 316 with patch.object(scm.ScmModel, hook_method) as hooks_mock:
304 317 model.install_hooks(scm_repo, repo_type='hg')
305 318 assert hooks_mock.call_count == 0
General Comments 0
You need to be logged in to leave comments. Login now