From b242e5b8918e6ecac3921faf5b15e80b900d04ad 2019-04-07 10:55:13 From: Marcin Lulek Date: 2019-04-07 10:55:13 Subject: [PATCH] elasticsearch: move to single doctype indices --- diff --git a/backend/src/appenlight/celery/tasks.py b/backend/src/appenlight/celery/tasks.py index 8041de8..416f8f3 100644 --- a/backend/src/appenlight/celery/tasks.py +++ b/backend/src/appenlight/celery/tasks.py @@ -239,7 +239,7 @@ def add_reports(resource_id, request_params, dataset, **kwargs): @celery.task(queue="es", default_retry_delay=600, max_retries=144) def add_reports_es(report_group_docs, report_docs): for k, v in report_group_docs.items(): - to_update = {"_index": k, "_type": "report_group"} + to_update = {"_index": k, "_type": "report"} [i.update(to_update) for i in v] elasticsearch.helpers.bulk(Datastores.es, v) for k, v in report_docs.items(): @@ -259,7 +259,7 @@ def add_reports_slow_calls_es(es_docs): @celery.task(queue="es", default_retry_delay=600, max_retries=144) def add_reports_stats_rows_es(es_docs): for k, v in es_docs.items(): - to_update = {"_index": k, "_type": "log"} + to_update = {"_index": k, "_type": "report"} [i.update(to_update) for i in v] elasticsearch.helpers.bulk(Datastores.es, v) @@ -287,7 +287,7 @@ def add_logs(resource_id, request_params, dataset, **kwargs): if entry["primary_key"] is None: es_docs[log_entry.partition_id].append(log_entry.es_doc()) - # 2nd pass to delete all log entries from db foe same pk/ns pair + # 2nd pass to delete all log entries from db for same pk/ns pair if ns_pairs: ids_to_delete = [] es_docs = collections.defaultdict(list) diff --git a/backend/src/appenlight/models/log.py b/backend/src/appenlight/models/log.py index 22fc394..adb9d64 100644 --- a/backend/src/appenlight/models/log.py +++ b/backend/src/appenlight/models/log.py @@ -112,7 +112,7 @@ class Log(Base, BaseModel): else None, } return { - "pg_id": str(self.log_id), + "log_id": str(self.log_id), "delete_hash": self.delete_hash, "resource_id": self.resource_id, "request_id": self.request_id, diff --git a/backend/src/appenlight/models/metric.py b/backend/src/appenlight/models/metric.py index 08d610a..0e2b0fe 100644 --- a/backend/src/appenlight/models/metric.py +++ b/backend/src/appenlight/models/metric.py @@ -60,6 +60,7 @@ class Metric(Base, BaseModel): } return { + "metric_id": self.pkey, "resource_id": self.resource_id, "timestamp": self.timestamp, "namespace": self.namespace, diff --git a/backend/src/appenlight/models/report.py b/backend/src/appenlight/models/report.py index 8ea4d6c..4767574 100644 --- a/backend/src/appenlight/models/report.py +++ b/backend/src/appenlight/models/report.py @@ -314,7 +314,7 @@ class Report(Base, BaseModel): "bool": { "filter": [ {"term": {"group_id": self.group_id}}, - {"range": {"pg_id": {"lt": self.id}}}, + {"range": {"report_id": {"lt": self.id}}}, ] } }, @@ -324,7 +324,7 @@ class Report(Base, BaseModel): body=query, index=self.partition_id, doc_type="report" ) if result["hits"]["total"]: - return result["hits"]["hits"][0]["_source"]["pg_id"] + return result["hits"]["hits"][0]["_source"]["report_id"] def get_next_in_group(self, request): query = { @@ -333,7 +333,7 @@ class Report(Base, BaseModel): "bool": { "filter": [ {"term": {"group_id": self.group_id}}, - {"range": {"pg_id": {"gt": self.id}}}, + {"range": {"report_id": {"gt": self.id}}}, ] } }, @@ -343,7 +343,7 @@ class Report(Base, BaseModel): body=query, index=self.partition_id, doc_type="report" ) if result["hits"]["total"]: - return result["hits"]["hits"][0]["_source"]["pg_id"] + return result["hits"]["hits"][0]["_source"]["report_id"] def get_public_url(self, request=None, report_group=None, _app_url=None): """ @@ -469,7 +469,7 @@ class Report(Base, BaseModel): tags["user_name"] = {"value": [self.username], "numeric_value": None} return { "_id": str(self.id), - "pg_id": str(self.id), + "report_id": str(self.id), "resource_id": self.resource_id, "http_status": self.http_status or "", "start_time": self.start_time, @@ -482,9 +482,14 @@ class Report(Base, BaseModel): "request_id": self.request_id, "ip": self.ip, "group_id": str(self.group_id), - "_parent": str(self.group_id), + "type": "report", + "join_field": { + "name": "report", + "parent": str(self.group_id) + }, "tags": tags, "tag_list": tag_list, + "_routing": str(self.group_id) } @property @@ -518,7 +523,7 @@ def after_update(mapper, connection, target): def after_delete(mapper, connection, target): if not hasattr(target, "_skip_ft_index"): - query = {"query": {"term": {"pg_id": target.id}}} + query = {"query": {"term": {"report_id": target.id}}} Datastores.es.delete_by_query( index=target.partition_id, doc_type="report", body=query, conflicts="proceed" ) diff --git a/backend/src/appenlight/models/report_group.py b/backend/src/appenlight/models/report_group.py index f7e91c5..9ea8b94 100644 --- a/backend/src/appenlight/models/report_group.py +++ b/backend/src/appenlight/models/report_group.py @@ -178,7 +178,7 @@ class ReportGroup(Base, BaseModel): def es_doc(self): return { "_id": str(self.id), - "pg_id": str(self.id), + "group_id": str(self.id), "resource_id": self.resource_id, "error": self.error, "fixed": self.fixed, @@ -190,6 +190,10 @@ class ReportGroup(Base, BaseModel): "summed_duration": self.summed_duration, "first_timestamp": self.first_timestamp, "last_timestamp": self.last_timestamp, + "type": "report_group", + "join_field": { + "name": "report_group" + }, } def set_notification_info(self, notify_10=False, notify_100=False): @@ -258,14 +262,14 @@ def after_insert(mapper, connection, target): if not hasattr(target, "_skip_ft_index"): data = target.es_doc() data.pop("_id", None) - Datastores.es.index(target.partition_id, "report_group", data, id=target.id) + Datastores.es.index(target.partition_id, "report", data, id=target.id) def after_update(mapper, connection, target): if not hasattr(target, "_skip_ft_index"): data = target.es_doc() data.pop("_id", None) - Datastores.es.index(target.partition_id, "report_group", data, id=target.id) + Datastores.es.index(target.partition_id, "report", data, id=target.id) def after_delete(mapper, connection, target): @@ -274,10 +278,6 @@ def after_delete(mapper, connection, target): Datastores.es.delete_by_query( index=target.partition_id, doc_type="report", body=query, conflicts="proceed" ) - query = {"query": {"term": {"pg_id": target.id}}} - Datastores.es.delete_by_query( - index=target.partition_id, doc_type="report_group", body=query, conflicts="proceed" - ) sa.event.listen(ReportGroup, "after_insert", after_insert) diff --git a/backend/src/appenlight/models/report_stat.py b/backend/src/appenlight/models/report_stat.py index f175766..ff95463 100644 --- a/backend/src/appenlight/models/report_stat.py +++ b/backend/src/appenlight/models/report_stat.py @@ -48,12 +48,13 @@ class ReportStat(Base, BaseModel): return { "resource_id": self.resource_id, "timestamp": self.start_interval, - "pg_id": str(self.id), + "report_stat_id": str(self.id), "permanent": True, "request_id": None, "log_level": "ERROR", "message": None, "namespace": "appenlight.error", + "group_id": str(self.group_id), "tags": { "duration": {"values": self.duration, "numeric_values": self.duration}, "occurences": { @@ -76,4 +77,5 @@ class ReportStat(Base, BaseModel): "server_name", "view_name", ], + "type": "report_stat", } diff --git a/backend/src/appenlight/models/services/log.py b/backend/src/appenlight/models/services/log.py index 620c9a2..3a93c4a 100644 --- a/backend/src/appenlight/models/services/log.py +++ b/backend/src/appenlight/models/services/log.py @@ -190,7 +190,7 @@ class LogService(BaseService): [], item_count=item_count, items_per_page=items_per_page, **filter_settings ) ordered_ids = tuple( - item["_source"]["pg_id"] for item in results.get("hits", []) + item["_source"]["log_id"] for item in results.get("hits", []) ) sorted_instance_list = [] diff --git a/backend/src/appenlight/models/services/report_group.py b/backend/src/appenlight/models/services/report_group.py index 193fd50..d1e8ba7 100644 --- a/backend/src/appenlight/models/services/report_group.py +++ b/backend/src/appenlight/models/services/report_group.py @@ -97,7 +97,7 @@ class ReportGroupService(BaseService): es_query["query"]["bool"]["filter"].extend(tags) result = Datastores.es.search( - body=es_query, index=index_names, doc_type="log", size=0 + body=es_query, index=index_names, doc_type="report", size=0 ) series = [] for bucket in result["aggregations"]["parent_agg"]["buckets"]: @@ -143,7 +143,7 @@ class ReportGroupService(BaseService): "top_groups": { "terms": { "size": 5000, - "field": "_parent#report_group", + "field": "join_field#report_group", "order": {"newest": "desc"}, }, "aggs": { @@ -315,7 +315,7 @@ class ReportGroupService(BaseService): ordered_ids = [] if results: for item in results["top_groups"]["buckets"]: - pg_id = item["top_reports_hits"]["hits"]["hits"][0]["_source"]["pg_id"] + pg_id = item["top_reports_hits"]["hits"]["hits"][0]["_source"]["report_id"] ordered_ids.append(pg_id) log.info(filter_settings) paginator = paginate.Page( diff --git a/backend/src/appenlight/models/services/request_metric.py b/backend/src/appenlight/models/services/request_metric.py index 02b0992..c80a00c 100644 --- a/backend/src/appenlight/models/services/request_metric.py +++ b/backend/src/appenlight/models/services/request_metric.py @@ -340,7 +340,7 @@ class RequestMetricService(BaseService): for hit in bucket["top_calls_hits"]["hits"]["hits"]: details[bucket["key"]].append( { - "report_id": hit["_source"]["pg_id"], + "report_id": hit["_source"]["request_metric_id"], "group_id": hit["_source"]["group_id"], } ) diff --git a/backend/src/appenlight/models/slow_call.py b/backend/src/appenlight/models/slow_call.py index d4acbfe..24e00fd 100644 --- a/backend/src/appenlight/models/slow_call.py +++ b/backend/src/appenlight/models/slow_call.py @@ -88,7 +88,7 @@ class SlowCall(Base, BaseModel): doc = { "resource_id": self.resource_id, "timestamp": self.timestamp, - "pg_id": str(self.id), + "slow_call_id": str(self.id), "permanent": False, "request_id": None, "log_level": "UNKNOWN", diff --git a/backend/src/appenlight/scripts/reindex_elasticsearch.py b/backend/src/appenlight/scripts/reindex_elasticsearch.py index 9ba2ec9..06eba8b 100644 --- a/backend/src/appenlight/scripts/reindex_elasticsearch.py +++ b/backend/src/appenlight/scripts/reindex_elasticsearch.py @@ -17,6 +17,7 @@ import argparse import datetime import logging +import copy import sqlalchemy as sa import elasticsearch.exceptions @@ -34,7 +35,6 @@ from appenlight.models.log import Log from appenlight.models.slow_call import SlowCall from appenlight.models.metric import Metric - log = logging.getLogger(__name__) tables = { @@ -128,7 +128,20 @@ def main(): def update_template(): try: - Datastores.es.indices.delete_template("rcae") + Datastores.es.indices.delete_template("rcae_reports") + except elasticsearch.exceptions.NotFoundError as e: + log.error(e) + + try: + Datastores.es.indices.delete_template("rcae_logs") + except elasticsearch.exceptions.NotFoundError as e: + log.error(e) + try: + Datastores.es.indices.delete_template("rcae_slow_calls") + except elasticsearch.exceptions.NotFoundError as e: + log.error(e) + try: + Datastores.es.indices.delete_template("rcae_metrics") except elasticsearch.exceptions.NotFoundError as e: log.error(e) log.info("updating elasticsearch template") @@ -153,37 +166,69 @@ def update_template(): } ] - template_schema = { - "template": "rcae_*", + shared_analysis = { + "analyzer": { + "url_path": { + "type": "custom", + "char_filter": [], + "tokenizer": "path_hierarchy", + "filter": [], + }, + "tag_value": { + "type": "custom", + "char_filter": [], + "tokenizer": "keyword", + "filter": ["lowercase"], + }, + } + } + + shared_log_mapping = { + "_all": {"enabled": False}, + "dynamic_templates": tag_templates, + "properties": { + "pg_id": {"type": "keyword", "index": True}, + "delete_hash": {"type": "keyword", "index": True}, + "resource_id": {"type": "integer"}, + "timestamp": {"type": "date"}, + "permanent": {"type": "boolean"}, + "request_id": {"type": "keyword", "index": True}, + "log_level": {"type": "text", "analyzer": "simple"}, + "message": {"type": "text", "analyzer": "simple"}, + "namespace": { + "type": "text", + "fields": {"keyword": {"type": "keyword", "ignore_above": 256}}, + }, + "tags": {"type": "object"}, + "tag_list": {"type": "text", "analyzer": "tag_value", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + }}, + }, + } + + report_schema = { + "template": "rcae_r_*", "settings": { "index": { "refresh_interval": "5s", "translog": {"sync_interval": "5s", "durability": "async"}, + "mapping": {"single_type": True} }, "number_of_shards": 5, - "analysis": { - "analyzer": { - "url_path": { - "type": "custom", - "char_filter": [], - "tokenizer": "path_hierarchy", - "filter": [], - }, - "tag_value": { - "type": "custom", - "char_filter": [], - "tokenizer": "keyword", - "filter": ["lowercase"], - }, - } - }, + "analysis": shared_analysis, }, "mappings": { - "report_group": { + "report": { "_all": {"enabled": False}, "dynamic_templates": tag_templates, "properties": { - "pg_id": {"type": "keyword", "index": True}, + "type": {"type": "keyword", "index": True}, + # report group + "group_id": {"type": "keyword", "index": True}, "resource_id": {"type": "integer"}, "priority": {"type": "integer"}, "error": {"type": "text", "analyzer": "simple"}, @@ -195,20 +240,13 @@ def update_template(): "average_duration": {"type": "float"}, "summed_duration": {"type": "float"}, "public": {"type": "boolean"}, - }, - }, - "report": { - "_all": {"enabled": False}, - "dynamic_templates": tag_templates, - "properties": { - "pg_id": {"type": "keyword", "index": True}, - "resource_id": {"type": "integer"}, - "group_id": {"type": "keyword"}, + # report + + "report_id": {"type": "keyword", "index": True}, "http_status": {"type": "integer"}, "ip": {"type": "keyword", "index": True}, "url_domain": {"type": "text", "analyzer": "simple"}, "url_path": {"type": "text", "analyzer": "url_path"}, - "error": {"type": "text", "analyzer": "simple"}, "report_type": {"type": "integer"}, "start_time": {"type": "date"}, "request_id": {"type": "keyword", "index": True}, @@ -223,45 +261,126 @@ def update_template(): } }}, "extra": {"type": "object"}, - }, - "_parent": {"type": "report_group"}, - }, - "log": { - "_all": {"enabled": False}, - "dynamic_templates": tag_templates, - "properties": { - "pg_id": {"type": "keyword", "index": True}, - "delete_hash": {"type": "keyword", "index": True}, - "resource_id": {"type": "integer"}, + + # report stats + + "report_stat_id": {"type": "keyword", "index": True}, "timestamp": {"type": "date"}, "permanent": {"type": "boolean"}, - "request_id": {"type": "keyword", "index": True}, "log_level": {"type": "text", "analyzer": "simple"}, "message": {"type": "text", "analyzer": "simple"}, "namespace": { "type": "text", "fields": {"keyword": {"type": "keyword", "ignore_above": 256}}, }, - "tags": {"type": "object"}, - "tag_list": {"type": "text", "analyzer": "tag_value", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - }}, + + "join_field": { + "type": "join", + "relations": { + "report_group": ["report", "report_stat"] + } + } + }, + } + } + } + + Datastores.es.indices.put_template("rcae_reports", body=report_schema) + + logs_mapping = copy.deepcopy(shared_log_mapping) + logs_mapping["properties"]["log_id"] = logs_mapping["properties"]["pg_id"] + del logs_mapping["properties"]["pg_id"] + + log_template = { + "template": "rcae_l_*", + "settings": { + "index": { + "refresh_interval": "5s", + "translog": {"sync_interval": "5s", "durability": "async"}, + "mapping": {"single_type": True} }, + "number_of_shards": 5, + "analysis": shared_analysis, + }, + "mappings": { + "log": logs_mapping, }, } - Datastores.es.indices.put_template("rcae", body=template_schema) + Datastores.es.indices.put_template("rcae_logs", body=log_template) + + slow_call_mapping = copy.deepcopy(shared_log_mapping) + slow_call_mapping["properties"]["slow_call_id"] = slow_call_mapping["properties"]["pg_id"] + del slow_call_mapping["properties"]["pg_id"] + + slow_call_template = { + "template": "rcae_sc_*", + "settings": { + "index": { + "refresh_interval": "5s", + "translog": {"sync_interval": "5s", "durability": "async"}, + "mapping": {"single_type": True} + }, + "number_of_shards": 5, + "analysis": shared_analysis, + }, + "mappings": { + "log": slow_call_mapping, + }, + } + + Datastores.es.indices.put_template("rcae_slow_calls", body=slow_call_template) + + metric_mapping = copy.deepcopy(shared_log_mapping) + metric_mapping["properties"]["metric_id"] = metric_mapping["properties"]["pg_id"] + del metric_mapping["properties"]["pg_id"] + + metrics_template = { + "template": "rcae_m_*", + "settings": { + "index": { + "refresh_interval": "5s", + "translog": {"sync_interval": "5s", "durability": "async"}, + "mapping": {"single_type": True} + }, + "number_of_shards": 5, + "analysis": shared_analysis, + }, + "mappings": { + "log": metric_mapping, + }, + } + + Datastores.es.indices.put_template("rcae_metrics", body=metrics_template) + + uptime_metric_mapping = copy.deepcopy(shared_log_mapping) + uptime_metric_mapping["properties"]["uptime_id"] = uptime_metric_mapping["properties"]["pg_id"] + del uptime_metric_mapping["properties"]["pg_id"] + + uptime_metrics_template = { + "template": "rcae_uptime_ce_*", + "settings": { + "index": { + "refresh_interval": "5s", + "translog": {"sync_interval": "5s", "durability": "async"}, + "mapping": {"single_type": True} + }, + "number_of_shards": 5, + "analysis": shared_analysis, + }, + "mappings": { + "log": shared_log_mapping, + }, + } + + Datastores.es.indices.put_template("rcae_uptime_metrics", body=uptime_metrics_template) def reindex_reports(): reports_groups_tables = detect_tables("reports_groups_p_") try: - Datastores.es.indices.delete("rcae_r*") + Datastores.es.indices.delete("`rcae_r_*") except elasticsearch.exceptions.NotFoundError as e: log.error(e) @@ -285,7 +404,7 @@ def reindex_reports(): name = partition_table.name log.info("round {}, {}".format(i, name)) for k, v in es_docs.items(): - to_update = {"_index": k, "_type": "report_group"} + to_update = {"_index": k, "_type": "report"} [i.update(to_update) for i in v] elasticsearch.helpers.bulk(Datastores.es, v) @@ -343,7 +462,7 @@ def reindex_reports(): name = partition_table.name log.info("round {}, {}".format(i, name)) for k, v in es_docs.items(): - to_update = {"_index": k, "_type": "log"} + to_update = {"_index": k, "_type": "report"} [i.update(to_update) for i in v] elasticsearch.helpers.bulk(Datastores.es, v) @@ -352,7 +471,7 @@ def reindex_reports(): def reindex_logs(): try: - Datastores.es.indices.delete("rcae_l*") + Datastores.es.indices.delete("rcae_l_*") except elasticsearch.exceptions.NotFoundError as e: log.error(e) @@ -388,7 +507,7 @@ def reindex_logs(): def reindex_metrics(): try: - Datastores.es.indices.delete("rcae_m*") + Datastores.es.indices.delete("rcae_m_*") except elasticsearch.exceptions.NotFoundError as e: log.error(e) @@ -422,7 +541,7 @@ def reindex_metrics(): def reindex_slow_calls(): try: - Datastores.es.indices.delete("rcae_sc*") + Datastores.es.indices.delete("rcae_sc_*") except elasticsearch.exceptions.NotFoundError as e: log.error(e)