Show More
@@ -22,6 +22,7 b' celery libs for RhodeCode' | |||||
22 | """ |
|
22 | """ | |
23 |
|
23 | |||
24 |
|
24 | |||
|
25 | import pylons | |||
25 | import socket |
|
26 | import socket | |
26 | import logging |
|
27 | import logging | |
27 |
|
28 | |||
@@ -29,16 +30,23 b' import rhodecode' | |||||
29 |
|
30 | |||
30 | from os.path import join as jn |
|
31 | from os.path import join as jn | |
31 | from pylons import config |
|
32 | from pylons import config | |
|
33 | from celery.task import Task | |||
|
34 | from pyramid.request import Request | |||
|
35 | from pyramid.scripting import prepare | |||
|
36 | from pyramid.threadlocal import get_current_request | |||
32 |
|
37 | |||
33 | from decorator import decorator |
|
38 | from decorator import decorator | |
34 |
|
39 | |||
35 | from zope.cachedescriptors.property import Lazy as LazyProperty |
|
40 | from zope.cachedescriptors.property import Lazy as LazyProperty | |
36 |
|
41 | |||
37 | from rhodecode.config import utils |
|
42 | from rhodecode.config import utils | |
38 |
from rhodecode.lib.utils2 import |
|
43 | from rhodecode.lib.utils2 import ( | |
|
44 | safe_str, md5_safe, aslist, get_routes_generator_for_server_url, | |||
|
45 | get_server_url) | |||
39 | from rhodecode.lib.pidlock import DaemonLock, LockHeld |
|
46 | from rhodecode.lib.pidlock import DaemonLock, LockHeld | |
40 | from rhodecode.lib.vcs import connect_vcs |
|
47 | from rhodecode.lib.vcs import connect_vcs | |
41 | from rhodecode.model import meta |
|
48 | from rhodecode.model import meta | |
|
49 | from rhodecode.lib.auth import AuthUser | |||
42 |
|
50 | |||
43 | log = logging.getLogger(__name__) |
|
51 | log = logging.getLogger(__name__) | |
44 |
|
52 | |||
@@ -52,6 +60,79 b' class ResultWrapper(object):' | |||||
52 | return self.task |
|
60 | return self.task | |
53 |
|
61 | |||
54 |
|
62 | |||
|
63 | class RhodecodeCeleryTask(Task): | |||
|
64 | """ | |||
|
65 | This is a celery task which will create a rhodecode app instance context | |||
|
66 | for the task, patch pyramid + pylons threadlocals with the original request | |||
|
67 | that created the task and also add the user to the context. | |||
|
68 | ||||
|
69 | This class as a whole should be removed once the pylons port is complete | |||
|
70 | and a pyramid only solution for celery is implemented as per issue #4139 | |||
|
71 | """ | |||
|
72 | ||||
|
73 | def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, | |||
|
74 | link=None, link_error=None, **options): | |||
|
75 | """ queue the job to run (we are in web request context here) """ | |||
|
76 | ||||
|
77 | request = get_current_request() | |||
|
78 | ||||
|
79 | # we hook into kwargs since it is the only way to pass our data to the | |||
|
80 | # celery worker in celery 2.2 | |||
|
81 | kwargs.update({ | |||
|
82 | '_rhodecode_proxy_data': { | |||
|
83 | 'environ': { | |||
|
84 | 'PATH_INFO': request.environ['PATH_INFO'], | |||
|
85 | 'SCRIPT_NAME': request.environ['SCRIPT_NAME'], | |||
|
86 | 'HTTP_HOST': request.environ.get('HTTP_HOST', | |||
|
87 | request.environ['SERVER_NAME']), | |||
|
88 | 'SERVER_NAME': request.environ['SERVER_NAME'], | |||
|
89 | 'SERVER_PORT': request.environ['SERVER_PORT'], | |||
|
90 | 'wsgi.url_scheme': request.environ['wsgi.url_scheme'], | |||
|
91 | }, | |||
|
92 | 'auth_user': { | |||
|
93 | 'ip_addr': request.user.ip_addr, | |||
|
94 | 'user_id': request.user.user_id | |||
|
95 | }, | |||
|
96 | } | |||
|
97 | }) | |||
|
98 | return super(RhodecodeCeleryTask, self).apply_async( | |||
|
99 | args, kwargs, task_id, producer, link, link_error, **options) | |||
|
100 | ||||
|
101 | def __call__(self, *args, **kwargs): | |||
|
102 | """ rebuild the context and then run task on celery worker """ | |||
|
103 | proxy_data = kwargs.pop('_rhodecode_proxy_data', {}) | |||
|
104 | ||||
|
105 | if not proxy_data: | |||
|
106 | return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs) | |||
|
107 | ||||
|
108 | log.debug('using celery proxy data to run task: %r', proxy_data) | |||
|
109 | ||||
|
110 | from rhodecode.config.routing import make_map | |||
|
111 | from rhodecode.config.middleware import make_pyramid_app | |||
|
112 | ||||
|
113 | # TODO: this can be done once per worker versus per task | |||
|
114 | pyramid_app = make_pyramid_app(config, **config['app_conf']) | |||
|
115 | ||||
|
116 | request = Request.blank('/', environ=proxy_data['environ']) | |||
|
117 | request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'], | |||
|
118 | ip_addr=proxy_data['auth_user']['ip_addr']) | |||
|
119 | ||||
|
120 | pyramid_request = prepare(request) # set pyramid threadlocal request | |||
|
121 | ||||
|
122 | # pylons routing | |||
|
123 | if not rhodecode.CONFIG.get('routes.map'): | |||
|
124 | rhodecode.CONFIG['routes.map'] = make_map(config) | |||
|
125 | pylons.url._push_object(get_routes_generator_for_server_url( | |||
|
126 | get_server_url(request.environ) | |||
|
127 | )) | |||
|
128 | ||||
|
129 | try: | |||
|
130 | return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs) | |||
|
131 | finally: | |||
|
132 | pyramid_request['closer']() | |||
|
133 | pylons.url._pop_object() | |||
|
134 | ||||
|
135 | ||||
55 | def run_task(task, *args, **kwargs): |
|
136 | def run_task(task, *args, **kwargs): | |
56 | if rhodecode.CELERY_ENABLED: |
|
137 | if rhodecode.CELERY_ENABLED: | |
57 | celery_is_up = False |
|
138 | celery_is_up = False |
@@ -33,7 +33,7 b' from pylons import config' | |||||
33 | import rhodecode |
|
33 | import rhodecode | |
34 | from rhodecode.lib.celerylib import ( |
|
34 | from rhodecode.lib.celerylib import ( | |
35 | run_task, dbsession, __get_lockkey, LockHeld, DaemonLock, |
|
35 | run_task, dbsession, __get_lockkey, LockHeld, DaemonLock, | |
36 | get_session, vcsconnection) |
|
36 | get_session, vcsconnection, RhodecodeCeleryTask) | |
37 | from rhodecode.lib.hooks_base import log_create_repository |
|
37 | from rhodecode.lib.hooks_base import log_create_repository | |
38 | from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer |
|
38 | from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer | |
39 | from rhodecode.lib.utils import add_cache, action_logger |
|
39 | from rhodecode.lib.utils import add_cache, action_logger | |
@@ -56,7 +56,7 b' def get_logger(cls):' | |||||
56 | return log |
|
56 | return log | |
57 |
|
57 | |||
58 |
|
58 | |||
59 | @task(ignore_result=True) |
|
59 | @task(ignore_result=True, base=RhodecodeCeleryTask) | |
60 | @dbsession |
|
60 | @dbsession | |
61 | def send_email(recipients, subject, body='', html_body='', email_config=None): |
|
61 | def send_email(recipients, subject, body='', html_body='', email_config=None): | |
62 | """ |
|
62 | """ | |
@@ -104,7 +104,7 b' def send_email(recipients, subject, body' | |||||
104 | return True |
|
104 | return True | |
105 |
|
105 | |||
106 |
|
106 | |||
107 |
@task(ignore_result= |
|
107 | @task(ignore_result=True, base=RhodecodeCeleryTask) | |
108 | @dbsession |
|
108 | @dbsession | |
109 | @vcsconnection |
|
109 | @vcsconnection | |
110 | def create_repo(form_data, cur_user): |
|
110 | def create_repo(form_data, cur_user): | |
@@ -197,7 +197,7 b' def create_repo(form_data, cur_user):' | |||||
197 | return True |
|
197 | return True | |
198 |
|
198 | |||
199 |
|
199 | |||
200 |
@task(ignore_result= |
|
200 | @task(ignore_result=True, base=RhodecodeCeleryTask) | |
201 | @dbsession |
|
201 | @dbsession | |
202 | @vcsconnection |
|
202 | @vcsconnection | |
203 | def create_repo_fork(form_data, cur_user): |
|
203 | def create_repo_fork(form_data, cur_user): |
General Comments 0
You need to be logged in to leave comments.
Login now