# -*- coding: utf-8 -*- # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import datetime import logging import sqlalchemy as sa import elasticsearch.exceptions import elasticsearch.helpers from collections import defaultdict from pyramid.paster import setup_logging from pyramid.paster import bootstrap from appenlight.models import ( DBSession, Datastores, metadata ) from appenlight.lib import get_callable from appenlight.models.report_group import ReportGroup from appenlight.models.report import Report from appenlight.models.report_stat import ReportStat from appenlight.models.log import Log from appenlight.models.slow_call import SlowCall from appenlight.models.metric import Metric log = logging.getLogger(__name__) tables = { 'slow_calls_p_': [], 'reports_stats_p_': [], 'reports_p_': [], 'reports_groups_p_': [], 'logs_p_': [], 'metrics_p_': [], } def detect_tables(table_prefix): found_tables = [] db_tables_query = ''' SELECT tablename FROM pg_tables WHERE tablename NOT LIKE 'pg_%' AND tablename NOT LIKE 'sql_%' ORDER BY tablename ASC;''' for table in DBSession.execute(db_tables_query).fetchall(): tablename = table.tablename if tablename.startswith(table_prefix): t = sa.Table(tablename, metadata, autoload=True, autoload_with=DBSession.bind.engine) found_tables.append(t) return found_tables def main(): """ Recreates Elasticsearch indexes Performs reindex of whole db to Elasticsearch """ # need parser twice because we first need to load ini file # bootstrap pyramid and then load plugins pre_parser = argparse.ArgumentParser( description='Reindex AppEnlight data', add_help=False) pre_parser.add_argument('-c', '--config', required=True, help='Configuration ini file of application') pre_parser.add_argument('-h', '--help', help='Show help', nargs='?') pre_parser.add_argument('-t', '--types', nargs='+', help='Which parts of database should get reindexed') args = pre_parser.parse_args() config_uri = args.config setup_logging(config_uri) log.setLevel(logging.INFO) env = bootstrap(config_uri) parser = argparse.ArgumentParser(description='Reindex AppEnlight data') choices = { 'reports': 'appenlight.scripts.reindex_elasticsearch:reindex_reports', 'logs': 'appenlight.scripts.reindex_elasticsearch:reindex_logs', 'metrics': 'appenlight.scripts.reindex_elasticsearch:reindex_metrics', 'slow_calls': 'appenlight.scripts.reindex_elasticsearch:reindex_slow_calls', 'template': 'appenlight.scripts.reindex_elasticsearch:update_template' } for k, v in env['registry'].appenlight_plugins.items(): if v.get('fulltext_indexer'): choices[k] = v['fulltext_indexer'] parser.add_argument('-t', '--types', nargs='*', choices=['all'] + list(choices.keys()), default=[], help='Which parts of database should get reindexed') parser.add_argument('-c', '--config', required=True, help='Configuration ini file of application') args = parser.parse_args() if 'all' in args.types: args.types = list(choices.keys()) print("Selected types to reindex: {}".format(args.types)) log.info('settings {}'.format(args.types)) if 'template' in args.types: get_callable(choices['template'])() args.types.remove('template') for selected in args.types: get_callable(choices[selected])() def update_template(): try: Datastores.es.indices.delete_template('rcae') except elasticsearch.exceptions.NotFoundError as e: log.error(e) log.info('updating elasticsearch template') tag_templates = [ {"values": { "path_match": "tags.*", "mapping": { "type": "object", "properties": { "values": {"type": "string", "analyzer": "tag_value"}, "numeric_values": {"type": "float"} } } }} ] template_schema = { "template": "rcae_*", "settings": { "index": { "refresh_interval": "5s", "translog": {"sync_interval": "5s", "durability": "async"} }, "number_of_shards": 5, "analysis": { "analyzer": { "url_path": { "type": "custom", "char_filter": [], "tokenizer": "path_hierarchy", "filter": [] }, "tag_value": { "type": "custom", "char_filter": [], "tokenizer": "keyword", "filter": ["lowercase"] }, } }, }, "mappings": { "report_group": { "_all": {"enabled": False}, "dynamic_templates": tag_templates, "properties": { "pg_id": {"type": "string", "index": "not_analyzed"}, "resource_id": {"type": "integer"}, "priority": {"type": "integer"}, "error": {"type": "string", "analyzer": "simple"}, "read": {"type": "boolean"}, "occurences": {"type": "integer"}, "fixed": {"type": "boolean"}, "first_timestamp": {"type": "date"}, "last_timestamp": {"type": "date"}, "average_duration": {"type": "float"}, "summed_duration": {"type": "float"}, "public": {"type": "boolean"} } }, "report": { "_all": {"enabled": False}, "dynamic_templates": tag_templates, "properties": { "pg_id": {"type": "string", "index": "not_analyzed"}, "resource_id": {"type": "integer"}, "group_id": {"type": "string"}, "http_status": {"type": "integer"}, "ip": {"type": "string", "index": "not_analyzed"}, "url_domain": {"type": "string", "analyzer": "simple"}, "url_path": {"type": "string", "analyzer": "url_path"}, "error": {"type": "string", "analyzer": "simple"}, "report_type": {"type": "integer"}, "start_time": {"type": "date"}, "request_id": {"type": "string", "index": "not_analyzed"}, "end_time": {"type": "date"}, "duration": {"type": "float"}, "tags": { "type": "object" }, "tag_list": {"type": "string", "analyzer": "tag_value"}, "extra": { "type": "object" }, }, "_parent": {"type": "report_group"} }, "log": { "_all": {"enabled": False}, "dynamic_templates": tag_templates, "properties": { "pg_id": {"type": "string", "index": "not_analyzed"}, "delete_hash": {"type": "string", "index": "not_analyzed"}, "resource_id": {"type": "integer"}, "timestamp": {"type": "date"}, "permanent": {"type": "boolean"}, "request_id": {"type": "string", "index": "not_analyzed"}, "log_level": {"type": "string", "analyzer": "simple"}, "message": {"type": "string", "analyzer": "simple"}, "namespace": {"type": "string", "index": "not_analyzed"}, "tags": { "type": "object" }, "tag_list": {"type": "string", "analyzer": "tag_value"} } } } } Datastores.es.indices.put_template('rcae', body=template_schema) def reindex_reports(): reports_groups_tables = detect_tables('reports_groups_p_') try: Datastores.es.indices.delete('rcae_r*') except elasticsearch.exceptions.NotFoundError as e: log.error(e) log.info('reindexing report groups') i = 0 task_start = datetime.datetime.now() for partition_table in reports_groups_tables: conn = DBSession.connection().execution_options(stream_results=True) result = conn.execute(partition_table.select()) while True: chunk = result.fetchmany(2000) if not chunk: break es_docs = defaultdict(list) for row in chunk: i += 1 item = ReportGroup(**dict(list(row.items()))) d_range = item.partition_id es_docs[d_range].append(item.es_doc()) if es_docs: name = partition_table.name log.info('round {}, {}'.format(i, name)) for k, v in es_docs.items(): to_update = {'_index': k, '_type': 'report_group'} [i.update(to_update) for i in v] elasticsearch.helpers.bulk(Datastores.es, v) log.info( 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) i = 0 log.info('reindexing reports') task_start = datetime.datetime.now() reports_tables = detect_tables('reports_p_') for partition_table in reports_tables: conn = DBSession.connection().execution_options(stream_results=True) result = conn.execute(partition_table.select()) while True: chunk = result.fetchmany(2000) if not chunk: break es_docs = defaultdict(list) for row in chunk: i += 1 item = Report(**dict(list(row.items()))) d_range = item.partition_id es_docs[d_range].append(item.es_doc()) if es_docs: name = partition_table.name log.info('round {}, {}'.format(i, name)) for k, v in es_docs.items(): to_update = {'_index': k, '_type': 'report'} [i.update(to_update) for i in v] elasticsearch.helpers.bulk(Datastores.es, v) log.info( 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) log.info('reindexing reports stats') i = 0 task_start = datetime.datetime.now() reports_stats_tables = detect_tables('reports_stats_p_') for partition_table in reports_stats_tables: conn = DBSession.connection().execution_options(stream_results=True) result = conn.execute(partition_table.select()) while True: chunk = result.fetchmany(2000) if not chunk: break es_docs = defaultdict(list) for row in chunk: rd = dict(list(row.items())) # remove legacy columns # TODO: remove the column later rd.pop('size', None) item = ReportStat(**rd) i += 1 d_range = item.partition_id es_docs[d_range].append(item.es_doc()) if es_docs: name = partition_table.name log.info('round {}, {}'.format(i, name)) for k, v in es_docs.items(): to_update = {'_index': k, '_type': 'log'} [i.update(to_update) for i in v] elasticsearch.helpers.bulk(Datastores.es, v) log.info( 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) def reindex_logs(): try: Datastores.es.indices.delete('rcae_l*') except elasticsearch.exceptions.NotFoundError as e: log.error(e) # logs log.info('reindexing logs') i = 0 task_start = datetime.datetime.now() log_tables = detect_tables('logs_p_') for partition_table in log_tables: conn = DBSession.connection().execution_options(stream_results=True) result = conn.execute(partition_table.select()) while True: chunk = result.fetchmany(2000) if not chunk: break es_docs = defaultdict(list) for row in chunk: i += 1 item = Log(**dict(list(row.items()))) d_range = item.partition_id es_docs[d_range].append(item.es_doc()) if es_docs: name = partition_table.name log.info('round {}, {}'.format(i, name)) for k, v in es_docs.items(): to_update = {'_index': k, '_type': 'log'} [i.update(to_update) for i in v] elasticsearch.helpers.bulk(Datastores.es, v) log.info( 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) def reindex_metrics(): try: Datastores.es.indices.delete('rcae_m*') except elasticsearch.exceptions.NotFoundError as e: log.error(e) log.info('reindexing applications metrics') i = 0 task_start = datetime.datetime.now() metric_tables = detect_tables('metrics_p_') for partition_table in metric_tables: conn = DBSession.connection().execution_options(stream_results=True) result = conn.execute(partition_table.select()) while True: chunk = result.fetchmany(2000) if not chunk: break es_docs = defaultdict(list) for row in chunk: i += 1 item = Metric(**dict(list(row.items()))) d_range = item.partition_id es_docs[d_range].append(item.es_doc()) if es_docs: name = partition_table.name log.info('round {}, {}'.format(i, name)) for k, v in es_docs.items(): to_update = {'_index': k, '_type': 'log'} [i.update(to_update) for i in v] elasticsearch.helpers.bulk(Datastores.es, v) log.info( 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) def reindex_slow_calls(): try: Datastores.es.indices.delete('rcae_sc*') except elasticsearch.exceptions.NotFoundError as e: log.error(e) log.info('reindexing slow calls') i = 0 task_start = datetime.datetime.now() slow_calls_tables = detect_tables('slow_calls_p_') for partition_table in slow_calls_tables: conn = DBSession.connection().execution_options(stream_results=True) result = conn.execute(partition_table.select()) while True: chunk = result.fetchmany(2000) if not chunk: break es_docs = defaultdict(list) for row in chunk: i += 1 item = SlowCall(**dict(list(row.items()))) d_range = item.partition_id es_docs[d_range].append(item.es_doc()) if es_docs: name = partition_table.name log.info('round {}, {}'.format(i, name)) for k, v in es_docs.items(): to_update = {'_index': k, '_type': 'log'} [i.update(to_update) for i in v] elasticsearch.helpers.bulk(Datastores.es, v) log.info( 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) if __name__ == '__main__': main()