##// END OF EJS Templates
feat(celery-hooks): added all needed changes to support new celery backend, removed DummyHooksCallbackDaemon, updated tests. Fixes: RCCE-55
ilin.s -
r5298:25044729 default
parent child Browse files
Show More
@@ -178,6 +178,7 b' class SshWrapper(object):'
178 extras = {
178 extras = {
179 'detect_force_push': detect_force_push,
179 'detect_force_push': detect_force_push,
180 'check_branch_perms': check_branch_perms,
180 'check_branch_perms': check_branch_perms,
181 'config': self.ini_path
181 }
182 }
182
183
183 if vcs == 'hg':
184 if vcs == 'hg':
@@ -149,8 +149,7 b' class VcsServer(object):'
149
149
150 callback_daemon, extras = prepare_callback_daemon(
150 callback_daemon, extras = prepare_callback_daemon(
151 extras, protocol=vcs_settings.HOOKS_PROTOCOL,
151 extras, protocol=vcs_settings.HOOKS_PROTOCOL,
152 host=vcs_settings.HOOKS_HOST,
152 host=vcs_settings.HOOKS_HOST)
153 use_direct_calls=False)
154
153
155 with callback_daemon:
154 with callback_daemon:
156 try:
155 try:
@@ -37,7 +37,6 b' def configure_vcs(config):'
37
37
38 conf.settings.HOOKS_PROTOCOL = config['vcs.hooks.protocol']
38 conf.settings.HOOKS_PROTOCOL = config['vcs.hooks.protocol']
39 conf.settings.HOOKS_HOST = config['vcs.hooks.host']
39 conf.settings.HOOKS_HOST = config['vcs.hooks.host']
40 conf.settings.HOOKS_DIRECT_CALLS = config['vcs.hooks.direct_calls']
41 conf.settings.DEFAULT_ENCODINGS = config['default_encoding']
40 conf.settings.DEFAULT_ENCODINGS = config['default_encoding']
42 conf.settings.ALIASES[:] = config['vcs.backends']
41 conf.settings.ALIASES[:] = config['vcs.backends']
43 conf.settings.SVN_COMPATIBLE_VERSION = config['vcs.svn.compatible_version']
42 conf.settings.SVN_COMPATIBLE_VERSION = config['vcs.svn.compatible_version']
@@ -32,6 +32,7 b' import rhodecode'
32 from rhodecode.lib import audit_logger
32 from rhodecode.lib import audit_logger
33 from rhodecode.lib.celerylib import get_logger, async_task, RequestContextTask, run_task
33 from rhodecode.lib.celerylib import get_logger, async_task, RequestContextTask, run_task
34 from rhodecode.lib import hooks_base
34 from rhodecode.lib import hooks_base
35 from rhodecode.lib.utils import adopt_for_celery
35 from rhodecode.lib.utils2 import safe_int, str2bool, aslist
36 from rhodecode.lib.utils2 import safe_int, str2bool, aslist
36 from rhodecode.lib.statsd_client import StatsdClient
37 from rhodecode.lib.statsd_client import StatsdClient
37 from rhodecode.model.db import (
38 from rhodecode.model.db import (
@@ -410,3 +411,38 b' def beat_check(*args, **kwargs):'
410 log = get_logger(beat_check)
411 log = get_logger(beat_check)
411 log.info('%r: Got args: %r and kwargs %r', beat_check, args, kwargs)
412 log.info('%r: Got args: %r and kwargs %r', beat_check, args, kwargs)
412 return time.time()
413 return time.time()
414
415
416 @async_task
417 @adopt_for_celery
418 def repo_size(extras):
419 from rhodecode.lib.hooks_base import repo_size
420 return repo_size(extras)
421
422
423 @async_task
424 @adopt_for_celery
425 def pre_pull(extras):
426 from rhodecode.lib.hooks_base import pre_pull
427 return pre_pull(extras)
428
429
430 @async_task
431 @adopt_for_celery
432 def post_pull(extras):
433 from rhodecode.lib.hooks_base import post_pull
434 return post_pull(extras)
435
436
437 @async_task
438 @adopt_for_celery
439 def pre_push(extras):
440 from rhodecode.lib.hooks_base import pre_push
441 return pre_push(extras)
442
443
444 @async_task
445 @adopt_for_celery
446 def post_push(extras):
447 from rhodecode.lib.hooks_base import post_push
448 return post_push(extras)
@@ -53,6 +53,9 b' class HookResponse(object):'
53 def __bool__(self):
53 def __bool__(self):
54 return self.status == 0
54 return self.status == 0
55
55
56 def to_json(self):
57 return {'status': self.status, 'output': self.output}
58
56
59
57 def is_shadow_repo(extras):
60 def is_shadow_repo(extras):
58 """
61 """
@@ -34,6 +34,7 b' from rhodecode.lib.exceptions import HTT'
34 from rhodecode.model import meta
34 from rhodecode.model import meta
35 from rhodecode.lib import hooks_base
35 from rhodecode.lib import hooks_base
36 from rhodecode.lib.utils2 import AttributeDict
36 from rhodecode.lib.utils2 import AttributeDict
37 from rhodecode.lib.pyramid_utils import get_config
37 from rhodecode.lib.ext_json import json
38 from rhodecode.lib.ext_json import json
38 from rhodecode.lib import rc_cache
39 from rhodecode.lib import rc_cache
39
40
@@ -135,9 +136,10 b' class HooksHttpHandler(BaseHTTPRequestHa'
135 self.log_date_time_string(), message)
136 self.log_date_time_string(), message)
136
137
137
138
138 class DummyHooksCallbackDaemon(object):
139 class BaseHooksCallbackDaemon:
139 hooks_uri = ''
140 """
140
141 Basic context manager for actions that don't require some extra
142 """
141 def __init__(self):
143 def __init__(self):
142 self.hooks_module = Hooks.__module__
144 self.hooks_module = Hooks.__module__
143
145
@@ -149,6 +151,16 b' class DummyHooksCallbackDaemon(object):'
149 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
151 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
150
152
151
153
154 class CeleryHooksCallbackDaemon(BaseHooksCallbackDaemon):
155 """
156 Context manger for achieving a compatibility with celery backend
157 """
158
159 def __init__(self, config):
160 self.task_queue = config.get('app:main', 'celery.broker_url')
161 self.task_backend = config.get('app:main', 'celery.result_backend')
162
163
152 class ThreadedHookCallbackDaemon(object):
164 class ThreadedHookCallbackDaemon(object):
153
165
154 _callback_thread = None
166 _callback_thread = None
@@ -334,28 +346,31 b' def get_txn_id_from_store(txn_id):'
334 return {}
346 return {}
335
347
336
348
337 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
349 def prepare_callback_daemon(extras, protocol, host, txn_id=None):
338 txn_details = get_txn_id_from_store(txn_id)
350 txn_details = get_txn_id_from_store(txn_id)
339 port = txn_details.get('port', 0)
351 port = txn_details.get('port', 0)
340 if use_direct_calls:
352 match protocol:
341 callback_daemon = DummyHooksCallbackDaemon()
353 case 'http':
342 extras['hooks_module'] = callback_daemon.hooks_module
343 else:
344 if protocol == 'http':
345 callback_daemon = HttpHooksCallbackDaemon(
354 callback_daemon = HttpHooksCallbackDaemon(
346 txn_id=txn_id, host=host, port=port)
355 txn_id=txn_id, host=host, port=port)
347 else:
356 case 'celery':
357 callback_daemon = CeleryHooksCallbackDaemon(get_config(extras['config']))
358 case 'local':
359 callback_daemon = BaseHooksCallbackDaemon()
360 case _:
348 log.error('Unsupported callback daemon protocol "%s"', protocol)
361 log.error('Unsupported callback daemon protocol "%s"', protocol)
349 raise Exception('Unsupported callback daemon protocol.')
362 raise Exception('Unsupported callback daemon protocol.')
350
363
351 extras['hooks_uri'] = callback_daemon.hooks_uri
364 extras['hooks_uri'] = getattr(callback_daemon, 'hooks_uri', '')
365 extras['task_queue'] = getattr(callback_daemon, 'task_queue', '')
366 extras['task_backend'] = getattr(callback_daemon, 'task_backend', '')
352 extras['hooks_protocol'] = protocol
367 extras['hooks_protocol'] = protocol
353 extras['time'] = time.time()
368 extras['time'] = time.time()
354
369
355 # register txn_id
370 # register txn_id
356 extras['txn_id'] = txn_id
371 extras['txn_id'] = txn_id
357 log.debug('Prepared a callback daemon: %s at url `%s`',
372 log.debug('Prepared a callback daemon: %s',
358 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
373 callback_daemon.__class__.__name__)
359 return callback_daemon, extras
374 return callback_daemon, extras
360
375
361
376
@@ -683,14 +683,14 b' class SimpleVCS(object):'
683 return True
683 return True
684
684
685 def _prepare_callback_daemon(self, extras, environ, action, txn_id=None):
685 def _prepare_callback_daemon(self, extras, environ, action, txn_id=None):
686 direct_calls = vcs_settings.HOOKS_DIRECT_CALLS
686 protocol = vcs_settings.HOOKS_PROTOCOL
687 if not self._should_use_callback_daemon(extras, environ, action):
687 if not self._should_use_callback_daemon(extras, environ, action):
688 # disable callback daemon for actions that don't require it
688 # disable callback daemon for actions that don't require it
689 direct_calls = True
689 protocol = 'local'
690
690
691 return prepare_callback_daemon(
691 return prepare_callback_daemon(
692 extras, protocol=vcs_settings.HOOKS_PROTOCOL,
692 extras, protocol=protocol,
693 host=vcs_settings.HOOKS_HOST, use_direct_calls=direct_calls, txn_id=txn_id)
693 host=vcs_settings.HOOKS_HOST, txn_id=txn_id)
694
694
695
695
696 def _should_check_locking(query_string):
696 def _should_check_locking(query_string):
@@ -32,6 +32,7 b' import tempfile'
32 import traceback
32 import traceback
33 import tarfile
33 import tarfile
34 import warnings
34 import warnings
35 from functools import wraps
35 from os.path import join as jn
36 from os.path import join as jn
36
37
37 import paste
38 import paste
@@ -41,6 +42,7 b' from webhelpers2.text import collapse, s'
41 from mako import exceptions
42 from mako import exceptions
42
43
43 from rhodecode.lib.hash_utils import sha256_safe, md5, sha1
44 from rhodecode.lib.hash_utils import sha256_safe, md5, sha1
45 from rhodecode.lib.type_utils import AttributeDict
44 from rhodecode.lib.str_utils import safe_bytes, safe_str
46 from rhodecode.lib.str_utils import safe_bytes, safe_str
45 from rhodecode.lib.vcs.backends.base import Config
47 from rhodecode.lib.vcs.backends.base import Config
46 from rhodecode.lib.vcs.exceptions import VCSError
48 from rhodecode.lib.vcs.exceptions import VCSError
@@ -73,6 +75,19 b" SLUG_RE = re.compile(r'[^{}]+'.format(SL"
73 _license_cache = None
75 _license_cache = None
74
76
75
77
78 def adopt_for_celery(func):
79 """
80 Decorator designed to adopt hooks (from rhodecode.lib.hooks_base)
81 for further usage as a celery tasks.
82 """
83 @wraps(func)
84 def wrapper(extras):
85 extras = AttributeDict(extras)
86 # HooksResponse implements to_json method which must be used there.
87 return func(extras).to_json()
88 return wrapper
89
90
76 def repo_name_slug(value):
91 def repo_name_slug(value):
77 """
92 """
78 Return slug of name of repository
93 Return slug of name of repository
@@ -49,7 +49,6 b' ARCHIVE_SPECS = ['
49 ]
49 ]
50
50
51 HOOKS_PROTOCOL = None
51 HOOKS_PROTOCOL = None
52 HOOKS_DIRECT_CALLS = False
53 HOOKS_HOST = '127.0.0.1'
52 HOOKS_HOST = '127.0.0.1'
54
53
55
54
@@ -982,8 +982,7 b' class PullRequestModel(BaseModel):'
982
982
983 callback_daemon, extras = prepare_callback_daemon(
983 callback_daemon, extras = prepare_callback_daemon(
984 extras, protocol=vcs_settings.HOOKS_PROTOCOL,
984 extras, protocol=vcs_settings.HOOKS_PROTOCOL,
985 host=vcs_settings.HOOKS_HOST,
985 host=vcs_settings.HOOKS_HOST)
986 use_direct_calls=vcs_settings.HOOKS_DIRECT_CALLS)
987
986
988 with callback_daemon:
987 with callback_daemon:
989 # TODO: johbo: Implement a clean way to run a config_override
988 # TODO: johbo: Implement a clean way to run a config_override
@@ -25,7 +25,6 b' from rhodecode.lib.utils2 import Attribu'
25 from rhodecode.tests.utils import CustomTestApp
25 from rhodecode.tests.utils import CustomTestApp
26
26
27 from rhodecode.lib.caching_query import FromCache
27 from rhodecode.lib.caching_query import FromCache
28 from rhodecode.lib.hooks_daemon import DummyHooksCallbackDaemon
29 from rhodecode.lib.middleware import simplevcs
28 from rhodecode.lib.middleware import simplevcs
30 from rhodecode.lib.middleware.https_fixup import HttpsFixup
29 from rhodecode.lib.middleware.https_fixup import HttpsFixup
31 from rhodecode.lib.middleware.utils import scm_app_http
30 from rhodecode.lib.middleware.utils import scm_app_http
@@ -397,18 +396,6 b' class TestGenerateVcsResponse(object):'
397 with pytest.raises(Exception):
396 with pytest.raises(Exception):
398 list(result)
397 list(result)
399
398
400 def test_prepare_callback_daemon_is_called(self):
401 def side_effect(extras, environ, action, txn_id=None):
402 return DummyHooksCallbackDaemon(), extras
403
404 prepare_patcher = mock.patch.object(
405 StubVCSController, '_prepare_callback_daemon')
406 with prepare_patcher as prepare_mock:
407 prepare_mock.side_effect = side_effect
408 self.call_controller_with_response_body(iter(['a', 'b']))
409 assert prepare_mock.called
410 assert prepare_mock.call_count == 1
411
412 def call_controller_with_response_body(self, response_body):
399 def call_controller_with_response_body(self, response_body):
413 settings = {
400 settings = {
414 'base_path': 'fake_base_path',
401 'base_path': 'fake_base_path',
@@ -462,26 +449,3 b' class TestInitializeGenerator(object):'
462 def factory(self, iterable):
449 def factory(self, iterable):
463 for elem in iterable:
450 for elem in iterable:
464 yield elem
451 yield elem
465
466
467 class TestPrepareHooksDaemon(object):
468 def test_calls_imported_prepare_callback_daemon(self, app_settings, request_stub):
469 expected_extras = {'extra1': 'value1'}
470 daemon = DummyHooksCallbackDaemon()
471
472 controller = StubVCSController(app_settings, request_stub.registry)
473 prepare_patcher = mock.patch.object(
474 simplevcs, 'prepare_callback_daemon',
475 return_value=(daemon, expected_extras))
476 with prepare_patcher as prepare_mock:
477 callback_daemon, extras = controller._prepare_callback_daemon(
478 expected_extras.copy(), {}, 'push')
479 prepare_mock.assert_called_once_with(
480 expected_extras,
481 protocol=app_settings['vcs.hooks.protocol'],
482 host=app_settings['vcs.hooks.host'],
483 txn_id=None,
484 use_direct_calls=app_settings['vcs.hooks.direct_calls'])
485
486 assert callback_daemon == daemon
487 assert extras == extras
@@ -23,6 +23,7 b' import io'
23 import mock
23 import mock
24 import msgpack
24 import msgpack
25 import pytest
25 import pytest
26 import tempfile
26
27
27 from rhodecode.lib import hooks_daemon
28 from rhodecode.lib import hooks_daemon
28 from rhodecode.lib.str_utils import safe_bytes
29 from rhodecode.lib.str_utils import safe_bytes
@@ -32,28 +33,6 b' from rhodecode.lib.ext_json import json'
32 test_proto = hooks_daemon.HooksHttpHandler.MSGPACK_HOOKS_PROTO
33 test_proto = hooks_daemon.HooksHttpHandler.MSGPACK_HOOKS_PROTO
33
34
34
35
35 class TestDummyHooksCallbackDaemon(object):
36 def test_hooks_module_path_set_properly(self):
37 daemon = hooks_daemon.DummyHooksCallbackDaemon()
38 assert daemon.hooks_module == 'rhodecode.lib.hooks_daemon'
39
40 def test_logs_entering_the_hook(self):
41 daemon = hooks_daemon.DummyHooksCallbackDaemon()
42 with mock.patch.object(hooks_daemon.log, 'debug') as log_mock:
43 with daemon as return_value:
44 log_mock.assert_called_once_with(
45 'Running `%s` callback daemon', 'DummyHooksCallbackDaemon')
46 assert return_value == daemon
47
48 def test_logs_exiting_the_hook(self):
49 daemon = hooks_daemon.DummyHooksCallbackDaemon()
50 with mock.patch.object(hooks_daemon.log, 'debug') as log_mock:
51 with daemon:
52 pass
53 log_mock.assert_called_with(
54 'Exiting `%s` callback daemon', 'DummyHooksCallbackDaemon')
55
56
57 class TestHooks(object):
36 class TestHooks(object):
58 def test_hooks_can_be_used_as_a_context_processor(self):
37 def test_hooks_can_be_used_as_a_context_processor(self):
59 hooks = hooks_daemon.Hooks()
38 hooks = hooks_daemon.Hooks()
@@ -299,17 +278,17 b' class TestHttpHooksCallbackDaemon(object'
299
278
300
279
301 class TestPrepareHooksDaemon(object):
280 class TestPrepareHooksDaemon(object):
302 @pytest.mark.parametrize('protocol', ('http',))
281 @pytest.mark.parametrize('protocol', ('celery',))
303 def test_returns_dummy_hooks_callback_daemon_when_using_direct_calls(
282 def test_returns_celery_hooks_callback_daemon_when_celery_protocol_specified(
304 self, protocol):
283 self, protocol):
305 expected_extras = {'extra1': 'value1'}
284 with tempfile.NamedTemporaryFile(mode='w') as temp_file:
306 callback, extras = hooks_daemon.prepare_callback_daemon(
285 temp_file.write("[app:main]\ncelery.broker_url = redis://redis/0\n"
307 expected_extras.copy(), protocol=protocol,
286 "celery.result_backend = redis://redis/0")
308 host='127.0.0.1', use_direct_calls=True)
287 temp_file.flush()
309 assert isinstance(callback, hooks_daemon.DummyHooksCallbackDaemon)
288 expected_extras = {'config': temp_file.name}
310 expected_extras['hooks_module'] = 'rhodecode.lib.hooks_daemon'
289 callback, extras = hooks_daemon.prepare_callback_daemon(
311 expected_extras['time'] = extras['time']
290 expected_extras, protocol=protocol, host='')
312 assert 'extra1' in extras
291 assert isinstance(callback, hooks_daemon.CeleryHooksCallbackDaemon)
313
292
314 @pytest.mark.parametrize('protocol, expected_class', (
293 @pytest.mark.parametrize('protocol, expected_class', (
315 ('http', hooks_daemon.HttpHooksCallbackDaemon),
294 ('http', hooks_daemon.HttpHooksCallbackDaemon),
@@ -319,11 +298,12 b' class TestPrepareHooksDaemon(object):'
319 expected_extras = {
298 expected_extras = {
320 'extra1': 'value1',
299 'extra1': 'value1',
321 'txn_id': 'txnid2',
300 'txn_id': 'txnid2',
322 'hooks_protocol': protocol.lower()
301 'hooks_protocol': protocol.lower(),
302 'task_backend': '',
303 'task_queue': ''
323 }
304 }
324 callback, extras = hooks_daemon.prepare_callback_daemon(
305 callback, extras = hooks_daemon.prepare_callback_daemon(
325 expected_extras.copy(), protocol=protocol, host='127.0.0.1',
306 expected_extras.copy(), protocol=protocol, host='127.0.0.1',
326 use_direct_calls=False,
327 txn_id='txnid2')
307 txn_id='txnid2')
328 assert isinstance(callback, expected_class)
308 assert isinstance(callback, expected_class)
329 extras.pop('hooks_uri')
309 extras.pop('hooks_uri')
@@ -343,8 +323,7 b' class TestPrepareHooksDaemon(object):'
343 with pytest.raises(Exception):
323 with pytest.raises(Exception):
344 callback, extras = hooks_daemon.prepare_callback_daemon(
324 callback, extras = hooks_daemon.prepare_callback_daemon(
345 expected_extras.copy(),
325 expected_extras.copy(),
346 protocol=protocol, host='127.0.0.1',
326 protocol=protocol, host='127.0.0.1')
347 use_direct_calls=False)
348
327
349
328
350 class MockRequest(object):
329 class MockRequest(object):
General Comments 0
You need to be logged in to leave comments. Login now