##// END OF EJS Templates
Aligned expanded sumarry box content better and moved comments as last column
Aligned expanded sumarry box content better and moved comments as last column

File last commit:

r3390:02f7713a default
r3639:7ee2e326 new-ui
Show More
loader.py
306 lines | 9.5 KiB | text/x-python | PythonLexer
celery: celery 4.X support. Fixes #4169...
r2359 # -*- coding: utf-8 -*-
docs: updated copyrights to 2019
r3363 # Copyright (C) 2010-2019 RhodeCode GmbH
celery: celery 4.X support. Fixes #4169...
r2359 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
Celery loader, run with::
scheduler: added DB models and db parsers for the RhodeCode scheduler....
r2406 celery worker \
--beat \
--app rhodecode.lib.celerylib.loader \
--scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
--loglevel DEBUG --ini=._dev/dev.ini
celery: celery 4.X support. Fixes #4169...
r2359 """
import os
import logging
celery: register EE tasks if we have EE enabled
r2465 import importlib
celery: celery 4.X support. Fixes #4169...
r2359
from celery import Celery
from celery import signals
from celery import Task
code: unified coverage notes to # pragma: no cover
r3282 from celery import exceptions # pragma: no cover
celery: celery 4.X support. Fixes #4169...
r2359 from kombu.serialization import register
from pyramid.threadlocal import get_current_request
import rhodecode
from rhodecode.lib.auth import AuthUser
celery: ping db connection before task execution to recycle db connections.
r3390 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars, ping_db
celery: celery 4.X support. Fixes #4169...
r2359 from rhodecode.lib.ext_json import json
from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
from rhodecode.lib.utils2 import str2bool
from rhodecode.model import meta
register('json_ext', json.dumps, json.loads,
content_type='application/x-json-ext',
content_encoding='utf-8')
log = logging.getLogger('celery.rhodecode.loader')
def add_preload_arguments(parser):
parser.add_argument(
'--ini', default=None,
help='Path to ini configuration file.'
)
parser.add_argument(
'--ini-var', default=None,
help='Comma separated list of key=value to pass to ini.'
)
def get_logger(obj):
custom_log = logging.getLogger(
'rhodecode.task.{}'.format(obj.__class__.__name__))
if rhodecode.CELERY_ENABLED:
try:
custom_log = obj.get_logger()
except Exception:
pass
return custom_log
celery: register EE tasks if we have EE enabled
r2465 imports = ['rhodecode.lib.celerylib.tasks']
try:
# try if we have EE tasks available
importlib.import_module('rc_ee')
imports.append('rc_ee.lib.celerylib.tasks')
except ImportError:
pass
celery: celery 4.X support. Fixes #4169...
r2359 base_celery_config = {
'result_backend': 'rpc://',
'result_expires': 60 * 60 * 24,
'result_persistent': True,
celery: register EE tasks if we have EE enabled
r2465 'imports': imports,
celery: celery 4.X support. Fixes #4169...
r2359 'worker_max_tasks_per_child': 100,
'accept_content': ['json_ext'],
'task_serializer': 'json_ext',
'result_serializer': 'json_ext',
'worker_hijack_root_logger': False,
scheduler: added DB models and db parsers for the RhodeCode scheduler....
r2406 'database_table_names': {
'task': 'beat_taskmeta',
'group': 'beat_groupmeta',
}
celery: celery 4.X support. Fixes #4169...
r2359 }
# init main celery app
celery_app = Celery()
celery_app.user_options['preload'].add(add_preload_arguments)
ini_file_glob = None
@signals.setup_logging.connect
def setup_logging_callback(**kwargs):
setup_logging(ini_file_glob)
@signals.user_preload_options.connect
def on_preload_parsed(options, **kwargs):
ini_location = options['ini']
ini_vars = options['ini_var']
celery_app.conf['INI_PYRAMID'] = options['ini']
if ini_location is None:
print('You must provide the paste --ini argument')
exit(-1)
options = None
if ini_vars is not None:
options = parse_ini_vars(ini_vars)
global ini_file_glob
ini_file_glob = ini_location
log.debug('Bootstrapping RhodeCode application...')
env = bootstrap(ini_location, options=options)
setup_celery_app(
app=env['app'], root=env['root'], request=env['request'],
registry=env['registry'], closer=env['closer'],
ini_location=ini_location)
# fix the global flag even if it's disabled via .ini file because this
# is a worker code that doesn't need this to be disabled.
rhodecode.CELERY_ENABLED = True
celery: ping db connection before task execution to recycle db connections.
r3390 @signals.task_prerun.connect
def task_prerun_signal(task_id, task, args, **kwargs):
ping_db()
celery: celery 4.X support. Fixes #4169...
r2359 @signals.task_success.connect
def task_success_signal(result, **kwargs):
meta.Session.commit()
celery: use safer events for execution of tasks
r2464 closer = celery_app.conf['PYRAMID_CLOSER']
if closer:
closer()
celery: celery 4.X support. Fixes #4169...
r2359
@signals.task_retry.connect
def task_retry_signal(
request, reason, einfo, **kwargs):
meta.Session.remove()
celery: use safer events for execution of tasks
r2464 closer = celery_app.conf['PYRAMID_CLOSER']
if closer:
closer()
celery: celery 4.X support. Fixes #4169...
r2359
@signals.task_failure.connect
def task_failure_signal(
task_id, exception, args, kwargs, traceback, einfo, **kargs):
celery: use exc_tracker to store tasks exceptions for easier debugging.
r3020 from rhodecode.lib.exc_tracking import store_exception
celery: celery 4.X support. Fixes #4169...
r2359 meta.Session.remove()
celery: use exc_tracker to store tasks exceptions for easier debugging.
r3020
# simulate sys.exc_info()
exc_info = (einfo.type, einfo.exception, einfo.tb)
exc-tracker: store API based exceptions and fix prefixes to no use _
r3335 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
celery: use exc_tracker to store tasks exceptions for easier debugging.
r3020
celery: use safer events for execution of tasks
r2464 closer = celery_app.conf['PYRAMID_CLOSER']
if closer:
closer()
celery: celery 4.X support. Fixes #4169...
r2359
@signals.task_revoked.connect
def task_revoked_signal(
request, terminated, signum, expired, **kwargs):
celery: use safer events for execution of tasks
r2464 closer = celery_app.conf['PYRAMID_CLOSER']
if closer:
closer()
celery: celery 4.X support. Fixes #4169...
r2359
def setup_celery_app(app, root, request, registry, closer, ini_location):
ini_dir = os.path.dirname(os.path.abspath(ini_location))
celery_config = base_celery_config
celery_config.update({
# store celerybeat scheduler db where the .ini file is
'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
})
ini_settings = get_ini_config(ini_location)
log.debug('Got custom celery conf: %s', ini_settings)
celery_config.update(ini_settings)
celery_app.config_from_object(celery_config)
celery_app.conf.update({'PYRAMID_APP': app})
celery_app.conf.update({'PYRAMID_ROOT': root})
celery_app.conf.update({'PYRAMID_REQUEST': request})
celery_app.conf.update({'PYRAMID_REGISTRY': registry})
celery_app.conf.update({'PYRAMID_CLOSER': closer})
def configure_celery(config, ini_location):
"""
Helper that is called from our application creation logic. It gives
connection info into running webapp and allows execution of tasks from
RhodeCode itself
"""
# store some globals into rhodecode
rhodecode.CELERY_ENABLED = str2bool(
config.registry.settings.get('use_celery'))
if rhodecode.CELERY_ENABLED:
log.info('Configuring celery based on `%s` file', ini_location)
setup_celery_app(
app=None, root=None, request=None, registry=config.registry,
closer=None, ini_location=ini_location)
celery: use safe environ extraction. In few cases of executing tasks at non-pyramid level....
r2417 def maybe_prepare_env(req):
environ = {}
try:
environ.update({
'PATH_INFO': req.environ['PATH_INFO'],
'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
celery: ping db connection before task execution to recycle db connections.
r3390 'HTTP_HOST':req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
celery: use safe environ extraction. In few cases of executing tasks at non-pyramid level....
r2417 'SERVER_NAME': req.environ['SERVER_NAME'],
'SERVER_PORT': req.environ['SERVER_PORT'],
'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
})
except Exception:
pass
return environ
celery: celery 4.X support. Fixes #4169...
r2359 class RequestContextTask(Task):
"""
This is a celery task which will create a rhodecode app instance context
for the task, patch pyramid with the original request
that created the task and also add the user to the context.
"""
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
link=None, link_error=None, shadow=None, **options):
""" queue the job to run (we are in web request context here) """
req = get_current_request()
# web case
if hasattr(req, 'user'):
ip_addr = req.user.ip_addr
user_id = req.user.user_id
# api case
elif hasattr(req, 'rpc_user'):
ip_addr = req.rpc_user.ip_addr
user_id = req.rpc_user.user_id
else:
raise Exception(
'Unable to fetch required data from request: {}. \n'
'This task is required to be executed from context of '
'request in a webapp'.format(repr(req)))
if req:
# we hook into kwargs since it is the only way to pass our data to
# the celery worker
celery: use safe environ extraction. In few cases of executing tasks at non-pyramid level....
r2417 environ = maybe_prepare_env(req)
celery: celery 4.X support. Fixes #4169...
r2359 options['headers'] = options.get('headers', {})
options['headers'].update({
'rhodecode_proxy_data': {
celery: use safe environ extraction. In few cases of executing tasks at non-pyramid level....
r2417 'environ': environ,
celery: celery 4.X support. Fixes #4169...
r2359 'auth_user': {
'ip_addr': ip_addr,
'user_id': user_id
},
}
})
return super(RequestContextTask, self).apply_async(
args, kwargs, task_id, producer, link, link_error, shadow, **options)
def __call__(self, *args, **kwargs):
""" rebuild the context and then run task on celery worker """
proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
if not proxy_data:
return super(RequestContextTask, self).__call__(*args, **kwargs)
log.debug('using celery proxy data to run task: %r', proxy_data)
# re-inject and register threadlocals for proper routing support
request = prepare_request(proxy_data['environ'])
request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
ip_addr=proxy_data['auth_user']['ip_addr'])
return super(RequestContextTask, self).__call__(*args, **kwargs)