|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
# Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
|
|
|
#
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
# You may obtain a copy of the License at
|
|
|
#
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
#
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
# See the License for the specific language governing permissions and
|
|
|
# limitations under the License.
|
|
|
|
|
|
import argparse
|
|
|
import datetime
|
|
|
import logging
|
|
|
|
|
|
import sqlalchemy as sa
|
|
|
import elasticsearch.exceptions
|
|
|
import elasticsearch.helpers
|
|
|
|
|
|
from collections import defaultdict
|
|
|
from pyramid.paster import setup_logging
|
|
|
from pyramid.paster import bootstrap
|
|
|
from appenlight.models import DBSession, Datastores, metadata
|
|
|
from appenlight.lib import get_callable
|
|
|
from appenlight.models.report_group import ReportGroup
|
|
|
from appenlight.models.report import Report
|
|
|
from appenlight.models.report_stat import ReportStat
|
|
|
from appenlight.models.log import Log
|
|
|
from appenlight.models.slow_call import SlowCall
|
|
|
from appenlight.models.metric import Metric
|
|
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
tables = {
|
|
|
"slow_calls_p_": [],
|
|
|
"reports_stats_p_": [],
|
|
|
"reports_p_": [],
|
|
|
"reports_groups_p_": [],
|
|
|
"logs_p_": [],
|
|
|
"metrics_p_": [],
|
|
|
}
|
|
|
|
|
|
|
|
|
def detect_tables(table_prefix):
|
|
|
found_tables = []
|
|
|
db_tables_query = """
|
|
|
SELECT tablename FROM pg_tables WHERE tablename NOT LIKE 'pg_%' AND
|
|
|
tablename NOT LIKE 'sql_%' ORDER BY tablename ASC;"""
|
|
|
|
|
|
for table in DBSession.execute(db_tables_query).fetchall():
|
|
|
tablename = table.tablename
|
|
|
if tablename.startswith(table_prefix):
|
|
|
t = sa.Table(
|
|
|
tablename, metadata, autoload=True, autoload_with=DBSession.bind.engine
|
|
|
)
|
|
|
found_tables.append(t)
|
|
|
return found_tables
|
|
|
|
|
|
|
|
|
def main():
|
|
|
"""
|
|
|
Recreates Elasticsearch indexes
|
|
|
Performs reindex of whole db to Elasticsearch
|
|
|
|
|
|
"""
|
|
|
|
|
|
# need parser twice because we first need to load ini file
|
|
|
# bootstrap pyramid and then load plugins
|
|
|
pre_parser = argparse.ArgumentParser(
|
|
|
description="Reindex AppEnlight data", add_help=False
|
|
|
)
|
|
|
pre_parser.add_argument(
|
|
|
"-c", "--config", required=True, help="Configuration ini file of application"
|
|
|
)
|
|
|
pre_parser.add_argument("-h", "--help", help="Show help", nargs="?")
|
|
|
pre_parser.add_argument(
|
|
|
"-t", "--types", nargs="+", help="Which parts of database should get reindexed"
|
|
|
)
|
|
|
args = pre_parser.parse_args()
|
|
|
|
|
|
config_uri = args.config
|
|
|
setup_logging(config_uri)
|
|
|
log.setLevel(logging.INFO)
|
|
|
env = bootstrap(config_uri)
|
|
|
parser = argparse.ArgumentParser(description="Reindex AppEnlight data")
|
|
|
choices = {
|
|
|
"reports": "appenlight.scripts.reindex_elasticsearch:reindex_reports",
|
|
|
"logs": "appenlight.scripts.reindex_elasticsearch:reindex_logs",
|
|
|
"metrics": "appenlight.scripts.reindex_elasticsearch:reindex_metrics",
|
|
|
"slow_calls": "appenlight.scripts.reindex_elasticsearch:reindex_slow_calls",
|
|
|
"template": "appenlight.scripts.reindex_elasticsearch:update_template",
|
|
|
}
|
|
|
for k, v in env["registry"].appenlight_plugins.items():
|
|
|
if v.get("fulltext_indexer"):
|
|
|
choices[k] = v["fulltext_indexer"]
|
|
|
parser.add_argument(
|
|
|
"-t",
|
|
|
"--types",
|
|
|
nargs="*",
|
|
|
choices=["all"] + list(choices.keys()),
|
|
|
default=[],
|
|
|
help="Which parts of database should get reindexed",
|
|
|
)
|
|
|
parser.add_argument(
|
|
|
"-c", "--config", required=True, help="Configuration ini file of application"
|
|
|
)
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
if "all" in args.types:
|
|
|
args.types = list(choices.keys())
|
|
|
|
|
|
print("Selected types to reindex: {}".format(args.types))
|
|
|
|
|
|
log.info("settings {}".format(args.types))
|
|
|
|
|
|
if "template" in args.types:
|
|
|
get_callable(choices["template"])()
|
|
|
args.types.remove("template")
|
|
|
for selected in args.types:
|
|
|
get_callable(choices[selected])()
|
|
|
|
|
|
|
|
|
def update_template():
|
|
|
try:
|
|
|
Datastores.es.indices.delete_template("rcae")
|
|
|
except elasticsearch.exceptions.NotFoundError as e:
|
|
|
log.error(e)
|
|
|
log.info("updating elasticsearch template")
|
|
|
tag_templates = [
|
|
|
{
|
|
|
"values": {
|
|
|
"path_match": "tags.*",
|
|
|
"mapping": {
|
|
|
"type": "object",
|
|
|
"properties": {
|
|
|
"values": {"type": "text", "analyzer": "tag_value",
|
|
|
"fields": {
|
|
|
"keyword": {
|
|
|
"type": "keyword",
|
|
|
"ignore_above": 256
|
|
|
}
|
|
|
}},
|
|
|
"numeric_values": {"type": "float"},
|
|
|
},
|
|
|
},
|
|
|
}
|
|
|
}
|
|
|
]
|
|
|
|
|
|
template_schema = {
|
|
|
"template": "rcae_*",
|
|
|
"settings": {
|
|
|
"index": {
|
|
|
"refresh_interval": "5s",
|
|
|
"translog": {"sync_interval": "5s", "durability": "async"},
|
|
|
},
|
|
|
"number_of_shards": 5,
|
|
|
"analysis": {
|
|
|
"analyzer": {
|
|
|
"url_path": {
|
|
|
"type": "custom",
|
|
|
"char_filter": [],
|
|
|
"tokenizer": "path_hierarchy",
|
|
|
"filter": [],
|
|
|
},
|
|
|
"tag_value": {
|
|
|
"type": "custom",
|
|
|
"char_filter": [],
|
|
|
"tokenizer": "keyword",
|
|
|
"filter": ["lowercase"],
|
|
|
},
|
|
|
}
|
|
|
},
|
|
|
},
|
|
|
"mappings": {
|
|
|
"report_group": {
|
|
|
"_all": {"enabled": False},
|
|
|
"dynamic_templates": tag_templates,
|
|
|
"properties": {
|
|
|
"pg_id": {"type": "keyword", "index": True},
|
|
|
"resource_id": {"type": "integer"},
|
|
|
"priority": {"type": "integer"},
|
|
|
"error": {"type": "text", "analyzer": "simple"},
|
|
|
"read": {"type": "boolean"},
|
|
|
"occurences": {"type": "integer"},
|
|
|
"fixed": {"type": "boolean"},
|
|
|
"first_timestamp": {"type": "date"},
|
|
|
"last_timestamp": {"type": "date"},
|
|
|
"average_duration": {"type": "float"},
|
|
|
"summed_duration": {"type": "float"},
|
|
|
"public": {"type": "boolean"},
|
|
|
},
|
|
|
},
|
|
|
"report": {
|
|
|
"_all": {"enabled": False},
|
|
|
"dynamic_templates": tag_templates,
|
|
|
"properties": {
|
|
|
"pg_id": {"type": "keyword", "index": True},
|
|
|
"resource_id": {"type": "integer"},
|
|
|
"group_id": {"type": "keyword"},
|
|
|
"http_status": {"type": "integer"},
|
|
|
"ip": {"type": "keyword", "index": True},
|
|
|
"url_domain": {"type": "text", "analyzer": "simple"},
|
|
|
"url_path": {"type": "text", "analyzer": "url_path"},
|
|
|
"error": {"type": "text", "analyzer": "simple"},
|
|
|
"report_type": {"type": "integer"},
|
|
|
"start_time": {"type": "date"},
|
|
|
"request_id": {"type": "keyword", "index": True},
|
|
|
"end_time": {"type": "date"},
|
|
|
"duration": {"type": "float"},
|
|
|
"tags": {"type": "object"},
|
|
|
"tag_list": {"type": "text", "analyzer": "tag_value",
|
|
|
"fields": {
|
|
|
"keyword": {
|
|
|
"type": "keyword",
|
|
|
"ignore_above": 256
|
|
|
}
|
|
|
}},
|
|
|
"extra": {"type": "object"},
|
|
|
},
|
|
|
"_parent": {"type": "report_group"},
|
|
|
},
|
|
|
"log": {
|
|
|
"_all": {"enabled": False},
|
|
|
"dynamic_templates": tag_templates,
|
|
|
"properties": {
|
|
|
"pg_id": {"type": "keyword", "index": True},
|
|
|
"delete_hash": {"type": "keyword", "index": True},
|
|
|
"resource_id": {"type": "integer"},
|
|
|
"timestamp": {"type": "date"},
|
|
|
"permanent": {"type": "boolean"},
|
|
|
"request_id": {"type": "keyword", "index": True},
|
|
|
"log_level": {"type": "text", "analyzer": "simple"},
|
|
|
"message": {"type": "text", "analyzer": "simple"},
|
|
|
"namespace": {
|
|
|
"type": "text",
|
|
|
"fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
|
|
|
},
|
|
|
"tags": {"type": "object"},
|
|
|
"tag_list": {"type": "text", "analyzer": "tag_value",
|
|
|
"fields": {
|
|
|
"keyword": {
|
|
|
"type": "keyword",
|
|
|
"ignore_above": 256
|
|
|
}
|
|
|
}},
|
|
|
},
|
|
|
},
|
|
|
},
|
|
|
}
|
|
|
|
|
|
Datastores.es.indices.put_template("rcae", body=template_schema)
|
|
|
|
|
|
|
|
|
def reindex_reports():
|
|
|
reports_groups_tables = detect_tables("reports_groups_p_")
|
|
|
try:
|
|
|
Datastores.es.indices.delete("rcae_r*")
|
|
|
except elasticsearch.exceptions.NotFoundError as e:
|
|
|
log.error(e)
|
|
|
|
|
|
log.info("reindexing report groups")
|
|
|
i = 0
|
|
|
task_start = datetime.datetime.now()
|
|
|
for partition_table in reports_groups_tables:
|
|
|
conn = DBSession.connection().execution_options(stream_results=True)
|
|
|
result = conn.execute(partition_table.select())
|
|
|
while True:
|
|
|
chunk = result.fetchmany(2000)
|
|
|
if not chunk:
|
|
|
break
|
|
|
es_docs = defaultdict(list)
|
|
|
for row in chunk:
|
|
|
i += 1
|
|
|
item = ReportGroup(**dict(list(row.items())))
|
|
|
d_range = item.partition_id
|
|
|
es_docs[d_range].append(item.es_doc())
|
|
|
if es_docs:
|
|
|
name = partition_table.name
|
|
|
log.info("round {}, {}".format(i, name))
|
|
|
for k, v in es_docs.items():
|
|
|
to_update = {"_index": k, "_type": "report_group"}
|
|
|
[i.update(to_update) for i in v]
|
|
|
elasticsearch.helpers.bulk(Datastores.es, v)
|
|
|
|
|
|
log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
|
|
|
|
|
|
i = 0
|
|
|
log.info("reindexing reports")
|
|
|
task_start = datetime.datetime.now()
|
|
|
reports_tables = detect_tables("reports_p_")
|
|
|
for partition_table in reports_tables:
|
|
|
conn = DBSession.connection().execution_options(stream_results=True)
|
|
|
result = conn.execute(partition_table.select())
|
|
|
while True:
|
|
|
chunk = result.fetchmany(2000)
|
|
|
if not chunk:
|
|
|
break
|
|
|
es_docs = defaultdict(list)
|
|
|
for row in chunk:
|
|
|
i += 1
|
|
|
item = Report(**dict(list(row.items())))
|
|
|
d_range = item.partition_id
|
|
|
es_docs[d_range].append(item.es_doc())
|
|
|
if es_docs:
|
|
|
name = partition_table.name
|
|
|
log.info("round {}, {}".format(i, name))
|
|
|
for k, v in es_docs.items():
|
|
|
to_update = {"_index": k, "_type": "report"}
|
|
|
[i.update(to_update) for i in v]
|
|
|
elasticsearch.helpers.bulk(Datastores.es, v)
|
|
|
|
|
|
log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
|
|
|
|
|
|
log.info("reindexing reports stats")
|
|
|
i = 0
|
|
|
task_start = datetime.datetime.now()
|
|
|
reports_stats_tables = detect_tables("reports_stats_p_")
|
|
|
for partition_table in reports_stats_tables:
|
|
|
conn = DBSession.connection().execution_options(stream_results=True)
|
|
|
result = conn.execute(partition_table.select())
|
|
|
while True:
|
|
|
chunk = result.fetchmany(2000)
|
|
|
if not chunk:
|
|
|
break
|
|
|
es_docs = defaultdict(list)
|
|
|
for row in chunk:
|
|
|
rd = dict(list(row.items()))
|
|
|
# remove legacy columns
|
|
|
# TODO: remove the column later
|
|
|
rd.pop("size", None)
|
|
|
item = ReportStat(**rd)
|
|
|
i += 1
|
|
|
d_range = item.partition_id
|
|
|
es_docs[d_range].append(item.es_doc())
|
|
|
if es_docs:
|
|
|
name = partition_table.name
|
|
|
log.info("round {}, {}".format(i, name))
|
|
|
for k, v in es_docs.items():
|
|
|
to_update = {"_index": k, "_type": "log"}
|
|
|
[i.update(to_update) for i in v]
|
|
|
elasticsearch.helpers.bulk(Datastores.es, v)
|
|
|
|
|
|
log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
|
|
|
|
|
|
|
|
|
def reindex_logs():
|
|
|
try:
|
|
|
Datastores.es.indices.delete("rcae_l*")
|
|
|
except elasticsearch.exceptions.NotFoundError as e:
|
|
|
log.error(e)
|
|
|
|
|
|
# logs
|
|
|
log.info("reindexing logs")
|
|
|
i = 0
|
|
|
task_start = datetime.datetime.now()
|
|
|
log_tables = detect_tables("logs_p_")
|
|
|
for partition_table in log_tables:
|
|
|
conn = DBSession.connection().execution_options(stream_results=True)
|
|
|
result = conn.execute(partition_table.select())
|
|
|
while True:
|
|
|
chunk = result.fetchmany(2000)
|
|
|
if not chunk:
|
|
|
break
|
|
|
es_docs = defaultdict(list)
|
|
|
|
|
|
for row in chunk:
|
|
|
i += 1
|
|
|
item = Log(**dict(list(row.items())))
|
|
|
d_range = item.partition_id
|
|
|
es_docs[d_range].append(item.es_doc())
|
|
|
if es_docs:
|
|
|
name = partition_table.name
|
|
|
log.info("round {}, {}".format(i, name))
|
|
|
for k, v in es_docs.items():
|
|
|
to_update = {"_index": k, "_type": "log"}
|
|
|
[i.update(to_update) for i in v]
|
|
|
elasticsearch.helpers.bulk(Datastores.es, v)
|
|
|
|
|
|
log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
|
|
|
|
|
|
|
|
|
def reindex_metrics():
|
|
|
try:
|
|
|
Datastores.es.indices.delete("rcae_m*")
|
|
|
except elasticsearch.exceptions.NotFoundError as e:
|
|
|
log.error(e)
|
|
|
|
|
|
log.info("reindexing applications metrics")
|
|
|
i = 0
|
|
|
task_start = datetime.datetime.now()
|
|
|
metric_tables = detect_tables("metrics_p_")
|
|
|
for partition_table in metric_tables:
|
|
|
conn = DBSession.connection().execution_options(stream_results=True)
|
|
|
result = conn.execute(partition_table.select())
|
|
|
while True:
|
|
|
chunk = result.fetchmany(2000)
|
|
|
if not chunk:
|
|
|
break
|
|
|
es_docs = defaultdict(list)
|
|
|
for row in chunk:
|
|
|
i += 1
|
|
|
item = Metric(**dict(list(row.items())))
|
|
|
d_range = item.partition_id
|
|
|
es_docs[d_range].append(item.es_doc())
|
|
|
if es_docs:
|
|
|
name = partition_table.name
|
|
|
log.info("round {}, {}".format(i, name))
|
|
|
for k, v in es_docs.items():
|
|
|
to_update = {"_index": k, "_type": "log"}
|
|
|
[i.update(to_update) for i in v]
|
|
|
elasticsearch.helpers.bulk(Datastores.es, v)
|
|
|
|
|
|
log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
|
|
|
|
|
|
|
|
|
def reindex_slow_calls():
|
|
|
try:
|
|
|
Datastores.es.indices.delete("rcae_sc*")
|
|
|
except elasticsearch.exceptions.NotFoundError as e:
|
|
|
log.error(e)
|
|
|
|
|
|
log.info("reindexing slow calls")
|
|
|
i = 0
|
|
|
task_start = datetime.datetime.now()
|
|
|
slow_calls_tables = detect_tables("slow_calls_p_")
|
|
|
for partition_table in slow_calls_tables:
|
|
|
conn = DBSession.connection().execution_options(stream_results=True)
|
|
|
result = conn.execute(partition_table.select())
|
|
|
while True:
|
|
|
chunk = result.fetchmany(2000)
|
|
|
if not chunk:
|
|
|
break
|
|
|
es_docs = defaultdict(list)
|
|
|
for row in chunk:
|
|
|
i += 1
|
|
|
item = SlowCall(**dict(list(row.items())))
|
|
|
d_range = item.partition_id
|
|
|
es_docs[d_range].append(item.es_doc())
|
|
|
if es_docs:
|
|
|
name = partition_table.name
|
|
|
log.info("round {}, {}".format(i, name))
|
|
|
for k, v in es_docs.items():
|
|
|
to_update = {"_index": k, "_type": "log"}
|
|
|
[i.update(to_update) for i in v]
|
|
|
elasticsearch.helpers.bulk(Datastores.es, v)
|
|
|
|
|
|
log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|
|
|
|