##// END OF EJS Templates
feat(celery-hooks): added all needed changes to support new celery backend, removed DummyHooksCallbackDaemon, updated tests. Fixes: RCCE-55
ilin.s -
r5298:25044729 default
parent child Browse files
Show More
@@ -178,6 +178,7 b' class SshWrapper(object):'
178 178 extras = {
179 179 'detect_force_push': detect_force_push,
180 180 'check_branch_perms': check_branch_perms,
181 'config': self.ini_path
181 182 }
182 183
183 184 if vcs == 'hg':
@@ -149,8 +149,7 b' class VcsServer(object):'
149 149
150 150 callback_daemon, extras = prepare_callback_daemon(
151 151 extras, protocol=vcs_settings.HOOKS_PROTOCOL,
152 host=vcs_settings.HOOKS_HOST,
153 use_direct_calls=False)
152 host=vcs_settings.HOOKS_HOST)
154 153
155 154 with callback_daemon:
156 155 try:
@@ -37,7 +37,6 b' def configure_vcs(config):'
37 37
38 38 conf.settings.HOOKS_PROTOCOL = config['vcs.hooks.protocol']
39 39 conf.settings.HOOKS_HOST = config['vcs.hooks.host']
40 conf.settings.HOOKS_DIRECT_CALLS = config['vcs.hooks.direct_calls']
41 40 conf.settings.DEFAULT_ENCODINGS = config['default_encoding']
42 41 conf.settings.ALIASES[:] = config['vcs.backends']
43 42 conf.settings.SVN_COMPATIBLE_VERSION = config['vcs.svn.compatible_version']
@@ -32,6 +32,7 b' import rhodecode'
32 32 from rhodecode.lib import audit_logger
33 33 from rhodecode.lib.celerylib import get_logger, async_task, RequestContextTask, run_task
34 34 from rhodecode.lib import hooks_base
35 from rhodecode.lib.utils import adopt_for_celery
35 36 from rhodecode.lib.utils2 import safe_int, str2bool, aslist
36 37 from rhodecode.lib.statsd_client import StatsdClient
37 38 from rhodecode.model.db import (
@@ -410,3 +411,38 b' def beat_check(*args, **kwargs):'
410 411 log = get_logger(beat_check)
411 412 log.info('%r: Got args: %r and kwargs %r', beat_check, args, kwargs)
412 413 return time.time()
414
415
416 @async_task
417 @adopt_for_celery
418 def repo_size(extras):
419 from rhodecode.lib.hooks_base import repo_size
420 return repo_size(extras)
421
422
423 @async_task
424 @adopt_for_celery
425 def pre_pull(extras):
426 from rhodecode.lib.hooks_base import pre_pull
427 return pre_pull(extras)
428
429
430 @async_task
431 @adopt_for_celery
432 def post_pull(extras):
433 from rhodecode.lib.hooks_base import post_pull
434 return post_pull(extras)
435
436
437 @async_task
438 @adopt_for_celery
439 def pre_push(extras):
440 from rhodecode.lib.hooks_base import pre_push
441 return pre_push(extras)
442
443
444 @async_task
445 @adopt_for_celery
446 def post_push(extras):
447 from rhodecode.lib.hooks_base import post_push
448 return post_push(extras)
@@ -53,6 +53,9 b' class HookResponse(object):'
53 53 def __bool__(self):
54 54 return self.status == 0
55 55
56 def to_json(self):
57 return {'status': self.status, 'output': self.output}
58
56 59
57 60 def is_shadow_repo(extras):
58 61 """
@@ -34,6 +34,7 b' from rhodecode.lib.exceptions import HTT'
34 34 from rhodecode.model import meta
35 35 from rhodecode.lib import hooks_base
36 36 from rhodecode.lib.utils2 import AttributeDict
37 from rhodecode.lib.pyramid_utils import get_config
37 38 from rhodecode.lib.ext_json import json
38 39 from rhodecode.lib import rc_cache
39 40
@@ -135,9 +136,10 b' class HooksHttpHandler(BaseHTTPRequestHa'
135 136 self.log_date_time_string(), message)
136 137
137 138
138 class DummyHooksCallbackDaemon(object):
139 hooks_uri = ''
140
139 class BaseHooksCallbackDaemon:
140 """
141 Basic context manager for actions that don't require some extra
142 """
141 143 def __init__(self):
142 144 self.hooks_module = Hooks.__module__
143 145
@@ -149,6 +151,16 b' class DummyHooksCallbackDaemon(object):'
149 151 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
150 152
151 153
154 class CeleryHooksCallbackDaemon(BaseHooksCallbackDaemon):
155 """
156 Context manger for achieving a compatibility with celery backend
157 """
158
159 def __init__(self, config):
160 self.task_queue = config.get('app:main', 'celery.broker_url')
161 self.task_backend = config.get('app:main', 'celery.result_backend')
162
163
152 164 class ThreadedHookCallbackDaemon(object):
153 165
154 166 _callback_thread = None
@@ -334,28 +346,31 b' def get_txn_id_from_store(txn_id):'
334 346 return {}
335 347
336 348
337 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
349 def prepare_callback_daemon(extras, protocol, host, txn_id=None):
338 350 txn_details = get_txn_id_from_store(txn_id)
339 351 port = txn_details.get('port', 0)
340 if use_direct_calls:
341 callback_daemon = DummyHooksCallbackDaemon()
342 extras['hooks_module'] = callback_daemon.hooks_module
343 else:
344 if protocol == 'http':
352 match protocol:
353 case 'http':
345 354 callback_daemon = HttpHooksCallbackDaemon(
346 355 txn_id=txn_id, host=host, port=port)
347 else:
356 case 'celery':
357 callback_daemon = CeleryHooksCallbackDaemon(get_config(extras['config']))
358 case 'local':
359 callback_daemon = BaseHooksCallbackDaemon()
360 case _:
348 361 log.error('Unsupported callback daemon protocol "%s"', protocol)
349 362 raise Exception('Unsupported callback daemon protocol.')
350 363
351 extras['hooks_uri'] = callback_daemon.hooks_uri
364 extras['hooks_uri'] = getattr(callback_daemon, 'hooks_uri', '')
365 extras['task_queue'] = getattr(callback_daemon, 'task_queue', '')
366 extras['task_backend'] = getattr(callback_daemon, 'task_backend', '')
352 367 extras['hooks_protocol'] = protocol
353 368 extras['time'] = time.time()
354 369
355 370 # register txn_id
356 371 extras['txn_id'] = txn_id
357 log.debug('Prepared a callback daemon: %s at url `%s`',
358 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
372 log.debug('Prepared a callback daemon: %s',
373 callback_daemon.__class__.__name__)
359 374 return callback_daemon, extras
360 375
361 376
@@ -683,14 +683,14 b' class SimpleVCS(object):'
683 683 return True
684 684
685 685 def _prepare_callback_daemon(self, extras, environ, action, txn_id=None):
686 direct_calls = vcs_settings.HOOKS_DIRECT_CALLS
686 protocol = vcs_settings.HOOKS_PROTOCOL
687 687 if not self._should_use_callback_daemon(extras, environ, action):
688 688 # disable callback daemon for actions that don't require it
689 direct_calls = True
689 protocol = 'local'
690 690
691 691 return prepare_callback_daemon(
692 extras, protocol=vcs_settings.HOOKS_PROTOCOL,
693 host=vcs_settings.HOOKS_HOST, use_direct_calls=direct_calls, txn_id=txn_id)
692 extras, protocol=protocol,
693 host=vcs_settings.HOOKS_HOST, txn_id=txn_id)
694 694
695 695
696 696 def _should_check_locking(query_string):
@@ -32,6 +32,7 b' import tempfile'
32 32 import traceback
33 33 import tarfile
34 34 import warnings
35 from functools import wraps
35 36 from os.path import join as jn
36 37
37 38 import paste
@@ -41,6 +42,7 b' from webhelpers2.text import collapse, s'
41 42 from mako import exceptions
42 43
43 44 from rhodecode.lib.hash_utils import sha256_safe, md5, sha1
45 from rhodecode.lib.type_utils import AttributeDict
44 46 from rhodecode.lib.str_utils import safe_bytes, safe_str
45 47 from rhodecode.lib.vcs.backends.base import Config
46 48 from rhodecode.lib.vcs.exceptions import VCSError
@@ -73,6 +75,19 b" SLUG_RE = re.compile(r'[^{}]+'.format(SL"
73 75 _license_cache = None
74 76
75 77
78 def adopt_for_celery(func):
79 """
80 Decorator designed to adopt hooks (from rhodecode.lib.hooks_base)
81 for further usage as a celery tasks.
82 """
83 @wraps(func)
84 def wrapper(extras):
85 extras = AttributeDict(extras)
86 # HooksResponse implements to_json method which must be used there.
87 return func(extras).to_json()
88 return wrapper
89
90
76 91 def repo_name_slug(value):
77 92 """
78 93 Return slug of name of repository
@@ -49,7 +49,6 b' ARCHIVE_SPECS = ['
49 49 ]
50 50
51 51 HOOKS_PROTOCOL = None
52 HOOKS_DIRECT_CALLS = False
53 52 HOOKS_HOST = '127.0.0.1'
54 53
55 54
@@ -982,8 +982,7 b' class PullRequestModel(BaseModel):'
982 982
983 983 callback_daemon, extras = prepare_callback_daemon(
984 984 extras, protocol=vcs_settings.HOOKS_PROTOCOL,
985 host=vcs_settings.HOOKS_HOST,
986 use_direct_calls=vcs_settings.HOOKS_DIRECT_CALLS)
985 host=vcs_settings.HOOKS_HOST)
987 986
988 987 with callback_daemon:
989 988 # TODO: johbo: Implement a clean way to run a config_override
@@ -25,7 +25,6 b' from rhodecode.lib.utils2 import Attribu'
25 25 from rhodecode.tests.utils import CustomTestApp
26 26
27 27 from rhodecode.lib.caching_query import FromCache
28 from rhodecode.lib.hooks_daemon import DummyHooksCallbackDaemon
29 28 from rhodecode.lib.middleware import simplevcs
30 29 from rhodecode.lib.middleware.https_fixup import HttpsFixup
31 30 from rhodecode.lib.middleware.utils import scm_app_http
@@ -397,18 +396,6 b' class TestGenerateVcsResponse(object):'
397 396 with pytest.raises(Exception):
398 397 list(result)
399 398
400 def test_prepare_callback_daemon_is_called(self):
401 def side_effect(extras, environ, action, txn_id=None):
402 return DummyHooksCallbackDaemon(), extras
403
404 prepare_patcher = mock.patch.object(
405 StubVCSController, '_prepare_callback_daemon')
406 with prepare_patcher as prepare_mock:
407 prepare_mock.side_effect = side_effect
408 self.call_controller_with_response_body(iter(['a', 'b']))
409 assert prepare_mock.called
410 assert prepare_mock.call_count == 1
411
412 399 def call_controller_with_response_body(self, response_body):
413 400 settings = {
414 401 'base_path': 'fake_base_path',
@@ -462,26 +449,3 b' class TestInitializeGenerator(object):'
462 449 def factory(self, iterable):
463 450 for elem in iterable:
464 451 yield elem
465
466
467 class TestPrepareHooksDaemon(object):
468 def test_calls_imported_prepare_callback_daemon(self, app_settings, request_stub):
469 expected_extras = {'extra1': 'value1'}
470 daemon = DummyHooksCallbackDaemon()
471
472 controller = StubVCSController(app_settings, request_stub.registry)
473 prepare_patcher = mock.patch.object(
474 simplevcs, 'prepare_callback_daemon',
475 return_value=(daemon, expected_extras))
476 with prepare_patcher as prepare_mock:
477 callback_daemon, extras = controller._prepare_callback_daemon(
478 expected_extras.copy(), {}, 'push')
479 prepare_mock.assert_called_once_with(
480 expected_extras,
481 protocol=app_settings['vcs.hooks.protocol'],
482 host=app_settings['vcs.hooks.host'],
483 txn_id=None,
484 use_direct_calls=app_settings['vcs.hooks.direct_calls'])
485
486 assert callback_daemon == daemon
487 assert extras == extras
@@ -23,6 +23,7 b' import io'
23 23 import mock
24 24 import msgpack
25 25 import pytest
26 import tempfile
26 27
27 28 from rhodecode.lib import hooks_daemon
28 29 from rhodecode.lib.str_utils import safe_bytes
@@ -32,28 +33,6 b' from rhodecode.lib.ext_json import json'
32 33 test_proto = hooks_daemon.HooksHttpHandler.MSGPACK_HOOKS_PROTO
33 34
34 35
35 class TestDummyHooksCallbackDaemon(object):
36 def test_hooks_module_path_set_properly(self):
37 daemon = hooks_daemon.DummyHooksCallbackDaemon()
38 assert daemon.hooks_module == 'rhodecode.lib.hooks_daemon'
39
40 def test_logs_entering_the_hook(self):
41 daemon = hooks_daemon.DummyHooksCallbackDaemon()
42 with mock.patch.object(hooks_daemon.log, 'debug') as log_mock:
43 with daemon as return_value:
44 log_mock.assert_called_once_with(
45 'Running `%s` callback daemon', 'DummyHooksCallbackDaemon')
46 assert return_value == daemon
47
48 def test_logs_exiting_the_hook(self):
49 daemon = hooks_daemon.DummyHooksCallbackDaemon()
50 with mock.patch.object(hooks_daemon.log, 'debug') as log_mock:
51 with daemon:
52 pass
53 log_mock.assert_called_with(
54 'Exiting `%s` callback daemon', 'DummyHooksCallbackDaemon')
55
56
57 36 class TestHooks(object):
58 37 def test_hooks_can_be_used_as_a_context_processor(self):
59 38 hooks = hooks_daemon.Hooks()
@@ -299,17 +278,17 b' class TestHttpHooksCallbackDaemon(object'
299 278
300 279
301 280 class TestPrepareHooksDaemon(object):
302 @pytest.mark.parametrize('protocol', ('http',))
303 def test_returns_dummy_hooks_callback_daemon_when_using_direct_calls(
281 @pytest.mark.parametrize('protocol', ('celery',))
282 def test_returns_celery_hooks_callback_daemon_when_celery_protocol_specified(
304 283 self, protocol):
305 expected_extras = {'extra1': 'value1'}
306 callback, extras = hooks_daemon.prepare_callback_daemon(
307 expected_extras.copy(), protocol=protocol,
308 host='127.0.0.1', use_direct_calls=True)
309 assert isinstance(callback, hooks_daemon.DummyHooksCallbackDaemon)
310 expected_extras['hooks_module'] = 'rhodecode.lib.hooks_daemon'
311 expected_extras['time'] = extras['time']
312 assert 'extra1' in extras
284 with tempfile.NamedTemporaryFile(mode='w') as temp_file:
285 temp_file.write("[app:main]\ncelery.broker_url = redis://redis/0\n"
286 "celery.result_backend = redis://redis/0")
287 temp_file.flush()
288 expected_extras = {'config': temp_file.name}
289 callback, extras = hooks_daemon.prepare_callback_daemon(
290 expected_extras, protocol=protocol, host='')
291 assert isinstance(callback, hooks_daemon.CeleryHooksCallbackDaemon)
313 292
314 293 @pytest.mark.parametrize('protocol, expected_class', (
315 294 ('http', hooks_daemon.HttpHooksCallbackDaemon),
@@ -319,11 +298,12 b' class TestPrepareHooksDaemon(object):'
319 298 expected_extras = {
320 299 'extra1': 'value1',
321 300 'txn_id': 'txnid2',
322 'hooks_protocol': protocol.lower()
301 'hooks_protocol': protocol.lower(),
302 'task_backend': '',
303 'task_queue': ''
323 304 }
324 305 callback, extras = hooks_daemon.prepare_callback_daemon(
325 306 expected_extras.copy(), protocol=protocol, host='127.0.0.1',
326 use_direct_calls=False,
327 307 txn_id='txnid2')
328 308 assert isinstance(callback, expected_class)
329 309 extras.pop('hooks_uri')
@@ -343,8 +323,7 b' class TestPrepareHooksDaemon(object):'
343 323 with pytest.raises(Exception):
344 324 callback, extras = hooks_daemon.prepare_callback_daemon(
345 325 expected_extras.copy(),
346 protocol=protocol, host='127.0.0.1',
347 use_direct_calls=False)
326 protocol=protocol, host='127.0.0.1')
348 327
349 328
350 329 class MockRequest(object):
General Comments 0
You need to be logged in to leave comments. Login now