# HG changeset patch # User RhodeCode Admin # Date 2023-01-31 22:33:33 # Node ID c5087cb092457ea2ce2258f40f1bea3170998bf1 # Parent 086736725cd21a42f4978b5873b5a1576324c704 celery: update how reqquest object is passed arround. - fixed hooks callback to actually use web based request passed to thread - rely on celery conf mechanism to pass in the request always in the same manner diff --git a/rhodecode/integrations/types/email.py b/rhodecode/integrations/types/email.py --- a/rhodecode/integrations/types/email.py +++ b/rhodecode/integrations/types/email.py @@ -325,6 +325,5 @@ class EmailEventHandler(object): recipients = self.integration_settings['recipients'] for email_address in recipients: - run_task( - tasks.send_email, email_address, subject, - email_body_plaintext, email_body_html) + run_task(tasks.send_email, email_address, subject, + email_body_plaintext, email_body_html) diff --git a/rhodecode/lib/celerylib/loader.py b/rhodecode/lib/celerylib/loader.py --- a/rhodecode/lib/celerylib/loader.py +++ b/rhodecode/lib/celerylib/loader.py @@ -38,11 +38,9 @@ from celery import signals from celery import Task from celery import exceptions # pragma: no cover from kombu.serialization import register -from pyramid.threadlocal import get_current_request import rhodecode -from rhodecode.lib.auth import AuthUser from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db from rhodecode.lib.ext_json import json from rhodecode.lib.pyramid_utils import bootstrap, setup_logging @@ -212,6 +210,19 @@ def task_revoked_signal( closer() +class UNSET(object): + pass + + +def set_celery_conf(app=UNSET(), root=UNSET(), request=UNSET(), registry=UNSET(), closer=UNSET()): + + if request is not UNSET: + celery_app.conf.update({'PYRAMID_REQUEST': request}) + + if registry is not UNSET: + celery_app.conf.update({'PYRAMID_REGISTRY': registry}) + + def setup_celery_app(app, root, request, registry, closer, celery_settings): log.debug('Got custom celery conf: %s', celery_settings) celery_config = base_celery_config @@ -273,52 +284,38 @@ class RequestContextTask(Task): def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options): """ queue the job to run (we are in web request context here) """ + from rhodecode.lib.base import get_ip_addr - req = self.app.conf['PYRAMID_REQUEST'] or get_current_request() + req = self.app.conf['PYRAMID_REQUEST'] + if not req: + raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key') log.debug('Running Task with class: %s. Request Class: %s', self.__class__, req.__class__) - user_id = None - ip_addr = None + user_id = 0 # web case if hasattr(req, 'user'): - ip_addr = req.user.ip_addr user_id = req.user.user_id # api case elif hasattr(req, 'rpc_user'): - ip_addr = req.rpc_user.ip_addr user_id = req.rpc_user.user_id - else: - if user_id and ip_addr: - log.debug('Using data from celery proxy user') - else: - raise Exception( - 'Unable to fetch required data from request: {}. \n' - 'This task is required to be executed from context of ' - 'request in a webapp. Task: {}'.format( - repr(req), - self.__class__ - ) - ) - - if req: - # we hook into kwargs since it is the only way to pass our data to - # the celery worker - environ = maybe_prepare_env(req) - options['headers'] = options.get('headers', {}) - options['headers'].update({ - 'rhodecode_proxy_data': { - 'environ': environ, - 'auth_user': { - 'ip_addr': ip_addr, - 'user_id': user_id - }, - } - }) + # we hook into kwargs since it is the only way to pass our data to + # the celery worker + environ = maybe_prepare_env(req) + options['headers'] = options.get('headers', {}) + options['headers'].update({ + 'rhodecode_proxy_data': { + 'environ': environ, + 'auth_user': { + 'ip_addr': get_ip_addr(req.environ), + 'user_id': user_id + }, + } + }) return super(RequestContextTask, self).apply_async( args, kwargs, task_id, producer, link, link_error, shadow, **options) diff --git a/rhodecode/lib/hooks_daemon.py b/rhodecode/lib/hooks_daemon.py --- a/rhodecode/lib/hooks_daemon.py +++ b/rhodecode/lib/hooks_daemon.py @@ -57,8 +57,10 @@ class HooksHttpHandler(BaseHTTPRequestHa 'TXN ID fail: expected {} got {} instead'.format( txn_id, computed_txn_id)) + request = getattr(self.server, 'request', None) try: - result = self._call_hook(method, extras) + hooks = Hooks(request=request, log_prefix='HOOKS: {} '.format(self.server.server_address)) + result = self._call_hook_method(hooks, method, extras) except Exception as e: exc_tb = traceback.format_exc() result = { @@ -80,8 +82,7 @@ class HooksHttpHandler(BaseHTTPRequestHa self.end_headers() self.wfile.write(json.dumps(result)) - def _call_hook(self, method, extras): - hooks = Hooks() + def _call_hook_method(self, hooks, method, extras): try: result = getattr(hooks, method)(extras) finally: @@ -97,7 +98,7 @@ class HooksHttpHandler(BaseHTTPRequestHa message = format % args log.debug( - "%s - - [%s] %s", self.client_address[0], + "HOOKS: %s - - [%s] %s", self.client_address, self.log_date_time_string(), message) @@ -154,6 +155,10 @@ class HttpHooksCallbackDaemon(ThreadedHo # request and wastes cpu at all other times. POLL_INTERVAL = 0.01 + @property + def _hook_prefix(self): + return 'HOOKS: {} '.format(self.hooks_uri) + def get_hostname(self): return socket.gethostname() or '127.0.0.1' @@ -162,6 +167,8 @@ class HttpHooksCallbackDaemon(ThreadedHo return _get_port(min_port, max_port) def _prepare(self, txn_id=None, host=None, port=None): + from pyramid.threadlocal import get_current_request + if not host or host == "*": host = self.get_hostname() if not port: @@ -173,13 +180,16 @@ class HttpHooksCallbackDaemon(ThreadedHo self._done = False log.debug( - "Preparing HTTP callback daemon at `%s` and registering hook object: %s", - self.hooks_uri, HooksHttpHandler) + "%s Preparing HTTP callback daemon registering hook object: %s", + self._hook_prefix, HooksHttpHandler) self._daemon = TCPServer(server_address, HooksHttpHandler) # inject transaction_id for later verification self._daemon.txn_id = self.txn_id + # pass the WEB app request into daemon + self._daemon.request = get_current_request() + def _run(self): log.debug("Running event loop of callback daemon in background thread") callback_thread = threading.Thread( @@ -269,40 +279,35 @@ class Hooks(object): """ Exposes the hooks for remote call backs """ + def __init__(self, request=None, log_prefix=''): + self.log_prefix = log_prefix + self.request = request def repo_size(self, extras): - log.debug("Called repo_size of %s object", self) + log.debug("%sCalled repo_size of %s object", self.log_prefix, self) return self._call_hook(hooks_base.repo_size, extras) def pre_pull(self, extras): - log.debug("Called pre_pull of %s object", self) + log.debug("%sCalled pre_pull of %s object", self.log_prefix, self) return self._call_hook(hooks_base.pre_pull, extras) def post_pull(self, extras): - log.debug("Called post_pull of %s object", self) + log.debug("%sCalled post_pull of %s object", self.log_prefix, self) return self._call_hook(hooks_base.post_pull, extras) def pre_push(self, extras): - log.debug("Called pre_push of %s object", self) + log.debug("%sCalled pre_push of %s object", self.log_prefix, self) return self._call_hook(hooks_base.pre_push, extras) def post_push(self, extras): - log.debug("Called post_push of %s object", self) + log.debug("%sCalled post_push of %s object", self.log_prefix, self) return self._call_hook(hooks_base.post_push, extras) def _call_hook(self, hook, extras): extras = AttributeDict(extras) server_url = extras['server_url'] - request = bootstrap_request(application_url=server_url) - bootstrap_config(request) # inject routes and other interfaces - - # inject the user for usage in hooks - request.user = AttributeDict({'username': extras.username, - 'ip_addr': extras.ip, - 'user_id': extras.user_id}) - - extras.request = request + extras.request = self.request try: result = hook(extras) @@ -322,7 +327,7 @@ class Hooks(object): exc_tb = '' if not isinstance(error, HTTPLockedRC): exc_tb = traceback.format_exc() - log.exception('Exception when handling hook %s', hook) + log.exception('%sException when handling hook %s', self.log_prefix, hook) error_args = error.args return { 'status': 128, @@ -334,7 +339,7 @@ class Hooks(object): finally: meta.Session.remove() - log.debug('Got hook call response %s', result) + log.debug('%sGot hook call response %s', self.log_prefix, result) return { 'status': result.status, 'output': result.output, diff --git a/rhodecode/model/notification.py b/rhodecode/model/notification.py --- a/rhodecode/model/notification.py +++ b/rhodecode/model/notification.py @@ -154,9 +154,8 @@ class NotificationModel(BaseModel): extra_headers = {'thread_ids': email_kwargs.pop('thread_ids')} log.debug('Creating notification email task for user:`%s`', recipient) - task = run_task( - tasks.send_email, recipient.email, subject, - email_body_plaintext, email_body, extra_headers=extra_headers) + task = run_task(tasks.send_email, recipient.email, subject, + email_body_plaintext, email_body, extra_headers=extra_headers) log.debug('Created email task: %s', task) return notification diff --git a/rhodecode/subscribers.py b/rhodecode/subscribers.py --- a/rhodecode/subscribers.py +++ b/rhodecode/subscribers.py @@ -69,6 +69,12 @@ def set_user_lang(event): event.request._LOCALE_ = user_lang +def update_celery_conf(event): + from rhodecode.lib.celerylib.loader import set_celery_conf + log.debug('Setting celery config from new request') + set_celery_conf(request=event.request, registry=event.request.registry) + + def add_request_user_context(event): """ Adds auth user into request context diff --git a/rhodecode/tweens.py b/rhodecode/tweens.py --- a/rhodecode/tweens.py +++ b/rhodecode/tweens.py @@ -108,6 +108,8 @@ def sanity_check_factory(handler, regist def includeme(config): config.add_subscriber('rhodecode.subscribers.add_renderer_globals', 'pyramid.events.BeforeRender') + config.add_subscriber('rhodecode.subscribers.update_celery_conf', + 'pyramid.events.NewRequest') config.add_subscriber('rhodecode.subscribers.set_user_lang', 'pyramid.events.NewRequest') config.add_subscriber('rhodecode.subscribers.reset_log_bucket',