##// END OF EJS Templates
debug: logging fix with reduced redundant info
super-admin -
r4871:f1ab2b3b default
parent child Browse files
Show More
@@ -1,323 +1,323 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 """
21 21 Celery loader, run with::
22 22
23 23 celery worker \
24 24 --task-events \
25 25 --beat \
26 26 --app rhodecode.lib.celerylib.loader \
27 27 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
28 28 --loglevel DEBUG --ini=.dev/dev.ini
29 29 """
30 30 import os
31 31 import logging
32 32 import importlib
33 33
34 34 from celery import Celery
35 35 from celery import signals
36 36 from celery import Task
37 37 from celery import exceptions # pragma: no cover
38 38 from kombu.serialization import register
39 39 from pyramid.threadlocal import get_current_request
40 40
41 41 import rhodecode
42 42
43 43 from rhodecode.lib.auth import AuthUser
44 44 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
45 45 from rhodecode.lib.ext_json import json
46 46 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
47 47 from rhodecode.lib.utils2 import str2bool
48 48 from rhodecode.model import meta
49 49
50 50
51 51 register('json_ext', json.dumps, json.loads,
52 52 content_type='application/x-json-ext',
53 53 content_encoding='utf-8')
54 54
55 55 log = logging.getLogger('celery.rhodecode.loader')
56 56
57 57
58 58 def add_preload_arguments(parser):
59 59 parser.add_argument(
60 60 '--ini', default=None,
61 61 help='Path to ini configuration file.'
62 62 )
63 63 parser.add_argument(
64 64 '--ini-var', default=None,
65 65 help='Comma separated list of key=value to pass to ini.'
66 66 )
67 67
68 68
69 69 def get_logger(obj):
70 70 custom_log = logging.getLogger(
71 71 'rhodecode.task.{}'.format(obj.__class__.__name__))
72 72
73 73 if rhodecode.CELERY_ENABLED:
74 74 try:
75 75 custom_log = obj.get_logger()
76 76 except Exception:
77 77 pass
78 78
79 79 return custom_log
80 80
81 81
82 82 imports = ['rhodecode.lib.celerylib.tasks']
83 83
84 84 try:
85 85 # try if we have EE tasks available
86 86 importlib.import_module('rc_ee')
87 87 imports.append('rc_ee.lib.celerylib.tasks')
88 88 except ImportError:
89 89 pass
90 90
91 91
92 92 base_celery_config = {
93 93 'result_backend': 'rpc://',
94 94 'result_expires': 60 * 60 * 24,
95 95 'result_persistent': True,
96 96 'imports': imports,
97 97 'worker_max_tasks_per_child': 100,
98 98 'accept_content': ['json_ext'],
99 99 'task_serializer': 'json_ext',
100 100 'result_serializer': 'json_ext',
101 101 'worker_hijack_root_logger': False,
102 102 'database_table_names': {
103 103 'task': 'beat_taskmeta',
104 104 'group': 'beat_groupmeta',
105 105 }
106 106 }
107 107 # init main celery app
108 108 celery_app = Celery()
109 109 celery_app.user_options['preload'].add(add_preload_arguments)
110 110 ini_file_glob = None
111 111
112 112
113 113 @signals.setup_logging.connect
114 114 def setup_logging_callback(**kwargs):
115 115 setup_logging(ini_file_glob)
116 116
117 117
118 118 @signals.user_preload_options.connect
119 119 def on_preload_parsed(options, **kwargs):
120 120 from rhodecode.config.middleware import get_celery_config
121 121
122 122 ini_location = options['ini']
123 123 ini_vars = options['ini_var']
124 124 celery_app.conf['INI_PYRAMID'] = options['ini']
125 125
126 126 if ini_location is None:
127 127 print('You must provide the paste --ini argument')
128 128 exit(-1)
129 129
130 130 options = None
131 131 if ini_vars is not None:
132 132 options = parse_ini_vars(ini_vars)
133 133
134 134 global ini_file_glob
135 135 ini_file_glob = ini_location
136 136
137 137 log.debug('Bootstrapping RhodeCode application...')
138 138 try:
139 139 env = bootstrap(ini_location, options=options)
140 140 except Exception:
141 141 log.exception('Failed to bootstrap RhodeCode APP')
142 142
143 143 celery_settings = get_celery_config(env['registry'].settings)
144 144 setup_celery_app(
145 145 app=env['app'], root=env['root'], request=env['request'],
146 146 registry=env['registry'], closer=env['closer'],
147 147 celery_settings=celery_settings)
148 148
149 149 # fix the global flag even if it's disabled via .ini file because this
150 150 # is a worker code that doesn't need this to be disabled.
151 151 rhodecode.CELERY_ENABLED = True
152 152
153 153
154 154 @signals.task_prerun.connect
155 155 def task_prerun_signal(task_id, task, args, **kwargs):
156 156 ping_db()
157 157
158 158
159 159 @signals.task_success.connect
160 160 def task_success_signal(result, **kwargs):
161 161 meta.Session.commit()
162 162 closer = celery_app.conf['PYRAMID_CLOSER']
163 163 if closer:
164 164 closer()
165 165
166 166
167 167 @signals.task_retry.connect
168 168 def task_retry_signal(
169 169 request, reason, einfo, **kwargs):
170 170 meta.Session.remove()
171 171 closer = celery_app.conf['PYRAMID_CLOSER']
172 172 if closer:
173 173 closer()
174 174
175 175
176 176 @signals.task_failure.connect
177 177 def task_failure_signal(
178 178 task_id, exception, args, kwargs, traceback, einfo, **kargs):
179 179 from rhodecode.lib.exc_tracking import store_exception
180 180 from rhodecode.lib.statsd_client import StatsdClient
181 181
182 182 meta.Session.remove()
183 183
184 184 # simulate sys.exc_info()
185 185 exc_info = (einfo.type, einfo.exception, einfo.tb)
186 186 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
187 187 statsd = StatsdClient.statsd
188 188 if statsd:
189 189 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
190 190 statsd.incr('rhodecode_exception_total',
191 191 tags=["exc_source:celery", "type:{}".format(exc_type)])
192 192
193 193 closer = celery_app.conf['PYRAMID_CLOSER']
194 194 if closer:
195 195 closer()
196 196
197 197
198 198 @signals.task_revoked.connect
199 199 def task_revoked_signal(
200 200 request, terminated, signum, expired, **kwargs):
201 201 closer = celery_app.conf['PYRAMID_CLOSER']
202 202 if closer:
203 203 closer()
204 204
205 205
206 206 def setup_celery_app(app, root, request, registry, closer, celery_settings):
207 207 log.debug('Got custom celery conf: %s', celery_settings)
208 208 celery_config = base_celery_config
209 209 celery_config.update({
210 210 # store celerybeat scheduler db where the .ini file is
211 211 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
212 212 })
213 213
214 214 celery_config.update(celery_settings)
215 215 celery_app.config_from_object(celery_config)
216 216
217 217 celery_app.conf.update({'PYRAMID_APP': app})
218 218 celery_app.conf.update({'PYRAMID_ROOT': root})
219 219 celery_app.conf.update({'PYRAMID_REQUEST': request})
220 220 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
221 221 celery_app.conf.update({'PYRAMID_CLOSER': closer})
222 222
223 223
224 224 def configure_celery(config, celery_settings):
225 225 """
226 226 Helper that is called from our application creation logic. It gives
227 227 connection info into running webapp and allows execution of tasks from
228 228 RhodeCode itself
229 229 """
230 230 # store some globals into rhodecode
231 231 rhodecode.CELERY_ENABLED = str2bool(
232 232 config.registry.settings.get('use_celery'))
233 233 if rhodecode.CELERY_ENABLED:
234 234 log.info('Configuring celery based on `%s` settings', celery_settings)
235 235 setup_celery_app(
236 236 app=None, root=None, request=None, registry=config.registry,
237 237 closer=None, celery_settings=celery_settings)
238 238
239 239
240 240 def maybe_prepare_env(req):
241 241 environ = {}
242 242 try:
243 243 environ.update({
244 244 'PATH_INFO': req.environ['PATH_INFO'],
245 245 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
246 246 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
247 247 'SERVER_NAME': req.environ['SERVER_NAME'],
248 248 'SERVER_PORT': req.environ['SERVER_PORT'],
249 249 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
250 250 })
251 251 except Exception:
252 252 pass
253 253
254 254 return environ
255 255
256 256
257 257 class RequestContextTask(Task):
258 258 """
259 259 This is a celery task which will create a rhodecode app instance context
260 260 for the task, patch pyramid with the original request
261 261 that created the task and also add the user to the context.
262 262 """
263 263
264 264 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
265 265 link=None, link_error=None, shadow=None, **options):
266 266 """ queue the job to run (we are in web request context here) """
267 267
268 268 req = get_current_request()
269 log.debug('Running Task: %s with class: %s. Request: %s Class: %s',
270 self, self.__class__, req, req.__class__)
269 log.debug('Running Task with class: %s. Request Class: %s',
270 self.__class__, req.__class__)
271 271
272 272 # web case
273 273 if hasattr(req, 'user'):
274 274 ip_addr = req.user.ip_addr
275 275 user_id = req.user.user_id
276 276
277 277 # api case
278 278 elif hasattr(req, 'rpc_user'):
279 279 ip_addr = req.rpc_user.ip_addr
280 280 user_id = req.rpc_user.user_id
281 281 else:
282 282 raise Exception(
283 283 'Unable to fetch required data from request: {}. \n'
284 284 'This task is required to be executed from context of '
285 285 'request in a webapp. Task: {}'.format(
286 286 repr(req),
287 287 self
288 288 )
289 289 )
290 290
291 291 if req:
292 292 # we hook into kwargs since it is the only way to pass our data to
293 293 # the celery worker
294 294 environ = maybe_prepare_env(req)
295 295 options['headers'] = options.get('headers', {})
296 296 options['headers'].update({
297 297 'rhodecode_proxy_data': {
298 298 'environ': environ,
299 299 'auth_user': {
300 300 'ip_addr': ip_addr,
301 301 'user_id': user_id
302 302 },
303 303 }
304 304 })
305 305
306 306 return super(RequestContextTask, self).apply_async(
307 307 args, kwargs, task_id, producer, link, link_error, shadow, **options)
308 308
309 309 def __call__(self, *args, **kwargs):
310 310 """ rebuild the context and then run task on celery worker """
311 311
312 312 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
313 313 if not proxy_data:
314 314 return super(RequestContextTask, self).__call__(*args, **kwargs)
315 315
316 316 log.debug('using celery proxy data to run task: %r', proxy_data)
317 317 # re-inject and register threadlocals for proper routing support
318 318 request = prepare_request(proxy_data['environ'])
319 319 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
320 320 ip_addr=proxy_data['auth_user']['ip_addr'])
321 321
322 322 return super(RequestContextTask, self).__call__(*args, **kwargs)
323 323
General Comments 0
You need to be logged in to leave comments. Login now