##// END OF EJS Templates
Aligned expanded sumarry box content better and moved comments as last column
Aligned expanded sumarry box content better and moved comments as last column

File last commit:

r3390:02f7713a default
r3639:7ee2e326 new-ui
Show More
loader.py
306 lines | 9.5 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
# Copyright (C) 2010-2019 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
Celery loader, run with::
celery worker \
--beat \
--app rhodecode.lib.celerylib.loader \
--scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
--loglevel DEBUG --ini=._dev/dev.ini
"""
import os
import logging
import importlib
from celery import Celery
from celery import signals
from celery import Task
from celery import exceptions # pragma: no cover
from kombu.serialization import register
from pyramid.threadlocal import get_current_request
import rhodecode
from rhodecode.lib.auth import AuthUser
from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars, ping_db
from rhodecode.lib.ext_json import json
from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
from rhodecode.lib.utils2 import str2bool
from rhodecode.model import meta
register('json_ext', json.dumps, json.loads,
content_type='application/x-json-ext',
content_encoding='utf-8')
log = logging.getLogger('celery.rhodecode.loader')
def add_preload_arguments(parser):
parser.add_argument(
'--ini', default=None,
help='Path to ini configuration file.'
)
parser.add_argument(
'--ini-var', default=None,
help='Comma separated list of key=value to pass to ini.'
)
def get_logger(obj):
custom_log = logging.getLogger(
'rhodecode.task.{}'.format(obj.__class__.__name__))
if rhodecode.CELERY_ENABLED:
try:
custom_log = obj.get_logger()
except Exception:
pass
return custom_log
imports = ['rhodecode.lib.celerylib.tasks']
try:
# try if we have EE tasks available
importlib.import_module('rc_ee')
imports.append('rc_ee.lib.celerylib.tasks')
except ImportError:
pass
base_celery_config = {
'result_backend': 'rpc://',
'result_expires': 60 * 60 * 24,
'result_persistent': True,
'imports': imports,
'worker_max_tasks_per_child': 100,
'accept_content': ['json_ext'],
'task_serializer': 'json_ext',
'result_serializer': 'json_ext',
'worker_hijack_root_logger': False,
'database_table_names': {
'task': 'beat_taskmeta',
'group': 'beat_groupmeta',
}
}
# init main celery app
celery_app = Celery()
celery_app.user_options['preload'].add(add_preload_arguments)
ini_file_glob = None
@signals.setup_logging.connect
def setup_logging_callback(**kwargs):
setup_logging(ini_file_glob)
@signals.user_preload_options.connect
def on_preload_parsed(options, **kwargs):
ini_location = options['ini']
ini_vars = options['ini_var']
celery_app.conf['INI_PYRAMID'] = options['ini']
if ini_location is None:
print('You must provide the paste --ini argument')
exit(-1)
options = None
if ini_vars is not None:
options = parse_ini_vars(ini_vars)
global ini_file_glob
ini_file_glob = ini_location
log.debug('Bootstrapping RhodeCode application...')
env = bootstrap(ini_location, options=options)
setup_celery_app(
app=env['app'], root=env['root'], request=env['request'],
registry=env['registry'], closer=env['closer'],
ini_location=ini_location)
# fix the global flag even if it's disabled via .ini file because this
# is a worker code that doesn't need this to be disabled.
rhodecode.CELERY_ENABLED = True
@signals.task_prerun.connect
def task_prerun_signal(task_id, task, args, **kwargs):
ping_db()
@signals.task_success.connect
def task_success_signal(result, **kwargs):
meta.Session.commit()
closer = celery_app.conf['PYRAMID_CLOSER']
if closer:
closer()
@signals.task_retry.connect
def task_retry_signal(
request, reason, einfo, **kwargs):
meta.Session.remove()
closer = celery_app.conf['PYRAMID_CLOSER']
if closer:
closer()
@signals.task_failure.connect
def task_failure_signal(
task_id, exception, args, kwargs, traceback, einfo, **kargs):
from rhodecode.lib.exc_tracking import store_exception
meta.Session.remove()
# simulate sys.exc_info()
exc_info = (einfo.type, einfo.exception, einfo.tb)
store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
closer = celery_app.conf['PYRAMID_CLOSER']
if closer:
closer()
@signals.task_revoked.connect
def task_revoked_signal(
request, terminated, signum, expired, **kwargs):
closer = celery_app.conf['PYRAMID_CLOSER']
if closer:
closer()
def setup_celery_app(app, root, request, registry, closer, ini_location):
ini_dir = os.path.dirname(os.path.abspath(ini_location))
celery_config = base_celery_config
celery_config.update({
# store celerybeat scheduler db where the .ini file is
'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
})
ini_settings = get_ini_config(ini_location)
log.debug('Got custom celery conf: %s', ini_settings)
celery_config.update(ini_settings)
celery_app.config_from_object(celery_config)
celery_app.conf.update({'PYRAMID_APP': app})
celery_app.conf.update({'PYRAMID_ROOT': root})
celery_app.conf.update({'PYRAMID_REQUEST': request})
celery_app.conf.update({'PYRAMID_REGISTRY': registry})
celery_app.conf.update({'PYRAMID_CLOSER': closer})
def configure_celery(config, ini_location):
"""
Helper that is called from our application creation logic. It gives
connection info into running webapp and allows execution of tasks from
RhodeCode itself
"""
# store some globals into rhodecode
rhodecode.CELERY_ENABLED = str2bool(
config.registry.settings.get('use_celery'))
if rhodecode.CELERY_ENABLED:
log.info('Configuring celery based on `%s` file', ini_location)
setup_celery_app(
app=None, root=None, request=None, registry=config.registry,
closer=None, ini_location=ini_location)
def maybe_prepare_env(req):
environ = {}
try:
environ.update({
'PATH_INFO': req.environ['PATH_INFO'],
'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
'HTTP_HOST':req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
'SERVER_NAME': req.environ['SERVER_NAME'],
'SERVER_PORT': req.environ['SERVER_PORT'],
'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
})
except Exception:
pass
return environ
class RequestContextTask(Task):
"""
This is a celery task which will create a rhodecode app instance context
for the task, patch pyramid with the original request
that created the task and also add the user to the context.
"""
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
link=None, link_error=None, shadow=None, **options):
""" queue the job to run (we are in web request context here) """
req = get_current_request()
# web case
if hasattr(req, 'user'):
ip_addr = req.user.ip_addr
user_id = req.user.user_id
# api case
elif hasattr(req, 'rpc_user'):
ip_addr = req.rpc_user.ip_addr
user_id = req.rpc_user.user_id
else:
raise Exception(
'Unable to fetch required data from request: {}. \n'
'This task is required to be executed from context of '
'request in a webapp'.format(repr(req)))
if req:
# we hook into kwargs since it is the only way to pass our data to
# the celery worker
environ = maybe_prepare_env(req)
options['headers'] = options.get('headers', {})
options['headers'].update({
'rhodecode_proxy_data': {
'environ': environ,
'auth_user': {
'ip_addr': ip_addr,
'user_id': user_id
},
}
})
return super(RequestContextTask, self).apply_async(
args, kwargs, task_id, producer, link, link_error, shadow, **options)
def __call__(self, *args, **kwargs):
""" rebuild the context and then run task on celery worker """
proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
if not proxy_data:
return super(RequestContextTask, self).__call__(*args, **kwargs)
log.debug('using celery proxy data to run task: %r', proxy_data)
# re-inject and register threadlocals for proper routing support
request = prepare_request(proxy_data['environ'])
request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
ip_addr=proxy_data['auth_user']['ip_addr'])
return super(RequestContextTask, self).__call__(*args, **kwargs)