##// END OF EJS Templates
celery: create custom celery task class which loads context/threadlocals...
dan -
r576:ffa50597 default
parent child Browse files
Show More
@@ -22,6 +22,7 b' celery libs for RhodeCode'
22 22 """
23 23
24 24
25 import pylons
25 26 import socket
26 27 import logging
27 28
@@ -29,16 +30,23 b' import rhodecode'
29 30
30 31 from os.path import join as jn
31 32 from pylons import config
33 from celery.task import Task
34 from pyramid.request import Request
35 from pyramid.scripting import prepare
36 from pyramid.threadlocal import get_current_request
32 37
33 38 from decorator import decorator
34 39
35 40 from zope.cachedescriptors.property import Lazy as LazyProperty
36 41
37 42 from rhodecode.config import utils
38 from rhodecode.lib.utils2 import safe_str, md5_safe, aslist
43 from rhodecode.lib.utils2 import (
44 safe_str, md5_safe, aslist, get_routes_generator_for_server_url,
45 get_server_url)
39 46 from rhodecode.lib.pidlock import DaemonLock, LockHeld
40 47 from rhodecode.lib.vcs import connect_vcs
41 48 from rhodecode.model import meta
49 from rhodecode.lib.auth import AuthUser
42 50
43 51 log = logging.getLogger(__name__)
44 52
@@ -52,6 +60,79 b' class ResultWrapper(object):'
52 60 return self.task
53 61
54 62
63 class RhodecodeCeleryTask(Task):
64 """
65 This is a celery task which will create a rhodecode app instance context
66 for the task, patch pyramid + pylons threadlocals with the original request
67 that created the task and also add the user to the context.
68
69 This class as a whole should be removed once the pylons port is complete
70 and a pyramid only solution for celery is implemented as per issue #4139
71 """
72
73 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
74 link=None, link_error=None, **options):
75 """ queue the job to run (we are in web request context here) """
76
77 request = get_current_request()
78
79 # we hook into kwargs since it is the only way to pass our data to the
80 # celery worker in celery 2.2
81 kwargs.update({
82 '_rhodecode_proxy_data': {
83 'environ': {
84 'PATH_INFO': request.environ['PATH_INFO'],
85 'SCRIPT_NAME': request.environ['SCRIPT_NAME'],
86 'HTTP_HOST': request.environ.get('HTTP_HOST',
87 request.environ['SERVER_NAME']),
88 'SERVER_NAME': request.environ['SERVER_NAME'],
89 'SERVER_PORT': request.environ['SERVER_PORT'],
90 'wsgi.url_scheme': request.environ['wsgi.url_scheme'],
91 },
92 'auth_user': {
93 'ip_addr': request.user.ip_addr,
94 'user_id': request.user.user_id
95 },
96 }
97 })
98 return super(RhodecodeCeleryTask, self).apply_async(
99 args, kwargs, task_id, producer, link, link_error, **options)
100
101 def __call__(self, *args, **kwargs):
102 """ rebuild the context and then run task on celery worker """
103 proxy_data = kwargs.pop('_rhodecode_proxy_data', {})
104
105 if not proxy_data:
106 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
107
108 log.debug('using celery proxy data to run task: %r', proxy_data)
109
110 from rhodecode.config.routing import make_map
111 from rhodecode.config.middleware import make_pyramid_app
112
113 # TODO: this can be done once per worker versus per task
114 pyramid_app = make_pyramid_app(config, **config['app_conf'])
115
116 request = Request.blank('/', environ=proxy_data['environ'])
117 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
118 ip_addr=proxy_data['auth_user']['ip_addr'])
119
120 pyramid_request = prepare(request) # set pyramid threadlocal request
121
122 # pylons routing
123 if not rhodecode.CONFIG.get('routes.map'):
124 rhodecode.CONFIG['routes.map'] = make_map(config)
125 pylons.url._push_object(get_routes_generator_for_server_url(
126 get_server_url(request.environ)
127 ))
128
129 try:
130 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
131 finally:
132 pyramid_request['closer']()
133 pylons.url._pop_object()
134
135
55 136 def run_task(task, *args, **kwargs):
56 137 if rhodecode.CELERY_ENABLED:
57 138 celery_is_up = False
@@ -33,7 +33,7 b' from pylons import config'
33 33 import rhodecode
34 34 from rhodecode.lib.celerylib import (
35 35 run_task, dbsession, __get_lockkey, LockHeld, DaemonLock,
36 get_session, vcsconnection)
36 get_session, vcsconnection, RhodecodeCeleryTask)
37 37 from rhodecode.lib.hooks_base import log_create_repository
38 38 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
39 39 from rhodecode.lib.utils import add_cache, action_logger
@@ -56,7 +56,7 b' def get_logger(cls):'
56 56 return log
57 57
58 58
59 @task(ignore_result=True)
59 @task(ignore_result=True, base=RhodecodeCeleryTask)
60 60 @dbsession
61 61 def send_email(recipients, subject, body='', html_body='', email_config=None):
62 62 """
@@ -104,7 +104,7 b' def send_email(recipients, subject, body'
104 104 return True
105 105
106 106
107 @task(ignore_result=False)
107 @task(ignore_result=True, base=RhodecodeCeleryTask)
108 108 @dbsession
109 109 @vcsconnection
110 110 def create_repo(form_data, cur_user):
@@ -197,7 +197,7 b' def create_repo(form_data, cur_user):'
197 197 return True
198 198
199 199
200 @task(ignore_result=False)
200 @task(ignore_result=True, base=RhodecodeCeleryTask)
201 201 @dbsession
202 202 @vcsconnection
203 203 def create_repo_fork(form_data, cur_user):
General Comments 0
You need to be logged in to leave comments. Login now