# HG changeset patch # User Daniel Dourvaris # Date 2016-08-03 00:59:58 # Node ID ffa50597a2861b9ee5176e69cd50ef640b8cec8b # Parent ec0a25076d0cc85770c8f1601d5e513d42261529 celery: create custom celery task class which loads context/threadlocals of the original request required for tasks like create_repo which needs to generate a url after the repo is created. fixes #4139 diff --git a/rhodecode/lib/celerylib/__init__.py b/rhodecode/lib/celerylib/__init__.py --- a/rhodecode/lib/celerylib/__init__.py +++ b/rhodecode/lib/celerylib/__init__.py @@ -22,6 +22,7 @@ celery libs for RhodeCode """ +import pylons import socket import logging @@ -29,16 +30,23 @@ import rhodecode from os.path import join as jn from pylons import config +from celery.task import Task +from pyramid.request import Request +from pyramid.scripting import prepare +from pyramid.threadlocal import get_current_request from decorator import decorator from zope.cachedescriptors.property import Lazy as LazyProperty from rhodecode.config import utils -from rhodecode.lib.utils2 import safe_str, md5_safe, aslist +from rhodecode.lib.utils2 import ( + safe_str, md5_safe, aslist, get_routes_generator_for_server_url, + get_server_url) from rhodecode.lib.pidlock import DaemonLock, LockHeld from rhodecode.lib.vcs import connect_vcs from rhodecode.model import meta +from rhodecode.lib.auth import AuthUser log = logging.getLogger(__name__) @@ -52,6 +60,79 @@ class ResultWrapper(object): return self.task +class RhodecodeCeleryTask(Task): + """ + This is a celery task which will create a rhodecode app instance context + for the task, patch pyramid + pylons threadlocals with the original request + that created the task and also add the user to the context. + + This class as a whole should be removed once the pylons port is complete + and a pyramid only solution for celery is implemented as per issue #4139 + """ + + def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, + link=None, link_error=None, **options): + """ queue the job to run (we are in web request context here) """ + + request = get_current_request() + + # we hook into kwargs since it is the only way to pass our data to the + # celery worker in celery 2.2 + kwargs.update({ + '_rhodecode_proxy_data': { + 'environ': { + 'PATH_INFO': request.environ['PATH_INFO'], + 'SCRIPT_NAME': request.environ['SCRIPT_NAME'], + 'HTTP_HOST': request.environ.get('HTTP_HOST', + request.environ['SERVER_NAME']), + 'SERVER_NAME': request.environ['SERVER_NAME'], + 'SERVER_PORT': request.environ['SERVER_PORT'], + 'wsgi.url_scheme': request.environ['wsgi.url_scheme'], + }, + 'auth_user': { + 'ip_addr': request.user.ip_addr, + 'user_id': request.user.user_id + }, + } + }) + return super(RhodecodeCeleryTask, self).apply_async( + args, kwargs, task_id, producer, link, link_error, **options) + + def __call__(self, *args, **kwargs): + """ rebuild the context and then run task on celery worker """ + proxy_data = kwargs.pop('_rhodecode_proxy_data', {}) + + if not proxy_data: + return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs) + + log.debug('using celery proxy data to run task: %r', proxy_data) + + from rhodecode.config.routing import make_map + from rhodecode.config.middleware import make_pyramid_app + + # TODO: this can be done once per worker versus per task + pyramid_app = make_pyramid_app(config, **config['app_conf']) + + request = Request.blank('/', environ=proxy_data['environ']) + request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'], + ip_addr=proxy_data['auth_user']['ip_addr']) + + pyramid_request = prepare(request) # set pyramid threadlocal request + + # pylons routing + if not rhodecode.CONFIG.get('routes.map'): + rhodecode.CONFIG['routes.map'] = make_map(config) + pylons.url._push_object(get_routes_generator_for_server_url( + get_server_url(request.environ) + )) + + try: + return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs) + finally: + pyramid_request['closer']() + pylons.url._pop_object() + + def run_task(task, *args, **kwargs): if rhodecode.CELERY_ENABLED: celery_is_up = False diff --git a/rhodecode/lib/celerylib/tasks.py b/rhodecode/lib/celerylib/tasks.py --- a/rhodecode/lib/celerylib/tasks.py +++ b/rhodecode/lib/celerylib/tasks.py @@ -33,7 +33,7 @@ from pylons import config import rhodecode from rhodecode.lib.celerylib import ( run_task, dbsession, __get_lockkey, LockHeld, DaemonLock, - get_session, vcsconnection) + get_session, vcsconnection, RhodecodeCeleryTask) from rhodecode.lib.hooks_base import log_create_repository from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer from rhodecode.lib.utils import add_cache, action_logger @@ -56,7 +56,7 @@ def get_logger(cls): return log -@task(ignore_result=True) +@task(ignore_result=True, base=RhodecodeCeleryTask) @dbsession def send_email(recipients, subject, body='', html_body='', email_config=None): """ @@ -104,7 +104,7 @@ def send_email(recipients, subject, body return True -@task(ignore_result=False) +@task(ignore_result=True, base=RhodecodeCeleryTask) @dbsession @vcsconnection def create_repo(form_data, cur_user): @@ -197,7 +197,7 @@ def create_repo(form_data, cur_user): return True -@task(ignore_result=False) +@task(ignore_result=True, base=RhodecodeCeleryTask) @dbsession @vcsconnection def create_repo_fork(form_data, cur_user):