##// END OF EJS Templates
debug: logging fix with reduced redundant info
super-admin -
r4871:f1ab2b3b default
parent child Browse files
Show More
@@ -1,323 +1,323 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2020 RhodeCode GmbH
3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
20 """
21 Celery loader, run with::
21 Celery loader, run with::
22
22
23 celery worker \
23 celery worker \
24 --task-events \
24 --task-events \
25 --beat \
25 --beat \
26 --app rhodecode.lib.celerylib.loader \
26 --app rhodecode.lib.celerylib.loader \
27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
28 --loglevel DEBUG --ini=.dev/dev.ini
28 --loglevel DEBUG --ini=.dev/dev.ini
29 """
29 """
30 import os
30 import os
31 import logging
31 import logging
32 import importlib
32 import importlib
33
33
34 from celery import Celery
34 from celery import Celery
35 from celery import signals
35 from celery import signals
36 from celery import Task
36 from celery import Task
37 from celery import exceptions # pragma: no cover
37 from celery import exceptions # pragma: no cover
38 from kombu.serialization import register
38 from kombu.serialization import register
39 from pyramid.threadlocal import get_current_request
39 from pyramid.threadlocal import get_current_request
40
40
41 import rhodecode
41 import rhodecode
42
42
43 from rhodecode.lib.auth import AuthUser
43 from rhodecode.lib.auth import AuthUser
44 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
44 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
45 from rhodecode.lib.ext_json import json
45 from rhodecode.lib.ext_json import json
46 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
46 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
47 from rhodecode.lib.utils2 import str2bool
47 from rhodecode.lib.utils2 import str2bool
48 from rhodecode.model import meta
48 from rhodecode.model import meta
49
49
50
50
51 register('json_ext', json.dumps, json.loads,
51 register('json_ext', json.dumps, json.loads,
52 content_type='application/x-json-ext',
52 content_type='application/x-json-ext',
53 content_encoding='utf-8')
53 content_encoding='utf-8')
54
54
55 log = logging.getLogger('celery.rhodecode.loader')
55 log = logging.getLogger('celery.rhodecode.loader')
56
56
57
57
58 def add_preload_arguments(parser):
58 def add_preload_arguments(parser):
59 parser.add_argument(
59 parser.add_argument(
60 '--ini', default=None,
60 '--ini', default=None,
61 help='Path to ini configuration file.'
61 help='Path to ini configuration file.'
62 )
62 )
63 parser.add_argument(
63 parser.add_argument(
64 '--ini-var', default=None,
64 '--ini-var', default=None,
65 help='Comma separated list of key=value to pass to ini.'
65 help='Comma separated list of key=value to pass to ini.'
66 )
66 )
67
67
68
68
69 def get_logger(obj):
69 def get_logger(obj):
70 custom_log = logging.getLogger(
70 custom_log = logging.getLogger(
71 'rhodecode.task.{}'.format(obj.__class__.__name__))
71 'rhodecode.task.{}'.format(obj.__class__.__name__))
72
72
73 if rhodecode.CELERY_ENABLED:
73 if rhodecode.CELERY_ENABLED:
74 try:
74 try:
75 custom_log = obj.get_logger()
75 custom_log = obj.get_logger()
76 except Exception:
76 except Exception:
77 pass
77 pass
78
78
79 return custom_log
79 return custom_log
80
80
81
81
82 imports = ['rhodecode.lib.celerylib.tasks']
82 imports = ['rhodecode.lib.celerylib.tasks']
83
83
84 try:
84 try:
85 # try if we have EE tasks available
85 # try if we have EE tasks available
86 importlib.import_module('rc_ee')
86 importlib.import_module('rc_ee')
87 imports.append('rc_ee.lib.celerylib.tasks')
87 imports.append('rc_ee.lib.celerylib.tasks')
88 except ImportError:
88 except ImportError:
89 pass
89 pass
90
90
91
91
92 base_celery_config = {
92 base_celery_config = {
93 'result_backend': 'rpc://',
93 'result_backend': 'rpc://',
94 'result_expires': 60 * 60 * 24,
94 'result_expires': 60 * 60 * 24,
95 'result_persistent': True,
95 'result_persistent': True,
96 'imports': imports,
96 'imports': imports,
97 'worker_max_tasks_per_child': 100,
97 'worker_max_tasks_per_child': 100,
98 'accept_content': ['json_ext'],
98 'accept_content': ['json_ext'],
99 'task_serializer': 'json_ext',
99 'task_serializer': 'json_ext',
100 'result_serializer': 'json_ext',
100 'result_serializer': 'json_ext',
101 'worker_hijack_root_logger': False,
101 'worker_hijack_root_logger': False,
102 'database_table_names': {
102 'database_table_names': {
103 'task': 'beat_taskmeta',
103 'task': 'beat_taskmeta',
104 'group': 'beat_groupmeta',
104 'group': 'beat_groupmeta',
105 }
105 }
106 }
106 }
107 # init main celery app
107 # init main celery app
108 celery_app = Celery()
108 celery_app = Celery()
109 celery_app.user_options['preload'].add(add_preload_arguments)
109 celery_app.user_options['preload'].add(add_preload_arguments)
110 ini_file_glob = None
110 ini_file_glob = None
111
111
112
112
113 @signals.setup_logging.connect
113 @signals.setup_logging.connect
114 def setup_logging_callback(**kwargs):
114 def setup_logging_callback(**kwargs):
115 setup_logging(ini_file_glob)
115 setup_logging(ini_file_glob)
116
116
117
117
118 @signals.user_preload_options.connect
118 @signals.user_preload_options.connect
119 def on_preload_parsed(options, **kwargs):
119 def on_preload_parsed(options, **kwargs):
120 from rhodecode.config.middleware import get_celery_config
120 from rhodecode.config.middleware import get_celery_config
121
121
122 ini_location = options['ini']
122 ini_location = options['ini']
123 ini_vars = options['ini_var']
123 ini_vars = options['ini_var']
124 celery_app.conf['INI_PYRAMID'] = options['ini']
124 celery_app.conf['INI_PYRAMID'] = options['ini']
125
125
126 if ini_location is None:
126 if ini_location is None:
127 print('You must provide the paste --ini argument')
127 print('You must provide the paste --ini argument')
128 exit(-1)
128 exit(-1)
129
129
130 options = None
130 options = None
131 if ini_vars is not None:
131 if ini_vars is not None:
132 options = parse_ini_vars(ini_vars)
132 options = parse_ini_vars(ini_vars)
133
133
134 global ini_file_glob
134 global ini_file_glob
135 ini_file_glob = ini_location
135 ini_file_glob = ini_location
136
136
137 log.debug('Bootstrapping RhodeCode application...')
137 log.debug('Bootstrapping RhodeCode application...')
138 try:
138 try:
139 env = bootstrap(ini_location, options=options)
139 env = bootstrap(ini_location, options=options)
140 except Exception:
140 except Exception:
141 log.exception('Failed to bootstrap RhodeCode APP')
141 log.exception('Failed to bootstrap RhodeCode APP')
142
142
143 celery_settings = get_celery_config(env['registry'].settings)
143 celery_settings = get_celery_config(env['registry'].settings)
144 setup_celery_app(
144 setup_celery_app(
145 app=env['app'], root=env['root'], request=env['request'],
145 app=env['app'], root=env['root'], request=env['request'],
146 registry=env['registry'], closer=env['closer'],
146 registry=env['registry'], closer=env['closer'],
147 celery_settings=celery_settings)
147 celery_settings=celery_settings)
148
148
149 # fix the global flag even if it's disabled via .ini file because this
149 # fix the global flag even if it's disabled via .ini file because this
150 # is a worker code that doesn't need this to be disabled.
150 # is a worker code that doesn't need this to be disabled.
151 rhodecode.CELERY_ENABLED = True
151 rhodecode.CELERY_ENABLED = True
152
152
153
153
154 @signals.task_prerun.connect
154 @signals.task_prerun.connect
155 def task_prerun_signal(task_id, task, args, **kwargs):
155 def task_prerun_signal(task_id, task, args, **kwargs):
156 ping_db()
156 ping_db()
157
157
158
158
159 @signals.task_success.connect
159 @signals.task_success.connect
160 def task_success_signal(result, **kwargs):
160 def task_success_signal(result, **kwargs):
161 meta.Session.commit()
161 meta.Session.commit()
162 closer = celery_app.conf['PYRAMID_CLOSER']
162 closer = celery_app.conf['PYRAMID_CLOSER']
163 if closer:
163 if closer:
164 closer()
164 closer()
165
165
166
166
167 @signals.task_retry.connect
167 @signals.task_retry.connect
168 def task_retry_signal(
168 def task_retry_signal(
169 request, reason, einfo, **kwargs):
169 request, reason, einfo, **kwargs):
170 meta.Session.remove()
170 meta.Session.remove()
171 closer = celery_app.conf['PYRAMID_CLOSER']
171 closer = celery_app.conf['PYRAMID_CLOSER']
172 if closer:
172 if closer:
173 closer()
173 closer()
174
174
175
175
176 @signals.task_failure.connect
176 @signals.task_failure.connect
177 def task_failure_signal(
177 def task_failure_signal(
178 task_id, exception, args, kwargs, traceback, einfo, **kargs):
178 task_id, exception, args, kwargs, traceback, einfo, **kargs):
179 from rhodecode.lib.exc_tracking import store_exception
179 from rhodecode.lib.exc_tracking import store_exception
180 from rhodecode.lib.statsd_client import StatsdClient
180 from rhodecode.lib.statsd_client import StatsdClient
181
181
182 meta.Session.remove()
182 meta.Session.remove()
183
183
184 # simulate sys.exc_info()
184 # simulate sys.exc_info()
185 exc_info = (einfo.type, einfo.exception, einfo.tb)
185 exc_info = (einfo.type, einfo.exception, einfo.tb)
186 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
186 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
187 statsd = StatsdClient.statsd
187 statsd = StatsdClient.statsd
188 if statsd:
188 if statsd:
189 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
189 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
190 statsd.incr('rhodecode_exception_total',
190 statsd.incr('rhodecode_exception_total',
191 tags=["exc_source:celery", "type:{}".format(exc_type)])
191 tags=["exc_source:celery", "type:{}".format(exc_type)])
192
192
193 closer = celery_app.conf['PYRAMID_CLOSER']
193 closer = celery_app.conf['PYRAMID_CLOSER']
194 if closer:
194 if closer:
195 closer()
195 closer()
196
196
197
197
198 @signals.task_revoked.connect
198 @signals.task_revoked.connect
199 def task_revoked_signal(
199 def task_revoked_signal(
200 request, terminated, signum, expired, **kwargs):
200 request, terminated, signum, expired, **kwargs):
201 closer = celery_app.conf['PYRAMID_CLOSER']
201 closer = celery_app.conf['PYRAMID_CLOSER']
202 if closer:
202 if closer:
203 closer()
203 closer()
204
204
205
205
206 def setup_celery_app(app, root, request, registry, closer, celery_settings):
206 def setup_celery_app(app, root, request, registry, closer, celery_settings):
207 log.debug('Got custom celery conf: %s', celery_settings)
207 log.debug('Got custom celery conf: %s', celery_settings)
208 celery_config = base_celery_config
208 celery_config = base_celery_config
209 celery_config.update({
209 celery_config.update({
210 # store celerybeat scheduler db where the .ini file is
210 # store celerybeat scheduler db where the .ini file is
211 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
211 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
212 })
212 })
213
213
214 celery_config.update(celery_settings)
214 celery_config.update(celery_settings)
215 celery_app.config_from_object(celery_config)
215 celery_app.config_from_object(celery_config)
216
216
217 celery_app.conf.update({'PYRAMID_APP': app})
217 celery_app.conf.update({'PYRAMID_APP': app})
218 celery_app.conf.update({'PYRAMID_ROOT': root})
218 celery_app.conf.update({'PYRAMID_ROOT': root})
219 celery_app.conf.update({'PYRAMID_REQUEST': request})
219 celery_app.conf.update({'PYRAMID_REQUEST': request})
220 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
220 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
221 celery_app.conf.update({'PYRAMID_CLOSER': closer})
221 celery_app.conf.update({'PYRAMID_CLOSER': closer})
222
222
223
223
224 def configure_celery(config, celery_settings):
224 def configure_celery(config, celery_settings):
225 """
225 """
226 Helper that is called from our application creation logic. It gives
226 Helper that is called from our application creation logic. It gives
227 connection info into running webapp and allows execution of tasks from
227 connection info into running webapp and allows execution of tasks from
228 RhodeCode itself
228 RhodeCode itself
229 """
229 """
230 # store some globals into rhodecode
230 # store some globals into rhodecode
231 rhodecode.CELERY_ENABLED = str2bool(
231 rhodecode.CELERY_ENABLED = str2bool(
232 config.registry.settings.get('use_celery'))
232 config.registry.settings.get('use_celery'))
233 if rhodecode.CELERY_ENABLED:
233 if rhodecode.CELERY_ENABLED:
234 log.info('Configuring celery based on `%s` settings', celery_settings)
234 log.info('Configuring celery based on `%s` settings', celery_settings)
235 setup_celery_app(
235 setup_celery_app(
236 app=None, root=None, request=None, registry=config.registry,
236 app=None, root=None, request=None, registry=config.registry,
237 closer=None, celery_settings=celery_settings)
237 closer=None, celery_settings=celery_settings)
238
238
239
239
240 def maybe_prepare_env(req):
240 def maybe_prepare_env(req):
241 environ = {}
241 environ = {}
242 try:
242 try:
243 environ.update({
243 environ.update({
244 'PATH_INFO': req.environ['PATH_INFO'],
244 'PATH_INFO': req.environ['PATH_INFO'],
245 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
245 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
246 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
246 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
247 'SERVER_NAME': req.environ['SERVER_NAME'],
247 'SERVER_NAME': req.environ['SERVER_NAME'],
248 'SERVER_PORT': req.environ['SERVER_PORT'],
248 'SERVER_PORT': req.environ['SERVER_PORT'],
249 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
249 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
250 })
250 })
251 except Exception:
251 except Exception:
252 pass
252 pass
253
253
254 return environ
254 return environ
255
255
256
256
257 class RequestContextTask(Task):
257 class RequestContextTask(Task):
258 """
258 """
259 This is a celery task which will create a rhodecode app instance context
259 This is a celery task which will create a rhodecode app instance context
260 for the task, patch pyramid with the original request
260 for the task, patch pyramid with the original request
261 that created the task and also add the user to the context.
261 that created the task and also add the user to the context.
262 """
262 """
263
263
264 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
264 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
265 link=None, link_error=None, shadow=None, **options):
265 link=None, link_error=None, shadow=None, **options):
266 """ queue the job to run (we are in web request context here) """
266 """ queue the job to run (we are in web request context here) """
267
267
268 req = get_current_request()
268 req = get_current_request()
269 log.debug('Running Task: %s with class: %s. Request: %s Class: %s',
269 log.debug('Running Task with class: %s. Request Class: %s',
270 self, self.__class__, req, req.__class__)
270 self.__class__, req.__class__)
271
271
272 # web case
272 # web case
273 if hasattr(req, 'user'):
273 if hasattr(req, 'user'):
274 ip_addr = req.user.ip_addr
274 ip_addr = req.user.ip_addr
275 user_id = req.user.user_id
275 user_id = req.user.user_id
276
276
277 # api case
277 # api case
278 elif hasattr(req, 'rpc_user'):
278 elif hasattr(req, 'rpc_user'):
279 ip_addr = req.rpc_user.ip_addr
279 ip_addr = req.rpc_user.ip_addr
280 user_id = req.rpc_user.user_id
280 user_id = req.rpc_user.user_id
281 else:
281 else:
282 raise Exception(
282 raise Exception(
283 'Unable to fetch required data from request: {}. \n'
283 'Unable to fetch required data from request: {}. \n'
284 'This task is required to be executed from context of '
284 'This task is required to be executed from context of '
285 'request in a webapp. Task: {}'.format(
285 'request in a webapp. Task: {}'.format(
286 repr(req),
286 repr(req),
287 self
287 self
288 )
288 )
289 )
289 )
290
290
291 if req:
291 if req:
292 # we hook into kwargs since it is the only way to pass our data to
292 # we hook into kwargs since it is the only way to pass our data to
293 # the celery worker
293 # the celery worker
294 environ = maybe_prepare_env(req)
294 environ = maybe_prepare_env(req)
295 options['headers'] = options.get('headers', {})
295 options['headers'] = options.get('headers', {})
296 options['headers'].update({
296 options['headers'].update({
297 'rhodecode_proxy_data': {
297 'rhodecode_proxy_data': {
298 'environ': environ,
298 'environ': environ,
299 'auth_user': {
299 'auth_user': {
300 'ip_addr': ip_addr,
300 'ip_addr': ip_addr,
301 'user_id': user_id
301 'user_id': user_id
302 },
302 },
303 }
303 }
304 })
304 })
305
305
306 return super(RequestContextTask, self).apply_async(
306 return super(RequestContextTask, self).apply_async(
307 args, kwargs, task_id, producer, link, link_error, shadow, **options)
307 args, kwargs, task_id, producer, link, link_error, shadow, **options)
308
308
309 def __call__(self, *args, **kwargs):
309 def __call__(self, *args, **kwargs):
310 """ rebuild the context and then run task on celery worker """
310 """ rebuild the context and then run task on celery worker """
311
311
312 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
312 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
313 if not proxy_data:
313 if not proxy_data:
314 return super(RequestContextTask, self).__call__(*args, **kwargs)
314 return super(RequestContextTask, self).__call__(*args, **kwargs)
315
315
316 log.debug('using celery proxy data to run task: %r', proxy_data)
316 log.debug('using celery proxy data to run task: %r', proxy_data)
317 # re-inject and register threadlocals for proper routing support
317 # re-inject and register threadlocals for proper routing support
318 request = prepare_request(proxy_data['environ'])
318 request = prepare_request(proxy_data['environ'])
319 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
319 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
320 ip_addr=proxy_data['auth_user']['ip_addr'])
320 ip_addr=proxy_data['auth_user']['ip_addr'])
321
321
322 return super(RequestContextTask, self).__call__(*args, **kwargs)
322 return super(RequestContextTask, self).__call__(*args, **kwargs)
323
323
General Comments 0
You need to be logged in to leave comments. Login now