##// END OF EJS Templates
tests: Fix mocking of get_scm and get_backend functions.
Martin Bornhold -
r488:c2409823 default
parent child Browse files
Show More
@@ -1,318 +1,318 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import stat
23 23 import sys
24 24
25 25 import pytest
26 26 from mock import Mock, patch, DEFAULT
27 27
28 28 import rhodecode
29 29 from rhodecode.model import db, scm
30 30
31 31
32 32 def test_scm_instance_config(backend):
33 33 repo = backend.create_repo()
34 34 with patch.multiple('rhodecode.model.db.Repository',
35 35 _get_instance=DEFAULT,
36 36 _get_instance_cached=DEFAULT) as mocks:
37 37 repo.scm_instance()
38 38 mocks['_get_instance'].assert_called_with(
39 39 config=None, cache=False)
40 40
41 41 config = {'some': 'value'}
42 42 repo.scm_instance(config=config)
43 43 mocks['_get_instance'].assert_called_with(
44 44 config=config, cache=False)
45 45
46 46 with patch.dict(rhodecode.CONFIG, {'vcs_full_cache': 'true'}):
47 47 repo.scm_instance(config=config)
48 48 mocks['_get_instance_cached'].assert_called()
49 49
50 50
51 51 def test__get_instance_config(backend):
52 52 repo = backend.create_repo()
53 53 vcs_class = Mock()
54 with patch.multiple('rhodecode.model.db',
54 with patch.multiple('rhodecode.lib.vcs.backends',
55 55 get_scm=DEFAULT,
56 56 get_backend=DEFAULT) as mocks:
57 57 mocks['get_scm'].return_value = backend.alias
58 58 mocks['get_backend'].return_value = vcs_class
59 59 with patch('rhodecode.model.db.Repository._config') as config_mock:
60 60 repo._get_instance()
61 61 vcs_class.assert_called_with(
62 repo.repo_full_path, config=config_mock, create=False,
63 with_wire={'cache': True})
62 repo_path=repo.repo_full_path, config=config_mock,
63 create=False, with_wire={'cache': True})
64 64
65 65 new_config = {'override': 'old_config'}
66 66 repo._get_instance(config=new_config)
67 67 vcs_class.assert_called_with(
68 repo.repo_full_path, config=new_config, create=False,
68 repo_path=repo.repo_full_path, config=new_config, create=False,
69 69 with_wire={'cache': True})
70 70
71 71
72 72 def test_mark_for_invalidation_config(backend):
73 73 repo = backend.create_repo()
74 74 with patch('rhodecode.model.db.Repository.update_commit_cache') as _mock:
75 75 scm.ScmModel().mark_for_invalidation(repo.repo_name)
76 76 _, kwargs = _mock.call_args
77 77 assert kwargs['config'].__dict__ == repo._config.__dict__
78 78
79 79
80 80 def test_mark_for_invalidation_with_delete_updates_last_commit(backend):
81 81 commits = [{'message': 'A'}, {'message': 'B'}]
82 82 repo = backend.create_repo(commits=commits)
83 83 scm.ScmModel().mark_for_invalidation(repo.repo_name, delete=True)
84 84 assert repo.changeset_cache['revision'] == 1
85 85
86 86
87 87 def test_mark_for_invalidation_with_delete_updates_last_commit_empty(backend):
88 88 repo = backend.create_repo()
89 89 scm.ScmModel().mark_for_invalidation(repo.repo_name, delete=True)
90 90 assert repo.changeset_cache['revision'] == -1
91 91
92 92
93 93 def test_strip_with_multiple_heads(backend_hg):
94 94 commits = [
95 95 {'message': 'A'},
96 96 {'message': 'a'},
97 97 {'message': 'b'},
98 98 {'message': 'B', 'parents': ['A']},
99 99 {'message': 'a1'},
100 100 ]
101 101 repo = backend_hg.create_repo(commits=commits)
102 102 commit_ids = backend_hg.commit_ids
103 103
104 104 model = scm.ScmModel()
105 105 model.strip(repo, commit_ids['b'], branch=None)
106 106
107 107 vcs_repo = repo.scm_instance()
108 108 rest_commit_ids = [c.raw_id for c in vcs_repo.get_changesets()]
109 109 assert len(rest_commit_ids) == 4
110 110 assert commit_ids['b'] not in rest_commit_ids
111 111
112 112
113 113 def test_strip_with_single_heads(backend_hg):
114 114 commits = [
115 115 {'message': 'A'},
116 116 {'message': 'a'},
117 117 {'message': 'b'},
118 118 ]
119 119 repo = backend_hg.create_repo(commits=commits)
120 120 commit_ids = backend_hg.commit_ids
121 121
122 122 model = scm.ScmModel()
123 123 model.strip(repo, commit_ids['b'], branch=None)
124 124
125 125 vcs_repo = repo.scm_instance()
126 126 rest_commit_ids = [c.raw_id for c in vcs_repo.get_changesets()]
127 127 assert len(rest_commit_ids) == 2
128 128 assert commit_ids['b'] not in rest_commit_ids
129 129
130 130
131 131 def test_get_nodes_returns_unicode_flat(backend_random):
132 132 repo = backend_random.repo
133 133 directories, files = scm.ScmModel().get_nodes(
134 134 repo.repo_name, repo.get_commit(commit_idx=0).raw_id,
135 135 flat=True)
136 136 assert_contains_only_unicode(directories)
137 137 assert_contains_only_unicode(files)
138 138
139 139
140 140 def test_get_nodes_returns_unicode_non_flat(backend_random):
141 141 repo = backend_random.repo
142 142 directories, files = scm.ScmModel().get_nodes(
143 143 repo.repo_name, repo.get_commit(commit_idx=0).raw_id,
144 144 flat=False)
145 145 # johbo: Checking only the names for now, since that is the critical
146 146 # part.
147 147 assert_contains_only_unicode([d['name'] for d in directories])
148 148 assert_contains_only_unicode([f['name'] for f in files])
149 149
150 150
151 151 def assert_contains_only_unicode(structure):
152 152 assert structure
153 153 for value in structure:
154 154 assert isinstance(value, unicode)
155 155
156 156
157 157 @pytest.mark.backends("hg", "git")
158 158 def test_get_non_unicode_reference(backend):
159 159 model = scm.ScmModel()
160 160 non_unicode_list = ["AdΔ±nΔ±".decode("cp1254")]
161 161
162 162 def scm_instance():
163 163 return Mock(
164 164 branches=non_unicode_list, bookmarks=non_unicode_list,
165 165 tags=non_unicode_list, alias=backend.alias)
166 166
167 167 repo = Mock(__class__=db.Repository, scm_instance=scm_instance)
168 168 choices, __ = model.get_repo_landing_revs(repo=repo)
169 169 if backend.alias == 'hg':
170 170 valid_choices = [
171 171 'rev:tip', u'branch:Ad\xc4\xb1n\xc4\xb1',
172 172 u'book:Ad\xc4\xb1n\xc4\xb1', u'tag:Ad\xc4\xb1n\xc4\xb1']
173 173 else:
174 174 valid_choices = [
175 175 'rev:tip', u'branch:Ad\xc4\xb1n\xc4\xb1',
176 176 u'tag:Ad\xc4\xb1n\xc4\xb1']
177 177
178 178 assert choices == valid_choices
179 179
180 180
181 181 class TestInstallSvnHooks(object):
182 182 HOOK_FILES = ('pre-commit', 'post-commit')
183 183
184 184 def test_new_hooks_are_created(self, backend_svn):
185 185 model = scm.ScmModel()
186 186 repo = backend_svn.create_repo()
187 187 vcs_repo = repo.scm_instance()
188 188 model.install_svn_hooks(vcs_repo)
189 189
190 190 hooks_path = os.path.join(vcs_repo.path, 'hooks')
191 191 assert os.path.isdir(hooks_path)
192 192 for file_name in self.HOOK_FILES:
193 193 file_path = os.path.join(hooks_path, file_name)
194 194 self._check_hook_file_mode(file_path)
195 195 self._check_hook_file_content(file_path)
196 196
197 197 def test_rc_hooks_are_replaced(self, backend_svn):
198 198 model = scm.ScmModel()
199 199 repo = backend_svn.create_repo()
200 200 vcs_repo = repo.scm_instance()
201 201 hooks_path = os.path.join(vcs_repo.path, 'hooks')
202 202 file_paths = [os.path.join(hooks_path, f) for f in self.HOOK_FILES]
203 203
204 204 for file_path in file_paths:
205 205 self._create_fake_hook(
206 206 file_path, content="RC_HOOK_VER = 'abcde'\n")
207 207
208 208 model.install_svn_hooks(vcs_repo)
209 209
210 210 for file_path in file_paths:
211 211 self._check_hook_file_content(file_path)
212 212
213 213 def test_non_rc_hooks_are_not_replaced_without_force_create(
214 214 self, backend_svn):
215 215 model = scm.ScmModel()
216 216 repo = backend_svn.create_repo()
217 217 vcs_repo = repo.scm_instance()
218 218 hooks_path = os.path.join(vcs_repo.path, 'hooks')
219 219 file_paths = [os.path.join(hooks_path, f) for f in self.HOOK_FILES]
220 220 non_rc_content = "exit 0\n"
221 221
222 222 for file_path in file_paths:
223 223 self._create_fake_hook(file_path, content=non_rc_content)
224 224
225 225 model.install_svn_hooks(vcs_repo)
226 226
227 227 for file_path in file_paths:
228 228 with open(file_path, 'rt') as hook_file:
229 229 content = hook_file.read()
230 230 assert content == non_rc_content
231 231
232 232 def test_non_rc_hooks_are_replaced_with_force_create(self, backend_svn):
233 233 model = scm.ScmModel()
234 234 repo = backend_svn.create_repo()
235 235 vcs_repo = repo.scm_instance()
236 236 hooks_path = os.path.join(vcs_repo.path, 'hooks')
237 237 file_paths = [os.path.join(hooks_path, f) for f in self.HOOK_FILES]
238 238 non_rc_content = "exit 0\n"
239 239
240 240 for file_path in file_paths:
241 241 self._create_fake_hook(file_path, content=non_rc_content)
242 242
243 243 model.install_svn_hooks(vcs_repo, force_create=True)
244 244
245 245 for file_path in file_paths:
246 246 self._check_hook_file_content(file_path)
247 247
248 248 def _check_hook_file_mode(self, file_path):
249 249 assert os.path.exists(file_path)
250 250 stat_info = os.stat(file_path)
251 251
252 252 file_mode = stat.S_IMODE(stat_info.st_mode)
253 253 expected_mode = int('755', 8)
254 254 assert expected_mode == file_mode
255 255
256 256 def _check_hook_file_content(self, file_path):
257 257 with open(file_path, 'rt') as hook_file:
258 258 content = hook_file.read()
259 259
260 260 expected_env = '#!{}'.format(sys.executable)
261 261 expected_rc_version = "\nRC_HOOK_VER = '{}'\n".format(
262 262 rhodecode.__version__)
263 263 assert content.strip().startswith(expected_env)
264 264 assert expected_rc_version in content
265 265
266 266 def _create_fake_hook(self, file_path, content):
267 267 with open(file_path, 'w') as hook_file:
268 268 hook_file.write(content)
269 269
270 270
271 271 class TestCheckRhodecodeHook(object):
272 272
273 273 @patch('os.path.exists', Mock(return_value=False))
274 274 def test_returns_true_when_no_hook_found(self):
275 275 result = scm._check_rhodecode_hook('/tmp/fake_hook_file.py')
276 276 assert result
277 277
278 278 @pytest.mark.parametrize("file_content, expected_result", [
279 279 ("RC_HOOK_VER = '3.3.3'\n", True),
280 280 ("RC_HOOK = '3.3.3'\n", False),
281 281 ])
282 282 @patch('os.path.exists', Mock(return_value=True))
283 283 def test_signatures(self, file_content, expected_result):
284 284 hook_content_patcher = patch.object(
285 285 scm, '_read_hook', return_value=file_content)
286 286 with hook_content_patcher:
287 287 result = scm._check_rhodecode_hook('/tmp/fake_hook_file.py')
288 288
289 289 assert result is expected_result
290 290
291 291
292 292 class TestInstallHooks(object):
293 293 def test_hooks_are_installed_for_git_repo(self, backend_git):
294 294 repo = backend_git.create_repo()
295 295 model = scm.ScmModel()
296 296 scm_repo = repo.scm_instance()
297 297 with patch.object(model, 'install_git_hook') as hooks_mock:
298 298 model.install_hooks(scm_repo, repo_type='git')
299 299 hooks_mock.assert_called_once_with(scm_repo)
300 300
301 301 def test_hooks_are_installed_for_svn_repo(self, backend_svn):
302 302 repo = backend_svn.create_repo()
303 303 scm_repo = repo.scm_instance()
304 304 model = scm.ScmModel()
305 305 with patch.object(scm.ScmModel, 'install_svn_hooks') as hooks_mock:
306 306 model.install_hooks(scm_repo, repo_type='svn')
307 307 hooks_mock.assert_called_once_with(scm_repo)
308 308
309 309 @pytest.mark.parametrize('hook_method', [
310 310 'install_svn_hooks',
311 311 'install_git_hook'])
312 312 def test_mercurial_doesnt_trigger_hooks(self, backend_hg, hook_method):
313 313 repo = backend_hg.create_repo()
314 314 scm_repo = repo.scm_instance()
315 315 model = scm.ScmModel()
316 316 with patch.object(scm.ScmModel, hook_method) as hooks_mock:
317 317 model.install_hooks(scm_repo, repo_type='hg')
318 318 assert hooks_mock.call_count == 0
General Comments 0
You need to be logged in to leave comments. Login now