##// END OF EJS Templates
statsd: better task execution reporting on celery
super-admin -
r4892:22facf19 default
parent child Browse files
Show More
@@ -1,95 +1,95 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2020 RhodeCode GmbH
3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import socket
21 import socket
22 import logging
22 import logging
23
23
24 import rhodecode
24 import rhodecode
25 from zope.cachedescriptors.property import Lazy as LazyProperty
25 from zope.cachedescriptors.property import Lazy as LazyProperty
26 from rhodecode.lib.celerylib.loader import (
26 from rhodecode.lib.celerylib.loader import (
27 celery_app, RequestContextTask, get_logger)
27 celery_app, RequestContextTask, get_logger)
28 from rhodecode.lib.statsd_client import StatsdClient
28 from rhodecode.lib.statsd_client import StatsdClient
29
29
30 async_task = celery_app.task
30 async_task = celery_app.task
31
31
32
32
33 log = logging.getLogger(__name__)
33 log = logging.getLogger(__name__)
34
34
35
35
36 class ResultWrapper(object):
36 class ResultWrapper(object):
37 def __init__(self, task):
37 def __init__(self, task):
38 self.task = task
38 self.task = task
39
39
40 @LazyProperty
40 @LazyProperty
41 def result(self):
41 def result(self):
42 return self.task
42 return self.task
43
43
44
44
45 def run_task(task, *args, **kwargs):
45 def run_task(task, *args, **kwargs):
46 import celery
46 import celery
47 log.debug('Got task `%s` for execution, celery mode enabled:%s', task, rhodecode.CELERY_ENABLED)
47 log.debug('Got task `%s` for execution, celery mode enabled:%s', task, rhodecode.CELERY_ENABLED)
48 if task is None:
48 if task is None:
49 raise ValueError('Got non-existing task for execution')
49 raise ValueError('Got non-existing task for execution')
50
50
51 exec_mode = 'sync'
51 exec_mode = 'sync'
52 allow_async = True
52 allow_async = True
53
53
54 # if we're already in a celery task, don't allow async execution again
54 # if we're already in a celery task, don't allow async execution again
55 # e.g task within task
55 # e.g task within task
56 in_task = celery.current_task
56 in_task = celery.current_task
57 if in_task:
57 if in_task:
58 log.debug('This task in in context of another task: %s, not allowing another async execution', in_task)
58 log.debug('This task in in context of another task: %s, not allowing another async execution', in_task)
59 allow_async = False
59 allow_async = False
60 if kwargs.pop('allow_subtask', False):
60 if kwargs.pop('allow_subtask', False):
61 log.debug('Forced async by allow_async=True flag')
61 log.debug('Forced async by allow_async=True flag')
62 allow_async = True
62 allow_async = True
63
63
64 t = None
64 t = None
65 if rhodecode.CELERY_ENABLED and allow_async:
65 if rhodecode.CELERY_ENABLED and allow_async:
66
66
67 statsd = StatsdClient.statsd
68 if statsd:
69 task_repr = getattr(task, 'name', task)
70 statsd.incr('rhodecode_celery_task_total', tags=[
71 'task:{}'.format(task_repr)
72 ])
73
74 try:
67 try:
75 t = task.apply_async(args=args, kwargs=kwargs)
68 t = task.apply_async(args=args, kwargs=kwargs)
76 log.debug('executing task %s:%s in async mode', t.task_id, task)
69 log.debug('executing task %s:%s in async mode', t.task_id, task)
77 except socket.error as e:
70 except socket.error as e:
78 if isinstance(e, IOError) and e.errno == 111:
71 if isinstance(e, IOError) and e.errno == 111:
79 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
72 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
80 else:
73 else:
81 log.exception("Exception while connecting to celeryd.")
74 log.exception("Exception while connecting to celeryd.")
82 except KeyError as e:
75 except KeyError as e:
83 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
76 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
84 except Exception as e:
77 except Exception as e:
85 log.exception(
78 log.exception(
86 "Exception while trying to run task asynchronous. "
79 "Exception while trying to run task asynchronous. "
87 "Fallback to sync execution.")
80 "Fallback to sync execution.")
88
81
89 else:
82 else:
90 log.debug('executing task %s:%s in sync mode', 'TASK', task)
83 log.debug('executing task %s:%s in sync mode', 'TASK', task)
84 statsd = StatsdClient.statsd
85 if statsd:
86 task_repr = getattr(task, 'name', task)
87 statsd.incr('rhodecode_celery_task_total', tags=[
88 'task:{}'.format(task_repr),
89 'mode:sync'
90 ])
91
91
92 # we got async task, return it after statsd call
92 # we got async task, return it after statsd call
93 if t:
93 if t:
94 return t
94 return t
95 return ResultWrapper(task(*args, **kwargs))
95 return ResultWrapper(task(*args, **kwargs))
@@ -1,344 +1,353 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2020 RhodeCode GmbH
3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
20 """
21 Celery loader, run with::
21 Celery loader, run with::
22
22
23 celery worker \
23 celery worker \
24 --task-events \
24 --task-events \
25 --beat \
25 --beat \
26 --autoscale=20,2 \
26 --autoscale=20,2 \
27 --max-tasks-per-child 1 \
27 --max-tasks-per-child 1 \
28 --app rhodecode.lib.celerylib.loader \
28 --app rhodecode.lib.celerylib.loader \
29 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
29 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
30 --loglevel DEBUG --ini=.dev/dev.ini
30 --loglevel DEBUG --ini=.dev/dev.ini
31 """
31 """
32 import os
32 import os
33 import logging
33 import logging
34 import importlib
34 import importlib
35
35
36 from celery import Celery
36 from celery import Celery
37 from celery import signals
37 from celery import signals
38 from celery import Task
38 from celery import Task
39 from celery import exceptions # pragma: no cover
39 from celery import exceptions # pragma: no cover
40 from kombu.serialization import register
40 from kombu.serialization import register
41
41
42 import rhodecode
42 import rhodecode
43
43
44 from rhodecode.lib.statsd_client import StatsdClient
44 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
45 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
45 from rhodecode.lib.ext_json import json
46 from rhodecode.lib.ext_json import json
46 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
47 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
47 from rhodecode.lib.utils2 import str2bool
48 from rhodecode.lib.utils2 import str2bool
48 from rhodecode.model import meta
49 from rhodecode.model import meta
49
50
50
51
51 register('json_ext', json.dumps, json.loads,
52 register('json_ext', json.dumps, json.loads,
52 content_type='application/x-json-ext',
53 content_type='application/x-json-ext',
53 content_encoding='utf-8')
54 content_encoding='utf-8')
54
55
55 log = logging.getLogger('celery.rhodecode.loader')
56 log = logging.getLogger('celery.rhodecode.loader')
56
57
57
58
58 imports = ['rhodecode.lib.celerylib.tasks']
59 imports = ['rhodecode.lib.celerylib.tasks']
59
60
60 try:
61 try:
61 # try if we have EE tasks available
62 # try if we have EE tasks available
62 importlib.import_module('rc_ee')
63 importlib.import_module('rc_ee')
63 imports.append('rc_ee.lib.celerylib.tasks')
64 imports.append('rc_ee.lib.celerylib.tasks')
64 except ImportError:
65 except ImportError:
65 pass
66 pass
66
67
67
68
68 base_celery_config = {
69 base_celery_config = {
69 'result_backend': 'rpc://',
70 'result_backend': 'rpc://',
70 'result_expires': 60 * 60 * 24,
71 'result_expires': 60 * 60 * 24,
71 'result_persistent': True,
72 'result_persistent': True,
72 'imports': imports,
73 'imports': imports,
73 'worker_max_tasks_per_child': 20,
74 'worker_max_tasks_per_child': 20,
74 'accept_content': ['json_ext', 'json'],
75 'accept_content': ['json_ext', 'json'],
75 'task_serializer': 'json_ext',
76 'task_serializer': 'json_ext',
76 'result_serializer': 'json_ext',
77 'result_serializer': 'json_ext',
77 'worker_hijack_root_logger': False,
78 'worker_hijack_root_logger': False,
78 'database_table_names': {
79 'database_table_names': {
79 'task': 'beat_taskmeta',
80 'task': 'beat_taskmeta',
80 'group': 'beat_groupmeta',
81 'group': 'beat_groupmeta',
81 }
82 }
82 }
83 }
83
84
84
85
85 def add_preload_arguments(parser):
86 def add_preload_arguments(parser):
86 parser.add_argument(
87 parser.add_argument(
87 '--ini', default=None,
88 '--ini', default=None,
88 help='Path to ini configuration file.'
89 help='Path to ini configuration file.'
89 )
90 )
90 parser.add_argument(
91 parser.add_argument(
91 '--ini-var', default=None,
92 '--ini-var', default=None,
92 help='Comma separated list of key=value to pass to ini.'
93 help='Comma separated list of key=value to pass to ini.'
93 )
94 )
94
95
95
96
96 def get_logger(obj):
97 def get_logger(obj):
97 custom_log = logging.getLogger(
98 custom_log = logging.getLogger(
98 'rhodecode.task.{}'.format(obj.__class__.__name__))
99 'rhodecode.task.{}'.format(obj.__class__.__name__))
99
100
100 if rhodecode.CELERY_ENABLED:
101 if rhodecode.CELERY_ENABLED:
101 try:
102 try:
102 custom_log = obj.get_logger()
103 custom_log = obj.get_logger()
103 except Exception:
104 except Exception:
104 pass
105 pass
105
106
106 return custom_log
107 return custom_log
107
108
108
109
109 # init main celery app
110 # init main celery app
110 celery_app = Celery()
111 celery_app = Celery()
111 celery_app.user_options['preload'].add(add_preload_arguments)
112 celery_app.user_options['preload'].add(add_preload_arguments)
112
113
113
114
114 @signals.setup_logging.connect
115 @signals.setup_logging.connect
115 def setup_logging_callback(**kwargs):
116 def setup_logging_callback(**kwargs):
116 ini_file = celery_app.conf['RC_INI_FILE']
117 ini_file = celery_app.conf['RC_INI_FILE']
117 setup_logging(ini_file)
118 setup_logging(ini_file)
118
119
119
120
120 @signals.user_preload_options.connect
121 @signals.user_preload_options.connect
121 def on_preload_parsed(options, **kwargs):
122 def on_preload_parsed(options, **kwargs):
122
123
123 ini_file = options['ini']
124 ini_file = options['ini']
124 ini_vars = options['ini_var']
125 ini_vars = options['ini_var']
125
126
126 if ini_file is None:
127 if ini_file is None:
127 print('You must provide the --ini argument to start celery')
128 print('You must provide the --ini argument to start celery')
128 exit(-1)
129 exit(-1)
129
130
130 options = None
131 options = None
131 if ini_vars is not None:
132 if ini_vars is not None:
132 options = parse_ini_vars(ini_vars)
133 options = parse_ini_vars(ini_vars)
133
134
134 celery_app.conf['RC_INI_FILE'] = ini_file
135 celery_app.conf['RC_INI_FILE'] = ini_file
135 celery_app.conf['RC_INI_OPTIONS'] = options
136 celery_app.conf['RC_INI_OPTIONS'] = options
136 setup_logging(ini_file)
137 setup_logging(ini_file)
137
138
138
139
139 def _init_celery(app_type=''):
140 def _init_celery(app_type=''):
140 from rhodecode.config.middleware import get_celery_config
141 from rhodecode.config.middleware import get_celery_config
141
142
142 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
143 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
143
144
144 ini_file = celery_app.conf['RC_INI_FILE']
145 ini_file = celery_app.conf['RC_INI_FILE']
145 options = celery_app.conf['RC_INI_OPTIONS']
146 options = celery_app.conf['RC_INI_OPTIONS']
146
147
147 env = None
148 env = None
148 try:
149 try:
149 env = bootstrap(ini_file, options=options)
150 env = bootstrap(ini_file, options=options)
150 except Exception:
151 except Exception:
151 log.exception('Failed to bootstrap RhodeCode APP')
152 log.exception('Failed to bootstrap RhodeCode APP')
152
153
153 if not env:
154 if not env:
154 raise EnvironmentError(
155 raise EnvironmentError(
155 'Failed to load pyramid ENV. '
156 'Failed to load pyramid ENV. '
156 'Probably there is another error present that prevents from running pyramid app')
157 'Probably there is another error present that prevents from running pyramid app')
157
158
158 log.debug('Got Pyramid ENV: %s', env)
159 log.debug('Got Pyramid ENV: %s', env)
159
160
160 celery_settings = get_celery_config(env['registry'].settings)
161 celery_settings = get_celery_config(env['registry'].settings)
161
162
162 setup_celery_app(
163 setup_celery_app(
163 app=env['app'], root=env['root'], request=env['request'],
164 app=env['app'], root=env['root'], request=env['request'],
164 registry=env['registry'], closer=env['closer'],
165 registry=env['registry'], closer=env['closer'],
165 celery_settings=celery_settings)
166 celery_settings=celery_settings)
166
167
167
168
168 @signals.celeryd_init.connect
169 @signals.celeryd_init.connect
169 def on_celeryd_init(sender=None, conf=None, **kwargs):
170 def on_celeryd_init(sender=None, conf=None, **kwargs):
170 _init_celery('celery worker')
171 _init_celery('celery worker')
171
172
172 # fix the global flag even if it's disabled via .ini file because this
173 # fix the global flag even if it's disabled via .ini file because this
173 # is a worker code that doesn't need this to be disabled.
174 # is a worker code that doesn't need this to be disabled.
174 rhodecode.CELERY_ENABLED = True
175 rhodecode.CELERY_ENABLED = True
175
176
176
177
177 @signals.beat_init.connect
178 @signals.beat_init.connect
178 def on_beat_init(sender=None, conf=None, **kwargs):
179 def on_beat_init(sender=None, conf=None, **kwargs):
179 _init_celery('celery beat')
180 _init_celery('celery beat')
180
181
181
182
182 @signals.task_prerun.connect
183 @signals.task_prerun.connect
183 def task_prerun_signal(task_id, task, args, **kwargs):
184 def task_prerun_signal(task_id, task, args, **kwargs):
184 ping_db()
185 ping_db()
186 statsd = StatsdClient.statsd
187 if statsd:
188 task_repr = getattr(task, 'name', task)
189 statsd.incr('rhodecode_celery_task_total', tags=[
190 'task:{}'.format(task_repr),
191 'mode:async'
192 ])
185
193
186
194
187 @signals.task_success.connect
195 @signals.task_success.connect
188 def task_success_signal(result, **kwargs):
196 def task_success_signal(result, **kwargs):
189 meta.Session.commit()
197 meta.Session.commit()
190 closer = celery_app.conf['PYRAMID_CLOSER']
198 closer = celery_app.conf['PYRAMID_CLOSER']
191 if closer:
199 if closer:
192 closer()
200 closer()
193
201
194
202
203
195 @signals.task_retry.connect
204 @signals.task_retry.connect
196 def task_retry_signal(
205 def task_retry_signal(
197 request, reason, einfo, **kwargs):
206 request, reason, einfo, **kwargs):
198 meta.Session.remove()
207 meta.Session.remove()
199 closer = celery_app.conf['PYRAMID_CLOSER']
208 closer = celery_app.conf['PYRAMID_CLOSER']
200 if closer:
209 if closer:
201 closer()
210 closer()
202
211
203
212
204 @signals.task_failure.connect
213 @signals.task_failure.connect
205 def task_failure_signal(
214 def task_failure_signal(
206 task_id, exception, args, kwargs, traceback, einfo, **kargs):
215 task_id, exception, args, kwargs, traceback, einfo, **kargs):
207
216
208 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
217 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
209 from rhodecode.lib.exc_tracking import store_exception
218 from rhodecode.lib.exc_tracking import store_exception
210 from rhodecode.lib.statsd_client import StatsdClient
219 from rhodecode.lib.statsd_client import StatsdClient
211
220
212 meta.Session.remove()
221 meta.Session.remove()
213
222
214 # simulate sys.exc_info()
223 # simulate sys.exc_info()
215 exc_info = (einfo.type, einfo.exception, einfo.tb)
224 exc_info = (einfo.type, einfo.exception, einfo.tb)
216 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
225 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
217 statsd = StatsdClient.statsd
226 statsd = StatsdClient.statsd
218 if statsd:
227 if statsd:
219 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
228 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
220 statsd.incr('rhodecode_exception_total',
229 statsd.incr('rhodecode_exception_total',
221 tags=["exc_source:celery", "type:{}".format(exc_type)])
230 tags=["exc_source:celery", "type:{}".format(exc_type)])
222
231
223 closer = celery_app.conf['PYRAMID_CLOSER']
232 closer = celery_app.conf['PYRAMID_CLOSER']
224 if closer:
233 if closer:
225 closer()
234 closer()
226
235
227
236
228 @signals.task_revoked.connect
237 @signals.task_revoked.connect
229 def task_revoked_signal(
238 def task_revoked_signal(
230 request, terminated, signum, expired, **kwargs):
239 request, terminated, signum, expired, **kwargs):
231 closer = celery_app.conf['PYRAMID_CLOSER']
240 closer = celery_app.conf['PYRAMID_CLOSER']
232 if closer:
241 if closer:
233 closer()
242 closer()
234
243
235
244
236 class UNSET(object):
245 class UNSET(object):
237 pass
246 pass
238
247
239
248
240 def set_celery_conf(app=UNSET(), root=UNSET(), request=UNSET(), registry=UNSET(), closer=UNSET()):
249 def set_celery_conf(app=UNSET(), root=UNSET(), request=UNSET(), registry=UNSET(), closer=UNSET()):
241
250
242 if request is not UNSET:
251 if request is not UNSET:
243 celery_app.conf.update({'PYRAMID_REQUEST': request})
252 celery_app.conf.update({'PYRAMID_REQUEST': request})
244
253
245 if registry is not UNSET:
254 if registry is not UNSET:
246 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
255 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
247
256
248
257
249 def setup_celery_app(app, root, request, registry, closer, celery_settings):
258 def setup_celery_app(app, root, request, registry, closer, celery_settings):
250 log.debug('Got custom celery conf: %s', celery_settings)
259 log.debug('Got custom celery conf: %s', celery_settings)
251 celery_config = base_celery_config
260 celery_config = base_celery_config
252 celery_config.update({
261 celery_config.update({
253 # store celerybeat scheduler db where the .ini file is
262 # store celerybeat scheduler db where the .ini file is
254 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
263 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
255 })
264 })
256
265
257 celery_config.update(celery_settings)
266 celery_config.update(celery_settings)
258 celery_app.config_from_object(celery_config)
267 celery_app.config_from_object(celery_config)
259
268
260 celery_app.conf.update({'PYRAMID_APP': app})
269 celery_app.conf.update({'PYRAMID_APP': app})
261 celery_app.conf.update({'PYRAMID_ROOT': root})
270 celery_app.conf.update({'PYRAMID_ROOT': root})
262 celery_app.conf.update({'PYRAMID_REQUEST': request})
271 celery_app.conf.update({'PYRAMID_REQUEST': request})
263 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
272 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
264 celery_app.conf.update({'PYRAMID_CLOSER': closer})
273 celery_app.conf.update({'PYRAMID_CLOSER': closer})
265
274
266
275
267 def configure_celery(config, celery_settings):
276 def configure_celery(config, celery_settings):
268 """
277 """
269 Helper that is called from our application creation logic. It gives
278 Helper that is called from our application creation logic. It gives
270 connection info into running webapp and allows execution of tasks from
279 connection info into running webapp and allows execution of tasks from
271 RhodeCode itself
280 RhodeCode itself
272 """
281 """
273 # store some globals into rhodecode
282 # store some globals into rhodecode
274 rhodecode.CELERY_ENABLED = str2bool(
283 rhodecode.CELERY_ENABLED = str2bool(
275 config.registry.settings.get('use_celery'))
284 config.registry.settings.get('use_celery'))
276 if rhodecode.CELERY_ENABLED:
285 if rhodecode.CELERY_ENABLED:
277 log.info('Configuring celery based on `%s` settings', celery_settings)
286 log.info('Configuring celery based on `%s` settings', celery_settings)
278 setup_celery_app(
287 setup_celery_app(
279 app=None, root=None, request=None, registry=config.registry,
288 app=None, root=None, request=None, registry=config.registry,
280 closer=None, celery_settings=celery_settings)
289 closer=None, celery_settings=celery_settings)
281
290
282
291
283 def maybe_prepare_env(req):
292 def maybe_prepare_env(req):
284 environ = {}
293 environ = {}
285 try:
294 try:
286 environ.update({
295 environ.update({
287 'PATH_INFO': req.environ['PATH_INFO'],
296 'PATH_INFO': req.environ['PATH_INFO'],
288 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
297 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
289 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
298 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
290 'SERVER_NAME': req.environ['SERVER_NAME'],
299 'SERVER_NAME': req.environ['SERVER_NAME'],
291 'SERVER_PORT': req.environ['SERVER_PORT'],
300 'SERVER_PORT': req.environ['SERVER_PORT'],
292 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
301 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
293 })
302 })
294 except Exception:
303 except Exception:
295 pass
304 pass
296
305
297 return environ
306 return environ
298
307
299
308
300 class RequestContextTask(Task):
309 class RequestContextTask(Task):
301 """
310 """
302 This is a celery task which will create a rhodecode app instance context
311 This is a celery task which will create a rhodecode app instance context
303 for the task, patch pyramid with the original request
312 for the task, patch pyramid with the original request
304 that created the task and also add the user to the context.
313 that created the task and also add the user to the context.
305 """
314 """
306
315
307 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
316 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
308 link=None, link_error=None, shadow=None, **options):
317 link=None, link_error=None, shadow=None, **options):
309 """ queue the job to run (we are in web request context here) """
318 """ queue the job to run (we are in web request context here) """
310 from rhodecode.lib.base import get_ip_addr
319 from rhodecode.lib.base import get_ip_addr
311
320
312 req = self.app.conf['PYRAMID_REQUEST']
321 req = self.app.conf['PYRAMID_REQUEST']
313 if not req:
322 if not req:
314 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
323 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
315
324
316 log.debug('Running Task with class: %s. Request Class: %s',
325 log.debug('Running Task with class: %s. Request Class: %s',
317 self.__class__, req.__class__)
326 self.__class__, req.__class__)
318
327
319 user_id = 0
328 user_id = 0
320
329
321 # web case
330 # web case
322 if hasattr(req, 'user'):
331 if hasattr(req, 'user'):
323 user_id = req.user.user_id
332 user_id = req.user.user_id
324
333
325 # api case
334 # api case
326 elif hasattr(req, 'rpc_user'):
335 elif hasattr(req, 'rpc_user'):
327 user_id = req.rpc_user.user_id
336 user_id = req.rpc_user.user_id
328
337
329 # we hook into kwargs since it is the only way to pass our data to
338 # we hook into kwargs since it is the only way to pass our data to
330 # the celery worker
339 # the celery worker
331 environ = maybe_prepare_env(req)
340 environ = maybe_prepare_env(req)
332 options['headers'] = options.get('headers', {})
341 options['headers'] = options.get('headers', {})
333 options['headers'].update({
342 options['headers'].update({
334 'rhodecode_proxy_data': {
343 'rhodecode_proxy_data': {
335 'environ': environ,
344 'environ': environ,
336 'auth_user': {
345 'auth_user': {
337 'ip_addr': get_ip_addr(req.environ),
346 'ip_addr': get_ip_addr(req.environ),
338 'user_id': user_id
347 'user_id': user_id
339 },
348 },
340 }
349 }
341 })
350 })
342
351
343 return super(RequestContextTask, self).apply_async(
352 return super(RequestContextTask, self).apply_async(
344 args, kwargs, task_id, producer, link, link_error, shadow, **options)
353 args, kwargs, task_id, producer, link, link_error, shadow, **options)
General Comments 0
You need to be logged in to leave comments. Login now