##// END OF EJS Templates
env: expose default base dir into env on app start
env: expose default base dir into env on app start

File last commit:

r5054:c54edc4f default
r5059:27d18f5d default
Show More
loader.py
355 lines | 10.5 KiB | text/x-python | PythonLexer
celery: celery 4.X support. Fixes #4169...
r2359
code: update copyrights to 2020
r4306 # Copyright (C) 2010-2020 RhodeCode GmbH
celery: celery 4.X support. Fixes #4169...
r2359 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
Celery loader, run with::
scheduler: added DB models and db parsers for the RhodeCode scheduler....
r2406 celery worker \
celery: improve errors handling and logging
r4868 --task-events \
scheduler: added DB models and db parsers for the RhodeCode scheduler....
r2406 --beat \
code: small fixes/whitespace/logging
r4872 --autoscale=20,2 \
--max-tasks-per-child 1 \
scheduler: added DB models and db parsers for the RhodeCode scheduler....
r2406 --app rhodecode.lib.celerylib.loader \
--scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
celery: improve errors handling and logging
r4868 --loglevel DEBUG --ini=.dev/dev.ini
celery: celery 4.X support. Fixes #4169...
r2359 """
import os
import logging
celery: register EE tasks if we have EE enabled
r2465 import importlib
celery: celery 4.X support. Fixes #4169...
r2359
from celery import Celery
from celery import signals
from celery import Task
code: unified coverage notes to # pragma: no cover
r3282 from celery import exceptions # pragma: no cover
celery: celery 4.X support. Fixes #4169...
r2359 from kombu.serialization import register
import rhodecode
statsd: better task execution reporting on celery
r4892 from rhodecode.lib.statsd_client import StatsdClient
config: major update for the code to make it be almost fully controllable via env for new docker based installer.
r4823 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
celery: celery 4.X support. Fixes #4169...
r2359 from rhodecode.lib.ext_json import json
requests: cleaned / unified way of handling requests generation from non-web scope....
r4873 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
celery: celery 4.X support. Fixes #4169...
r2359 from rhodecode.lib.utils2 import str2bool
from rhodecode.model import meta
register('json_ext', json.dumps, json.loads,
content_type='application/x-json-ext',
content_encoding='utf-8')
log = logging.getLogger('celery.rhodecode.loader')
celery: register EE tasks if we have EE enabled
r2465 imports = ['rhodecode.lib.celerylib.tasks']
try:
# try if we have EE tasks available
importlib.import_module('rc_ee')
imports.append('rc_ee.lib.celerylib.tasks')
except ImportError:
pass
celery: celery 4.X support. Fixes #4169...
r2359 base_celery_config = {
'result_backend': 'rpc://',
'result_expires': 60 * 60 * 24,
'result_persistent': True,
celery: register EE tasks if we have EE enabled
r2465 'imports': imports,
config: reduce max-task-per-child to 20
r4890 'worker_max_tasks_per_child': 20,
celery: allow JSON task data to be sent
r4881 'accept_content': ['json_ext', 'json'],
celery: celery 4.X support. Fixes #4169...
r2359 'task_serializer': 'json_ext',
'result_serializer': 'json_ext',
'worker_hijack_root_logger': False,
scheduler: added DB models and db parsers for the RhodeCode scheduler....
r2406 'database_table_names': {
'task': 'beat_taskmeta',
'group': 'beat_groupmeta',
}
celery: celery 4.X support. Fixes #4169...
r2359 }
celery: bumped to 4.4.7 and fixed loader logic/logging
r4880
def add_preload_arguments(parser):
parser.add_argument(
'--ini', default=None,
help='Path to ini configuration file.'
)
parser.add_argument(
'--ini-var', default=None,
help='Comma separated list of key=value to pass to ini.'
)
def get_logger(obj):
custom_log = logging.getLogger(
'rhodecode.task.{}'.format(obj.__class__.__name__))
if rhodecode.CELERY_ENABLED:
try:
custom_log = obj.get_logger()
except Exception:
pass
return custom_log
celery: celery 4.X support. Fixes #4169...
r2359 # init main celery app
celery_app = Celery()
celery_app.user_options['preload'].add(add_preload_arguments)
@signals.setup_logging.connect
def setup_logging_callback(**kwargs):
celery: bumped to 4.4.7 and fixed loader logic/logging
r4880 ini_file = celery_app.conf['RC_INI_FILE']
setup_logging(ini_file)
celery: celery 4.X support. Fixes #4169...
r2359
@signals.user_preload_options.connect
def on_preload_parsed(options, **kwargs):
config: major update for the code to make it be almost fully controllable via env for new docker based installer.
r4823
celery: bumped to 4.4.7 and fixed loader logic/logging
r4880 ini_file = options['ini']
celery: celery 4.X support. Fixes #4169...
r2359 ini_vars = options['ini_var']
celery: bumped to 4.4.7 and fixed loader logic/logging
r4880 if ini_file is None:
celery: fixed bootstrap of beat workers
r4889 print('You must provide the --ini argument to start celery')
celery: celery 4.X support. Fixes #4169...
r2359 exit(-1)
options = None
if ini_vars is not None:
options = parse_ini_vars(ini_vars)
celery: bumped to 4.4.7 and fixed loader logic/logging
r4880 celery_app.conf['RC_INI_FILE'] = ini_file
celery_app.conf['RC_INI_OPTIONS'] = options
setup_logging(ini_file)
celery: fixed bootstrap of beat workers
r4889 def _init_celery(app_type=''):
celery: bumped to 4.4.7 and fixed loader logic/logging
r4880 from rhodecode.config.middleware import get_celery_config
celery: celery 4.X support. Fixes #4169...
r2359
celery: fixed bootstrap of beat workers
r4889 log.debug('Bootstrapping RhodeCode application for %s...', app_type)
code: small fixes/whitespace/logging
r4872
celery: fixed bootstrap of beat workers
r4889 ini_file = celery_app.conf['RC_INI_FILE']
options = celery_app.conf['RC_INI_OPTIONS']
celery: bumped to 4.4.7 and fixed loader logic/logging
r4880
celery: fixed bootstrap of beat workers
r4889 env = None
celery: improve errors handling and logging
r4868 try:
celery: bumped to 4.4.7 and fixed loader logic/logging
r4880 env = bootstrap(ini_file, options=options)
celery: improve errors handling and logging
r4868 except Exception:
log.exception('Failed to bootstrap RhodeCode APP')
celery: celery 4.X support. Fixes #4169...
r2359
celery: fixed bootstrap of beat workers
r4889 if not env:
raise EnvironmentError(
'Failed to load pyramid ENV. '
'Probably there is another error present that prevents from running pyramid app')
code: small fixes/whitespace/logging
r4872 log.debug('Got Pyramid ENV: %s', env)
celery: fail with exception if bootstrap is failed
r4876
config: major update for the code to make it be almost fully controllable via env for new docker based installer.
r4823 celery_settings = get_celery_config(env['registry'].settings)
code: small fixes/whitespace/logging
r4872
celery: celery 4.X support. Fixes #4169...
r2359 setup_celery_app(
app=env['app'], root=env['root'], request=env['request'],
registry=env['registry'], closer=env['closer'],
config: major update for the code to make it be almost fully controllable via env for new docker based installer.
r4823 celery_settings=celery_settings)
celery: celery 4.X support. Fixes #4169...
r2359
celery: fixed bootstrap of beat workers
r4889
@signals.celeryd_init.connect
def on_celeryd_init(sender=None, conf=None, **kwargs):
_init_celery('celery worker')
celery: celery 4.X support. Fixes #4169...
r2359 # fix the global flag even if it's disabled via .ini file because this
# is a worker code that doesn't need this to be disabled.
rhodecode.CELERY_ENABLED = True
celery: fixed bootstrap of beat workers
r4889 @signals.beat_init.connect
def on_beat_init(sender=None, conf=None, **kwargs):
_init_celery('celery beat')
celery: ping db connection before task execution to recycle db connections.
r3390 @signals.task_prerun.connect
def task_prerun_signal(task_id, task, args, **kwargs):
ping_db()
statsd: better task execution reporting on celery
r4892 statsd = StatsdClient.statsd
if statsd:
task_repr = getattr(task, 'name', task)
statsd.incr('rhodecode_celery_task_total', tags=[
'task:{}'.format(task_repr),
'mode:async'
])
celery: ping db connection before task execution to recycle db connections.
r3390
celery: celery 4.X support. Fixes #4169...
r2359 @signals.task_success.connect
def task_success_signal(result, **kwargs):
meta.Session.commit()
celery: use safer events for execution of tasks
r2464 closer = celery_app.conf['PYRAMID_CLOSER']
if closer:
closer()
celery: celery 4.X support. Fixes #4169...
r2359
statsd: better task execution reporting on celery
r4892
celery: celery 4.X support. Fixes #4169...
r2359 @signals.task_retry.connect
def task_retry_signal(
request, reason, einfo, **kwargs):
meta.Session.remove()
celery: use safer events for execution of tasks
r2464 closer = celery_app.conf['PYRAMID_CLOSER']
if closer:
closer()
celery: celery 4.X support. Fixes #4169...
r2359
@signals.task_failure.connect
def task_failure_signal(
task_id, exception, args, kwargs, traceback, einfo, **kargs):
celery: don't allow subtasks within tasks
r4875
log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
celery: use exc_tracker to store tasks exceptions for easier debugging.
r3020 from rhodecode.lib.exc_tracking import store_exception
metrics: expose more metrics via statsd...
r4801 from rhodecode.lib.statsd_client import StatsdClient
celery: use exc_tracker to store tasks exceptions for easier debugging.
r3020
celery: celery 4.X support. Fixes #4169...
r2359 meta.Session.remove()
celery: use exc_tracker to store tasks exceptions for easier debugging.
r3020
# simulate sys.exc_info()
exc_info = (einfo.type, einfo.exception, einfo.tb)
exc-tracker: store API based exceptions and fix prefixes to no use _
r3335 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
metrics: expose more metrics via statsd...
r4801 statsd = StatsdClient.statsd
if statsd:
metrics: expose exc_type in consistent format
r4808 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
statsd.incr('rhodecode_exception_total',
tags=["exc_source:celery", "type:{}".format(exc_type)])
celery: use exc_tracker to store tasks exceptions for easier debugging.
r3020
celery: use safer events for execution of tasks
r2464 closer = celery_app.conf['PYRAMID_CLOSER']
if closer:
closer()
celery: celery 4.X support. Fixes #4169...
r2359
@signals.task_revoked.connect
def task_revoked_signal(
request, terminated, signum, expired, **kwargs):
celery: use safer events for execution of tasks
r2464 closer = celery_app.conf['PYRAMID_CLOSER']
if closer:
closer()
celery: celery 4.X support. Fixes #4169...
r2359
celery: update how reqquest object is passed arround....
r4878 class UNSET(object):
pass
celery: use just one instance of UNSET() object
r5019 _unset = UNSET()
def set_celery_conf(app=_unset, root=_unset, request=_unset, registry=_unset, closer=_unset):
celery: update how reqquest object is passed arround....
r4878
if request is not UNSET:
celery_app.conf.update({'PYRAMID_REQUEST': request})
if registry is not UNSET:
celery_app.conf.update({'PYRAMID_REGISTRY': registry})
config: major update for the code to make it be almost fully controllable via env for new docker based installer.
r4823 def setup_celery_app(app, root, request, registry, closer, celery_settings):
log.debug('Got custom celery conf: %s', celery_settings)
celery: celery 4.X support. Fixes #4169...
r2359 celery_config = base_celery_config
celery_config.update({
# store celerybeat scheduler db where the .ini file is
config: major update for the code to make it be almost fully controllable via env for new docker based installer.
r4823 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
celery: celery 4.X support. Fixes #4169...
r2359 })
config: major update for the code to make it be almost fully controllable via env for new docker based installer.
r4823 celery_config.update(celery_settings)
celery: celery 4.X support. Fixes #4169...
r2359 celery_app.config_from_object(celery_config)
celery_app.conf.update({'PYRAMID_APP': app})
celery_app.conf.update({'PYRAMID_ROOT': root})
celery_app.conf.update({'PYRAMID_REQUEST': request})
celery_app.conf.update({'PYRAMID_REGISTRY': registry})
celery_app.conf.update({'PYRAMID_CLOSER': closer})
config: major update for the code to make it be almost fully controllable via env for new docker based installer.
r4823 def configure_celery(config, celery_settings):
celery: celery 4.X support. Fixes #4169...
r2359 """
Helper that is called from our application creation logic. It gives
connection info into running webapp and allows execution of tasks from
RhodeCode itself
"""
# store some globals into rhodecode
rhodecode.CELERY_ENABLED = str2bool(
config.registry.settings.get('use_celery'))
if rhodecode.CELERY_ENABLED:
config: major update for the code to make it be almost fully controllable via env for new docker based installer.
r4823 log.info('Configuring celery based on `%s` settings', celery_settings)
celery: celery 4.X support. Fixes #4169...
r2359 setup_celery_app(
app=None, root=None, request=None, registry=config.registry,
config: major update for the code to make it be almost fully controllable via env for new docker based installer.
r4823 closer=None, celery_settings=celery_settings)
celery: celery 4.X support. Fixes #4169...
r2359
celery: use safe environ extraction. In few cases of executing tasks at non-pyramid level....
r2417 def maybe_prepare_env(req):
environ = {}
try:
environ.update({
'PATH_INFO': req.environ['PATH_INFO'],
'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
config: major update for the code to make it be almost fully controllable via env for new docker based installer.
r4823 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
celery: use safe environ extraction. In few cases of executing tasks at non-pyramid level....
r2417 'SERVER_NAME': req.environ['SERVER_NAME'],
'SERVER_PORT': req.environ['SERVER_PORT'],
'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
})
except Exception:
pass
return environ
celery: celery 4.X support. Fixes #4169...
r2359 class RequestContextTask(Task):
"""
This is a celery task which will create a rhodecode app instance context
for the task, patch pyramid with the original request
that created the task and also add the user to the context.
"""
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
link=None, link_error=None, shadow=None, **options):
""" queue the job to run (we are in web request context here) """
celery: update how reqquest object is passed arround....
r4878 from rhodecode.lib.base import get_ip_addr
celery: celery 4.X support. Fixes #4169...
r2359
celery: update how reqquest object is passed arround....
r4878 req = self.app.conf['PYRAMID_REQUEST']
if not req:
raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
requests: cleaned / unified way of handling requests generation from non-web scope....
r4873
debug: logging fix with reduced redundant info
r4871 log.debug('Running Task with class: %s. Request Class: %s',
self.__class__, req.__class__)
celery: celery 4.X support. Fixes #4169...
r2359
celery: update how reqquest object is passed arround....
r4878 user_id = 0
requests: cleaned / unified way of handling requests generation from non-web scope....
r4873
celery: celery 4.X support. Fixes #4169...
r2359 # web case
if hasattr(req, 'user'):
user_id = req.user.user_id
# api case
elif hasattr(req, 'rpc_user'):
user_id = req.rpc_user.user_id
requests: cleaned / unified way of handling requests generation from non-web scope....
r4873
celery: update how reqquest object is passed arround....
r4878 # we hook into kwargs since it is the only way to pass our data to
# the celery worker
environ = maybe_prepare_env(req)
options['headers'] = options.get('headers', {})
options['headers'].update({
'rhodecode_proxy_data': {
'environ': environ,
'auth_user': {
'ip_addr': get_ip_addr(req.environ),
'user_id': user_id
},
}
})
celery: celery 4.X support. Fixes #4169...
r2359
return super(RequestContextTask, self).apply_async(
args, kwargs, task_id, producer, link, link_error, shadow, **options)