##// END OF EJS Templates
celery: create custom celery task class which loads context/threadlocals...
dan -
r576:ffa50597 default
parent child Browse files
Show More
@@ -22,6 +22,7 b' celery libs for RhodeCode'
22 """
22 """
23
23
24
24
25 import pylons
25 import socket
26 import socket
26 import logging
27 import logging
27
28
@@ -29,16 +30,23 b' import rhodecode'
29
30
30 from os.path import join as jn
31 from os.path import join as jn
31 from pylons import config
32 from pylons import config
33 from celery.task import Task
34 from pyramid.request import Request
35 from pyramid.scripting import prepare
36 from pyramid.threadlocal import get_current_request
32
37
33 from decorator import decorator
38 from decorator import decorator
34
39
35 from zope.cachedescriptors.property import Lazy as LazyProperty
40 from zope.cachedescriptors.property import Lazy as LazyProperty
36
41
37 from rhodecode.config import utils
42 from rhodecode.config import utils
38 from rhodecode.lib.utils2 import safe_str, md5_safe, aslist
43 from rhodecode.lib.utils2 import (
44 safe_str, md5_safe, aslist, get_routes_generator_for_server_url,
45 get_server_url)
39 from rhodecode.lib.pidlock import DaemonLock, LockHeld
46 from rhodecode.lib.pidlock import DaemonLock, LockHeld
40 from rhodecode.lib.vcs import connect_vcs
47 from rhodecode.lib.vcs import connect_vcs
41 from rhodecode.model import meta
48 from rhodecode.model import meta
49 from rhodecode.lib.auth import AuthUser
42
50
43 log = logging.getLogger(__name__)
51 log = logging.getLogger(__name__)
44
52
@@ -52,6 +60,79 b' class ResultWrapper(object):'
52 return self.task
60 return self.task
53
61
54
62
63 class RhodecodeCeleryTask(Task):
64 """
65 This is a celery task which will create a rhodecode app instance context
66 for the task, patch pyramid + pylons threadlocals with the original request
67 that created the task and also add the user to the context.
68
69 This class as a whole should be removed once the pylons port is complete
70 and a pyramid only solution for celery is implemented as per issue #4139
71 """
72
73 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
74 link=None, link_error=None, **options):
75 """ queue the job to run (we are in web request context here) """
76
77 request = get_current_request()
78
79 # we hook into kwargs since it is the only way to pass our data to the
80 # celery worker in celery 2.2
81 kwargs.update({
82 '_rhodecode_proxy_data': {
83 'environ': {
84 'PATH_INFO': request.environ['PATH_INFO'],
85 'SCRIPT_NAME': request.environ['SCRIPT_NAME'],
86 'HTTP_HOST': request.environ.get('HTTP_HOST',
87 request.environ['SERVER_NAME']),
88 'SERVER_NAME': request.environ['SERVER_NAME'],
89 'SERVER_PORT': request.environ['SERVER_PORT'],
90 'wsgi.url_scheme': request.environ['wsgi.url_scheme'],
91 },
92 'auth_user': {
93 'ip_addr': request.user.ip_addr,
94 'user_id': request.user.user_id
95 },
96 }
97 })
98 return super(RhodecodeCeleryTask, self).apply_async(
99 args, kwargs, task_id, producer, link, link_error, **options)
100
101 def __call__(self, *args, **kwargs):
102 """ rebuild the context and then run task on celery worker """
103 proxy_data = kwargs.pop('_rhodecode_proxy_data', {})
104
105 if not proxy_data:
106 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
107
108 log.debug('using celery proxy data to run task: %r', proxy_data)
109
110 from rhodecode.config.routing import make_map
111 from rhodecode.config.middleware import make_pyramid_app
112
113 # TODO: this can be done once per worker versus per task
114 pyramid_app = make_pyramid_app(config, **config['app_conf'])
115
116 request = Request.blank('/', environ=proxy_data['environ'])
117 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
118 ip_addr=proxy_data['auth_user']['ip_addr'])
119
120 pyramid_request = prepare(request) # set pyramid threadlocal request
121
122 # pylons routing
123 if not rhodecode.CONFIG.get('routes.map'):
124 rhodecode.CONFIG['routes.map'] = make_map(config)
125 pylons.url._push_object(get_routes_generator_for_server_url(
126 get_server_url(request.environ)
127 ))
128
129 try:
130 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
131 finally:
132 pyramid_request['closer']()
133 pylons.url._pop_object()
134
135
55 def run_task(task, *args, **kwargs):
136 def run_task(task, *args, **kwargs):
56 if rhodecode.CELERY_ENABLED:
137 if rhodecode.CELERY_ENABLED:
57 celery_is_up = False
138 celery_is_up = False
@@ -33,7 +33,7 b' from pylons import config'
33 import rhodecode
33 import rhodecode
34 from rhodecode.lib.celerylib import (
34 from rhodecode.lib.celerylib import (
35 run_task, dbsession, __get_lockkey, LockHeld, DaemonLock,
35 run_task, dbsession, __get_lockkey, LockHeld, DaemonLock,
36 get_session, vcsconnection)
36 get_session, vcsconnection, RhodecodeCeleryTask)
37 from rhodecode.lib.hooks_base import log_create_repository
37 from rhodecode.lib.hooks_base import log_create_repository
38 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
38 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
39 from rhodecode.lib.utils import add_cache, action_logger
39 from rhodecode.lib.utils import add_cache, action_logger
@@ -56,7 +56,7 b' def get_logger(cls):'
56 return log
56 return log
57
57
58
58
59 @task(ignore_result=True)
59 @task(ignore_result=True, base=RhodecodeCeleryTask)
60 @dbsession
60 @dbsession
61 def send_email(recipients, subject, body='', html_body='', email_config=None):
61 def send_email(recipients, subject, body='', html_body='', email_config=None):
62 """
62 """
@@ -104,7 +104,7 b' def send_email(recipients, subject, body'
104 return True
104 return True
105
105
106
106
107 @task(ignore_result=False)
107 @task(ignore_result=True, base=RhodecodeCeleryTask)
108 @dbsession
108 @dbsession
109 @vcsconnection
109 @vcsconnection
110 def create_repo(form_data, cur_user):
110 def create_repo(form_data, cur_user):
@@ -197,7 +197,7 b' def create_repo(form_data, cur_user):'
197 return True
197 return True
198
198
199
199
200 @task(ignore_result=False)
200 @task(ignore_result=True, base=RhodecodeCeleryTask)
201 @dbsession
201 @dbsession
202 @vcsconnection
202 @vcsconnection
203 def create_repo_fork(form_data, cur_user):
203 def create_repo_fork(form_data, cur_user):
General Comments 0
You need to be logged in to leave comments. Login now