##// END OF EJS Templates
celery: don't allow subtasks within tasks
super-admin -
r4875:92d53da0 default
parent child Browse files
Show More
@@ -1,84 +1,96 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2020 RhodeCode GmbH
3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import socket
21 import socket
22 import logging
22 import logging
23
23
24 import rhodecode
24 import rhodecode
25 from zope.cachedescriptors.property import Lazy as LazyProperty
25 from zope.cachedescriptors.property import Lazy as LazyProperty
26 from rhodecode.lib.celerylib.loader import (
26 from rhodecode.lib.celerylib.loader import (
27 celery_app, RequestContextTask, get_logger)
27 celery_app, RequestContextTask, get_logger)
28 from rhodecode.lib.statsd_client import StatsdClient
28 from rhodecode.lib.statsd_client import StatsdClient
29
29
30 async_task = celery_app.task
30 async_task = celery_app.task
31
31
32
32
33 log = logging.getLogger(__name__)
33 log = logging.getLogger(__name__)
34
34
35
35
36 class ResultWrapper(object):
36 class ResultWrapper(object):
37 def __init__(self, task):
37 def __init__(self, task):
38 self.task = task
38 self.task = task
39
39
40 @LazyProperty
40 @LazyProperty
41 def result(self):
41 def result(self):
42 return self.task
42 return self.task
43
43
44
44
45 def run_task(task, *args, **kwargs):
45 def run_task(task, *args, **kwargs):
46 import celery
46 log.debug('Got task `%s` for execution, celery mode enabled:%s', task, rhodecode.CELERY_ENABLED)
47 log.debug('Got task `%s` for execution, celery mode enabled:%s', task, rhodecode.CELERY_ENABLED)
47 if task is None:
48 if task is None:
48 raise ValueError('Got non-existing task for execution')
49 raise ValueError('Got non-existing task for execution')
49
50
50 exec_mode = 'sync'
51 exec_mode = 'sync'
52 allow_async = True
53
54 # if we're already in a celery task, don't allow async execution again
55 # e.g task within task
56 in_task = celery.current_task
57 if in_task:
58 log.debug('This task in in context of another task: %s, not allowing another async execution', in_task)
59 allow_async = False
60 if kwargs.pop('allow_subtask', False):
61 log.debug('Forced async by allow_async=True flag')
62 allow_async = True
51
63
52 t = None
64 t = None
53 if rhodecode.CELERY_ENABLED:
65 if rhodecode.CELERY_ENABLED and allow_async:
54 try:
66 try:
55 t = task.apply_async(args=args, kwargs=kwargs)
67 t = task.apply_async(args=args, kwargs=kwargs)
56 log.debug('executing task %s:%s in async mode', t.task_id, task)
68 log.debug('executing task %s:%s in async mode', t.task_id, task)
57 exec_mode = 'async'
69 exec_mode = 'async'
58 except socket.error as e:
70 except socket.error as e:
59 if isinstance(e, IOError) and e.errno == 111:
71 if isinstance(e, IOError) and e.errno == 111:
60 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
72 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
61 else:
73 else:
62 log.exception("Exception while connecting to celeryd.")
74 log.exception("Exception while connecting to celeryd.")
63 except KeyError as e:
75 except KeyError as e:
64 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
76 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
65 except Exception as e:
77 except Exception as e:
66 log.exception(
78 log.exception(
67 "Exception while trying to run task asynchronous. "
79 "Exception while trying to run task asynchronous. "
68 "Fallback to sync execution.")
80 "Fallback to sync execution.")
69
81
70 else:
82 else:
71 log.debug('executing task %s:%s in sync mode', 'TASK', task)
83 log.debug('executing task %s:%s in sync mode', 'TASK', task)
72
84
73 statsd = StatsdClient.statsd
85 statsd = StatsdClient.statsd
74 if statsd:
86 if statsd:
75 task_repr = getattr(task, 'name', task)
87 task_repr = getattr(task, 'name', task)
76 statsd.incr('rhodecode_celery_task_total', tags=[
88 statsd.incr('rhodecode_celery_task_total', tags=[
77 'task:{}'.format(task_repr),
89 'task:{}'.format(task_repr),
78 'mode:{}'.format(exec_mode)
90 'mode:{}'.format(exec_mode)
79 ])
91 ])
80
92
81 # we got async task, return it after statsd call
93 # we got async task, return it after statsd call
82 if t:
94 if t:
83 return t
95 return t
84 return ResultWrapper(task(*args, **kwargs))
96 return ResultWrapper(task(*args, **kwargs))
@@ -1,327 +1,323 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2020 RhodeCode GmbH
3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
20 """
21 Celery loader, run with::
21 Celery loader, run with::
22
22
23 celery worker \
23 celery worker \
24 --task-events \
24 --task-events \
25 --beat \
25 --beat \
26 --autoscale=20,2 \
26 --autoscale=20,2 \
27 --max-tasks-per-child 1 \
27 --max-tasks-per-child 1 \
28 --app rhodecode.lib.celerylib.loader \
28 --app rhodecode.lib.celerylib.loader \
29 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
29 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
30 --loglevel DEBUG --ini=.dev/dev.ini
30 --loglevel DEBUG --ini=.dev/dev.ini
31 """
31 """
32 import os
32 import os
33 import logging
33 import logging
34 import importlib
34 import importlib
35
35
36 from celery import Celery
36 from celery import Celery
37 from celery import signals
37 from celery import signals
38 from celery import Task
38 from celery import Task
39 from celery import exceptions # pragma: no cover
39 from celery import exceptions # pragma: no cover
40 from kombu.serialization import register
40 from kombu.serialization import register
41 from pyramid.threadlocal import get_current_request
41 from pyramid.threadlocal import get_current_request
42
42
43 import rhodecode
43 import rhodecode
44
44
45 from rhodecode.lib.auth import AuthUser
45 from rhodecode.lib.auth import AuthUser
46 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
46 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
47 from rhodecode.lib.ext_json import json
47 from rhodecode.lib.ext_json import json
48 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
48 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
49 from rhodecode.lib.utils2 import str2bool
49 from rhodecode.lib.utils2 import str2bool
50 from rhodecode.model import meta
50 from rhodecode.model import meta
51
51
52
52
53 register('json_ext', json.dumps, json.loads,
53 register('json_ext', json.dumps, json.loads,
54 content_type='application/x-json-ext',
54 content_type='application/x-json-ext',
55 content_encoding='utf-8')
55 content_encoding='utf-8')
56
56
57 log = logging.getLogger('celery.rhodecode.loader')
57 log = logging.getLogger('celery.rhodecode.loader')
58
58
59
59
60 def add_preload_arguments(parser):
60 def add_preload_arguments(parser):
61 parser.add_argument(
61 parser.add_argument(
62 '--ini', default=None,
62 '--ini', default=None,
63 help='Path to ini configuration file.'
63 help='Path to ini configuration file.'
64 )
64 )
65 parser.add_argument(
65 parser.add_argument(
66 '--ini-var', default=None,
66 '--ini-var', default=None,
67 help='Comma separated list of key=value to pass to ini.'
67 help='Comma separated list of key=value to pass to ini.'
68 )
68 )
69
69
70
70
71 def get_logger(obj):
71 def get_logger(obj):
72 custom_log = logging.getLogger(
72 custom_log = logging.getLogger(
73 'rhodecode.task.{}'.format(obj.__class__.__name__))
73 'rhodecode.task.{}'.format(obj.__class__.__name__))
74
74
75 if rhodecode.CELERY_ENABLED:
75 if rhodecode.CELERY_ENABLED:
76 try:
76 try:
77 custom_log = obj.get_logger()
77 custom_log = obj.get_logger()
78 except Exception:
78 except Exception:
79 pass
79 pass
80
80
81 return custom_log
81 return custom_log
82
82
83
83
84 imports = ['rhodecode.lib.celerylib.tasks']
84 imports = ['rhodecode.lib.celerylib.tasks']
85
85
86 try:
86 try:
87 # try if we have EE tasks available
87 # try if we have EE tasks available
88 importlib.import_module('rc_ee')
88 importlib.import_module('rc_ee')
89 imports.append('rc_ee.lib.celerylib.tasks')
89 imports.append('rc_ee.lib.celerylib.tasks')
90 except ImportError:
90 except ImportError:
91 pass
91 pass
92
92
93
93
94 base_celery_config = {
94 base_celery_config = {
95 'result_backend': 'rpc://',
95 'result_backend': 'rpc://',
96 'result_expires': 60 * 60 * 24,
96 'result_expires': 60 * 60 * 24,
97 'result_persistent': True,
97 'result_persistent': True,
98 'imports': imports,
98 'imports': imports,
99 'worker_max_tasks_per_child': 100,
99 'worker_max_tasks_per_child': 100,
100 'accept_content': ['json_ext'],
100 'accept_content': ['json_ext'],
101 'task_serializer': 'json_ext',
101 'task_serializer': 'json_ext',
102 'result_serializer': 'json_ext',
102 'result_serializer': 'json_ext',
103 'worker_hijack_root_logger': False,
103 'worker_hijack_root_logger': False,
104 'database_table_names': {
104 'database_table_names': {
105 'task': 'beat_taskmeta',
105 'task': 'beat_taskmeta',
106 'group': 'beat_groupmeta',
106 'group': 'beat_groupmeta',
107 }
107 }
108 }
108 }
109 # init main celery app
109 # init main celery app
110 celery_app = Celery()
110 celery_app = Celery()
111 celery_app.user_options['preload'].add(add_preload_arguments)
111 celery_app.user_options['preload'].add(add_preload_arguments)
112 ini_file_glob = None
112 ini_file_glob = None
113
113
114
114
115 @signals.setup_logging.connect
115 @signals.setup_logging.connect
116 def setup_logging_callback(**kwargs):
116 def setup_logging_callback(**kwargs):
117 setup_logging(ini_file_glob)
117 setup_logging(ini_file_glob)
118
118
119
119
120 @signals.user_preload_options.connect
120 @signals.user_preload_options.connect
121 def on_preload_parsed(options, **kwargs):
121 def on_preload_parsed(options, **kwargs):
122 from rhodecode.config.middleware import get_celery_config
122 from rhodecode.config.middleware import get_celery_config
123
123
124 ini_location = options['ini']
124 ini_location = options['ini']
125 ini_vars = options['ini_var']
125 ini_vars = options['ini_var']
126 celery_app.conf['INI_PYRAMID'] = options['ini']
126 celery_app.conf['INI_PYRAMID'] = options['ini']
127
127
128 if ini_location is None:
128 if ini_location is None:
129 print('You must provide the paste --ini argument')
129 print('You must provide the paste --ini argument')
130 exit(-1)
130 exit(-1)
131
131
132 options = None
132 options = None
133 if ini_vars is not None:
133 if ini_vars is not None:
134 options = parse_ini_vars(ini_vars)
134 options = parse_ini_vars(ini_vars)
135
135
136 global ini_file_glob
136 global ini_file_glob
137 ini_file_glob = ini_location
137 ini_file_glob = ini_location
138
138
139 log.debug('Bootstrapping RhodeCode application...')
139 log.debug('Bootstrapping RhodeCode application...')
140
140
141 env = {}
141 env = {}
142 try:
142 try:
143 env = bootstrap(ini_location, options=options)
143 env = bootstrap(ini_location, options=options)
144 except Exception:
144 except Exception:
145 log.exception('Failed to bootstrap RhodeCode APP')
145 log.exception('Failed to bootstrap RhodeCode APP')
146
146
147 log.debug('Got Pyramid ENV: %s', env)
147 log.debug('Got Pyramid ENV: %s', env)
148 celery_settings = get_celery_config(env['registry'].settings)
148 celery_settings = get_celery_config(env['registry'].settings)
149
149
150 setup_celery_app(
150 setup_celery_app(
151 app=env['app'], root=env['root'], request=env['request'],
151 app=env['app'], root=env['root'], request=env['request'],
152 registry=env['registry'], closer=env['closer'],
152 registry=env['registry'], closer=env['closer'],
153 celery_settings=celery_settings)
153 celery_settings=celery_settings)
154
154
155 # fix the global flag even if it's disabled via .ini file because this
155 # fix the global flag even if it's disabled via .ini file because this
156 # is a worker code that doesn't need this to be disabled.
156 # is a worker code that doesn't need this to be disabled.
157 rhodecode.CELERY_ENABLED = True
157 rhodecode.CELERY_ENABLED = True
158
158
159
159
160 @signals.task_prerun.connect
160 @signals.task_prerun.connect
161 def task_prerun_signal(task_id, task, args, **kwargs):
161 def task_prerun_signal(task_id, task, args, **kwargs):
162 ping_db()
162 ping_db()
163
163
164
164
165 @signals.task_success.connect
165 @signals.task_success.connect
166 def task_success_signal(result, **kwargs):
166 def task_success_signal(result, **kwargs):
167 meta.Session.commit()
167 meta.Session.commit()
168 closer = celery_app.conf['PYRAMID_CLOSER']
168 closer = celery_app.conf['PYRAMID_CLOSER']
169 if closer:
169 if closer:
170 closer()
170 closer()
171
171
172
172
173 @signals.task_retry.connect
173 @signals.task_retry.connect
174 def task_retry_signal(
174 def task_retry_signal(
175 request, reason, einfo, **kwargs):
175 request, reason, einfo, **kwargs):
176 meta.Session.remove()
176 meta.Session.remove()
177 closer = celery_app.conf['PYRAMID_CLOSER']
177 closer = celery_app.conf['PYRAMID_CLOSER']
178 if closer:
178 if closer:
179 closer()
179 closer()
180
180
181
181
182 @signals.task_failure.connect
182 @signals.task_failure.connect
183 def task_failure_signal(
183 def task_failure_signal(
184 task_id, exception, args, kwargs, traceback, einfo, **kargs):
184 task_id, exception, args, kwargs, traceback, einfo, **kargs):
185
186 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
185 from rhodecode.lib.exc_tracking import store_exception
187 from rhodecode.lib.exc_tracking import store_exception
186 from rhodecode.lib.statsd_client import StatsdClient
188 from rhodecode.lib.statsd_client import StatsdClient
187
189
188 meta.Session.remove()
190 meta.Session.remove()
189
191
190 # simulate sys.exc_info()
192 # simulate sys.exc_info()
191 exc_info = (einfo.type, einfo.exception, einfo.tb)
193 exc_info = (einfo.type, einfo.exception, einfo.tb)
192 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
194 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
193 statsd = StatsdClient.statsd
195 statsd = StatsdClient.statsd
194 if statsd:
196 if statsd:
195 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
197 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
196 statsd.incr('rhodecode_exception_total',
198 statsd.incr('rhodecode_exception_total',
197 tags=["exc_source:celery", "type:{}".format(exc_type)])
199 tags=["exc_source:celery", "type:{}".format(exc_type)])
198
200
199 closer = celery_app.conf['PYRAMID_CLOSER']
201 closer = celery_app.conf['PYRAMID_CLOSER']
200 if closer:
202 if closer:
201 closer()
203 closer()
202
204
203
205
204 @signals.task_revoked.connect
206 @signals.task_revoked.connect
205 def task_revoked_signal(
207 def task_revoked_signal(
206 request, terminated, signum, expired, **kwargs):
208 request, terminated, signum, expired, **kwargs):
207 closer = celery_app.conf['PYRAMID_CLOSER']
209 closer = celery_app.conf['PYRAMID_CLOSER']
208 if closer:
210 if closer:
209 closer()
211 closer()
210
212
211
213
212 def setup_celery_app(app, root, request, registry, closer, celery_settings):
214 def setup_celery_app(app, root, request, registry, closer, celery_settings):
213 log.debug('Got custom celery conf: %s', celery_settings)
215 log.debug('Got custom celery conf: %s', celery_settings)
214 celery_config = base_celery_config
216 celery_config = base_celery_config
215 celery_config.update({
217 celery_config.update({
216 # store celerybeat scheduler db where the .ini file is
218 # store celerybeat scheduler db where the .ini file is
217 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
219 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
218 })
220 })
219
221
220 celery_config.update(celery_settings)
222 celery_config.update(celery_settings)
221 celery_app.config_from_object(celery_config)
223 celery_app.config_from_object(celery_config)
222
224
223 celery_app.conf.update({'PYRAMID_APP': app})
225 celery_app.conf.update({'PYRAMID_APP': app})
224 celery_app.conf.update({'PYRAMID_ROOT': root})
226 celery_app.conf.update({'PYRAMID_ROOT': root})
225 celery_app.conf.update({'PYRAMID_REQUEST': request})
227 celery_app.conf.update({'PYRAMID_REQUEST': request})
226 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
228 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
227 celery_app.conf.update({'PYRAMID_CLOSER': closer})
229 celery_app.conf.update({'PYRAMID_CLOSER': closer})
228
230
229
231
230 def configure_celery(config, celery_settings):
232 def configure_celery(config, celery_settings):
231 """
233 """
232 Helper that is called from our application creation logic. It gives
234 Helper that is called from our application creation logic. It gives
233 connection info into running webapp and allows execution of tasks from
235 connection info into running webapp and allows execution of tasks from
234 RhodeCode itself
236 RhodeCode itself
235 """
237 """
236 # store some globals into rhodecode
238 # store some globals into rhodecode
237 rhodecode.CELERY_ENABLED = str2bool(
239 rhodecode.CELERY_ENABLED = str2bool(
238 config.registry.settings.get('use_celery'))
240 config.registry.settings.get('use_celery'))
239 if rhodecode.CELERY_ENABLED:
241 if rhodecode.CELERY_ENABLED:
240 log.info('Configuring celery based on `%s` settings', celery_settings)
242 log.info('Configuring celery based on `%s` settings', celery_settings)
241 setup_celery_app(
243 setup_celery_app(
242 app=None, root=None, request=None, registry=config.registry,
244 app=None, root=None, request=None, registry=config.registry,
243 closer=None, celery_settings=celery_settings)
245 closer=None, celery_settings=celery_settings)
244
246
245
247
246 def maybe_prepare_env(req):
248 def maybe_prepare_env(req):
247 environ = {}
249 environ = {}
248 try:
250 try:
249 environ.update({
251 environ.update({
250 'PATH_INFO': req.environ['PATH_INFO'],
252 'PATH_INFO': req.environ['PATH_INFO'],
251 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
253 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
252 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
254 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
253 'SERVER_NAME': req.environ['SERVER_NAME'],
255 'SERVER_NAME': req.environ['SERVER_NAME'],
254 'SERVER_PORT': req.environ['SERVER_PORT'],
256 'SERVER_PORT': req.environ['SERVER_PORT'],
255 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
257 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
256 })
258 })
257 except Exception:
259 except Exception:
258 pass
260 pass
259
261
260 return environ
262 return environ
261
263
262
264
263 class RequestContextTask(Task):
265 class RequestContextTask(Task):
264 """
266 """
265 This is a celery task which will create a rhodecode app instance context
267 This is a celery task which will create a rhodecode app instance context
266 for the task, patch pyramid with the original request
268 for the task, patch pyramid with the original request
267 that created the task and also add the user to the context.
269 that created the task and also add the user to the context.
268 """
270 """
269
271
270 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
272 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
271 link=None, link_error=None, shadow=None, **options):
273 link=None, link_error=None, shadow=None, **options):
272 """ queue the job to run (we are in web request context here) """
274 """ queue the job to run (we are in web request context here) """
273
275
274 req = self.app.conf['PYRAMID_REQUEST'] or get_current_request()
276 req = self.app.conf['PYRAMID_REQUEST'] or get_current_request()
275
277
276 log.debug('Running Task with class: %s. Request Class: %s',
278 log.debug('Running Task with class: %s. Request Class: %s',
277 self.__class__, req.__class__)
279 self.__class__, req.__class__)
278
280
279 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
280 log.debug('celery proxy data:%r', proxy_data)
281
282 user_id = None
281 user_id = None
283 ip_addr = None
282 ip_addr = None
284 if proxy_data:
285 user_id = proxy_data['auth_user']['user_id']
286 ip_addr = proxy_data['auth_user']['ip_addr']
287
283
288 # web case
284 # web case
289 if hasattr(req, 'user'):
285 if hasattr(req, 'user'):
290 ip_addr = req.user.ip_addr
286 ip_addr = req.user.ip_addr
291 user_id = req.user.user_id
287 user_id = req.user.user_id
292
288
293 # api case
289 # api case
294 elif hasattr(req, 'rpc_user'):
290 elif hasattr(req, 'rpc_user'):
295 ip_addr = req.rpc_user.ip_addr
291 ip_addr = req.rpc_user.ip_addr
296 user_id = req.rpc_user.user_id
292 user_id = req.rpc_user.user_id
297 else:
293 else:
298 if user_id and ip_addr:
294 if user_id and ip_addr:
299 log.debug('Using data from celery proxy user')
295 log.debug('Using data from celery proxy user')
300
296
301 else:
297 else:
302 raise Exception(
298 raise Exception(
303 'Unable to fetch required data from request: {}. \n'
299 'Unable to fetch required data from request: {}. \n'
304 'This task is required to be executed from context of '
300 'This task is required to be executed from context of '
305 'request in a webapp. Task: {}'.format(
301 'request in a webapp. Task: {}'.format(
306 repr(req),
302 repr(req),
307 self.__class__
303 self.__class__
308 )
304 )
309 )
305 )
310
306
311 if req:
307 if req:
312 # we hook into kwargs since it is the only way to pass our data to
308 # we hook into kwargs since it is the only way to pass our data to
313 # the celery worker
309 # the celery worker
314 environ = maybe_prepare_env(req)
310 environ = maybe_prepare_env(req)
315 options['headers'] = options.get('headers', {})
311 options['headers'] = options.get('headers', {})
316 options['headers'].update({
312 options['headers'].update({
317 'rhodecode_proxy_data': {
313 'rhodecode_proxy_data': {
318 'environ': environ,
314 'environ': environ,
319 'auth_user': {
315 'auth_user': {
320 'ip_addr': ip_addr,
316 'ip_addr': ip_addr,
321 'user_id': user_id
317 'user_id': user_id
322 },
318 },
323 }
319 }
324 })
320 })
325
321
326 return super(RequestContextTask, self).apply_async(
322 return super(RequestContextTask, self).apply_async(
327 args, kwargs, task_id, producer, link, link_error, shadow, **options)
323 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now