##// END OF EJS Templates
setup: change url to github
setup: change url to github

File last commit:

r175:765d965d
r196:472d1df0 master
Show More
tasks.py
705 lines | 27.1 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
# Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import bisect
import collections
import math
from datetime import datetime, timedelta
import sqlalchemy as sa
import elasticsearch.exceptions
import elasticsearch.helpers
from celery.utils.log import get_task_logger
from zope.sqlalchemy import mark_changed
from pyramid.threadlocal import get_current_request, get_current_registry
from ziggurat_foundations.models.services.resource import ResourceService
from appenlight.celery import celery
from appenlight.models.report_group import ReportGroup
from appenlight.models import DBSession, Datastores
from appenlight.models.report import Report
from appenlight.models.log import Log
from appenlight.models.metric import Metric
from appenlight.models.event import Event
from appenlight.models.services.application import ApplicationService
from appenlight.models.services.event import EventService
from appenlight.models.services.log import LogService
from appenlight.models.services.report import ReportService
from appenlight.models.services.report_group import ReportGroupService
from appenlight.models.services.user import UserService
from appenlight.models.tag import Tag
from appenlight.lib import print_traceback
from appenlight.lib.utils import parse_proto, in_batches
from appenlight.lib.ext_json import json
from appenlight.lib.redis_keys import REDIS_KEYS
from appenlight.lib.enums import ReportType
log = get_task_logger(__name__)
sample_boundries = (
list(range(100, 1000, 100))
+ list(range(1000, 10000, 1000))
+ list(range(10000, 100000, 5000))
)
def pick_sample(total_occurences, report_type=None):
every = 1.0
position = bisect.bisect_left(sample_boundries, total_occurences)
if position > 0:
if report_type == ReportType.not_found:
divide = 10.0
else:
divide = 100.0
every = sample_boundries[position - 1] / divide
return total_occurences % every == 0
@celery.task(queue="default", default_retry_delay=1, max_retries=2)
def test_exception_task():
log.error("test celery log", extra={"location": "celery"})
log.warning("test celery log", extra={"location": "celery"})
raise Exception("Celery exception test")
@celery.task(queue="default", default_retry_delay=1, max_retries=2)
def test_retry_exception_task():
try:
import time
time.sleep(1.3)
log.error("test retry celery log", extra={"location": "celery"})
log.warning("test retry celery log", extra={"location": "celery"})
raise Exception("Celery exception test")
except Exception as exc:
if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
raise
test_retry_exception_task.retry(exc=exc)
@celery.task(queue="reports", default_retry_delay=600, max_retries=144)
def add_reports(resource_id, request_params, dataset, **kwargs):
proto_version = parse_proto(request_params.get("protocol_version", ""))
current_time = datetime.utcnow().replace(second=0, microsecond=0)
try:
# we will store solr docs here for single insert
es_report_docs = {}
es_report_group_docs = {}
resource = ApplicationService.by_id(resource_id)
tags = []
es_slow_calls_docs = {}
es_reports_stats_rows = {}
for report_data in dataset:
# build report details for later
added_details = 0
report = Report()
report.set_data(report_data, resource, proto_version)
report._skip_ft_index = True
# find latest group in this months partition
report_group = ReportGroupService.by_hash_and_resource(
report.resource_id,
report.grouping_hash,
since_when=datetime.utcnow().date().replace(day=1),
)
occurences = report_data.get("occurences", 1)
if not report_group:
# total reports will be +1 moment later
report_group = ReportGroup(
grouping_hash=report.grouping_hash,
occurences=0,
total_reports=0,
last_report=0,
priority=report.priority,
error=report.error,
first_timestamp=report.start_time,
)
report_group._skip_ft_index = True
report_group.report_type = report.report_type
report.report_group_time = report_group.first_timestamp
add_sample = pick_sample(
report_group.occurences, report_type=report_group.report_type
)
if add_sample:
resource.report_groups.append(report_group)
report_group.reports.append(report)
added_details += 1
DBSession.flush()
if report.partition_id not in es_report_docs:
es_report_docs[report.partition_id] = []
es_report_docs[report.partition_id].append(report.es_doc())
tags.extend(list(report.tags.items()))
slow_calls = report.add_slow_calls(report_data, report_group)
DBSession.flush()
for s_call in slow_calls:
if s_call.partition_id not in es_slow_calls_docs:
es_slow_calls_docs[s_call.partition_id] = []
es_slow_calls_docs[s_call.partition_id].append(s_call.es_doc())
# try generating new stat rows if needed
else:
# required for postprocessing to not fail later
report.report_group = report_group
stat_row = ReportService.generate_stat_rows(report, resource, report_group)
if stat_row.partition_id not in es_reports_stats_rows:
es_reports_stats_rows[stat_row.partition_id] = []
es_reports_stats_rows[stat_row.partition_id].append(stat_row.es_doc())
# see if we should mark 10th occurence of report
last_occurences_10 = int(math.floor(report_group.occurences / 10))
curr_occurences_10 = int(
math.floor((report_group.occurences + report.occurences) / 10)
)
last_occurences_100 = int(math.floor(report_group.occurences / 100))
curr_occurences_100 = int(
math.floor((report_group.occurences + report.occurences) / 100)
)
notify_occurences_10 = last_occurences_10 != curr_occurences_10
notify_occurences_100 = last_occurences_100 != curr_occurences_100
report_group.occurences = ReportGroup.occurences + occurences
report_group.last_timestamp = report.start_time
report_group.summed_duration = ReportGroup.summed_duration + report.duration
summed_duration = ReportGroup.summed_duration + report.duration
summed_occurences = ReportGroup.occurences + occurences
report_group.average_duration = summed_duration / summed_occurences
report_group.run_postprocessing(report)
if added_details:
report_group.total_reports = ReportGroup.total_reports + 1
report_group.last_report = report.id
report_group.set_notification_info(
notify_10=notify_occurences_10, notify_100=notify_occurences_100
)
DBSession.flush()
report_group.get_report().notify_channel(report_group)
if report_group.partition_id not in es_report_group_docs:
es_report_group_docs[report_group.partition_id] = []
es_report_group_docs[report_group.partition_id].append(
report_group.es_doc()
)
action = "REPORT"
log_msg = "%s: %s %s, client: %s, proto: %s" % (
action,
report_data.get("http_status", "unknown"),
str(resource),
report_data.get("client"),
proto_version,
)
log.info(log_msg)
total_reports = len(dataset)
redis_pipeline = Datastores.redis.pipeline(transaction=False)
key = REDIS_KEYS["counters"]["reports_per_minute"].format(current_time)
redis_pipeline.incr(key, total_reports)
redis_pipeline.expire(key, 3600 * 24)
key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
resource.owner_user_id, current_time
)
redis_pipeline.incr(key, total_reports)
redis_pipeline.expire(key, 3600)
key = REDIS_KEYS["counters"]["reports_per_hour_per_app"].format(
resource_id, current_time.replace(minute=0)
)
redis_pipeline.incr(key, total_reports)
redis_pipeline.expire(key, 3600 * 24 * 7)
redis_pipeline.sadd(
REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
current_time.replace(minute=0)
),
resource_id,
)
redis_pipeline.execute()
add_reports_es(es_report_group_docs, es_report_docs)
add_reports_slow_calls_es(es_slow_calls_docs)
add_reports_stats_rows_es(es_reports_stats_rows)
return True
except Exception as exc:
print_traceback(log)
if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
raise
add_reports.retry(exc=exc)
@celery.task(queue="es", default_retry_delay=600, max_retries=144)
def add_reports_es(report_group_docs, report_docs):
for k, v in report_group_docs.items():
to_update = {"_index": k, "_type": "report"}
[i.update(to_update) for i in v]
elasticsearch.helpers.bulk(Datastores.es, v)
for k, v in report_docs.items():
to_update = {"_index": k, "_type": "report"}
[i.update(to_update) for i in v]
elasticsearch.helpers.bulk(Datastores.es, v)
@celery.task(queue="es", default_retry_delay=600, max_retries=144)
def add_reports_slow_calls_es(es_docs):
for k, v in es_docs.items():
to_update = {"_index": k, "_type": "log"}
[i.update(to_update) for i in v]
elasticsearch.helpers.bulk(Datastores.es, v)
@celery.task(queue="es", default_retry_delay=600, max_retries=144)
def add_reports_stats_rows_es(es_docs):
for k, v in es_docs.items():
to_update = {"_index": k, "_type": "report"}
[i.update(to_update) for i in v]
elasticsearch.helpers.bulk(Datastores.es, v)
@celery.task(queue="logs", default_retry_delay=600, max_retries=144)
def add_logs(resource_id, request_params, dataset, **kwargs):
proto_version = request_params.get("protocol_version")
current_time = datetime.utcnow().replace(second=0, microsecond=0)
try:
es_docs = collections.defaultdict(list)
resource = ApplicationService.by_id_cached()(resource_id)
resource = DBSession.merge(resource, load=False)
ns_pairs = []
for entry in dataset:
# gather pk and ns so we can remove older versions of row later
if entry["primary_key"] is not None:
ns_pairs.append({"pk": entry["primary_key"], "ns": entry["namespace"]})
log_entry = Log()
log_entry.set_data(entry, resource=resource)
log_entry._skip_ft_index = True
resource.logs.append(log_entry)
DBSession.flush()
# insert non pk rows first
if entry["primary_key"] is None:
es_docs[log_entry.partition_id].append(log_entry.es_doc())
# 2nd pass to delete all log entries from db for same pk/ns pair
if ns_pairs:
ids_to_delete = []
es_docs = collections.defaultdict(list)
es_docs_to_delete = collections.defaultdict(list)
found_pkey_logs = LogService.query_by_primary_key_and_namespace(
list_of_pairs=ns_pairs
)
log_dict = {}
for log_entry in found_pkey_logs:
log_key = (log_entry.primary_key, log_entry.namespace)
if log_key not in log_dict:
log_dict[log_key] = []
log_dict[log_key].append(log_entry)
for ns, entry_list in log_dict.items():
entry_list = sorted(entry_list, key=lambda x: x.timestamp)
# newest row needs to be indexed in es
log_entry = entry_list[-1]
# delete everything from pg and ES, leave the last row in pg
for e in entry_list[:-1]:
ids_to_delete.append(e.log_id)
es_docs_to_delete[e.partition_id].append(e.delete_hash)
es_docs_to_delete[log_entry.partition_id].append(log_entry.delete_hash)
es_docs[log_entry.partition_id].append(log_entry.es_doc())
if ids_to_delete:
query = DBSession.query(Log).filter(Log.log_id.in_(ids_to_delete))
query.delete(synchronize_session=False)
if es_docs_to_delete:
# batch this to avoid problems with default ES bulk limits
for es_index in es_docs_to_delete.keys():
for batch in in_batches(es_docs_to_delete[es_index], 20):
query = {"query": {"terms": {"delete_hash": batch}}}
try:
Datastores.es.delete_by_query(
index=es_index,
doc_type="log",
body=query,
conflicts="proceed",
)
except elasticsearch.exceptions.NotFoundError as exc:
msg = "skipping index {}".format(es_index)
log.info(msg)
total_logs = len(dataset)
log_msg = "LOG_NEW: %s, entries: %s, proto:%s" % (
str(resource),
total_logs,
proto_version,
)
log.info(log_msg)
# mark_changed(session)
redis_pipeline = Datastores.redis.pipeline(transaction=False)
key = REDIS_KEYS["counters"]["logs_per_minute"].format(current_time)
redis_pipeline.incr(key, total_logs)
redis_pipeline.expire(key, 3600 * 24)
key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
resource.owner_user_id, current_time
)
redis_pipeline.incr(key, total_logs)
redis_pipeline.expire(key, 3600)
key = REDIS_KEYS["counters"]["logs_per_hour_per_app"].format(
resource_id, current_time.replace(minute=0)
)
redis_pipeline.incr(key, total_logs)
redis_pipeline.expire(key, 3600 * 24 * 7)
redis_pipeline.sadd(
REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
current_time.replace(minute=0)
),
resource_id,
)
redis_pipeline.execute()
add_logs_es(es_docs)
return True
except Exception as exc:
print_traceback(log)
if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
raise
add_logs.retry(exc=exc)
@celery.task(queue="es", default_retry_delay=600, max_retries=144)
def add_logs_es(es_docs):
for k, v in es_docs.items():
to_update = {"_index": k, "_type": "log"}
[i.update(to_update) for i in v]
elasticsearch.helpers.bulk(Datastores.es, v)
@celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
def add_metrics(resource_id, request_params, dataset, proto_version):
current_time = datetime.utcnow().replace(second=0, microsecond=0)
try:
resource = ApplicationService.by_id_cached()(resource_id)
resource = DBSession.merge(resource, load=False)
es_docs = []
rows = []
for metric in dataset:
tags = dict(metric["tags"])
server_n = tags.get("server_name", metric["server_name"]).lower()
tags["server_name"] = server_n or "unknown"
new_metric = Metric(
timestamp=metric["timestamp"],
resource_id=resource.resource_id,
namespace=metric["namespace"],
tags=tags,
)
rows.append(new_metric)
es_docs.append(new_metric.es_doc())
session = DBSession()
session.bulk_save_objects(rows)
session.flush()
action = "METRICS"
metrics_msg = "%s: %s, metrics: %s, proto:%s" % (
action,
str(resource),
len(dataset),
proto_version,
)
log.info(metrics_msg)
mark_changed(session)
redis_pipeline = Datastores.redis.pipeline(transaction=False)
key = REDIS_KEYS["counters"]["metrics_per_minute"].format(current_time)
redis_pipeline.incr(key, len(rows))
redis_pipeline.expire(key, 3600 * 24)
key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
resource.owner_user_id, current_time
)
redis_pipeline.incr(key, len(rows))
redis_pipeline.expire(key, 3600)
key = REDIS_KEYS["counters"]["metrics_per_hour_per_app"].format(
resource_id, current_time.replace(minute=0)
)
redis_pipeline.incr(key, len(rows))
redis_pipeline.expire(key, 3600 * 24 * 7)
redis_pipeline.sadd(
REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
current_time.replace(minute=0)
),
resource_id,
)
redis_pipeline.execute()
add_metrics_es(es_docs)
return True
except Exception as exc:
print_traceback(log)
if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
raise
add_metrics.retry(exc=exc)
@celery.task(queue="es", default_retry_delay=600, max_retries=144)
def add_metrics_es(es_docs):
for doc in es_docs:
partition = "rcae_m_%s" % doc["timestamp"].strftime("%Y_%m_%d")
Datastores.es.index(partition, "log", doc)
@celery.task(queue="default", default_retry_delay=5, max_retries=2)
def check_user_report_notifications(resource_id):
since_when = datetime.utcnow()
try:
request = get_current_request()
application = ApplicationService.by_id(resource_id)
if not application:
return
error_key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format(
ReportType.error, resource_id
)
slow_key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format(
ReportType.slow, resource_id
)
error_group_ids = Datastores.redis.smembers(error_key)
slow_group_ids = Datastores.redis.smembers(slow_key)
Datastores.redis.delete(error_key)
Datastores.redis.delete(slow_key)
err_gids = [int(g_id) for g_id in error_group_ids]
slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
group_ids = err_gids + slow_gids
occurence_dict = {}
for g_id in group_ids:
key = REDIS_KEYS["counters"]["report_group_occurences"].format(g_id)
val = Datastores.redis.get(key)
Datastores.redis.delete(key)
if val:
occurence_dict[g_id] = int(val)
else:
occurence_dict[g_id] = 1
report_groups = ReportGroupService.by_ids(group_ids)
report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
ApplicationService.check_for_groups_alert(
application,
"alert",
report_groups=report_groups,
occurence_dict=occurence_dict,
)
users = set(
[p.user for p in ResourceService.users_for_perm(application, "view")]
)
report_groups = report_groups.all()
for user in users:
UserService.report_notify(
user,
request,
application,
report_groups=report_groups,
occurence_dict=occurence_dict,
)
for group in report_groups:
# marks report_groups as notified
if not group.notified:
group.notified = True
except Exception as exc:
print_traceback(log)
raise
@celery.task(queue="default", default_retry_delay=5, max_retries=2)
def check_alerts(resource_id):
since_when = datetime.utcnow()
try:
request = get_current_request()
application = ApplicationService.by_id(resource_id)
if not application:
return
error_key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format(
ReportType.error, resource_id
)
slow_key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format(
ReportType.slow, resource_id
)
error_group_ids = Datastores.redis.smembers(error_key)
slow_group_ids = Datastores.redis.smembers(slow_key)
Datastores.redis.delete(error_key)
Datastores.redis.delete(slow_key)
err_gids = [int(g_id) for g_id in error_group_ids]
slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
group_ids = err_gids + slow_gids
occurence_dict = {}
for g_id in group_ids:
key = REDIS_KEYS["counters"]["report_group_occurences_alerting"].format(
g_id
)
val = Datastores.redis.get(key)
Datastores.redis.delete(key)
if val:
occurence_dict[g_id] = int(val)
else:
occurence_dict[g_id] = 1
report_groups = ReportGroupService.by_ids(group_ids)
report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
ApplicationService.check_for_groups_alert(
application,
"alert",
report_groups=report_groups,
occurence_dict=occurence_dict,
since_when=since_when,
)
except Exception as exc:
print_traceback(log)
raise
@celery.task(queue="default", default_retry_delay=1, max_retries=2)
def close_alerts():
log.warning("Checking alerts")
since_when = datetime.utcnow()
try:
event_types = [
Event.types["error_report_alert"],
Event.types["slow_report_alert"],
]
statuses = [Event.statuses["active"]]
# get events older than 5 min
events = EventService.by_type_and_status(
event_types, statuses, older_than=(since_when - timedelta(minutes=5))
)
for event in events:
# see if we can close them
event.validate_or_close(since_when=(since_when - timedelta(minutes=1)))
except Exception as exc:
print_traceback(log)
raise
@celery.task(queue="default", default_retry_delay=600, max_retries=144)
def update_tag_counter(tag_name, tag_value, count):
try:
query = (
DBSession.query(Tag)
.filter(Tag.name == tag_name)
.filter(
sa.cast(Tag.value, sa.types.TEXT)
== sa.cast(json.dumps(tag_value), sa.types.TEXT)
)
)
query.update(
{"times_seen": Tag.times_seen + count, "last_timestamp": datetime.utcnow()},
synchronize_session=False,
)
session = DBSession()
mark_changed(session)
return True
except Exception as exc:
print_traceback(log)
if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
raise
update_tag_counter.retry(exc=exc)
@celery.task(queue="default")
def update_tag_counters():
"""
Sets task to update counters for application tags
"""
tags = Datastores.redis.lrange(REDIS_KEYS["seen_tag_list"], 0, -1)
Datastores.redis.delete(REDIS_KEYS["seen_tag_list"])
c = collections.Counter(tags)
for t_json, count in c.items():
tag_info = json.loads(t_json)
update_tag_counter.delay(tag_info[0], tag_info[1], count)
@celery.task(queue="default")
def daily_digest():
"""
Sends daily digest with top 50 error reports
"""
request = get_current_request()
apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports"])
Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports"])
since_when = datetime.utcnow() - timedelta(hours=8)
log.warning("Generating daily digests")
for resource_id in apps:
resource_id = resource_id.decode("utf8")
end_date = datetime.utcnow().replace(microsecond=0, second=0)
filter_settings = {
"resource": [resource_id],
"tags": [{"name": "type", "value": ["error"], "op": None}],
"type": "error",
"start_date": since_when,
"end_date": end_date,
}
reports = ReportGroupService.get_trending(
request, filter_settings=filter_settings, limit=50
)
application = ApplicationService.by_id(resource_id)
if application:
users = set(
[p.user for p in ResourceService.users_for_perm(application, "view")]
)
for user in users:
user.send_digest(
request, application, reports=reports, since_when=since_when
)
@celery.task(queue="default")
def notifications_reports():
"""
Loop that checks redis for info and then issues new tasks to celery to
issue notifications
"""
apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports"])
Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports"])
for app in apps:
log.warning("Notify for app: %s" % app)
check_user_report_notifications.delay(app.decode("utf8"))
@celery.task(queue="default")
def alerting_reports():
"""
Loop that checks redis for info and then issues new tasks to celery to
perform the following:
- which applications should have new alerts opened
"""
apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports_alerting"])
Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports_alerting"])
for app in apps:
log.warning("Notify for app: %s" % app)
check_alerts.delay(app.decode("utf8"))
@celery.task(
queue="default", soft_time_limit=3600 * 4, hard_time_limit=3600 * 4, max_retries=144
)
def logs_cleanup(resource_id, filter_settings):
request = get_current_request()
request.tm.begin()
es_query = {"query": {"bool": {"filter": [{"term": {"resource_id": resource_id}}]}}}
query = DBSession.query(Log).filter(Log.resource_id == resource_id)
if filter_settings["namespace"]:
query = query.filter(Log.namespace == filter_settings["namespace"][0])
es_query["query"]["bool"]["filter"].append(
{"term": {"namespace": filter_settings["namespace"][0]}}
)
query.delete(synchronize_session=False)
request.tm.commit()
Datastores.es.delete_by_query(
index="rcae_l_*", doc_type="log", body=es_query, conflicts="proceed"
)