##// END OF EJS Templates
tests: fixed simplevcs tests
super-admin -
r5167:039ef6ec default
parent child Browse files
Show More
@@ -1,136 +1,138 b''
1 1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 """
20 20 py.test config for test suite for making push/pull operations.
21 21
22 22 .. important::
23 23
24 24 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
25 25 to redirect things to stderr instead of stdout.
26 26 """
27 27
28 28 import pytest
29 29 import logging
30 30
31 31 from rhodecode.authentication import AuthenticationPluginRegistry
32 32 from rhodecode.model.db import Permission, User
33 33 from rhodecode.model.meta import Session
34 34 from rhodecode.model.settings import SettingsModel
35 35 from rhodecode.model.user import UserModel
36 36
37 37
38 38 log = logging.getLogger(__name__)
39 39
40 40 # Docker image running httpbin...
41 41 HTTPBIN_DOMAIN = 'http://httpbin'
42 42 HTTPBIN_POST = HTTPBIN_DOMAIN + '/post'
43 43
44 44
45 45 @pytest.fixture()
46 46 def enable_auth_plugins(request, baseapp, csrf_token):
47 47 """
48 48 Return a factory object that when called, allows to control which
49 49 authentication plugins are enabled.
50 50 """
51 51
52 52 class AuthPluginManager(object):
53 53
54 54 def cleanup(self):
55 55 self._enable_plugins(['egg:rhodecode-enterprise-ce#rhodecode'])
56 56
57 57 def enable(self, plugins_list, override=None):
58 58 return self._enable_plugins(plugins_list, override)
59 59
60 60 def _enable_plugins(self, plugins_list, override=None):
61 61 override = override or {}
62 62 params = {
63 63 'auth_plugins': ','.join(plugins_list),
64 64 }
65 65
66 66 # helper translate some names to others, to fix settings code
67 67 name_map = {
68 68 'token': 'authtoken'
69 69 }
70 70 log.debug('enable_auth_plugins: enabling following auth-plugins: %s', plugins_list)
71 71
72 72 for module in plugins_list:
73 73 plugin_name = module.partition('#')[-1]
74 74 if plugin_name in name_map:
75 75 plugin_name = name_map[plugin_name]
76 76 enabled_plugin = f'auth_{plugin_name}_enabled'
77 77 cache_ttl = f'auth_{plugin_name}_cache_ttl'
78 78
79 79 # default params that are needed for each plugin,
80 80 # `enabled` and `cache_ttl`
81 81 params.update({
82 82 enabled_plugin: True,
83 83 cache_ttl: 0
84 84 })
85 85 if override.get:
86 86 params.update(override.get(module, {}))
87 87
88 88 validated_params = params
89 89
90 90 for k, v in validated_params.items():
91 91 setting = SettingsModel().create_or_update_setting(k, v)
92 92 Session().add(setting)
93 93 Session().commit()
94 94
95 95 AuthenticationPluginRegistry.invalidate_auth_plugins_cache(hard=True)
96 96
97 97 enabled_plugins = SettingsModel().get_auth_plugins()
98 98 assert plugins_list == enabled_plugins
99 99
100 100 enabler = AuthPluginManager()
101 101 request.addfinalizer(enabler.cleanup)
102 102
103 103 return enabler
104 104
105 105
106 106 @pytest.fixture()
107 107 def test_user_factory(request, baseapp):
108 108
109 109 def user_factory(username='test_user', password='qweqwe', first_name='John', last_name='Testing', **kwargs):
110 110 usr = UserModel().create_or_update(
111 111 username=username,
112 112 password=password,
113 113 email=f'{username}@rhodecode.org',
114 114 firstname=first_name, lastname=last_name)
115 115 Session().commit()
116 116
117 117 for k, v in kwargs.items():
118 118 setattr(usr, k, v)
119 119 Session().add(usr)
120 120
121 assert User.get_by_username(username) == usr
121 new_usr = User.get_by_username(username)
122 new_usr_id = new_usr.user_id
123 assert new_usr == usr
122 124
123 125 @request.addfinalizer
124 126 def cleanup():
125 if UserModel().get_user(usr.user_id) is None:
127 if User.get(new_usr_id) is None:
126 128 return
127 129
128 130 perm = Permission.query().all()
129 131 for p in perm:
130 132 UserModel().revoke_perm(usr, p)
131 133
132 UserModel().delete(usr.user_id)
134 UserModel().delete(new_usr_id)
133 135 Session().commit()
134 136 return usr
135 137
136 138 return user_factory
@@ -1,492 +1,487 b''
1 1
2 2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software: you can redistribute it and/or modify
5 5 # it under the terms of the GNU Affero General Public License, version 3
6 6 # (only), as published by the Free Software Foundation.
7 7 #
8 8 # This program is distributed in the hope that it will be useful,
9 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 11 # GNU General Public License for more details.
12 12 #
13 13 # You should have received a copy of the GNU Affero General Public License
14 14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 15 #
16 16 # This program is dual-licensed. If you wish to learn more about the
17 17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 19
20 20 import mock
21 21 import pytest
22 22
23 23 from rhodecode.lib.str_utils import base64_to_str
24 24 from rhodecode.lib.utils2 import AttributeDict
25 25 from rhodecode.tests.utils import CustomTestApp
26 26
27 27 from rhodecode.lib.caching_query import FromCache
28 28 from rhodecode.lib.hooks_daemon import DummyHooksCallbackDaemon
29 29 from rhodecode.lib.middleware import simplevcs
30 30 from rhodecode.lib.middleware.https_fixup import HttpsFixup
31 31 from rhodecode.lib.middleware.utils import scm_app_http
32 32 from rhodecode.model.db import User, _hash_key
33 33 from rhodecode.model.meta import Session, cache as db_cache
34 34 from rhodecode.tests import (
35 35 HG_REPO, TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS)
36 36 from rhodecode.tests.lib.middleware import mock_scm_app
37 37
38 38
39 39 class StubVCSController(simplevcs.SimpleVCS):
40 40
41 41 SCM = 'hg'
42 42 stub_response_body = tuple()
43 43
44 44 def __init__(self, *args, **kwargs):
45 45 super(StubVCSController, self).__init__(*args, **kwargs)
46 46 self._action = 'pull'
47 47 self._is_shadow_repo_dir = True
48 48 self._name = HG_REPO
49 49 self.set_repo_names(None)
50 50
51 51 @property
52 52 def is_shadow_repo_dir(self):
53 53 return self._is_shadow_repo_dir
54 54
55 55 def _get_repository_name(self, environ):
56 56 return self._name
57 57
58 58 def _get_action(self, environ):
59 59 return self._action
60 60
61 61 def _create_wsgi_app(self, repo_path, repo_name, config):
62 62 def fake_app(environ, start_response):
63 63 headers = [
64 64 ('Http-Accept', 'application/mercurial')
65 65 ]
66 66 start_response('200 OK', headers)
67 67 return self.stub_response_body
68 68 return fake_app
69 69
70 70 def _create_config(self, extras, repo_name, scheme='http'):
71 71 return None
72 72
73 73
74 74 @pytest.fixture()
75 75 def vcscontroller(baseapp, config_stub, request_stub):
76 76 from rhodecode.config.middleware import ce_auth_resources
77 77
78 78 config_stub.testing_securitypolicy()
79 79 config_stub.include('rhodecode.authentication')
80 80
81 81 for resource in ce_auth_resources:
82 82 config_stub.include(resource)
83 83
84 84 controller = StubVCSController(
85 85 baseapp.config.get_settings(), request_stub.registry)
86 86 app = HttpsFixup(controller, baseapp.config.get_settings())
87 87 app = CustomTestApp(app)
88 88
89 89 _remove_default_user_from_query_cache()
90 90
91 91 # Sanity checks that things are set up correctly
92 92 app.get('/' + HG_REPO, status=200)
93 93
94 94 app.controller = controller
95 95 return app
96 96
97 97
98 98 def _remove_default_user_from_query_cache():
99 99 user = User.get_default_user(cache=True)
100 100 query = Session().query(User).filter(User.username == user.username)
101 101 query = query.options(
102 102 FromCache("sql_cache_short", f"get_user_{_hash_key(user.username)}"))
103 103
104 104 db_cache.invalidate(
105 105 query, {},
106 106 FromCache("sql_cache_short", f"get_user_{_hash_key(user.username)}"))
107 107
108 108 Session().expire(user)
109 109
110 110
111 111 def test_handles_exceptions_during_permissions_checks(
112 112 vcscontroller, disable_anonymous_user, enable_auth_plugins, test_user_factory):
113 113
114 114 test_password = 'qweqwe'
115 115 test_user = test_user_factory(password=test_password, extern_type='headers', extern_name='headers')
116 116 test_username = test_user.username
117 117
118 118 enable_auth_plugins.enable([
119 119 'egg:rhodecode-enterprise-ce#headers',
120 120 'egg:rhodecode-enterprise-ce#token',
121 121 'egg:rhodecode-enterprise-ce#rhodecode'],
122 122 override={
123 123 'egg:rhodecode-enterprise-ce#headers': {'auth_headers_header': 'REMOTE_USER'}
124 124 })
125 125
126 126 user_and_pass = f'{test_username}:{test_password}'
127 127 auth_password = base64_to_str(user_and_pass)
128 128
129 129 extra_environ = {
130 130 'AUTH_TYPE': 'Basic',
131 131 'HTTP_AUTHORIZATION': f'Basic {auth_password}',
132 132 'REMOTE_USER': test_username,
133 133 }
134 134
135 135 # Verify that things are hooked up correctly, we pass user with headers bound auth, and headers filled in
136 136 vcscontroller.get('/', status=200, extra_environ=extra_environ)
137 137
138 138 # Simulate trouble during permission checks
139 139 with mock.patch('rhodecode.model.db.User.get_by_username',
140 140 side_effect=Exception('permission_error_test')) as get_user:
141 141 # Verify that a correct 500 is returned and check that the expected
142 142 # code path was hit.
143 143 vcscontroller.get('/', status=500, extra_environ=extra_environ)
144 144 assert get_user.called
145 145
146 146
147 def test_returns_forbidden_if_no_anonymous_access(
148 vcscontroller, disable_anonymous_user):
149 vcscontroller.get('/', status=401)
150
151
152 147 class StubFailVCSController(simplevcs.SimpleVCS):
153 148 def _handle_request(self, environ, start_response):
154 149 raise Exception("BOOM")
155 150
156 151
157 152 @pytest.fixture(scope='module')
158 153 def fail_controller(baseapp):
159 154 controller = StubFailVCSController(
160 155 baseapp.config.get_settings(), baseapp.config)
161 156 controller = HttpsFixup(controller, baseapp.config.get_settings())
162 157 controller = CustomTestApp(controller)
163 158 return controller
164 159
165 160
166 161 def test_handles_exceptions_as_internal_server_error(fail_controller):
167 162 fail_controller.get('/', status=500)
168 163
169 164
170 165 def test_provides_traceback_for_appenlight(fail_controller):
171 166 response = fail_controller.get(
172 167 '/', status=500, extra_environ={'appenlight.client': 'fake'})
173 168 assert 'appenlight.__traceback' in response.request.environ
174 169
175 170
176 171 def test_provides_utils_scm_app_as_scm_app_by_default(baseapp, request_stub):
177 172 controller = StubVCSController(baseapp.config.get_settings(), request_stub.registry)
178 173 assert controller.scm_app is scm_app_http
179 174
180 175
181 176 def test_allows_to_override_scm_app_via_config(baseapp, request_stub):
182 177 config = baseapp.config.get_settings().copy()
183 178 config['vcs.scm_app_implementation'] = (
184 179 'rhodecode.tests.lib.middleware.mock_scm_app')
185 180 controller = StubVCSController(config, request_stub.registry)
186 181 assert controller.scm_app is mock_scm_app
187 182
188 183
189 184 @pytest.mark.parametrize('query_string, expected', [
190 185 ('cmd=stub_command', True),
191 186 ('cmd=listkeys', False),
192 187 ])
193 188 def test_should_check_locking(query_string, expected):
194 189 result = simplevcs._should_check_locking(query_string)
195 190 assert result == expected
196 191
197 192
198 193 class TestShadowRepoRegularExpression(object):
199 194 pr_segment = 'pull-request'
200 195 shadow_segment = 'repository'
201 196
202 197 @pytest.mark.parametrize('url, expected', [
203 198 # repo with/without groups
204 199 ('My-Repo/{pr_segment}/1/{shadow_segment}', True),
205 200 ('Group/My-Repo/{pr_segment}/2/{shadow_segment}', True),
206 201 ('Group/Sub-Group/My-Repo/{pr_segment}/3/{shadow_segment}', True),
207 202 ('Group/Sub-Group1/Sub-Group2/My-Repo/{pr_segment}/3/{shadow_segment}', True),
208 203
209 204 # pull request ID
210 205 ('MyRepo/{pr_segment}/1/{shadow_segment}', True),
211 206 ('MyRepo/{pr_segment}/1234567890/{shadow_segment}', True),
212 207 ('MyRepo/{pr_segment}/-1/{shadow_segment}', False),
213 208 ('MyRepo/{pr_segment}/invalid/{shadow_segment}', False),
214 209
215 210 # unicode
216 211 (u'Sp€çîál-Repö/{pr_segment}/1/{shadow_segment}', True),
217 212 (u'Sp€çîál-Gröüp/Sp€çîál-Repö/{pr_segment}/1/{shadow_segment}', True),
218 213
219 214 # trailing/leading slash
220 215 ('/My-Repo/{pr_segment}/1/{shadow_segment}', False),
221 216 ('My-Repo/{pr_segment}/1/{shadow_segment}/', False),
222 217 ('/My-Repo/{pr_segment}/1/{shadow_segment}/', False),
223 218
224 219 # misc
225 220 ('My-Repo/{pr_segment}/1/{shadow_segment}/extra', False),
226 221 ('My-Repo/{pr_segment}/1/{shadow_segment}extra', False),
227 222 ])
228 223 def test_shadow_repo_regular_expression(self, url, expected):
229 224 from rhodecode.lib.middleware.simplevcs import SimpleVCS
230 225 url = url.format(
231 226 pr_segment=self.pr_segment,
232 227 shadow_segment=self.shadow_segment)
233 228 match_obj = SimpleVCS.shadow_repo_re.match(url)
234 229 assert (match_obj is not None) == expected
235 230
236 231
237 232 @pytest.mark.backends('git', 'hg')
238 233 class TestShadowRepoExposure(object):
239 234
240 235 def test_pull_on_shadow_repo_propagates_to_wsgi_app(
241 236 self, baseapp, request_stub):
242 237 """
243 238 Check that a pull action to a shadow repo is propagated to the
244 239 underlying wsgi app.
245 240 """
246 241 controller = StubVCSController(
247 242 baseapp.config.get_settings(), request_stub.registry)
248 243 controller._check_ssl = mock.Mock()
249 244 controller.is_shadow_repo = True
250 245 controller._action = 'pull'
251 246 controller._is_shadow_repo_dir = True
252 247 controller.stub_response_body = (b'dummy body value',)
253 248 controller._get_default_cache_ttl = mock.Mock(
254 249 return_value=(False, 0))
255 250
256 251 environ_stub = {
257 252 'HTTP_HOST': 'test.example.com',
258 253 'HTTP_ACCEPT': 'application/mercurial',
259 254 'REQUEST_METHOD': 'GET',
260 255 'wsgi.url_scheme': 'http',
261 256 }
262 257
263 258 response = controller(environ_stub, mock.Mock())
264 259 response_body = b''.join(response)
265 260
266 261 # Assert that we got the response from the wsgi app.
267 262 assert response_body == b''.join(controller.stub_response_body)
268 263
269 264 def test_pull_on_shadow_repo_that_is_missing(self, baseapp, request_stub):
270 265 """
271 266 Check that a pull action to a shadow repo is propagated to the
272 267 underlying wsgi app.
273 268 """
274 269 controller = StubVCSController(
275 270 baseapp.config.get_settings(), request_stub.registry)
276 271 controller._check_ssl = mock.Mock()
277 272 controller.is_shadow_repo = True
278 273 controller._action = 'pull'
279 274 controller._is_shadow_repo_dir = False
280 275 controller.stub_response_body = (b'dummy body value',)
281 276 environ_stub = {
282 277 'HTTP_HOST': 'test.example.com',
283 278 'HTTP_ACCEPT': 'application/mercurial',
284 279 'REQUEST_METHOD': 'GET',
285 280 'wsgi.url_scheme': 'http',
286 281 }
287 282
288 283 response = controller(environ_stub, mock.Mock())
289 284 response_body = b''.join(response)
290 285
291 286 # Assert that we got the response from the wsgi app.
292 287 assert b'404 Not Found' in response_body
293 288
294 289 def test_push_on_shadow_repo_raises(self, baseapp, request_stub):
295 290 """
296 291 Check that a push action to a shadow repo is aborted.
297 292 """
298 293 controller = StubVCSController(
299 294 baseapp.config.get_settings(), request_stub.registry)
300 295 controller._check_ssl = mock.Mock()
301 296 controller.is_shadow_repo = True
302 297 controller._action = 'push'
303 298 controller.stub_response_body = (b'dummy body value',)
304 299 environ_stub = {
305 300 'HTTP_HOST': 'test.example.com',
306 301 'HTTP_ACCEPT': 'application/mercurial',
307 302 'REQUEST_METHOD': 'GET',
308 303 'wsgi.url_scheme': 'http',
309 304 }
310 305
311 306 response = controller(environ_stub, mock.Mock())
312 307 response_body = b''.join(response)
313 308
314 309 assert response_body != controller.stub_response_body
315 310 # Assert that a 406 error is returned.
316 311 assert b'406 Not Acceptable' in response_body
317 312
318 313 def test_set_repo_names_no_shadow(self, baseapp, request_stub):
319 314 """
320 315 Check that the set_repo_names method sets all names to the one returned
321 316 by the _get_repository_name method on a request to a non shadow repo.
322 317 """
323 318 environ_stub = {}
324 319 controller = StubVCSController(
325 320 baseapp.config.get_settings(), request_stub.registry)
326 321 controller._name = 'RepoGroup/MyRepo'
327 322 controller.set_repo_names(environ_stub)
328 323 assert not controller.is_shadow_repo
329 324 assert (controller.url_repo_name ==
330 325 controller.acl_repo_name ==
331 326 controller.vcs_repo_name ==
332 327 controller._get_repository_name(environ_stub))
333 328
334 329 def test_set_repo_names_with_shadow(
335 330 self, baseapp, pr_util, config_stub, request_stub):
336 331 """
337 332 Check that the set_repo_names method sets correct names on a request
338 333 to a shadow repo.
339 334 """
340 335 from rhodecode.model.pull_request import PullRequestModel
341 336
342 337 pull_request = pr_util.create_pull_request()
343 338 shadow_url = '{target}/{pr_segment}/{pr_id}/{shadow_segment}'.format(
344 339 target=pull_request.target_repo.repo_name,
345 340 pr_id=pull_request.pull_request_id,
346 341 pr_segment=TestShadowRepoRegularExpression.pr_segment,
347 342 shadow_segment=TestShadowRepoRegularExpression.shadow_segment)
348 343 controller = StubVCSController(
349 344 baseapp.config.get_settings(), request_stub.registry)
350 345 controller._name = shadow_url
351 346 controller.set_repo_names({})
352 347
353 348 # Get file system path to shadow repo for assertions.
354 349 workspace_id = PullRequestModel()._workspace_id(pull_request)
355 350 vcs_repo_name = pull_request.target_repo.get_shadow_repository_path(workspace_id)
356 351
357 352 assert controller.vcs_repo_name == vcs_repo_name
358 353 assert controller.url_repo_name == shadow_url
359 354 assert controller.acl_repo_name == pull_request.target_repo.repo_name
360 355 assert controller.is_shadow_repo
361 356
362 357 def test_set_repo_names_with_shadow_but_missing_pr(
363 358 self, baseapp, pr_util, config_stub, request_stub):
364 359 """
365 360 Checks that the set_repo_names method enforces matching target repos
366 361 and pull request IDs.
367 362 """
368 363 pull_request = pr_util.create_pull_request()
369 364 shadow_url = '{target}/{pr_segment}/{pr_id}/{shadow_segment}'.format(
370 365 target=pull_request.target_repo.repo_name,
371 366 pr_id=999999999,
372 367 pr_segment=TestShadowRepoRegularExpression.pr_segment,
373 368 shadow_segment=TestShadowRepoRegularExpression.shadow_segment)
374 369 controller = StubVCSController(
375 370 baseapp.config.get_settings(), request_stub.registry)
376 371 controller._name = shadow_url
377 372 controller.set_repo_names({})
378 373
379 374 assert not controller.is_shadow_repo
380 375 assert (controller.url_repo_name ==
381 376 controller.acl_repo_name ==
382 377 controller.vcs_repo_name)
383 378
384 379
385 380 @pytest.mark.usefixtures('baseapp')
386 381 class TestGenerateVcsResponse(object):
387 382
388 383 def test_ensures_that_start_response_is_called_early_enough(self):
389 384 self.call_controller_with_response_body(iter(['a', 'b']))
390 385 assert self.start_response.called
391 386
392 387 def test_invalidates_cache_after_body_is_consumed(self):
393 388 result = self.call_controller_with_response_body(iter(['a', 'b']))
394 389 assert not self.was_cache_invalidated()
395 390 # Consume the result
396 391 list(result)
397 392 assert self.was_cache_invalidated()
398 393
399 394 def test_raises_unknown_exceptions(self):
400 395 result = self.call_controller_with_response_body(
401 396 self.raise_result_iter(vcs_kind='unknown'))
402 397 with pytest.raises(Exception):
403 398 list(result)
404 399
405 400 def test_prepare_callback_daemon_is_called(self):
406 401 def side_effect(extras, environ, action, txn_id=None):
407 402 return DummyHooksCallbackDaemon(), extras
408 403
409 404 prepare_patcher = mock.patch.object(
410 405 StubVCSController, '_prepare_callback_daemon')
411 406 with prepare_patcher as prepare_mock:
412 407 prepare_mock.side_effect = side_effect
413 408 self.call_controller_with_response_body(iter(['a', 'b']))
414 409 assert prepare_mock.called
415 410 assert prepare_mock.call_count == 1
416 411
417 412 def call_controller_with_response_body(self, response_body):
418 413 settings = {
419 414 'base_path': 'fake_base_path',
420 415 'vcs.hooks.protocol': 'http',
421 416 'vcs.hooks.direct_calls': False,
422 417 }
423 418 registry = AttributeDict()
424 419 controller = StubVCSController(settings, registry)
425 420 controller._invalidate_cache = mock.Mock()
426 421 controller.stub_response_body = response_body
427 422 self.start_response = mock.Mock()
428 423 result = controller._generate_vcs_response(
429 424 environ={}, start_response=self.start_response,
430 425 repo_path='fake_repo_path',
431 426 extras={}, action='push')
432 427 self.controller = controller
433 428 return result
434 429
435 430 def raise_result_iter(self, vcs_kind='repo_locked'):
436 431 """
437 432 Simulates an exception due to a vcs raised exception if kind vcs_kind
438 433 """
439 434 raise self.vcs_exception(vcs_kind=vcs_kind)
440 435 yield "never_reached"
441 436
442 437 def vcs_exception(self, vcs_kind='repo_locked'):
443 438 locked_exception = Exception('TEST_MESSAGE')
444 439 locked_exception._vcs_kind = vcs_kind
445 440 return locked_exception
446 441
447 442 def was_cache_invalidated(self):
448 443 return self.controller._invalidate_cache.called
449 444
450 445
451 446 class TestInitializeGenerator(object):
452 447
453 448 def test_drains_first_element(self):
454 449 gen = self.factory(['__init__', 1, 2])
455 450 result = list(gen)
456 451 assert result == [1, 2]
457 452
458 453 @pytest.mark.parametrize('values', [
459 454 [],
460 455 [1, 2],
461 456 ])
462 457 def test_raises_value_error(self, values):
463 458 with pytest.raises(ValueError):
464 459 self.factory(values)
465 460
466 461 @simplevcs.initialize_generator
467 462 def factory(self, iterable):
468 463 for elem in iterable:
469 464 yield elem
470 465
471 466
472 467 class TestPrepareHooksDaemon(object):
473 468 def test_calls_imported_prepare_callback_daemon(self, app_settings, request_stub):
474 469 expected_extras = {'extra1': 'value1'}
475 470 daemon = DummyHooksCallbackDaemon()
476 471
477 472 controller = StubVCSController(app_settings, request_stub.registry)
478 473 prepare_patcher = mock.patch.object(
479 474 simplevcs, 'prepare_callback_daemon',
480 475 return_value=(daemon, expected_extras))
481 476 with prepare_patcher as prepare_mock:
482 477 callback_daemon, extras = controller._prepare_callback_daemon(
483 478 expected_extras.copy(), {}, 'push')
484 479 prepare_mock.assert_called_once_with(
485 480 expected_extras,
486 481 protocol=app_settings['vcs.hooks.protocol'],
487 482 host=app_settings['vcs.hooks.host'],
488 483 txn_id=None,
489 484 use_direct_calls=app_settings['vcs.hooks.direct_calls'])
490 485
491 486 assert callback_daemon == daemon
492 487 assert extras == extras
General Comments 0
You need to be logged in to leave comments. Login now