##// END OF EJS Templates
setup: change url to github
setup: change url to github

File last commit:

r175:765d965d
r196:472d1df0 master
Show More
tasks.py
705 lines | 27.1 KiB | text/x-python | PythonLexer
project: initial commit
r0 # -*- coding: utf-8 -*-
license: change the license to Apache 2.0
r112 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
project: initial commit
r0 #
license: change the license to Apache 2.0
r112 # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
project: initial commit
r0 #
license: change the license to Apache 2.0
r112 # http://www.apache.org/licenses/LICENSE-2.0
project: initial commit
r0 #
license: change the license to Apache 2.0
r112 # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
project: initial commit
r0
import bisect
import collections
import math
from datetime import datetime, timedelta
import sqlalchemy as sa
elasticsearch: replace pyelasticsearch with elasticsearch
r151 import elasticsearch.exceptions
import elasticsearch.helpers
project: initial commit
r0
from celery.utils.log import get_task_logger
from zope.sqlalchemy import mark_changed
from pyramid.threadlocal import get_current_request, get_current_registry
requirements: bump ziggurat_foundations to 0.8.3
r135 from ziggurat_foundations.models.services.resource import ResourceService
project: initial commit
r0 from appenlight.celery import celery
from appenlight.models.report_group import ReportGroup
from appenlight.models import DBSession, Datastores
from appenlight.models.report import Report
from appenlight.models.log import Log
metrics: rename RequestMetric module to Metric
r49 from appenlight.models.metric import Metric
project: initial commit
r0 from appenlight.models.event import Event
from appenlight.models.services.application import ApplicationService
from appenlight.models.services.event import EventService
from appenlight.models.services.log import LogService
from appenlight.models.services.report import ReportService
from appenlight.models.services.report_group import ReportGroupService
from appenlight.models.services.user import UserService
from appenlight.models.tag import Tag
from appenlight.lib import print_traceback
from appenlight.lib.utils import parse_proto, in_batches
from appenlight.lib.ext_json import json
from appenlight.lib.redis_keys import REDIS_KEYS
from appenlight.lib.enums import ReportType
log = get_task_logger(__name__)
black: reformat source
r153 sample_boundries = (
list(range(100, 1000, 100))
+ list(range(1000, 10000, 1000))
+ list(range(10000, 100000, 5000))
)
project: initial commit
r0
tasks: change sampling rates for something better suited for big amounts of data
r37 def pick_sample(total_occurences, report_type=None):
project: initial commit
r0 every = 1.0
position = bisect.bisect_left(sample_boundries, total_occurences)
if position > 0:
tasks: change sampling rates for something better suited for big amounts of data
r37 if report_type == ReportType.not_found:
project: initial commit
r0 divide = 10.0
else:
divide = 100.0
every = sample_boundries[position - 1] / divide
return total_occurences % every == 0
@celery.task(queue="default", default_retry_delay=1, max_retries=2)
def test_exception_task():
black: reformat source
r153 log.error("test celery log", extra={"location": "celery"})
log.warning("test celery log", extra={"location": "celery"})
raise Exception("Celery exception test")
project: initial commit
r0
@celery.task(queue="default", default_retry_delay=1, max_retries=2)
def test_retry_exception_task():
try:
import time
time.sleep(1.3)
black: reformat source
r153 log.error("test retry celery log", extra={"location": "celery"})
log.warning("test retry celery log", extra={"location": "celery"})
raise Exception("Celery exception test")
project: initial commit
r0 except Exception as exc:
tasks: when CELERY_EAGER_PROPAGATES_EXCEPTIONS is true do not retry tasks
r138 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
raise
project: initial commit
r0 test_retry_exception_task.retry(exc=exc)
tasks: change retry amount
r47 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
redis: some cleanups and use of pipelines for better performance
r87 def add_reports(resource_id, request_params, dataset, **kwargs):
black: reformat source
r153 proto_version = parse_proto(request_params.get("protocol_version", ""))
project: initial commit
r0 current_time = datetime.utcnow().replace(second=0, microsecond=0)
try:
# we will store solr docs here for single insert
es_report_docs = {}
es_report_group_docs = {}
resource = ApplicationService.by_id(resource_id)
tags = []
es_slow_calls_docs = {}
es_reports_stats_rows = {}
validation: remove 0.4 report validators
r39 for report_data in dataset:
project: initial commit
r0 # build report details for later
added_details = 0
report = Report()
report.set_data(report_data, resource, proto_version)
report._skip_ft_index = True
reports: enforce more uniform distribution of reports between partitions
r108 # find latest group in this months partition
project: initial commit
r0 report_group = ReportGroupService.by_hash_and_resource(
report.resource_id,
reports: enforce more uniform distribution of reports between partitions
r108 report.grouping_hash,
black: reformat source
r153 since_when=datetime.utcnow().date().replace(day=1),
project: initial commit
r0 )
black: reformat source
r153 occurences = report_data.get("occurences", 1)
project: initial commit
r0 if not report_group:
# total reports will be +1 moment later
black: reformat source
r153 report_group = ReportGroup(
grouping_hash=report.grouping_hash,
occurences=0,
total_reports=0,
last_report=0,
priority=report.priority,
error=report.error,
first_timestamp=report.start_time,
)
project: initial commit
r0 report_group._skip_ft_index = True
report_group.report_type = report.report_type
report.report_group_time = report_group.first_timestamp
black: reformat source
r153 add_sample = pick_sample(
report_group.occurences, report_type=report_group.report_type
)
project: initial commit
r0 if add_sample:
resource.report_groups.append(report_group)
report_group.reports.append(report)
added_details += 1
DBSession.flush()
if report.partition_id not in es_report_docs:
es_report_docs[report.partition_id] = []
es_report_docs[report.partition_id].append(report.es_doc())
tags.extend(list(report.tags.items()))
slow_calls = report.add_slow_calls(report_data, report_group)
DBSession.flush()
for s_call in slow_calls:
if s_call.partition_id not in es_slow_calls_docs:
es_slow_calls_docs[s_call.partition_id] = []
black: reformat source
r153 es_slow_calls_docs[s_call.partition_id].append(s_call.es_doc())
project: initial commit
r0 # try generating new stat rows if needed
else:
# required for postprocessing to not fail later
report.report_group = report_group
black: reformat source
r153 stat_row = ReportService.generate_stat_rows(report, resource, report_group)
project: initial commit
r0 if stat_row.partition_id not in es_reports_stats_rows:
es_reports_stats_rows[stat_row.partition_id] = []
black: reformat source
r153 es_reports_stats_rows[stat_row.partition_id].append(stat_row.es_doc())
project: initial commit
r0
# see if we should mark 10th occurence of report
last_occurences_10 = int(math.floor(report_group.occurences / 10))
black: reformat source
r153 curr_occurences_10 = int(
math.floor((report_group.occurences + report.occurences) / 10)
)
last_occurences_100 = int(math.floor(report_group.occurences / 100))
curr_occurences_100 = int(
math.floor((report_group.occurences + report.occurences) / 100)
)
project: initial commit
r0 notify_occurences_10 = last_occurences_10 != curr_occurences_10
notify_occurences_100 = last_occurences_100 != curr_occurences_100
report_group.occurences = ReportGroup.occurences + occurences
report_group.last_timestamp = report.start_time
report_group.summed_duration = ReportGroup.summed_duration + report.duration
summed_duration = ReportGroup.summed_duration + report.duration
summed_occurences = ReportGroup.occurences + occurences
report_group.average_duration = summed_duration / summed_occurences
report_group.run_postprocessing(report)
if added_details:
report_group.total_reports = ReportGroup.total_reports + 1
report_group.last_report = report.id
black: reformat source
r153 report_group.set_notification_info(
notify_10=notify_occurences_10, notify_100=notify_occurences_100
)
project: initial commit
r0 DBSession.flush()
report_group.get_report().notify_channel(report_group)
if report_group.partition_id not in es_report_group_docs:
es_report_group_docs[report_group.partition_id] = []
es_report_group_docs[report_group.partition_id].append(
black: reformat source
r153 report_group.es_doc()
)
project: initial commit
r0
black: reformat source
r153 action = "REPORT"
log_msg = "%s: %s %s, client: %s, proto: %s" % (
project: initial commit
r0 action,
black: reformat source
r153 report_data.get("http_status", "unknown"),
project: initial commit
r0 str(resource),
black: reformat source
r153 report_data.get("client"),
proto_version,
)
project: initial commit
r0 log.info(log_msg)
total_reports = len(dataset)
redis: some cleanups and use of pipelines for better performance
r87 redis_pipeline = Datastores.redis.pipeline(transaction=False)
black: reformat source
r153 key = REDIS_KEYS["counters"]["reports_per_minute"].format(current_time)
redis: some cleanups and use of pipelines for better performance
r87 redis_pipeline.incr(key, total_reports)
redis_pipeline.expire(key, 3600 * 24)
black: reformat source
r153 key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
resource.owner_user_id, current_time
)
tasks: better counters
r94 redis_pipeline.incr(key, total_reports)
redis_pipeline.expire(key, 3600)
black: reformat source
r153 key = REDIS_KEYS["counters"]["reports_per_hour_per_app"].format(
resource_id, current_time.replace(minute=0)
)
redis: some cleanups and use of pipelines for better performance
r87 redis_pipeline.incr(key, total_reports)
redis_pipeline.expire(key, 3600 * 24 * 7)
redis_pipeline.sadd(
black: reformat source
r153 REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
current_time.replace(minute=0)
),
resource_id,
)
redis: some cleanups and use of pipelines for better performance
r87 redis_pipeline.execute()
project: initial commit
r0
add_reports_es(es_report_group_docs, es_report_docs)
add_reports_slow_calls_es(es_slow_calls_docs)
add_reports_stats_rows_es(es_reports_stats_rows)
return True
except Exception as exc:
print_traceback(log)
tasks: when CELERY_EAGER_PROPAGATES_EXCEPTIONS is true do not retry tasks
r138 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
raise
project: initial commit
r0 add_reports.retry(exc=exc)
tasks: change retry amount
r47 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
project: initial commit
r0 def add_reports_es(report_group_docs, report_docs):
for k, v in report_group_docs.items():
elasticsearch: move to single doctype indices
r168 to_update = {"_index": k, "_type": "report"}
elasticsearch: replace pyelasticsearch with elasticsearch
r151 [i.update(to_update) for i in v]
elasticsearch.helpers.bulk(Datastores.es, v)
project: initial commit
r0 for k, v in report_docs.items():
black: reformat source
r153 to_update = {"_index": k, "_type": "report"}
elasticsearch: replace pyelasticsearch with elasticsearch
r151 [i.update(to_update) for i in v]
elasticsearch.helpers.bulk(Datastores.es, v)
project: initial commit
r0
tasks: change retry amount
r47 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
project: initial commit
r0 def add_reports_slow_calls_es(es_docs):
for k, v in es_docs.items():
black: reformat source
r153 to_update = {"_index": k, "_type": "log"}
elasticsearch: replace pyelasticsearch with elasticsearch
r151 [i.update(to_update) for i in v]
elasticsearch.helpers.bulk(Datastores.es, v)
project: initial commit
r0
tasks: change retry amount
r47 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
project: initial commit
r0 def add_reports_stats_rows_es(es_docs):
for k, v in es_docs.items():
elasticsearch: move to single doctype indices
r168 to_update = {"_index": k, "_type": "report"}
elasticsearch: replace pyelasticsearch with elasticsearch
r151 [i.update(to_update) for i in v]
elasticsearch.helpers.bulk(Datastores.es, v)
project: initial commit
r0
tasks: change retry amount
r47 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
redis: some cleanups and use of pipelines for better performance
r87 def add_logs(resource_id, request_params, dataset, **kwargs):
black: reformat source
r153 proto_version = request_params.get("protocol_version")
project: initial commit
r0 current_time = datetime.utcnow().replace(second=0, microsecond=0)
try:
es_docs = collections.defaultdict(list)
tasks: better counters
r94 resource = ApplicationService.by_id_cached()(resource_id)
resource = DBSession.merge(resource, load=False)
project: initial commit
r0 ns_pairs = []
for entry in dataset:
# gather pk and ns so we can remove older versions of row later
black: reformat source
r153 if entry["primary_key"] is not None:
ns_pairs.append({"pk": entry["primary_key"], "ns": entry["namespace"]})
project: initial commit
r0 log_entry = Log()
tasks: better counters
r94 log_entry.set_data(entry, resource=resource)
project: initial commit
r0 log_entry._skip_ft_index = True
tasks: better counters
r94 resource.logs.append(log_entry)
project: initial commit
r0 DBSession.flush()
# insert non pk rows first
black: reformat source
r153 if entry["primary_key"] is None:
project: initial commit
r0 es_docs[log_entry.partition_id].append(log_entry.es_doc())
elasticsearch: move to single doctype indices
r168 # 2nd pass to delete all log entries from db for same pk/ns pair
project: initial commit
r0 if ns_pairs:
ids_to_delete = []
es_docs = collections.defaultdict(list)
es_docs_to_delete = collections.defaultdict(list)
found_pkey_logs = LogService.query_by_primary_key_and_namespace(
black: reformat source
r153 list_of_pairs=ns_pairs
)
project: initial commit
r0 log_dict = {}
for log_entry in found_pkey_logs:
log_key = (log_entry.primary_key, log_entry.namespace)
if log_key not in log_dict:
log_dict[log_key] = []
log_dict[log_key].append(log_entry)
for ns, entry_list in log_dict.items():
entry_list = sorted(entry_list, key=lambda x: x.timestamp)
# newest row needs to be indexed in es
log_entry = entry_list[-1]
# delete everything from pg and ES, leave the last row in pg
for e in entry_list[:-1]:
ids_to_delete.append(e.log_id)
es_docs_to_delete[e.partition_id].append(e.delete_hash)
black: reformat source
r153 es_docs_to_delete[log_entry.partition_id].append(log_entry.delete_hash)
project: initial commit
r0
es_docs[log_entry.partition_id].append(log_entry.es_doc())
if ids_to_delete:
black: reformat source
r153 query = DBSession.query(Log).filter(Log.log_id.in_(ids_to_delete))
project: initial commit
r0 query.delete(synchronize_session=False)
if es_docs_to_delete:
# batch this to avoid problems with default ES bulk limits
for es_index in es_docs_to_delete.keys():
for batch in in_batches(es_docs_to_delete[es_index], 20):
black: reformat source
r153 query = {"query": {"terms": {"delete_hash": batch}}}
project: initial commit
r0
try:
elasticsearch: migrate to ES 5.x
r165 Datastores.es.delete_by_query(
reformat: black
r175 index=es_index,
doc_type="log",
body=query,
conflicts="proceed",
black: reformat source
r153 )
elasticsearch: replace pyelasticsearch with elasticsearch
r151 except elasticsearch.exceptions.NotFoundError as exc:
black: reformat source
r153 msg = "skipping index {}".format(es_index)
logs: better log message
r52 log.info(msg)
project: initial commit
r0
total_logs = len(dataset)
black: reformat source
r153 log_msg = "LOG_NEW: %s, entries: %s, proto:%s" % (
tasks: better counters
r94 str(resource),
project: initial commit
r0 total_logs,
black: reformat source
r153 proto_version,
)
project: initial commit
r0 log.info(log_msg)
# mark_changed(session)
redis: some cleanups and use of pipelines for better performance
r87 redis_pipeline = Datastores.redis.pipeline(transaction=False)
black: reformat source
r153 key = REDIS_KEYS["counters"]["logs_per_minute"].format(current_time)
redis: some cleanups and use of pipelines for better performance
r87 redis_pipeline.incr(key, total_logs)
redis_pipeline.expire(key, 3600 * 24)
black: reformat source
r153 key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
resource.owner_user_id, current_time
)
tasks: better counters
r94 redis_pipeline.incr(key, total_logs)
redis_pipeline.expire(key, 3600)
black: reformat source
r153 key = REDIS_KEYS["counters"]["logs_per_hour_per_app"].format(
resource_id, current_time.replace(minute=0)
)
redis: some cleanups and use of pipelines for better performance
r87 redis_pipeline.incr(key, total_logs)
redis_pipeline.expire(key, 3600 * 24 * 7)
redis_pipeline.sadd(
black: reformat source
r153 REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
current_time.replace(minute=0)
),
resource_id,
)
redis: some cleanups and use of pipelines for better performance
r87 redis_pipeline.execute()
project: initial commit
r0 add_logs_es(es_docs)
return True
except Exception as exc:
print_traceback(log)
tasks: when CELERY_EAGER_PROPAGATES_EXCEPTIONS is true do not retry tasks
r138 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
raise
project: initial commit
r0 add_logs.retry(exc=exc)
tasks: change retry amount
r47 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
project: initial commit
r0 def add_logs_es(es_docs):
for k, v in es_docs.items():
black: reformat source
r153 to_update = {"_index": k, "_type": "log"}
elasticsearch: replace pyelasticsearch with elasticsearch
r151 [i.update(to_update) for i in v]
elasticsearch.helpers.bulk(Datastores.es, v)
project: initial commit
r0
tasks: change retry amount
r47 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
redis: some cleanups and use of pipelines for better performance
r87 def add_metrics(resource_id, request_params, dataset, proto_version):
project: initial commit
r0 current_time = datetime.utcnow().replace(second=0, microsecond=0)
try:
tasks: better counters
r94 resource = ApplicationService.by_id_cached()(resource_id)
resource = DBSession.merge(resource, load=False)
project: initial commit
r0 es_docs = []
rows = []
for metric in dataset:
black: reformat source
r153 tags = dict(metric["tags"])
server_n = tags.get("server_name", metric["server_name"]).lower()
tags["server_name"] = server_n or "unknown"
project: initial commit
r0 new_metric = Metric(
black: reformat source
r153 timestamp=metric["timestamp"],
tasks: better counters
r94 resource_id=resource.resource_id,
black: reformat source
r153 namespace=metric["namespace"],
tags=tags,
)
project: initial commit
r0 rows.append(new_metric)
es_docs.append(new_metric.es_doc())
session = DBSession()
session.bulk_save_objects(rows)
session.flush()
black: reformat source
r153 action = "METRICS"
metrics_msg = "%s: %s, metrics: %s, proto:%s" % (
project: initial commit
r0 action,
tasks: better counters
r94 str(resource),
project: initial commit
r0 len(dataset),
black: reformat source
r153 proto_version,
project: initial commit
r0 )
log.info(metrics_msg)
mark_changed(session)
redis: some cleanups and use of pipelines for better performance
r87 redis_pipeline = Datastores.redis.pipeline(transaction=False)
black: reformat source
r153 key = REDIS_KEYS["counters"]["metrics_per_minute"].format(current_time)
redis: some cleanups and use of pipelines for better performance
r87 redis_pipeline.incr(key, len(rows))
redis_pipeline.expire(key, 3600 * 24)
black: reformat source
r153 key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
resource.owner_user_id, current_time
)
tasks: better counters
r94 redis_pipeline.incr(key, len(rows))
redis_pipeline.expire(key, 3600)
black: reformat source
r153 key = REDIS_KEYS["counters"]["metrics_per_hour_per_app"].format(
resource_id, current_time.replace(minute=0)
)
redis: some cleanups and use of pipelines for better performance
r87 redis_pipeline.incr(key, len(rows))
redis_pipeline.expire(key, 3600 * 24 * 7)
redis_pipeline.sadd(
black: reformat source
r153 REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
current_time.replace(minute=0)
),
resource_id,
)
redis: some cleanups and use of pipelines for better performance
r87 redis_pipeline.execute()
project: initial commit
r0 add_metrics_es(es_docs)
return True
except Exception as exc:
print_traceback(log)
tasks: when CELERY_EAGER_PROPAGATES_EXCEPTIONS is true do not retry tasks
r138 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
raise
project: initial commit
r0 add_metrics.retry(exc=exc)
tasks: change retry amount
r47 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
project: initial commit
r0 def add_metrics_es(es_docs):
for doc in es_docs:
black: reformat source
r153 partition = "rcae_m_%s" % doc["timestamp"].strftime("%Y_%m_%d")
Datastores.es.index(partition, "log", doc)
project: initial commit
r0
@celery.task(queue="default", default_retry_delay=5, max_retries=2)
celery: decouple report notifications from alerts
r21 def check_user_report_notifications(resource_id):
since_when = datetime.utcnow()
project: initial commit
r0 try:
request = get_current_request()
application = ApplicationService.by_id(resource_id)
if not application:
return
black: reformat source
r153 error_key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format(
ReportType.error, resource_id
)
slow_key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format(
ReportType.slow, resource_id
)
project: initial commit
r0 error_group_ids = Datastores.redis.smembers(error_key)
slow_group_ids = Datastores.redis.smembers(slow_key)
Datastores.redis.delete(error_key)
Datastores.redis.delete(slow_key)
err_gids = [int(g_id) for g_id in error_group_ids]
slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
group_ids = err_gids + slow_gids
occurence_dict = {}
for g_id in group_ids:
black: reformat source
r153 key = REDIS_KEYS["counters"]["report_group_occurences"].format(g_id)
project: initial commit
r0 val = Datastores.redis.get(key)
Datastores.redis.delete(key)
if val:
occurence_dict[g_id] = int(val)
else:
occurence_dict[g_id] = 1
report_groups = ReportGroupService.by_ids(group_ids)
report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
ApplicationService.check_for_groups_alert(
black: reformat source
r153 application,
"alert",
report_groups=report_groups,
occurence_dict=occurence_dict,
)
users = set(
[p.user for p in ResourceService.users_for_perm(application, "view")]
)
project: initial commit
r0 report_groups = report_groups.all()
for user in users:
black: reformat source
r153 UserService.report_notify(
user,
request,
application,
report_groups=report_groups,
occurence_dict=occurence_dict,
)
project: initial commit
r0 for group in report_groups:
# marks report_groups as notified
if not group.notified:
group.notified = True
except Exception as exc:
print_traceback(log)
raise
celery: decouple report notifications from alerts
r21 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
def check_alerts(resource_id):
since_when = datetime.utcnow()
try:
request = get_current_request()
application = ApplicationService.by_id(resource_id)
if not application:
return
black: reformat source
r153 error_key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format(
ReportType.error, resource_id
)
slow_key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format(
ReportType.slow, resource_id
)
celery: decouple report notifications from alerts
r21 error_group_ids = Datastores.redis.smembers(error_key)
slow_group_ids = Datastores.redis.smembers(slow_key)
Datastores.redis.delete(error_key)
Datastores.redis.delete(slow_key)
err_gids = [int(g_id) for g_id in error_group_ids]
slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
group_ids = err_gids + slow_gids
occurence_dict = {}
for g_id in group_ids:
black: reformat source
r153 key = REDIS_KEYS["counters"]["report_group_occurences_alerting"].format(
g_id
)
celery: decouple report notifications from alerts
r21 val = Datastores.redis.get(key)
Datastores.redis.delete(key)
if val:
occurence_dict[g_id] = int(val)
else:
occurence_dict[g_id] = 1
report_groups = ReportGroupService.by_ids(group_ids)
report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
ApplicationService.check_for_groups_alert(
black: reformat source
r153 application,
"alert",
report_groups=report_groups,
occurence_dict=occurence_dict,
since_when=since_when,
)
celery: decouple report notifications from alerts
r21 except Exception as exc:
print_traceback(log)
raise
project: initial commit
r0 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
celery: decouple report notifications from alerts
r21 def close_alerts():
black: reformat source
r153 log.warning("Checking alerts")
celery: decouple report notifications from alerts
r21 since_when = datetime.utcnow()
project: initial commit
r0 try:
black: reformat source
r153 event_types = [
Event.types["error_report_alert"],
Event.types["slow_report_alert"],
]
statuses = [Event.statuses["active"]]
project: initial commit
r0 # get events older than 5 min
events = EventService.by_type_and_status(
black: reformat source
r153 event_types, statuses, older_than=(since_when - timedelta(minutes=5))
)
project: initial commit
r0 for event in events:
# see if we can close them
black: reformat source
r153 event.validate_or_close(since_when=(since_when - timedelta(minutes=1)))
project: initial commit
r0 except Exception as exc:
print_traceback(log)
raise
tasks: change retry amount
r47 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
project: initial commit
r0 def update_tag_counter(tag_name, tag_value, count):
try:
black: reformat source
r153 query = (
DBSession.query(Tag)
.filter(Tag.name == tag_name)
.filter(
sa.cast(Tag.value, sa.types.TEXT)
== sa.cast(json.dumps(tag_value), sa.types.TEXT)
)
)
query.update(
{"times_seen": Tag.times_seen + count, "last_timestamp": datetime.utcnow()},
synchronize_session=False,
)
project: initial commit
r0 session = DBSession()
mark_changed(session)
return True
except Exception as exc:
print_traceback(log)
tasks: when CELERY_EAGER_PROPAGATES_EXCEPTIONS is true do not retry tasks
r138 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
raise
project: initial commit
r0 update_tag_counter.retry(exc=exc)
@celery.task(queue="default")
def update_tag_counters():
"""
Sets task to update counters for application tags
"""
black: reformat source
r153 tags = Datastores.redis.lrange(REDIS_KEYS["seen_tag_list"], 0, -1)
Datastores.redis.delete(REDIS_KEYS["seen_tag_list"])
project: initial commit
r0 c = collections.Counter(tags)
for t_json, count in c.items():
tag_info = json.loads(t_json)
update_tag_counter.delay(tag_info[0], tag_info[1], count)
@celery.task(queue="default")
def daily_digest():
"""
Sends daily digest with top 50 error reports
"""
request = get_current_request()
black: reformat source
r153 apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports"])
Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports"])
project: initial commit
r0 since_when = datetime.utcnow() - timedelta(hours=8)
black: reformat source
r153 log.warning("Generating daily digests")
project: initial commit
r0 for resource_id in apps:
black: reformat source
r153 resource_id = resource_id.decode("utf8")
project: initial commit
r0 end_date = datetime.utcnow().replace(microsecond=0, second=0)
black: reformat source
r153 filter_settings = {
"resource": [resource_id],
"tags": [{"name": "type", "value": ["error"], "op": None}],
"type": "error",
"start_date": since_when,
"end_date": end_date,
}
project: initial commit
r0
reports = ReportGroupService.get_trending(
black: reformat source
r153 request, filter_settings=filter_settings, limit=50
)
project: initial commit
r0
application = ApplicationService.by_id(resource_id)
if application:
black: reformat source
r153 users = set(
[p.user for p in ResourceService.users_for_perm(application, "view")]
)
project: initial commit
r0 for user in users:
black: reformat source
r153 user.send_digest(
request, application, reports=reports, since_when=since_when
)
project: initial commit
r0
@celery.task(queue="default")
celery: decouple report notifications from alerts
r21 def notifications_reports():
project: initial commit
r0 """
Loop that checks redis for info and then issues new tasks to celery to
celery: decouple report notifications from alerts
r21 issue notifications
project: initial commit
r0 """
black: reformat source
r153 apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports"])
Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports"])
project: initial commit
r0 for app in apps:
black: reformat source
r153 log.warning("Notify for app: %s" % app)
check_user_report_notifications.delay(app.decode("utf8"))
celery: decouple report notifications from alerts
r21
@celery.task(queue="default")
def alerting_reports():
"""
Loop that checks redis for info and then issues new tasks to celery to
perform the following:
- which applications should have new alerts opened
"""
black: reformat source
r153 apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports_alerting"])
Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports_alerting"])
celery: decouple report notifications from alerts
r21 for app in apps:
black: reformat source
r153 log.warning("Notify for app: %s" % app)
check_alerts.delay(app.decode("utf8"))
project: initial commit
r0
black: reformat source
r153 @celery.task(
queue="default", soft_time_limit=3600 * 4, hard_time_limit=3600 * 4, max_retries=144
)
project: initial commit
r0 def logs_cleanup(resource_id, filter_settings):
request = get_current_request()
request.tm.begin()
reformat: black
r175 es_query = {"query": {"bool": {"filter": [{"term": {"resource_id": resource_id}}]}}}
project: initial commit
r0
query = DBSession.query(Log).filter(Log.resource_id == resource_id)
black: reformat source
r153 if filter_settings["namespace"]:
query = query.filter(Log.namespace == filter_settings["namespace"][0])
normalize bool.filter format for elasticsearch 5.x
r157 es_query["query"]["bool"]["filter"].append(
black: reformat source
r153 {"term": {"namespace": filter_settings["namespace"][0]}}
project: initial commit
r0 )
query.delete(synchronize_session=False)
request.tm.commit()
elasticsearch: migrate to ES 5.x
r165 Datastores.es.delete_by_query(
reformat: black
r175 index="rcae_l_*", doc_type="log", body=es_query, conflicts="proceed"
black: reformat source
r153 )