reindex_elasticsearch.py
572 lines
| 20.2 KiB
| text/x-python
|
PythonLexer
r0 | # -*- coding: utf-8 -*- | |||
r112 | # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors | |||
r0 | # | |||
r112 | # Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | ||||
# You may obtain a copy of the License at | ||||
r0 | # | |||
r112 | # http://www.apache.org/licenses/LICENSE-2.0 | |||
r0 | # | |||
r112 | # Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | ||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
# See the License for the specific language governing permissions and | ||||
# limitations under the License. | ||||
r0 | ||||
import argparse | ||||
import datetime | ||||
import logging | ||||
r168 | import copy | |||
r0 | ||||
import sqlalchemy as sa | ||||
r151 | import elasticsearch.exceptions | |||
import elasticsearch.helpers | ||||
r0 | from collections import defaultdict | |||
from pyramid.paster import setup_logging | ||||
from pyramid.paster import bootstrap | ||||
r153 | from appenlight.models import DBSession, Datastores, metadata | |||
r0 | from appenlight.lib import get_callable | |||
from appenlight.models.report_group import ReportGroup | ||||
from appenlight.models.report import Report | ||||
from appenlight.models.report_stat import ReportStat | ||||
from appenlight.models.log import Log | ||||
from appenlight.models.slow_call import SlowCall | ||||
r49 | from appenlight.models.metric import Metric | |||
r0 | ||||
log = logging.getLogger(__name__) | ||||
tables = { | ||||
r153 | "slow_calls_p_": [], | |||
"reports_stats_p_": [], | ||||
"reports_p_": [], | ||||
"reports_groups_p_": [], | ||||
"logs_p_": [], | ||||
"metrics_p_": [], | ||||
r0 | } | |||
r153 | ||||
r0 | def detect_tables(table_prefix): | |||
found_tables = [] | ||||
r153 | db_tables_query = """ | |||
r0 | SELECT tablename FROM pg_tables WHERE tablename NOT LIKE 'pg_%' AND | |||
r153 | tablename NOT LIKE 'sql_%' ORDER BY tablename ASC;""" | |||
r0 | ||||
for table in DBSession.execute(db_tables_query).fetchall(): | ||||
tablename = table.tablename | ||||
if tablename.startswith(table_prefix): | ||||
r153 | t = sa.Table( | |||
tablename, metadata, autoload=True, autoload_with=DBSession.bind.engine | ||||
) | ||||
r0 | found_tables.append(t) | |||
return found_tables | ||||
def main(): | ||||
""" | ||||
Recreates Elasticsearch indexes | ||||
Performs reindex of whole db to Elasticsearch | ||||
""" | ||||
# need parser twice because we first need to load ini file | ||||
# bootstrap pyramid and then load plugins | ||||
pre_parser = argparse.ArgumentParser( | ||||
r153 | description="Reindex AppEnlight data", add_help=False | |||
) | ||||
pre_parser.add_argument( | ||||
"-c", "--config", required=True, help="Configuration ini file of application" | ||||
) | ||||
pre_parser.add_argument("-h", "--help", help="Show help", nargs="?") | ||||
pre_parser.add_argument( | ||||
"-t", "--types", nargs="+", help="Which parts of database should get reindexed" | ||||
) | ||||
r0 | args = pre_parser.parse_args() | |||
config_uri = args.config | ||||
setup_logging(config_uri) | ||||
log.setLevel(logging.INFO) | ||||
env = bootstrap(config_uri) | ||||
r153 | parser = argparse.ArgumentParser(description="Reindex AppEnlight data") | |||
r0 | choices = { | |||
r153 | "reports": "appenlight.scripts.reindex_elasticsearch:reindex_reports", | |||
"logs": "appenlight.scripts.reindex_elasticsearch:reindex_logs", | ||||
"metrics": "appenlight.scripts.reindex_elasticsearch:reindex_metrics", | ||||
"slow_calls": "appenlight.scripts.reindex_elasticsearch:reindex_slow_calls", | ||||
"template": "appenlight.scripts.reindex_elasticsearch:update_template", | ||||
r0 | } | |||
r153 | for k, v in env["registry"].appenlight_plugins.items(): | |||
if v.get("fulltext_indexer"): | ||||
choices[k] = v["fulltext_indexer"] | ||||
parser.add_argument( | ||||
"-t", | ||||
"--types", | ||||
nargs="*", | ||||
choices=["all"] + list(choices.keys()), | ||||
default=[], | ||||
help="Which parts of database should get reindexed", | ||||
) | ||||
parser.add_argument( | ||||
"-c", "--config", required=True, help="Configuration ini file of application" | ||||
) | ||||
r0 | args = parser.parse_args() | |||
r153 | if "all" in args.types: | |||
r0 | args.types = list(choices.keys()) | |||
r151 | print("Selected types to reindex: {}".format(args.types)) | |||
r153 | log.info("settings {}".format(args.types)) | |||
r0 | ||||
r153 | if "template" in args.types: | |||
get_callable(choices["template"])() | ||||
args.types.remove("template") | ||||
r0 | for selected in args.types: | |||
get_callable(choices[selected])() | ||||
def update_template(): | ||||
try: | ||||
r168 | Datastores.es.indices.delete_template("rcae_reports") | |||
except elasticsearch.exceptions.NotFoundError as e: | ||||
log.error(e) | ||||
try: | ||||
Datastores.es.indices.delete_template("rcae_logs") | ||||
except elasticsearch.exceptions.NotFoundError as e: | ||||
log.error(e) | ||||
try: | ||||
Datastores.es.indices.delete_template("rcae_slow_calls") | ||||
except elasticsearch.exceptions.NotFoundError as e: | ||||
log.error(e) | ||||
try: | ||||
Datastores.es.indices.delete_template("rcae_metrics") | ||||
r151 | except elasticsearch.exceptions.NotFoundError as e: | |||
log.error(e) | ||||
r153 | log.info("updating elasticsearch template") | |||
r0 | tag_templates = [ | |||
r153 | { | |||
"values": { | ||||
"path_match": "tags.*", | ||||
"mapping": { | ||||
"type": "object", | ||||
"properties": { | ||||
r165 | "values": {"type": "text", "analyzer": "tag_value", | |||
"fields": { | ||||
"keyword": { | ||||
"type": "keyword", | ||||
"ignore_above": 256 | ||||
} | ||||
}}, | ||||
r153 | "numeric_values": {"type": "float"}, | |||
}, | ||||
}, | ||||
r0 | } | |||
r153 | } | |||
r0 | ] | |||
r168 | shared_analysis = { | |||
"analyzer": { | ||||
"url_path": { | ||||
"type": "custom", | ||||
"char_filter": [], | ||||
"tokenizer": "path_hierarchy", | ||||
"filter": [], | ||||
}, | ||||
"tag_value": { | ||||
"type": "custom", | ||||
"char_filter": [], | ||||
"tokenizer": "keyword", | ||||
"filter": ["lowercase"], | ||||
}, | ||||
} | ||||
} | ||||
shared_log_mapping = { | ||||
"_all": {"enabled": False}, | ||||
"dynamic_templates": tag_templates, | ||||
"properties": { | ||||
"pg_id": {"type": "keyword", "index": True}, | ||||
"delete_hash": {"type": "keyword", "index": True}, | ||||
"resource_id": {"type": "integer"}, | ||||
"timestamp": {"type": "date"}, | ||||
"permanent": {"type": "boolean"}, | ||||
"request_id": {"type": "keyword", "index": True}, | ||||
"log_level": {"type": "text", "analyzer": "simple"}, | ||||
"message": {"type": "text", "analyzer": "simple"}, | ||||
"namespace": { | ||||
"type": "text", | ||||
"fields": {"keyword": {"type": "keyword", "ignore_above": 256}}, | ||||
}, | ||||
"tags": {"type": "object"}, | ||||
"tag_list": {"type": "text", "analyzer": "tag_value", | ||||
"fields": { | ||||
"keyword": { | ||||
"type": "keyword", | ||||
"ignore_above": 256 | ||||
} | ||||
}}, | ||||
}, | ||||
} | ||||
report_schema = { | ||||
"template": "rcae_r_*", | ||||
r0 | "settings": { | |||
"index": { | ||||
"refresh_interval": "5s", | ||||
r170 | "translog": {"sync_interval": "5s", "durability": "async"} | |||
r0 | }, | |||
"number_of_shards": 5, | ||||
r168 | "analysis": shared_analysis, | |||
r0 | }, | |||
"mappings": { | ||||
r168 | "report": { | |||
r0 | "_all": {"enabled": False}, | |||
"dynamic_templates": tag_templates, | ||||
"properties": { | ||||
r168 | "type": {"type": "keyword", "index": True}, | |||
# report group | ||||
"group_id": {"type": "keyword", "index": True}, | ||||
r0 | "resource_id": {"type": "integer"}, | |||
"priority": {"type": "integer"}, | ||||
r165 | "error": {"type": "text", "analyzer": "simple"}, | |||
r0 | "read": {"type": "boolean"}, | |||
"occurences": {"type": "integer"}, | ||||
"fixed": {"type": "boolean"}, | ||||
"first_timestamp": {"type": "date"}, | ||||
"last_timestamp": {"type": "date"}, | ||||
"average_duration": {"type": "float"}, | ||||
"summed_duration": {"type": "float"}, | ||||
r153 | "public": {"type": "boolean"}, | |||
r168 | # report | |||
"report_id": {"type": "keyword", "index": True}, | ||||
r0 | "http_status": {"type": "integer"}, | |||
r165 | "ip": {"type": "keyword", "index": True}, | |||
"url_domain": {"type": "text", "analyzer": "simple"}, | ||||
"url_path": {"type": "text", "analyzer": "url_path"}, | ||||
r0 | "report_type": {"type": "integer"}, | |||
"start_time": {"type": "date"}, | ||||
r165 | "request_id": {"type": "keyword", "index": True}, | |||
r0 | "end_time": {"type": "date"}, | |||
"duration": {"type": "float"}, | ||||
r153 | "tags": {"type": "object"}, | |||
r165 | "tag_list": {"type": "text", "analyzer": "tag_value", | |||
"fields": { | ||||
"keyword": { | ||||
"type": "keyword", | ||||
"ignore_above": 256 | ||||
} | ||||
}}, | ||||
r153 | "extra": {"type": "object"}, | |||
r168 | ||||
# report stats | ||||
"report_stat_id": {"type": "keyword", "index": True}, | ||||
r0 | "timestamp": {"type": "date"}, | |||
"permanent": {"type": "boolean"}, | ||||
r165 | "log_level": {"type": "text", "analyzer": "simple"}, | |||
"message": {"type": "text", "analyzer": "simple"}, | ||||
"namespace": { | ||||
"type": "text", | ||||
"fields": {"keyword": {"type": "keyword", "ignore_above": 256}}, | ||||
}, | ||||
r168 | ||||
"join_field": { | ||||
"type": "join", | ||||
"relations": { | ||||
"report_group": ["report", "report_stat"] | ||||
} | ||||
} | ||||
r153 | }, | |||
r168 | } | |||
} | ||||
} | ||||
Datastores.es.indices.put_template("rcae_reports", body=report_schema) | ||||
logs_mapping = copy.deepcopy(shared_log_mapping) | ||||
logs_mapping["properties"]["log_id"] = logs_mapping["properties"]["pg_id"] | ||||
del logs_mapping["properties"]["pg_id"] | ||||
log_template = { | ||||
"template": "rcae_l_*", | ||||
"settings": { | ||||
"index": { | ||||
"refresh_interval": "5s", | ||||
"translog": {"sync_interval": "5s", "durability": "async"}, | ||||
r153 | }, | |||
r168 | "number_of_shards": 5, | |||
"analysis": shared_analysis, | ||||
}, | ||||
"mappings": { | ||||
"log": logs_mapping, | ||||
r153 | }, | |||
r0 | } | |||
r168 | Datastores.es.indices.put_template("rcae_logs", body=log_template) | |||
slow_call_mapping = copy.deepcopy(shared_log_mapping) | ||||
slow_call_mapping["properties"]["slow_call_id"] = slow_call_mapping["properties"]["pg_id"] | ||||
del slow_call_mapping["properties"]["pg_id"] | ||||
slow_call_template = { | ||||
"template": "rcae_sc_*", | ||||
"settings": { | ||||
"index": { | ||||
"refresh_interval": "5s", | ||||
"translog": {"sync_interval": "5s", "durability": "async"}, | ||||
}, | ||||
"number_of_shards": 5, | ||||
"analysis": shared_analysis, | ||||
}, | ||||
"mappings": { | ||||
"log": slow_call_mapping, | ||||
}, | ||||
} | ||||
Datastores.es.indices.put_template("rcae_slow_calls", body=slow_call_template) | ||||
metric_mapping = copy.deepcopy(shared_log_mapping) | ||||
metric_mapping["properties"]["metric_id"] = metric_mapping["properties"]["pg_id"] | ||||
del metric_mapping["properties"]["pg_id"] | ||||
metrics_template = { | ||||
"template": "rcae_m_*", | ||||
"settings": { | ||||
"index": { | ||||
"refresh_interval": "5s", | ||||
"translog": {"sync_interval": "5s", "durability": "async"}, | ||||
}, | ||||
"number_of_shards": 5, | ||||
"analysis": shared_analysis, | ||||
}, | ||||
"mappings": { | ||||
"log": metric_mapping, | ||||
}, | ||||
} | ||||
Datastores.es.indices.put_template("rcae_metrics", body=metrics_template) | ||||
uptime_metric_mapping = copy.deepcopy(shared_log_mapping) | ||||
uptime_metric_mapping["properties"]["uptime_id"] = uptime_metric_mapping["properties"]["pg_id"] | ||||
del uptime_metric_mapping["properties"]["pg_id"] | ||||
uptime_metrics_template = { | ||||
"template": "rcae_uptime_ce_*", | ||||
"settings": { | ||||
"index": { | ||||
"refresh_interval": "5s", | ||||
"translog": {"sync_interval": "5s", "durability": "async"}, | ||||
}, | ||||
"number_of_shards": 5, | ||||
"analysis": shared_analysis, | ||||
}, | ||||
"mappings": { | ||||
"log": shared_log_mapping, | ||||
}, | ||||
} | ||||
Datastores.es.indices.put_template("rcae_uptime_metrics", body=uptime_metrics_template) | ||||
r0 | ||||
def reindex_reports(): | ||||
r153 | reports_groups_tables = detect_tables("reports_groups_p_") | |||
r0 | try: | |||
r168 | Datastores.es.indices.delete("`rcae_r_*") | |||
r151 | except elasticsearch.exceptions.NotFoundError as e: | |||
r0 | log.error(e) | |||
r153 | log.info("reindexing report groups") | |||
r0 | i = 0 | |||
task_start = datetime.datetime.now() | ||||
for partition_table in reports_groups_tables: | ||||
conn = DBSession.connection().execution_options(stream_results=True) | ||||
result = conn.execute(partition_table.select()) | ||||
while True: | ||||
chunk = result.fetchmany(2000) | ||||
if not chunk: | ||||
break | ||||
es_docs = defaultdict(list) | ||||
for row in chunk: | ||||
i += 1 | ||||
item = ReportGroup(**dict(list(row.items()))) | ||||
d_range = item.partition_id | ||||
es_docs[d_range].append(item.es_doc()) | ||||
if es_docs: | ||||
name = partition_table.name | ||||
r153 | log.info("round {}, {}".format(i, name)) | |||
r0 | for k, v in es_docs.items(): | |||
r168 | to_update = {"_index": k, "_type": "report"} | |||
r151 | [i.update(to_update) for i in v] | |||
elasticsearch.helpers.bulk(Datastores.es, v) | ||||
r0 | ||||
r153 | log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start)) | |||
r0 | ||||
i = 0 | ||||
r153 | log.info("reindexing reports") | |||
r0 | task_start = datetime.datetime.now() | |||
r153 | reports_tables = detect_tables("reports_p_") | |||
r0 | for partition_table in reports_tables: | |||
conn = DBSession.connection().execution_options(stream_results=True) | ||||
result = conn.execute(partition_table.select()) | ||||
while True: | ||||
chunk = result.fetchmany(2000) | ||||
if not chunk: | ||||
break | ||||
es_docs = defaultdict(list) | ||||
for row in chunk: | ||||
i += 1 | ||||
item = Report(**dict(list(row.items()))) | ||||
d_range = item.partition_id | ||||
es_docs[d_range].append(item.es_doc()) | ||||
if es_docs: | ||||
name = partition_table.name | ||||
r153 | log.info("round {}, {}".format(i, name)) | |||
r0 | for k, v in es_docs.items(): | |||
r153 | to_update = {"_index": k, "_type": "report"} | |||
r151 | [i.update(to_update) for i in v] | |||
elasticsearch.helpers.bulk(Datastores.es, v) | ||||
r0 | ||||
r153 | log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start)) | |||
r0 | ||||
r153 | log.info("reindexing reports stats") | |||
r0 | i = 0 | |||
task_start = datetime.datetime.now() | ||||
r153 | reports_stats_tables = detect_tables("reports_stats_p_") | |||
r0 | for partition_table in reports_stats_tables: | |||
conn = DBSession.connection().execution_options(stream_results=True) | ||||
result = conn.execute(partition_table.select()) | ||||
while True: | ||||
chunk = result.fetchmany(2000) | ||||
if not chunk: | ||||
break | ||||
es_docs = defaultdict(list) | ||||
for row in chunk: | ||||
rd = dict(list(row.items())) | ||||
# remove legacy columns | ||||
# TODO: remove the column later | ||||
r153 | rd.pop("size", None) | |||
r0 | item = ReportStat(**rd) | |||
i += 1 | ||||
d_range = item.partition_id | ||||
es_docs[d_range].append(item.es_doc()) | ||||
if es_docs: | ||||
name = partition_table.name | ||||
r153 | log.info("round {}, {}".format(i, name)) | |||
r0 | for k, v in es_docs.items(): | |||
r168 | to_update = {"_index": k, "_type": "report"} | |||
r151 | [i.update(to_update) for i in v] | |||
elasticsearch.helpers.bulk(Datastores.es, v) | ||||
r0 | ||||
r153 | log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start)) | |||
r0 | ||||
def reindex_logs(): | ||||
try: | ||||
r168 | Datastores.es.indices.delete("rcae_l_*") | |||
r151 | except elasticsearch.exceptions.NotFoundError as e: | |||
r0 | log.error(e) | |||
# logs | ||||
r153 | log.info("reindexing logs") | |||
r0 | i = 0 | |||
task_start = datetime.datetime.now() | ||||
r153 | log_tables = detect_tables("logs_p_") | |||
r0 | for partition_table in log_tables: | |||
conn = DBSession.connection().execution_options(stream_results=True) | ||||
result = conn.execute(partition_table.select()) | ||||
while True: | ||||
chunk = result.fetchmany(2000) | ||||
if not chunk: | ||||
break | ||||
es_docs = defaultdict(list) | ||||
for row in chunk: | ||||
i += 1 | ||||
item = Log(**dict(list(row.items()))) | ||||
d_range = item.partition_id | ||||
es_docs[d_range].append(item.es_doc()) | ||||
if es_docs: | ||||
name = partition_table.name | ||||
r153 | log.info("round {}, {}".format(i, name)) | |||
r0 | for k, v in es_docs.items(): | |||
r153 | to_update = {"_index": k, "_type": "log"} | |||
r151 | [i.update(to_update) for i in v] | |||
elasticsearch.helpers.bulk(Datastores.es, v) | ||||
r0 | ||||
r153 | log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start)) | |||
r0 | ||||
def reindex_metrics(): | ||||
try: | ||||
r168 | Datastores.es.indices.delete("rcae_m_*") | |||
r151 | except elasticsearch.exceptions.NotFoundError as e: | |||
log.error(e) | ||||
r0 | ||||
r153 | log.info("reindexing applications metrics") | |||
r0 | i = 0 | |||
task_start = datetime.datetime.now() | ||||
r153 | metric_tables = detect_tables("metrics_p_") | |||
r0 | for partition_table in metric_tables: | |||
conn = DBSession.connection().execution_options(stream_results=True) | ||||
result = conn.execute(partition_table.select()) | ||||
while True: | ||||
chunk = result.fetchmany(2000) | ||||
if not chunk: | ||||
break | ||||
es_docs = defaultdict(list) | ||||
for row in chunk: | ||||
i += 1 | ||||
item = Metric(**dict(list(row.items()))) | ||||
d_range = item.partition_id | ||||
es_docs[d_range].append(item.es_doc()) | ||||
if es_docs: | ||||
name = partition_table.name | ||||
r153 | log.info("round {}, {}".format(i, name)) | |||
r0 | for k, v in es_docs.items(): | |||
r153 | to_update = {"_index": k, "_type": "log"} | |||
r151 | [i.update(to_update) for i in v] | |||
elasticsearch.helpers.bulk(Datastores.es, v) | ||||
r0 | ||||
r153 | log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start)) | |||
r0 | ||||
def reindex_slow_calls(): | ||||
try: | ||||
r168 | Datastores.es.indices.delete("rcae_sc_*") | |||
r151 | except elasticsearch.exceptions.NotFoundError as e: | |||
log.error(e) | ||||
r0 | ||||
r153 | log.info("reindexing slow calls") | |||
r0 | i = 0 | |||
task_start = datetime.datetime.now() | ||||
r153 | slow_calls_tables = detect_tables("slow_calls_p_") | |||
r0 | for partition_table in slow_calls_tables: | |||
conn = DBSession.connection().execution_options(stream_results=True) | ||||
result = conn.execute(partition_table.select()) | ||||
while True: | ||||
chunk = result.fetchmany(2000) | ||||
if not chunk: | ||||
break | ||||
es_docs = defaultdict(list) | ||||
for row in chunk: | ||||
i += 1 | ||||
item = SlowCall(**dict(list(row.items()))) | ||||
d_range = item.partition_id | ||||
es_docs[d_range].append(item.es_doc()) | ||||
if es_docs: | ||||
name = partition_table.name | ||||
r153 | log.info("round {}, {}".format(i, name)) | |||
r0 | for k, v in es_docs.items(): | |||
r153 | to_update = {"_index": k, "_type": "log"} | |||
r151 | [i.update(to_update) for i in v] | |||
elasticsearch.helpers.bulk(Datastores.es, v) | ||||
r0 | ||||
r153 | log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start)) | |||
r0 | ||||
r153 | if __name__ == "__main__": | |||
r0 | main() | |||