__init__.py
171 lines
| 6.1 KiB
| text/x-python
|
PythonLexer
r0 | # -*- coding: utf-8 -*- | |||
r112 | # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors | |||
r0 | # | |||
r112 | # Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | ||||
# You may obtain a copy of the License at | ||||
r0 | # | |||
r112 | # http://www.apache.org/licenses/LICENSE-2.0 | |||
r0 | # | |||
r112 | # Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | ||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
# See the License for the specific language governing permissions and | ||||
# limitations under the License. | ||||
r0 | ||||
import logging | ||||
from datetime import timedelta | ||||
from celery import Celery | ||||
from celery.bin import Option | ||||
from celery.schedules import crontab | ||||
from celery.signals import worker_init, task_revoked, user_preload_options | ||||
from celery.signals import task_prerun, task_retry, task_failure, task_success | ||||
from kombu.serialization import register | ||||
from pyramid.paster import bootstrap | ||||
from pyramid.request import Request | ||||
from pyramid.scripting import prepare | ||||
from pyramid.settings import asbool | ||||
from pyramid.threadlocal import get_current_request | ||||
from appenlight.celery.encoders import json_dumps, json_loads | ||||
from appenlight_client.ext.celery import register_signals | ||||
log = logging.getLogger(__name__) | ||||
register('date_json', json_dumps, json_loads, | ||||
content_type='application/x-date_json', | ||||
content_encoding='utf-8') | ||||
celery = Celery() | ||||
celery.user_options['preload'].add( | ||||
Option('--ini', dest='ini', default=None, | ||||
help='Specifies pyramid configuration file location.') | ||||
) | ||||
r21 | ||||
r0 | @user_preload_options.connect | |||
def on_preload_parsed(options, **kwargs): | ||||
""" | ||||
This actually configures celery from pyramid config file | ||||
""" | ||||
celery.conf['INI_PYRAMID'] = options['ini'] | ||||
import appenlight_client.client as e_client | ||||
ini_location = options['ini'] | ||||
if not ini_location: | ||||
raise Exception('You need to pass pyramid ini location using ' | ||||
'--ini=filename.ini argument to the worker') | ||||
r152 | env = bootstrap(ini_location[0]) | |||
r0 | api_key = env['request'].registry.settings['appenlight.api_key'] | |||
tr_config = env['request'].registry.settings.get( | ||||
'appenlight.transport_config') | ||||
CONFIG = e_client.get_config({'appenlight.api_key': api_key}) | ||||
if tr_config: | ||||
CONFIG['appenlight.transport_config'] = tr_config | ||||
APPENLIGHT_CLIENT = e_client.Client(CONFIG) | ||||
# log.addHandler(APPENLIGHT_CLIENT.log_handler) | ||||
register_signals(APPENLIGHT_CLIENT) | ||||
celery.pyramid = env | ||||
celery_config = { | ||||
r21 | 'CELERY_IMPORTS': ["appenlight.celery.tasks", ], | |||
r0 | 'CELERYD_TASK_TIME_LIMIT': 60, | |||
'CELERYD_MAX_TASKS_PER_CHILD': 1000, | ||||
'CELERY_IGNORE_RESULT': True, | ||||
'CELERY_ACCEPT_CONTENT': ['date_json'], | ||||
'CELERY_TASK_SERIALIZER': 'date_json', | ||||
'CELERY_RESULT_SERIALIZER': 'date_json', | ||||
'BROKER_URL': None, | ||||
'CELERYD_CONCURRENCY': None, | ||||
'CELERY_TIMEZONE': None, | ||||
'CELERYBEAT_SCHEDULE': { | ||||
r21 | 'alerting_reports': { | |||
'task': 'appenlight.celery.tasks.alerting_reports', | ||||
r0 | 'schedule': timedelta(seconds=60) | |||
}, | ||||
r21 | 'close_alerts': { | |||
'task': 'appenlight.celery.tasks.close_alerts', | ||||
'schedule': timedelta(seconds=60) | ||||
} | ||||
r0 | } | |||
} | ||||
celery.config_from_object(celery_config) | ||||
r21 | ||||
r0 | def configure_celery(pyramid_registry): | |||
settings = pyramid_registry.settings | ||||
celery_config['BROKER_URL'] = settings['celery.broker_url'] | ||||
celery_config['CELERYD_CONCURRENCY'] = settings['celery.concurrency'] | ||||
celery_config['CELERY_TIMEZONE'] = settings['celery.timezone'] | ||||
r21 | ||||
notifications_seconds = int(settings.get('tasks.notifications_reports.interval', 60)) | ||||
celery_config['CELERYBEAT_SCHEDULE']['notifications'] = { | ||||
'task': 'appenlight.celery.tasks.notifications_reports', | ||||
'schedule': timedelta(seconds=notifications_seconds) | ||||
} | ||||
celery_config['CELERYBEAT_SCHEDULE']['daily_digest'] = { | ||||
'task': 'appenlight.celery.tasks.daily_digest', | ||||
'schedule': crontab(minute=1, hour='4,12,20') | ||||
} | ||||
r0 | if asbool(settings.get('celery.always_eager')): | |||
celery_config['CELERY_ALWAYS_EAGER'] = True | ||||
celery_config['CELERY_EAGER_PROPAGATES_EXCEPTIONS'] = True | ||||
for plugin in pyramid_registry.appenlight_plugins.values(): | ||||
if plugin.get('celery_tasks'): | ||||
celery_config['CELERY_IMPORTS'].extend(plugin['celery_tasks']) | ||||
if plugin.get('celery_beats'): | ||||
for name, config in plugin['celery_beats']: | ||||
celery_config['CELERYBEAT_SCHEDULE'][name] = config | ||||
celery.config_from_object(celery_config) | ||||
@task_prerun.connect | ||||
def task_prerun_signal(task_id, task, args, kwargs, **kwaargs): | ||||
if hasattr(celery, 'pyramid'): | ||||
env = celery.pyramid | ||||
env = prepare(registry=env['request'].registry) | ||||
proper_base_url = env['request'].registry.settings['mailing.app_url'] | ||||
r21 | tmp_req = Request.blank('/', base_url=proper_base_url) | |||
r0 | # ensure tasks generate url for right domain from config | |||
r21 | env['request'].environ['HTTP_HOST'] = tmp_req.environ['HTTP_HOST'] | |||
env['request'].environ['SERVER_PORT'] = tmp_req.environ['SERVER_PORT'] | ||||
env['request'].environ['SERVER_NAME'] = tmp_req.environ['SERVER_NAME'] | ||||
env['request'].environ['wsgi.url_scheme'] = \ | ||||
tmp_req.environ['wsgi.url_scheme'] | ||||
r0 | get_current_request().tm.begin() | |||
@task_success.connect | ||||
def task_success_signal(result, **kwargs): | ||||
get_current_request().tm.commit() | ||||
if hasattr(celery, 'pyramid'): | ||||
celery.pyramid["closer"]() | ||||
@task_retry.connect | ||||
def task_retry_signal(request, reason, einfo, **kwargs): | ||||
get_current_request().tm.abort() | ||||
if hasattr(celery, 'pyramid'): | ||||
celery.pyramid["closer"]() | ||||
@task_failure.connect | ||||
def task_failure_signal(task_id, exception, args, kwargs, traceback, einfo, | ||||
**kwaargs): | ||||
get_current_request().tm.abort() | ||||
if hasattr(celery, 'pyramid'): | ||||
celery.pyramid["closer"]() | ||||
@task_revoked.connect | ||||
def task_revoked_signal(request, terminated, signum, expired, **kwaargs): | ||||
get_current_request().tm.abort() | ||||
if hasattr(celery, 'pyramid'): | ||||
celery.pyramid["closer"]() | ||||