# -*- coding: utf-8 -*- # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import datetime import logging import sqlalchemy as sa import elasticsearch.exceptions import elasticsearch.helpers from collections import defaultdict from pyramid.paster import setup_logging from pyramid.paster import bootstrap from appenlight.models import DBSession, Datastores, metadata from appenlight.lib import get_callable from appenlight.models.report_group import ReportGroup from appenlight.models.report import Report from appenlight.models.report_stat import ReportStat from appenlight.models.log import Log from appenlight.models.slow_call import SlowCall from appenlight.models.metric import Metric log = logging.getLogger(__name__) tables = { "slow_calls_p_": [], "reports_stats_p_": [], "reports_p_": [], "reports_groups_p_": [], "logs_p_": [], "metrics_p_": [], } def detect_tables(table_prefix): found_tables = [] db_tables_query = """ SELECT tablename FROM pg_tables WHERE tablename NOT LIKE 'pg_%' AND tablename NOT LIKE 'sql_%' ORDER BY tablename ASC;""" for table in DBSession.execute(db_tables_query).fetchall(): tablename = table.tablename if tablename.startswith(table_prefix): t = sa.Table( tablename, metadata, autoload=True, autoload_with=DBSession.bind.engine ) found_tables.append(t) return found_tables def main(): """ Recreates Elasticsearch indexes Performs reindex of whole db to Elasticsearch """ # need parser twice because we first need to load ini file # bootstrap pyramid and then load plugins pre_parser = argparse.ArgumentParser( description="Reindex AppEnlight data", add_help=False ) pre_parser.add_argument( "-c", "--config", required=True, help="Configuration ini file of application" ) pre_parser.add_argument("-h", "--help", help="Show help", nargs="?") pre_parser.add_argument( "-t", "--types", nargs="+", help="Which parts of database should get reindexed" ) args = pre_parser.parse_args() config_uri = args.config setup_logging(config_uri) log.setLevel(logging.INFO) env = bootstrap(config_uri) parser = argparse.ArgumentParser(description="Reindex AppEnlight data") choices = { "reports": "appenlight.scripts.reindex_elasticsearch:reindex_reports", "logs": "appenlight.scripts.reindex_elasticsearch:reindex_logs", "metrics": "appenlight.scripts.reindex_elasticsearch:reindex_metrics", "slow_calls": "appenlight.scripts.reindex_elasticsearch:reindex_slow_calls", "template": "appenlight.scripts.reindex_elasticsearch:update_template", } for k, v in env["registry"].appenlight_plugins.items(): if v.get("fulltext_indexer"): choices[k] = v["fulltext_indexer"] parser.add_argument( "-t", "--types", nargs="*", choices=["all"] + list(choices.keys()), default=[], help="Which parts of database should get reindexed", ) parser.add_argument( "-c", "--config", required=True, help="Configuration ini file of application" ) args = parser.parse_args() if "all" in args.types: args.types = list(choices.keys()) print("Selected types to reindex: {}".format(args.types)) log.info("settings {}".format(args.types)) if "template" in args.types: get_callable(choices["template"])() args.types.remove("template") for selected in args.types: get_callable(choices[selected])() def update_template(): try: Datastores.es.indices.delete_template("rcae") except elasticsearch.exceptions.NotFoundError as e: log.error(e) log.info("updating elasticsearch template") tag_templates = [ { "values": { "path_match": "tags.*", "mapping": { "type": "object", "properties": { "values": {"type": "string", "analyzer": "tag_value"}, "numeric_values": {"type": "float"}, }, }, } } ] template_schema = { "template": "rcae_*", "settings": { "index": { "refresh_interval": "5s", "translog": {"sync_interval": "5s", "durability": "async"}, }, "number_of_shards": 5, "analysis": { "analyzer": { "url_path": { "type": "custom", "char_filter": [], "tokenizer": "path_hierarchy", "filter": [], }, "tag_value": { "type": "custom", "char_filter": [], "tokenizer": "keyword", "filter": ["lowercase"], }, } }, }, "mappings": { "report_group": { "_all": {"enabled": False}, "dynamic_templates": tag_templates, "properties": { "pg_id": {"type": "string", "index": "not_analyzed"}, "resource_id": {"type": "integer"}, "priority": {"type": "integer"}, "error": {"type": "string", "analyzer": "simple"}, "read": {"type": "boolean"}, "occurences": {"type": "integer"}, "fixed": {"type": "boolean"}, "first_timestamp": {"type": "date"}, "last_timestamp": {"type": "date"}, "average_duration": {"type": "float"}, "summed_duration": {"type": "float"}, "public": {"type": "boolean"}, }, }, "report": { "_all": {"enabled": False}, "dynamic_templates": tag_templates, "properties": { "pg_id": {"type": "string", "index": "not_analyzed"}, "resource_id": {"type": "integer"}, "group_id": {"type": "string"}, "http_status": {"type": "integer"}, "ip": {"type": "string", "index": "not_analyzed"}, "url_domain": {"type": "string", "analyzer": "simple"}, "url_path": {"type": "string", "analyzer": "url_path"}, "error": {"type": "string", "analyzer": "simple"}, "report_type": {"type": "integer"}, "start_time": {"type": "date"}, "request_id": {"type": "string", "index": "not_analyzed"}, "end_time": {"type": "date"}, "duration": {"type": "float"}, "tags": {"type": "object"}, "tag_list": {"type": "string", "analyzer": "tag_value"}, "extra": {"type": "object"}, }, "_parent": {"type": "report_group"}, }, "log": { "_all": {"enabled": False}, "dynamic_templates": tag_templates, "properties": { "pg_id": {"type": "string", "index": "not_analyzed"}, "delete_hash": {"type": "string", "index": "not_analyzed"}, "resource_id": {"type": "integer"}, "timestamp": {"type": "date"}, "permanent": {"type": "boolean"}, "request_id": {"type": "string", "index": "not_analyzed"}, "log_level": {"type": "string", "analyzer": "simple"}, "message": {"type": "string", "analyzer": "simple"}, "namespace": {"type": "string", "index": "not_analyzed"}, "tags": {"type": "object"}, "tag_list": {"type": "string", "analyzer": "tag_value"}, }, }, }, } Datastores.es.indices.put_template("rcae", body=template_schema) def reindex_reports(): reports_groups_tables = detect_tables("reports_groups_p_") try: Datastores.es.indices.delete("rcae_r*") except elasticsearch.exceptions.NotFoundError as e: log.error(e) log.info("reindexing report groups") i = 0 task_start = datetime.datetime.now() for partition_table in reports_groups_tables: conn = DBSession.connection().execution_options(stream_results=True) result = conn.execute(partition_table.select()) while True: chunk = result.fetchmany(2000) if not chunk: break es_docs = defaultdict(list) for row in chunk: i += 1 item = ReportGroup(**dict(list(row.items()))) d_range = item.partition_id es_docs[d_range].append(item.es_doc()) if es_docs: name = partition_table.name log.info("round {}, {}".format(i, name)) for k, v in es_docs.items(): to_update = {"_index": k, "_type": "report_group"} [i.update(to_update) for i in v] elasticsearch.helpers.bulk(Datastores.es, v) log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start)) i = 0 log.info("reindexing reports") task_start = datetime.datetime.now() reports_tables = detect_tables("reports_p_") for partition_table in reports_tables: conn = DBSession.connection().execution_options(stream_results=True) result = conn.execute(partition_table.select()) while True: chunk = result.fetchmany(2000) if not chunk: break es_docs = defaultdict(list) for row in chunk: i += 1 item = Report(**dict(list(row.items()))) d_range = item.partition_id es_docs[d_range].append(item.es_doc()) if es_docs: name = partition_table.name log.info("round {}, {}".format(i, name)) for k, v in es_docs.items(): to_update = {"_index": k, "_type": "report"} [i.update(to_update) for i in v] elasticsearch.helpers.bulk(Datastores.es, v) log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start)) log.info("reindexing reports stats") i = 0 task_start = datetime.datetime.now() reports_stats_tables = detect_tables("reports_stats_p_") for partition_table in reports_stats_tables: conn = DBSession.connection().execution_options(stream_results=True) result = conn.execute(partition_table.select()) while True: chunk = result.fetchmany(2000) if not chunk: break es_docs = defaultdict(list) for row in chunk: rd = dict(list(row.items())) # remove legacy columns # TODO: remove the column later rd.pop("size", None) item = ReportStat(**rd) i += 1 d_range = item.partition_id es_docs[d_range].append(item.es_doc()) if es_docs: name = partition_table.name log.info("round {}, {}".format(i, name)) for k, v in es_docs.items(): to_update = {"_index": k, "_type": "log"} [i.update(to_update) for i in v] elasticsearch.helpers.bulk(Datastores.es, v) log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start)) def reindex_logs(): try: Datastores.es.indices.delete("rcae_l*") except elasticsearch.exceptions.NotFoundError as e: log.error(e) # logs log.info("reindexing logs") i = 0 task_start = datetime.datetime.now() log_tables = detect_tables("logs_p_") for partition_table in log_tables: conn = DBSession.connection().execution_options(stream_results=True) result = conn.execute(partition_table.select()) while True: chunk = result.fetchmany(2000) if not chunk: break es_docs = defaultdict(list) for row in chunk: i += 1 item = Log(**dict(list(row.items()))) d_range = item.partition_id es_docs[d_range].append(item.es_doc()) if es_docs: name = partition_table.name log.info("round {}, {}".format(i, name)) for k, v in es_docs.items(): to_update = {"_index": k, "_type": "log"} [i.update(to_update) for i in v] elasticsearch.helpers.bulk(Datastores.es, v) log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start)) def reindex_metrics(): try: Datastores.es.indices.delete("rcae_m*") except elasticsearch.exceptions.NotFoundError as e: log.error(e) log.info("reindexing applications metrics") i = 0 task_start = datetime.datetime.now() metric_tables = detect_tables("metrics_p_") for partition_table in metric_tables: conn = DBSession.connection().execution_options(stream_results=True) result = conn.execute(partition_table.select()) while True: chunk = result.fetchmany(2000) if not chunk: break es_docs = defaultdict(list) for row in chunk: i += 1 item = Metric(**dict(list(row.items()))) d_range = item.partition_id es_docs[d_range].append(item.es_doc()) if es_docs: name = partition_table.name log.info("round {}, {}".format(i, name)) for k, v in es_docs.items(): to_update = {"_index": k, "_type": "log"} [i.update(to_update) for i in v] elasticsearch.helpers.bulk(Datastores.es, v) log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start)) def reindex_slow_calls(): try: Datastores.es.indices.delete("rcae_sc*") except elasticsearch.exceptions.NotFoundError as e: log.error(e) log.info("reindexing slow calls") i = 0 task_start = datetime.datetime.now() slow_calls_tables = detect_tables("slow_calls_p_") for partition_table in slow_calls_tables: conn = DBSession.connection().execution_options(stream_results=True) result = conn.execute(partition_table.select()) while True: chunk = result.fetchmany(2000) if not chunk: break es_docs = defaultdict(list) for row in chunk: i += 1 item = SlowCall(**dict(list(row.items()))) d_range = item.partition_id es_docs[d_range].append(item.es_doc()) if es_docs: name = partition_table.name log.info("round {}, {}".format(i, name)) for k, v in es_docs.items(): to_update = {"_index": k, "_type": "log"} [i.update(to_update) for i in v] elasticsearch.helpers.bulk(Datastores.es, v) log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start)) if __name__ == "__main__": main()