tasks.py
705 lines
| 27.1 KiB
| text/x-python
|
PythonLexer
r0 | # -*- coding: utf-8 -*- | |||
r112 | # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors | |||
r0 | # | |||
r112 | # Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | ||||
# You may obtain a copy of the License at | ||||
r0 | # | |||
r112 | # http://www.apache.org/licenses/LICENSE-2.0 | |||
r0 | # | |||
r112 | # Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | ||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
# See the License for the specific language governing permissions and | ||||
# limitations under the License. | ||||
r0 | ||||
import bisect | ||||
import collections | ||||
import math | ||||
from datetime import datetime, timedelta | ||||
import sqlalchemy as sa | ||||
r151 | import elasticsearch.exceptions | |||
import elasticsearch.helpers | ||||
r0 | ||||
from celery.utils.log import get_task_logger | ||||
from zope.sqlalchemy import mark_changed | ||||
from pyramid.threadlocal import get_current_request, get_current_registry | ||||
r135 | from ziggurat_foundations.models.services.resource import ResourceService | |||
r0 | from appenlight.celery import celery | |||
from appenlight.models.report_group import ReportGroup | ||||
from appenlight.models import DBSession, Datastores | ||||
from appenlight.models.report import Report | ||||
from appenlight.models.log import Log | ||||
r49 | from appenlight.models.metric import Metric | |||
r0 | from appenlight.models.event import Event | |||
from appenlight.models.services.application import ApplicationService | ||||
from appenlight.models.services.event import EventService | ||||
from appenlight.models.services.log import LogService | ||||
from appenlight.models.services.report import ReportService | ||||
from appenlight.models.services.report_group import ReportGroupService | ||||
from appenlight.models.services.user import UserService | ||||
from appenlight.models.tag import Tag | ||||
from appenlight.lib import print_traceback | ||||
from appenlight.lib.utils import parse_proto, in_batches | ||||
from appenlight.lib.ext_json import json | ||||
from appenlight.lib.redis_keys import REDIS_KEYS | ||||
from appenlight.lib.enums import ReportType | ||||
log = get_task_logger(__name__) | ||||
r153 | sample_boundries = ( | |||
list(range(100, 1000, 100)) | ||||
+ list(range(1000, 10000, 1000)) | ||||
+ list(range(10000, 100000, 5000)) | ||||
) | ||||
r0 | ||||
r37 | def pick_sample(total_occurences, report_type=None): | |||
r0 | every = 1.0 | |||
position = bisect.bisect_left(sample_boundries, total_occurences) | ||||
if position > 0: | ||||
r37 | if report_type == ReportType.not_found: | |||
r0 | divide = 10.0 | |||
else: | ||||
divide = 100.0 | ||||
every = sample_boundries[position - 1] / divide | ||||
return total_occurences % every == 0 | ||||
@celery.task(queue="default", default_retry_delay=1, max_retries=2) | ||||
def test_exception_task(): | ||||
r153 | log.error("test celery log", extra={"location": "celery"}) | |||
log.warning("test celery log", extra={"location": "celery"}) | ||||
raise Exception("Celery exception test") | ||||
r0 | ||||
@celery.task(queue="default", default_retry_delay=1, max_retries=2) | ||||
def test_retry_exception_task(): | ||||
try: | ||||
import time | ||||
time.sleep(1.3) | ||||
r153 | log.error("test retry celery log", extra={"location": "celery"}) | |||
log.warning("test retry celery log", extra={"location": "celery"}) | ||||
raise Exception("Celery exception test") | ||||
r0 | except Exception as exc: | |||
r138 | if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]: | |||
raise | ||||
r0 | test_retry_exception_task.retry(exc=exc) | |||
r47 | @celery.task(queue="reports", default_retry_delay=600, max_retries=144) | |||
r87 | def add_reports(resource_id, request_params, dataset, **kwargs): | |||
r153 | proto_version = parse_proto(request_params.get("protocol_version", "")) | |||
r0 | current_time = datetime.utcnow().replace(second=0, microsecond=0) | |||
try: | ||||
# we will store solr docs here for single insert | ||||
es_report_docs = {} | ||||
es_report_group_docs = {} | ||||
resource = ApplicationService.by_id(resource_id) | ||||
tags = [] | ||||
es_slow_calls_docs = {} | ||||
es_reports_stats_rows = {} | ||||
r39 | for report_data in dataset: | |||
r0 | # build report details for later | |||
added_details = 0 | ||||
report = Report() | ||||
report.set_data(report_data, resource, proto_version) | ||||
report._skip_ft_index = True | ||||
r108 | # find latest group in this months partition | |||
r0 | report_group = ReportGroupService.by_hash_and_resource( | |||
report.resource_id, | ||||
r108 | report.grouping_hash, | |||
r153 | since_when=datetime.utcnow().date().replace(day=1), | |||
r0 | ) | |||
r153 | occurences = report_data.get("occurences", 1) | |||
r0 | if not report_group: | |||
# total reports will be +1 moment later | ||||
r153 | report_group = ReportGroup( | |||
grouping_hash=report.grouping_hash, | ||||
occurences=0, | ||||
total_reports=0, | ||||
last_report=0, | ||||
priority=report.priority, | ||||
error=report.error, | ||||
first_timestamp=report.start_time, | ||||
) | ||||
r0 | report_group._skip_ft_index = True | |||
report_group.report_type = report.report_type | ||||
report.report_group_time = report_group.first_timestamp | ||||
r153 | add_sample = pick_sample( | |||
report_group.occurences, report_type=report_group.report_type | ||||
) | ||||
r0 | if add_sample: | |||
resource.report_groups.append(report_group) | ||||
report_group.reports.append(report) | ||||
added_details += 1 | ||||
DBSession.flush() | ||||
if report.partition_id not in es_report_docs: | ||||
es_report_docs[report.partition_id] = [] | ||||
es_report_docs[report.partition_id].append(report.es_doc()) | ||||
tags.extend(list(report.tags.items())) | ||||
slow_calls = report.add_slow_calls(report_data, report_group) | ||||
DBSession.flush() | ||||
for s_call in slow_calls: | ||||
if s_call.partition_id not in es_slow_calls_docs: | ||||
es_slow_calls_docs[s_call.partition_id] = [] | ||||
r153 | es_slow_calls_docs[s_call.partition_id].append(s_call.es_doc()) | |||
r0 | # try generating new stat rows if needed | |||
else: | ||||
# required for postprocessing to not fail later | ||||
report.report_group = report_group | ||||
r153 | stat_row = ReportService.generate_stat_rows(report, resource, report_group) | |||
r0 | if stat_row.partition_id not in es_reports_stats_rows: | |||
es_reports_stats_rows[stat_row.partition_id] = [] | ||||
r153 | es_reports_stats_rows[stat_row.partition_id].append(stat_row.es_doc()) | |||
r0 | ||||
# see if we should mark 10th occurence of report | ||||
last_occurences_10 = int(math.floor(report_group.occurences / 10)) | ||||
r153 | curr_occurences_10 = int( | |||
math.floor((report_group.occurences + report.occurences) / 10) | ||||
) | ||||
last_occurences_100 = int(math.floor(report_group.occurences / 100)) | ||||
curr_occurences_100 = int( | ||||
math.floor((report_group.occurences + report.occurences) / 100) | ||||
) | ||||
r0 | notify_occurences_10 = last_occurences_10 != curr_occurences_10 | |||
notify_occurences_100 = last_occurences_100 != curr_occurences_100 | ||||
report_group.occurences = ReportGroup.occurences + occurences | ||||
report_group.last_timestamp = report.start_time | ||||
report_group.summed_duration = ReportGroup.summed_duration + report.duration | ||||
summed_duration = ReportGroup.summed_duration + report.duration | ||||
summed_occurences = ReportGroup.occurences + occurences | ||||
report_group.average_duration = summed_duration / summed_occurences | ||||
report_group.run_postprocessing(report) | ||||
if added_details: | ||||
report_group.total_reports = ReportGroup.total_reports + 1 | ||||
report_group.last_report = report.id | ||||
r153 | report_group.set_notification_info( | |||
notify_10=notify_occurences_10, notify_100=notify_occurences_100 | ||||
) | ||||
r0 | DBSession.flush() | |||
report_group.get_report().notify_channel(report_group) | ||||
if report_group.partition_id not in es_report_group_docs: | ||||
es_report_group_docs[report_group.partition_id] = [] | ||||
es_report_group_docs[report_group.partition_id].append( | ||||
r153 | report_group.es_doc() | |||
) | ||||
r0 | ||||
r153 | action = "REPORT" | |||
log_msg = "%s: %s %s, client: %s, proto: %s" % ( | ||||
r0 | action, | |||
r153 | report_data.get("http_status", "unknown"), | |||
r0 | str(resource), | |||
r153 | report_data.get("client"), | |||
proto_version, | ||||
) | ||||
r0 | log.info(log_msg) | |||
total_reports = len(dataset) | ||||
r87 | redis_pipeline = Datastores.redis.pipeline(transaction=False) | |||
r153 | key = REDIS_KEYS["counters"]["reports_per_minute"].format(current_time) | |||
r87 | redis_pipeline.incr(key, total_reports) | |||
redis_pipeline.expire(key, 3600 * 24) | ||||
r153 | key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format( | |||
resource.owner_user_id, current_time | ||||
) | ||||
r94 | redis_pipeline.incr(key, total_reports) | |||
redis_pipeline.expire(key, 3600) | ||||
r153 | key = REDIS_KEYS["counters"]["reports_per_hour_per_app"].format( | |||
resource_id, current_time.replace(minute=0) | ||||
) | ||||
r87 | redis_pipeline.incr(key, total_reports) | |||
redis_pipeline.expire(key, 3600 * 24 * 7) | ||||
redis_pipeline.sadd( | ||||
r153 | REDIS_KEYS["apps_that_got_new_data_per_hour"].format( | |||
current_time.replace(minute=0) | ||||
), | ||||
resource_id, | ||||
) | ||||
r87 | redis_pipeline.execute() | |||
r0 | ||||
add_reports_es(es_report_group_docs, es_report_docs) | ||||
add_reports_slow_calls_es(es_slow_calls_docs) | ||||
add_reports_stats_rows_es(es_reports_stats_rows) | ||||
return True | ||||
except Exception as exc: | ||||
print_traceback(log) | ||||
r138 | if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]: | |||
raise | ||||
r0 | add_reports.retry(exc=exc) | |||
r47 | @celery.task(queue="es", default_retry_delay=600, max_retries=144) | |||
r0 | def add_reports_es(report_group_docs, report_docs): | |||
for k, v in report_group_docs.items(): | ||||
r168 | to_update = {"_index": k, "_type": "report"} | |||
r151 | [i.update(to_update) for i in v] | |||
elasticsearch.helpers.bulk(Datastores.es, v) | ||||
r0 | for k, v in report_docs.items(): | |||
r153 | to_update = {"_index": k, "_type": "report"} | |||
r151 | [i.update(to_update) for i in v] | |||
elasticsearch.helpers.bulk(Datastores.es, v) | ||||
r0 | ||||
r47 | @celery.task(queue="es", default_retry_delay=600, max_retries=144) | |||
r0 | def add_reports_slow_calls_es(es_docs): | |||
for k, v in es_docs.items(): | ||||
r153 | to_update = {"_index": k, "_type": "log"} | |||
r151 | [i.update(to_update) for i in v] | |||
elasticsearch.helpers.bulk(Datastores.es, v) | ||||
r0 | ||||
r47 | @celery.task(queue="es", default_retry_delay=600, max_retries=144) | |||
r0 | def add_reports_stats_rows_es(es_docs): | |||
for k, v in es_docs.items(): | ||||
r168 | to_update = {"_index": k, "_type": "report"} | |||
r151 | [i.update(to_update) for i in v] | |||
elasticsearch.helpers.bulk(Datastores.es, v) | ||||
r0 | ||||
r47 | @celery.task(queue="logs", default_retry_delay=600, max_retries=144) | |||
r87 | def add_logs(resource_id, request_params, dataset, **kwargs): | |||
r153 | proto_version = request_params.get("protocol_version") | |||
r0 | current_time = datetime.utcnow().replace(second=0, microsecond=0) | |||
try: | ||||
es_docs = collections.defaultdict(list) | ||||
r94 | resource = ApplicationService.by_id_cached()(resource_id) | |||
resource = DBSession.merge(resource, load=False) | ||||
r0 | ns_pairs = [] | |||
for entry in dataset: | ||||
# gather pk and ns so we can remove older versions of row later | ||||
r153 | if entry["primary_key"] is not None: | |||
ns_pairs.append({"pk": entry["primary_key"], "ns": entry["namespace"]}) | ||||
r0 | log_entry = Log() | |||
r94 | log_entry.set_data(entry, resource=resource) | |||
r0 | log_entry._skip_ft_index = True | |||
r94 | resource.logs.append(log_entry) | |||
r0 | DBSession.flush() | |||
# insert non pk rows first | ||||
r153 | if entry["primary_key"] is None: | |||
r0 | es_docs[log_entry.partition_id].append(log_entry.es_doc()) | |||
r168 | # 2nd pass to delete all log entries from db for same pk/ns pair | |||
r0 | if ns_pairs: | |||
ids_to_delete = [] | ||||
es_docs = collections.defaultdict(list) | ||||
es_docs_to_delete = collections.defaultdict(list) | ||||
found_pkey_logs = LogService.query_by_primary_key_and_namespace( | ||||
r153 | list_of_pairs=ns_pairs | |||
) | ||||
r0 | log_dict = {} | |||
for log_entry in found_pkey_logs: | ||||
log_key = (log_entry.primary_key, log_entry.namespace) | ||||
if log_key not in log_dict: | ||||
log_dict[log_key] = [] | ||||
log_dict[log_key].append(log_entry) | ||||
for ns, entry_list in log_dict.items(): | ||||
entry_list = sorted(entry_list, key=lambda x: x.timestamp) | ||||
# newest row needs to be indexed in es | ||||
log_entry = entry_list[-1] | ||||
# delete everything from pg and ES, leave the last row in pg | ||||
for e in entry_list[:-1]: | ||||
ids_to_delete.append(e.log_id) | ||||
es_docs_to_delete[e.partition_id].append(e.delete_hash) | ||||
r153 | es_docs_to_delete[log_entry.partition_id].append(log_entry.delete_hash) | |||
r0 | ||||
es_docs[log_entry.partition_id].append(log_entry.es_doc()) | ||||
if ids_to_delete: | ||||
r153 | query = DBSession.query(Log).filter(Log.log_id.in_(ids_to_delete)) | |||
r0 | query.delete(synchronize_session=False) | |||
if es_docs_to_delete: | ||||
# batch this to avoid problems with default ES bulk limits | ||||
for es_index in es_docs_to_delete.keys(): | ||||
for batch in in_batches(es_docs_to_delete[es_index], 20): | ||||
r153 | query = {"query": {"terms": {"delete_hash": batch}}} | |||
r0 | ||||
try: | ||||
r165 | Datastores.es.delete_by_query( | |||
r175 | index=es_index, | |||
doc_type="log", | ||||
body=query, | ||||
conflicts="proceed", | ||||
r153 | ) | |||
r151 | except elasticsearch.exceptions.NotFoundError as exc: | |||
r153 | msg = "skipping index {}".format(es_index) | |||
r52 | log.info(msg) | |||
r0 | ||||
total_logs = len(dataset) | ||||
r153 | log_msg = "LOG_NEW: %s, entries: %s, proto:%s" % ( | |||
r94 | str(resource), | |||
r0 | total_logs, | |||
r153 | proto_version, | |||
) | ||||
r0 | log.info(log_msg) | |||
# mark_changed(session) | ||||
r87 | redis_pipeline = Datastores.redis.pipeline(transaction=False) | |||
r153 | key = REDIS_KEYS["counters"]["logs_per_minute"].format(current_time) | |||
r87 | redis_pipeline.incr(key, total_logs) | |||
redis_pipeline.expire(key, 3600 * 24) | ||||
r153 | key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format( | |||
resource.owner_user_id, current_time | ||||
) | ||||
r94 | redis_pipeline.incr(key, total_logs) | |||
redis_pipeline.expire(key, 3600) | ||||
r153 | key = REDIS_KEYS["counters"]["logs_per_hour_per_app"].format( | |||
resource_id, current_time.replace(minute=0) | ||||
) | ||||
r87 | redis_pipeline.incr(key, total_logs) | |||
redis_pipeline.expire(key, 3600 * 24 * 7) | ||||
redis_pipeline.sadd( | ||||
r153 | REDIS_KEYS["apps_that_got_new_data_per_hour"].format( | |||
current_time.replace(minute=0) | ||||
), | ||||
resource_id, | ||||
) | ||||
r87 | redis_pipeline.execute() | |||
r0 | add_logs_es(es_docs) | |||
return True | ||||
except Exception as exc: | ||||
print_traceback(log) | ||||
r138 | if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]: | |||
raise | ||||
r0 | add_logs.retry(exc=exc) | |||
r47 | @celery.task(queue="es", default_retry_delay=600, max_retries=144) | |||
r0 | def add_logs_es(es_docs): | |||
for k, v in es_docs.items(): | ||||
r153 | to_update = {"_index": k, "_type": "log"} | |||
r151 | [i.update(to_update) for i in v] | |||
elasticsearch.helpers.bulk(Datastores.es, v) | ||||
r0 | ||||
r47 | @celery.task(queue="metrics", default_retry_delay=600, max_retries=144) | |||
r87 | def add_metrics(resource_id, request_params, dataset, proto_version): | |||
r0 | current_time = datetime.utcnow().replace(second=0, microsecond=0) | |||
try: | ||||
r94 | resource = ApplicationService.by_id_cached()(resource_id) | |||
resource = DBSession.merge(resource, load=False) | ||||
r0 | es_docs = [] | |||
rows = [] | ||||
for metric in dataset: | ||||
r153 | tags = dict(metric["tags"]) | |||
server_n = tags.get("server_name", metric["server_name"]).lower() | ||||
tags["server_name"] = server_n or "unknown" | ||||
r0 | new_metric = Metric( | |||
r153 | timestamp=metric["timestamp"], | |||
r94 | resource_id=resource.resource_id, | |||
r153 | namespace=metric["namespace"], | |||
tags=tags, | ||||
) | ||||
r0 | rows.append(new_metric) | |||
es_docs.append(new_metric.es_doc()) | ||||
session = DBSession() | ||||
session.bulk_save_objects(rows) | ||||
session.flush() | ||||
r153 | action = "METRICS" | |||
metrics_msg = "%s: %s, metrics: %s, proto:%s" % ( | ||||
r0 | action, | |||
r94 | str(resource), | |||
r0 | len(dataset), | |||
r153 | proto_version, | |||
r0 | ) | |||
log.info(metrics_msg) | ||||
mark_changed(session) | ||||
r87 | redis_pipeline = Datastores.redis.pipeline(transaction=False) | |||
r153 | key = REDIS_KEYS["counters"]["metrics_per_minute"].format(current_time) | |||
r87 | redis_pipeline.incr(key, len(rows)) | |||
redis_pipeline.expire(key, 3600 * 24) | ||||
r153 | key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format( | |||
resource.owner_user_id, current_time | ||||
) | ||||
r94 | redis_pipeline.incr(key, len(rows)) | |||
redis_pipeline.expire(key, 3600) | ||||
r153 | key = REDIS_KEYS["counters"]["metrics_per_hour_per_app"].format( | |||
resource_id, current_time.replace(minute=0) | ||||
) | ||||
r87 | redis_pipeline.incr(key, len(rows)) | |||
redis_pipeline.expire(key, 3600 * 24 * 7) | ||||
redis_pipeline.sadd( | ||||
r153 | REDIS_KEYS["apps_that_got_new_data_per_hour"].format( | |||
current_time.replace(minute=0) | ||||
), | ||||
resource_id, | ||||
) | ||||
r87 | redis_pipeline.execute() | |||
r0 | add_metrics_es(es_docs) | |||
return True | ||||
except Exception as exc: | ||||
print_traceback(log) | ||||
r138 | if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]: | |||
raise | ||||
r0 | add_metrics.retry(exc=exc) | |||
r47 | @celery.task(queue="es", default_retry_delay=600, max_retries=144) | |||
r0 | def add_metrics_es(es_docs): | |||
for doc in es_docs: | ||||
r153 | partition = "rcae_m_%s" % doc["timestamp"].strftime("%Y_%m_%d") | |||
Datastores.es.index(partition, "log", doc) | ||||
r0 | ||||
@celery.task(queue="default", default_retry_delay=5, max_retries=2) | ||||
r21 | def check_user_report_notifications(resource_id): | |||
since_when = datetime.utcnow() | ||||
r0 | try: | |||
request = get_current_request() | ||||
application = ApplicationService.by_id(resource_id) | ||||
if not application: | ||||
return | ||||
r153 | error_key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format( | |||
ReportType.error, resource_id | ||||
) | ||||
slow_key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format( | ||||
ReportType.slow, resource_id | ||||
) | ||||
r0 | error_group_ids = Datastores.redis.smembers(error_key) | |||
slow_group_ids = Datastores.redis.smembers(slow_key) | ||||
Datastores.redis.delete(error_key) | ||||
Datastores.redis.delete(slow_key) | ||||
err_gids = [int(g_id) for g_id in error_group_ids] | ||||
slow_gids = [int(g_id) for g_id in list(slow_group_ids)] | ||||
group_ids = err_gids + slow_gids | ||||
occurence_dict = {} | ||||
for g_id in group_ids: | ||||
r153 | key = REDIS_KEYS["counters"]["report_group_occurences"].format(g_id) | |||
r0 | val = Datastores.redis.get(key) | |||
Datastores.redis.delete(key) | ||||
if val: | ||||
occurence_dict[g_id] = int(val) | ||||
else: | ||||
occurence_dict[g_id] = 1 | ||||
report_groups = ReportGroupService.by_ids(group_ids) | ||||
report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref)) | ||||
ApplicationService.check_for_groups_alert( | ||||
r153 | application, | |||
"alert", | ||||
report_groups=report_groups, | ||||
occurence_dict=occurence_dict, | ||||
) | ||||
users = set( | ||||
[p.user for p in ResourceService.users_for_perm(application, "view")] | ||||
) | ||||
r0 | report_groups = report_groups.all() | |||
for user in users: | ||||
r153 | UserService.report_notify( | |||
user, | ||||
request, | ||||
application, | ||||
report_groups=report_groups, | ||||
occurence_dict=occurence_dict, | ||||
) | ||||
r0 | for group in report_groups: | |||
# marks report_groups as notified | ||||
if not group.notified: | ||||
group.notified = True | ||||
except Exception as exc: | ||||
print_traceback(log) | ||||
raise | ||||
r21 | @celery.task(queue="default", default_retry_delay=5, max_retries=2) | |||
def check_alerts(resource_id): | ||||
since_when = datetime.utcnow() | ||||
try: | ||||
request = get_current_request() | ||||
application = ApplicationService.by_id(resource_id) | ||||
if not application: | ||||
return | ||||
r153 | error_key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format( | |||
ReportType.error, resource_id | ||||
) | ||||
slow_key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format( | ||||
ReportType.slow, resource_id | ||||
) | ||||
r21 | error_group_ids = Datastores.redis.smembers(error_key) | |||
slow_group_ids = Datastores.redis.smembers(slow_key) | ||||
Datastores.redis.delete(error_key) | ||||
Datastores.redis.delete(slow_key) | ||||
err_gids = [int(g_id) for g_id in error_group_ids] | ||||
slow_gids = [int(g_id) for g_id in list(slow_group_ids)] | ||||
group_ids = err_gids + slow_gids | ||||
occurence_dict = {} | ||||
for g_id in group_ids: | ||||
r153 | key = REDIS_KEYS["counters"]["report_group_occurences_alerting"].format( | |||
g_id | ||||
) | ||||
r21 | val = Datastores.redis.get(key) | |||
Datastores.redis.delete(key) | ||||
if val: | ||||
occurence_dict[g_id] = int(val) | ||||
else: | ||||
occurence_dict[g_id] = 1 | ||||
report_groups = ReportGroupService.by_ids(group_ids) | ||||
report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref)) | ||||
ApplicationService.check_for_groups_alert( | ||||
r153 | application, | |||
"alert", | ||||
report_groups=report_groups, | ||||
occurence_dict=occurence_dict, | ||||
since_when=since_when, | ||||
) | ||||
r21 | except Exception as exc: | |||
print_traceback(log) | ||||
raise | ||||
r0 | @celery.task(queue="default", default_retry_delay=1, max_retries=2) | |||
r21 | def close_alerts(): | |||
r153 | log.warning("Checking alerts") | |||
r21 | since_when = datetime.utcnow() | |||
r0 | try: | |||
r153 | event_types = [ | |||
Event.types["error_report_alert"], | ||||
Event.types["slow_report_alert"], | ||||
] | ||||
statuses = [Event.statuses["active"]] | ||||
r0 | # get events older than 5 min | |||
events = EventService.by_type_and_status( | ||||
r153 | event_types, statuses, older_than=(since_when - timedelta(minutes=5)) | |||
) | ||||
r0 | for event in events: | |||
# see if we can close them | ||||
r153 | event.validate_or_close(since_when=(since_when - timedelta(minutes=1))) | |||
r0 | except Exception as exc: | |||
print_traceback(log) | ||||
raise | ||||
r47 | @celery.task(queue="default", default_retry_delay=600, max_retries=144) | |||
r0 | def update_tag_counter(tag_name, tag_value, count): | |||
try: | ||||
r153 | query = ( | |||
DBSession.query(Tag) | ||||
.filter(Tag.name == tag_name) | ||||
.filter( | ||||
sa.cast(Tag.value, sa.types.TEXT) | ||||
== sa.cast(json.dumps(tag_value), sa.types.TEXT) | ||||
) | ||||
) | ||||
query.update( | ||||
{"times_seen": Tag.times_seen + count, "last_timestamp": datetime.utcnow()}, | ||||
synchronize_session=False, | ||||
) | ||||
r0 | session = DBSession() | |||
mark_changed(session) | ||||
return True | ||||
except Exception as exc: | ||||
print_traceback(log) | ||||
r138 | if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]: | |||
raise | ||||
r0 | update_tag_counter.retry(exc=exc) | |||
@celery.task(queue="default") | ||||
def update_tag_counters(): | ||||
""" | ||||
Sets task to update counters for application tags | ||||
""" | ||||
r153 | tags = Datastores.redis.lrange(REDIS_KEYS["seen_tag_list"], 0, -1) | |||
Datastores.redis.delete(REDIS_KEYS["seen_tag_list"]) | ||||
r0 | c = collections.Counter(tags) | |||
for t_json, count in c.items(): | ||||
tag_info = json.loads(t_json) | ||||
update_tag_counter.delay(tag_info[0], tag_info[1], count) | ||||
@celery.task(queue="default") | ||||
def daily_digest(): | ||||
""" | ||||
Sends daily digest with top 50 error reports | ||||
""" | ||||
request = get_current_request() | ||||
r153 | apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports"]) | |||
Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports"]) | ||||
r0 | since_when = datetime.utcnow() - timedelta(hours=8) | |||
r153 | log.warning("Generating daily digests") | |||
r0 | for resource_id in apps: | |||
r153 | resource_id = resource_id.decode("utf8") | |||
r0 | end_date = datetime.utcnow().replace(microsecond=0, second=0) | |||
r153 | filter_settings = { | |||
"resource": [resource_id], | ||||
"tags": [{"name": "type", "value": ["error"], "op": None}], | ||||
"type": "error", | ||||
"start_date": since_when, | ||||
"end_date": end_date, | ||||
} | ||||
r0 | ||||
reports = ReportGroupService.get_trending( | ||||
r153 | request, filter_settings=filter_settings, limit=50 | |||
) | ||||
r0 | ||||
application = ApplicationService.by_id(resource_id) | ||||
if application: | ||||
r153 | users = set( | |||
[p.user for p in ResourceService.users_for_perm(application, "view")] | ||||
) | ||||
r0 | for user in users: | |||
r153 | user.send_digest( | |||
request, application, reports=reports, since_when=since_when | ||||
) | ||||
r0 | ||||
@celery.task(queue="default") | ||||
r21 | def notifications_reports(): | |||
r0 | """ | |||
Loop that checks redis for info and then issues new tasks to celery to | ||||
r21 | issue notifications | |||
r0 | """ | |||
r153 | apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports"]) | |||
Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports"]) | ||||
r0 | for app in apps: | |||
r153 | log.warning("Notify for app: %s" % app) | |||
check_user_report_notifications.delay(app.decode("utf8")) | ||||
r21 | ||||
@celery.task(queue="default") | ||||
def alerting_reports(): | ||||
""" | ||||
Loop that checks redis for info and then issues new tasks to celery to | ||||
perform the following: | ||||
- which applications should have new alerts opened | ||||
""" | ||||
r153 | apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports_alerting"]) | |||
Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports_alerting"]) | ||||
r21 | for app in apps: | |||
r153 | log.warning("Notify for app: %s" % app) | |||
check_alerts.delay(app.decode("utf8")) | ||||
r0 | ||||
r153 | @celery.task( | |||
queue="default", soft_time_limit=3600 * 4, hard_time_limit=3600 * 4, max_retries=144 | ||||
) | ||||
r0 | def logs_cleanup(resource_id, filter_settings): | |||
request = get_current_request() | ||||
request.tm.begin() | ||||
r175 | es_query = {"query": {"bool": {"filter": [{"term": {"resource_id": resource_id}}]}}} | |||
r0 | ||||
query = DBSession.query(Log).filter(Log.resource_id == resource_id) | ||||
r153 | if filter_settings["namespace"]: | |||
query = query.filter(Log.namespace == filter_settings["namespace"][0]) | ||||
r157 | es_query["query"]["bool"]["filter"].append( | |||
r153 | {"term": {"namespace": filter_settings["namespace"][0]}} | |||
r0 | ) | |||
query.delete(synchronize_session=False) | ||||
request.tm.commit() | ||||
r165 | Datastores.es.delete_by_query( | |||
r175 | index="rcae_l_*", doc_type="log", body=es_query, conflicts="proceed" | |||
r153 | ) | |||