__init__.py
181 lines
| 6.1 KiB
| text/x-python
|
PythonLexer
r0 | # -*- coding: utf-8 -*- | |||
r112 | # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors | |||
r0 | # | |||
r112 | # Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | ||||
# You may obtain a copy of the License at | ||||
r0 | # | |||
r112 | # http://www.apache.org/licenses/LICENSE-2.0 | |||
r0 | # | |||
r112 | # Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | ||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
# See the License for the specific language governing permissions and | ||||
# limitations under the License. | ||||
r0 | ||||
import logging | ||||
from datetime import timedelta | ||||
from celery import Celery | ||||
from celery.bin import Option | ||||
from celery.schedules import crontab | ||||
from celery.signals import worker_init, task_revoked, user_preload_options | ||||
from celery.signals import task_prerun, task_retry, task_failure, task_success | ||||
from kombu.serialization import register | ||||
from pyramid.paster import bootstrap | ||||
from pyramid.request import Request | ||||
from pyramid.scripting import prepare | ||||
from pyramid.settings import asbool | ||||
from pyramid.threadlocal import get_current_request | ||||
from appenlight.celery.encoders import json_dumps, json_loads | ||||
from appenlight_client.ext.celery import register_signals | ||||
log = logging.getLogger(__name__) | ||||
r153 | register( | |||
"date_json", | ||||
json_dumps, | ||||
json_loads, | ||||
content_type="application/x-date_json", | ||||
content_encoding="utf-8", | ||||
) | ||||
r0 | ||||
celery = Celery() | ||||
r153 | celery.user_options["preload"].add( | |||
Option( | ||||
"--ini", | ||||
dest="ini", | ||||
default=None, | ||||
help="Specifies pyramid configuration file location.", | ||||
) | ||||
r0 | ) | |||
r21 | ||||
r0 | @user_preload_options.connect | |||
def on_preload_parsed(options, **kwargs): | ||||
""" | ||||
This actually configures celery from pyramid config file | ||||
""" | ||||
r153 | celery.conf["INI_PYRAMID"] = options["ini"] | |||
r0 | import appenlight_client.client as e_client | |||
r153 | ||||
ini_location = options["ini"] | ||||
r0 | if not ini_location: | |||
r153 | raise Exception( | |||
"You need to pass pyramid ini location using " | ||||
"--ini=filename.ini argument to the worker" | ||||
) | ||||
r152 | env = bootstrap(ini_location[0]) | |||
r153 | api_key = env["request"].registry.settings["appenlight.api_key"] | |||
tr_config = env["request"].registry.settings.get("appenlight.transport_config") | ||||
CONFIG = e_client.get_config({"appenlight.api_key": api_key}) | ||||
r0 | if tr_config: | |||
r153 | CONFIG["appenlight.transport_config"] = tr_config | |||
r0 | APPENLIGHT_CLIENT = e_client.Client(CONFIG) | |||
# log.addHandler(APPENLIGHT_CLIENT.log_handler) | ||||
register_signals(APPENLIGHT_CLIENT) | ||||
celery.pyramid = env | ||||
celery_config = { | ||||
r153 | "CELERY_IMPORTS": ["appenlight.celery.tasks"], | |||
"CELERYD_TASK_TIME_LIMIT": 60, | ||||
"CELERYD_MAX_TASKS_PER_CHILD": 1000, | ||||
"CELERY_IGNORE_RESULT": True, | ||||
"CELERY_ACCEPT_CONTENT": ["date_json"], | ||||
"CELERY_TASK_SERIALIZER": "date_json", | ||||
"CELERY_RESULT_SERIALIZER": "date_json", | ||||
"BROKER_URL": None, | ||||
"CELERYD_CONCURRENCY": None, | ||||
"CELERY_TIMEZONE": None, | ||||
"CELERYBEAT_SCHEDULE": { | ||||
"alerting_reports": { | ||||
"task": "appenlight.celery.tasks.alerting_reports", | ||||
"schedule": timedelta(seconds=60), | ||||
r0 | }, | |||
r153 | "close_alerts": { | |||
"task": "appenlight.celery.tasks.close_alerts", | ||||
"schedule": timedelta(seconds=60), | ||||
}, | ||||
}, | ||||
r0 | } | |||
celery.config_from_object(celery_config) | ||||
r21 | ||||
r0 | def configure_celery(pyramid_registry): | |||
settings = pyramid_registry.settings | ||||
r153 | celery_config["BROKER_URL"] = settings["celery.broker_url"] | |||
celery_config["CELERYD_CONCURRENCY"] = settings["celery.concurrency"] | ||||
celery_config["CELERY_TIMEZONE"] = settings["celery.timezone"] | ||||
r21 | ||||
r153 | notifications_seconds = int( | |||
settings.get("tasks.notifications_reports.interval", 60) | ||||
) | ||||
r21 | ||||
r153 | celery_config["CELERYBEAT_SCHEDULE"]["notifications"] = { | |||
"task": "appenlight.celery.tasks.notifications_reports", | ||||
"schedule": timedelta(seconds=notifications_seconds), | ||||
r21 | } | |||
r153 | celery_config["CELERYBEAT_SCHEDULE"]["daily_digest"] = { | |||
"task": "appenlight.celery.tasks.daily_digest", | ||||
"schedule": crontab(minute=1, hour="4,12,20"), | ||||
r21 | } | |||
r153 | if asbool(settings.get("celery.always_eager")): | |||
celery_config["CELERY_ALWAYS_EAGER"] = True | ||||
celery_config["CELERY_EAGER_PROPAGATES_EXCEPTIONS"] = True | ||||
r0 | ||||
for plugin in pyramid_registry.appenlight_plugins.values(): | ||||
r153 | if plugin.get("celery_tasks"): | |||
celery_config["CELERY_IMPORTS"].extend(plugin["celery_tasks"]) | ||||
if plugin.get("celery_beats"): | ||||
for name, config in plugin["celery_beats"]: | ||||
celery_config["CELERYBEAT_SCHEDULE"][name] = config | ||||
r0 | celery.config_from_object(celery_config) | |||
@task_prerun.connect | ||||
def task_prerun_signal(task_id, task, args, kwargs, **kwaargs): | ||||
r153 | if hasattr(celery, "pyramid"): | |||
r0 | env = celery.pyramid | |||
r153 | env = prepare(registry=env["request"].registry) | |||
proper_base_url = env["request"].registry.settings["mailing.app_url"] | ||||
tmp_req = Request.blank("/", base_url=proper_base_url) | ||||
r0 | # ensure tasks generate url for right domain from config | |||
r153 | env["request"].environ["HTTP_HOST"] = tmp_req.environ["HTTP_HOST"] | |||
env["request"].environ["SERVER_PORT"] = tmp_req.environ["SERVER_PORT"] | ||||
env["request"].environ["SERVER_NAME"] = tmp_req.environ["SERVER_NAME"] | ||||
env["request"].environ["wsgi.url_scheme"] = tmp_req.environ["wsgi.url_scheme"] | ||||
r0 | get_current_request().tm.begin() | |||
@task_success.connect | ||||
def task_success_signal(result, **kwargs): | ||||
get_current_request().tm.commit() | ||||
r153 | if hasattr(celery, "pyramid"): | |||
r0 | celery.pyramid["closer"]() | |||
@task_retry.connect | ||||
def task_retry_signal(request, reason, einfo, **kwargs): | ||||
get_current_request().tm.abort() | ||||
r153 | if hasattr(celery, "pyramid"): | |||
r0 | celery.pyramid["closer"]() | |||
@task_failure.connect | ||||
r153 | def task_failure_signal(task_id, exception, args, kwargs, traceback, einfo, **kwaargs): | |||
r0 | get_current_request().tm.abort() | |||
r153 | if hasattr(celery, "pyramid"): | |||
r0 | celery.pyramid["closer"]() | |||
@task_revoked.connect | ||||
def task_revoked_signal(request, terminated, signum, expired, **kwaargs): | ||||
get_current_request().tm.abort() | ||||
r153 | if hasattr(celery, "pyramid"): | |||
r0 | celery.pyramid["closer"]() | |||