Show More
@@ -22,6 +22,7 b' celery libs for RhodeCode' | |||
|
22 | 22 | """ |
|
23 | 23 | |
|
24 | 24 | |
|
25 | import pylons | |
|
25 | 26 | import socket |
|
26 | 27 | import logging |
|
27 | 28 | |
@@ -29,16 +30,23 b' import rhodecode' | |||
|
29 | 30 | |
|
30 | 31 | from os.path import join as jn |
|
31 | 32 | from pylons import config |
|
33 | from celery.task import Task | |
|
34 | from pyramid.request import Request | |
|
35 | from pyramid.scripting import prepare | |
|
36 | from pyramid.threadlocal import get_current_request | |
|
32 | 37 | |
|
33 | 38 | from decorator import decorator |
|
34 | 39 | |
|
35 | 40 | from zope.cachedescriptors.property import Lazy as LazyProperty |
|
36 | 41 | |
|
37 | 42 | from rhodecode.config import utils |
|
38 |
from rhodecode.lib.utils2 import |
|
|
43 | from rhodecode.lib.utils2 import ( | |
|
44 | safe_str, md5_safe, aslist, get_routes_generator_for_server_url, | |
|
45 | get_server_url) | |
|
39 | 46 | from rhodecode.lib.pidlock import DaemonLock, LockHeld |
|
40 | 47 | from rhodecode.lib.vcs import connect_vcs |
|
41 | 48 | from rhodecode.model import meta |
|
49 | from rhodecode.lib.auth import AuthUser | |
|
42 | 50 | |
|
43 | 51 | log = logging.getLogger(__name__) |
|
44 | 52 | |
@@ -52,6 +60,79 b' class ResultWrapper(object):' | |||
|
52 | 60 | return self.task |
|
53 | 61 | |
|
54 | 62 | |
|
63 | class RhodecodeCeleryTask(Task): | |
|
64 | """ | |
|
65 | This is a celery task which will create a rhodecode app instance context | |
|
66 | for the task, patch pyramid + pylons threadlocals with the original request | |
|
67 | that created the task and also add the user to the context. | |
|
68 | ||
|
69 | This class as a whole should be removed once the pylons port is complete | |
|
70 | and a pyramid only solution for celery is implemented as per issue #4139 | |
|
71 | """ | |
|
72 | ||
|
73 | def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, | |
|
74 | link=None, link_error=None, **options): | |
|
75 | """ queue the job to run (we are in web request context here) """ | |
|
76 | ||
|
77 | request = get_current_request() | |
|
78 | ||
|
79 | # we hook into kwargs since it is the only way to pass our data to the | |
|
80 | # celery worker in celery 2.2 | |
|
81 | kwargs.update({ | |
|
82 | '_rhodecode_proxy_data': { | |
|
83 | 'environ': { | |
|
84 | 'PATH_INFO': request.environ['PATH_INFO'], | |
|
85 | 'SCRIPT_NAME': request.environ['SCRIPT_NAME'], | |
|
86 | 'HTTP_HOST': request.environ.get('HTTP_HOST', | |
|
87 | request.environ['SERVER_NAME']), | |
|
88 | 'SERVER_NAME': request.environ['SERVER_NAME'], | |
|
89 | 'SERVER_PORT': request.environ['SERVER_PORT'], | |
|
90 | 'wsgi.url_scheme': request.environ['wsgi.url_scheme'], | |
|
91 | }, | |
|
92 | 'auth_user': { | |
|
93 | 'ip_addr': request.user.ip_addr, | |
|
94 | 'user_id': request.user.user_id | |
|
95 | }, | |
|
96 | } | |
|
97 | }) | |
|
98 | return super(RhodecodeCeleryTask, self).apply_async( | |
|
99 | args, kwargs, task_id, producer, link, link_error, **options) | |
|
100 | ||
|
101 | def __call__(self, *args, **kwargs): | |
|
102 | """ rebuild the context and then run task on celery worker """ | |
|
103 | proxy_data = kwargs.pop('_rhodecode_proxy_data', {}) | |
|
104 | ||
|
105 | if not proxy_data: | |
|
106 | return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs) | |
|
107 | ||
|
108 | log.debug('using celery proxy data to run task: %r', proxy_data) | |
|
109 | ||
|
110 | from rhodecode.config.routing import make_map | |
|
111 | from rhodecode.config.middleware import make_pyramid_app | |
|
112 | ||
|
113 | # TODO: this can be done once per worker versus per task | |
|
114 | pyramid_app = make_pyramid_app(config, **config['app_conf']) | |
|
115 | ||
|
116 | request = Request.blank('/', environ=proxy_data['environ']) | |
|
117 | request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'], | |
|
118 | ip_addr=proxy_data['auth_user']['ip_addr']) | |
|
119 | ||
|
120 | pyramid_request = prepare(request) # set pyramid threadlocal request | |
|
121 | ||
|
122 | # pylons routing | |
|
123 | if not rhodecode.CONFIG.get('routes.map'): | |
|
124 | rhodecode.CONFIG['routes.map'] = make_map(config) | |
|
125 | pylons.url._push_object(get_routes_generator_for_server_url( | |
|
126 | get_server_url(request.environ) | |
|
127 | )) | |
|
128 | ||
|
129 | try: | |
|
130 | return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs) | |
|
131 | finally: | |
|
132 | pyramid_request['closer']() | |
|
133 | pylons.url._pop_object() | |
|
134 | ||
|
135 | ||
|
55 | 136 | def run_task(task, *args, **kwargs): |
|
56 | 137 | if rhodecode.CELERY_ENABLED: |
|
57 | 138 | celery_is_up = False |
@@ -33,7 +33,7 b' from pylons import config' | |||
|
33 | 33 | import rhodecode |
|
34 | 34 | from rhodecode.lib.celerylib import ( |
|
35 | 35 | run_task, dbsession, __get_lockkey, LockHeld, DaemonLock, |
|
36 | get_session, vcsconnection) | |
|
36 | get_session, vcsconnection, RhodecodeCeleryTask) | |
|
37 | 37 | from rhodecode.lib.hooks_base import log_create_repository |
|
38 | 38 | from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer |
|
39 | 39 | from rhodecode.lib.utils import add_cache, action_logger |
@@ -56,7 +56,7 b' def get_logger(cls):' | |||
|
56 | 56 | return log |
|
57 | 57 | |
|
58 | 58 | |
|
59 | @task(ignore_result=True) | |
|
59 | @task(ignore_result=True, base=RhodecodeCeleryTask) | |
|
60 | 60 | @dbsession |
|
61 | 61 | def send_email(recipients, subject, body='', html_body='', email_config=None): |
|
62 | 62 | """ |
@@ -104,7 +104,7 b' def send_email(recipients, subject, body' | |||
|
104 | 104 | return True |
|
105 | 105 | |
|
106 | 106 | |
|
107 |
@task(ignore_result= |
|
|
107 | @task(ignore_result=True, base=RhodecodeCeleryTask) | |
|
108 | 108 | @dbsession |
|
109 | 109 | @vcsconnection |
|
110 | 110 | def create_repo(form_data, cur_user): |
@@ -197,7 +197,7 b' def create_repo(form_data, cur_user):' | |||
|
197 | 197 | return True |
|
198 | 198 | |
|
199 | 199 | |
|
200 |
@task(ignore_result= |
|
|
200 | @task(ignore_result=True, base=RhodecodeCeleryTask) | |
|
201 | 201 | @dbsession |
|
202 | 202 | @vcsconnection |
|
203 | 203 | def create_repo_fork(form_data, cur_user): |
General Comments 0
You need to be logged in to leave comments.
Login now