##// END OF EJS Templates
black: reformat source
black: reformat source

File last commit:

r153:32f4b641
r153:32f4b641
Show More
reindex_elasticsearch.py
437 lines | 16.0 KiB | text/x-python | PythonLexer
/ backend / src / appenlight / scripts / reindex_elasticsearch.py
project: initial commit
r0 # -*- coding: utf-8 -*-
license: change the license to Apache 2.0
r112 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
project: initial commit
r0 #
license: change the license to Apache 2.0
r112 # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
project: initial commit
r0 #
license: change the license to Apache 2.0
r112 # http://www.apache.org/licenses/LICENSE-2.0
project: initial commit
r0 #
license: change the license to Apache 2.0
r112 # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
project: initial commit
r0
import argparse
import datetime
import logging
import sqlalchemy as sa
elasticsearch: replace pyelasticsearch with elasticsearch
r151 import elasticsearch.exceptions
import elasticsearch.helpers
project: initial commit
r0 from collections import defaultdict
from pyramid.paster import setup_logging
from pyramid.paster import bootstrap
black: reformat source
r153 from appenlight.models import DBSession, Datastores, metadata
project: initial commit
r0 from appenlight.lib import get_callable
from appenlight.models.report_group import ReportGroup
from appenlight.models.report import Report
from appenlight.models.report_stat import ReportStat
from appenlight.models.log import Log
from appenlight.models.slow_call import SlowCall
metrics: rename RequestMetric module to Metric
r49 from appenlight.models.metric import Metric
project: initial commit
r0
log = logging.getLogger(__name__)
tables = {
black: reformat source
r153 "slow_calls_p_": [],
"reports_stats_p_": [],
"reports_p_": [],
"reports_groups_p_": [],
"logs_p_": [],
"metrics_p_": [],
project: initial commit
r0 }
black: reformat source
r153
project: initial commit
r0 def detect_tables(table_prefix):
found_tables = []
black: reformat source
r153 db_tables_query = """
project: initial commit
r0 SELECT tablename FROM pg_tables WHERE tablename NOT LIKE 'pg_%' AND
black: reformat source
r153 tablename NOT LIKE 'sql_%' ORDER BY tablename ASC;"""
project: initial commit
r0
for table in DBSession.execute(db_tables_query).fetchall():
tablename = table.tablename
if tablename.startswith(table_prefix):
black: reformat source
r153 t = sa.Table(
tablename, metadata, autoload=True, autoload_with=DBSession.bind.engine
)
project: initial commit
r0 found_tables.append(t)
return found_tables
def main():
"""
Recreates Elasticsearch indexes
Performs reindex of whole db to Elasticsearch
"""
# need parser twice because we first need to load ini file
# bootstrap pyramid and then load plugins
pre_parser = argparse.ArgumentParser(
black: reformat source
r153 description="Reindex AppEnlight data", add_help=False
)
pre_parser.add_argument(
"-c", "--config", required=True, help="Configuration ini file of application"
)
pre_parser.add_argument("-h", "--help", help="Show help", nargs="?")
pre_parser.add_argument(
"-t", "--types", nargs="+", help="Which parts of database should get reindexed"
)
project: initial commit
r0 args = pre_parser.parse_args()
config_uri = args.config
setup_logging(config_uri)
log.setLevel(logging.INFO)
env = bootstrap(config_uri)
black: reformat source
r153 parser = argparse.ArgumentParser(description="Reindex AppEnlight data")
project: initial commit
r0 choices = {
black: reformat source
r153 "reports": "appenlight.scripts.reindex_elasticsearch:reindex_reports",
"logs": "appenlight.scripts.reindex_elasticsearch:reindex_logs",
"metrics": "appenlight.scripts.reindex_elasticsearch:reindex_metrics",
"slow_calls": "appenlight.scripts.reindex_elasticsearch:reindex_slow_calls",
"template": "appenlight.scripts.reindex_elasticsearch:update_template",
project: initial commit
r0 }
black: reformat source
r153 for k, v in env["registry"].appenlight_plugins.items():
if v.get("fulltext_indexer"):
choices[k] = v["fulltext_indexer"]
parser.add_argument(
"-t",
"--types",
nargs="*",
choices=["all"] + list(choices.keys()),
default=[],
help="Which parts of database should get reindexed",
)
parser.add_argument(
"-c", "--config", required=True, help="Configuration ini file of application"
)
project: initial commit
r0 args = parser.parse_args()
black: reformat source
r153 if "all" in args.types:
project: initial commit
r0 args.types = list(choices.keys())
elasticsearch: replace pyelasticsearch with elasticsearch
r151 print("Selected types to reindex: {}".format(args.types))
black: reformat source
r153 log.info("settings {}".format(args.types))
project: initial commit
r0
black: reformat source
r153 if "template" in args.types:
get_callable(choices["template"])()
args.types.remove("template")
project: initial commit
r0 for selected in args.types:
get_callable(choices[selected])()
def update_template():
try:
black: reformat source
r153 Datastores.es.indices.delete_template("rcae")
elasticsearch: replace pyelasticsearch with elasticsearch
r151 except elasticsearch.exceptions.NotFoundError as e:
log.error(e)
black: reformat source
r153 log.info("updating elasticsearch template")
project: initial commit
r0 tag_templates = [
black: reformat source
r153 {
"values": {
"path_match": "tags.*",
"mapping": {
"type": "object",
"properties": {
"values": {"type": "string", "analyzer": "tag_value"},
"numeric_values": {"type": "float"},
},
},
project: initial commit
r0 }
black: reformat source
r153 }
project: initial commit
r0 ]
template_schema = {
"template": "rcae_*",
"settings": {
"index": {
"refresh_interval": "5s",
black: reformat source
r153 "translog": {"sync_interval": "5s", "durability": "async"},
project: initial commit
r0 },
"number_of_shards": 5,
"analysis": {
"analyzer": {
"url_path": {
"type": "custom",
"char_filter": [],
"tokenizer": "path_hierarchy",
black: reformat source
r153 "filter": [],
project: initial commit
r0 },
"tag_value": {
"type": "custom",
"char_filter": [],
"tokenizer": "keyword",
black: reformat source
r153 "filter": ["lowercase"],
project: initial commit
r0 },
}
},
},
"mappings": {
"report_group": {
"_all": {"enabled": False},
"dynamic_templates": tag_templates,
"properties": {
"pg_id": {"type": "string", "index": "not_analyzed"},
"resource_id": {"type": "integer"},
"priority": {"type": "integer"},
"error": {"type": "string", "analyzer": "simple"},
"read": {"type": "boolean"},
"occurences": {"type": "integer"},
"fixed": {"type": "boolean"},
"first_timestamp": {"type": "date"},
"last_timestamp": {"type": "date"},
"average_duration": {"type": "float"},
"summed_duration": {"type": "float"},
black: reformat source
r153 "public": {"type": "boolean"},
},
project: initial commit
r0 },
"report": {
"_all": {"enabled": False},
"dynamic_templates": tag_templates,
"properties": {
"pg_id": {"type": "string", "index": "not_analyzed"},
"resource_id": {"type": "integer"},
"group_id": {"type": "string"},
"http_status": {"type": "integer"},
"ip": {"type": "string", "index": "not_analyzed"},
"url_domain": {"type": "string", "analyzer": "simple"},
"url_path": {"type": "string", "analyzer": "url_path"},
"error": {"type": "string", "analyzer": "simple"},
"report_type": {"type": "integer"},
"start_time": {"type": "date"},
"request_id": {"type": "string", "index": "not_analyzed"},
"end_time": {"type": "date"},
"duration": {"type": "float"},
black: reformat source
r153 "tags": {"type": "object"},
project: initial commit
r0 "tag_list": {"type": "string", "analyzer": "tag_value"},
black: reformat source
r153 "extra": {"type": "object"},
project: initial commit
r0 },
black: reformat source
r153 "_parent": {"type": "report_group"},
project: initial commit
r0 },
"log": {
"_all": {"enabled": False},
"dynamic_templates": tag_templates,
"properties": {
"pg_id": {"type": "string", "index": "not_analyzed"},
"delete_hash": {"type": "string", "index": "not_analyzed"},
"resource_id": {"type": "integer"},
"timestamp": {"type": "date"},
"permanent": {"type": "boolean"},
"request_id": {"type": "string", "index": "not_analyzed"},
"log_level": {"type": "string", "analyzer": "simple"},
"message": {"type": "string", "analyzer": "simple"},
"namespace": {"type": "string", "index": "not_analyzed"},
black: reformat source
r153 "tags": {"type": "object"},
"tag_list": {"type": "string", "analyzer": "tag_value"},
},
},
},
project: initial commit
r0 }
black: reformat source
r153 Datastores.es.indices.put_template("rcae", body=template_schema)
project: initial commit
r0
def reindex_reports():
black: reformat source
r153 reports_groups_tables = detect_tables("reports_groups_p_")
project: initial commit
r0 try:
black: reformat source
r153 Datastores.es.indices.delete("rcae_r*")
elasticsearch: replace pyelasticsearch with elasticsearch
r151 except elasticsearch.exceptions.NotFoundError as e:
project: initial commit
r0 log.error(e)
black: reformat source
r153 log.info("reindexing report groups")
project: initial commit
r0 i = 0
task_start = datetime.datetime.now()
for partition_table in reports_groups_tables:
conn = DBSession.connection().execution_options(stream_results=True)
result = conn.execute(partition_table.select())
while True:
chunk = result.fetchmany(2000)
if not chunk:
break
es_docs = defaultdict(list)
for row in chunk:
i += 1
item = ReportGroup(**dict(list(row.items())))
d_range = item.partition_id
es_docs[d_range].append(item.es_doc())
if es_docs:
name = partition_table.name
black: reformat source
r153 log.info("round {}, {}".format(i, name))
project: initial commit
r0 for k, v in es_docs.items():
black: reformat source
r153 to_update = {"_index": k, "_type": "report_group"}
elasticsearch: replace pyelasticsearch with elasticsearch
r151 [i.update(to_update) for i in v]
elasticsearch.helpers.bulk(Datastores.es, v)
project: initial commit
r0
black: reformat source
r153 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
project: initial commit
r0
i = 0
black: reformat source
r153 log.info("reindexing reports")
project: initial commit
r0 task_start = datetime.datetime.now()
black: reformat source
r153 reports_tables = detect_tables("reports_p_")
project: initial commit
r0 for partition_table in reports_tables:
conn = DBSession.connection().execution_options(stream_results=True)
result = conn.execute(partition_table.select())
while True:
chunk = result.fetchmany(2000)
if not chunk:
break
es_docs = defaultdict(list)
for row in chunk:
i += 1
item = Report(**dict(list(row.items())))
d_range = item.partition_id
es_docs[d_range].append(item.es_doc())
if es_docs:
name = partition_table.name
black: reformat source
r153 log.info("round {}, {}".format(i, name))
project: initial commit
r0 for k, v in es_docs.items():
black: reformat source
r153 to_update = {"_index": k, "_type": "report"}
elasticsearch: replace pyelasticsearch with elasticsearch
r151 [i.update(to_update) for i in v]
elasticsearch.helpers.bulk(Datastores.es, v)
project: initial commit
r0
black: reformat source
r153 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
project: initial commit
r0
black: reformat source
r153 log.info("reindexing reports stats")
project: initial commit
r0 i = 0
task_start = datetime.datetime.now()
black: reformat source
r153 reports_stats_tables = detect_tables("reports_stats_p_")
project: initial commit
r0 for partition_table in reports_stats_tables:
conn = DBSession.connection().execution_options(stream_results=True)
result = conn.execute(partition_table.select())
while True:
chunk = result.fetchmany(2000)
if not chunk:
break
es_docs = defaultdict(list)
for row in chunk:
rd = dict(list(row.items()))
# remove legacy columns
# TODO: remove the column later
black: reformat source
r153 rd.pop("size", None)
project: initial commit
r0 item = ReportStat(**rd)
i += 1
d_range = item.partition_id
es_docs[d_range].append(item.es_doc())
if es_docs:
name = partition_table.name
black: reformat source
r153 log.info("round {}, {}".format(i, name))
project: initial commit
r0 for k, v in es_docs.items():
black: reformat source
r153 to_update = {"_index": k, "_type": "log"}
elasticsearch: replace pyelasticsearch with elasticsearch
r151 [i.update(to_update) for i in v]
elasticsearch.helpers.bulk(Datastores.es, v)
project: initial commit
r0
black: reformat source
r153 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
project: initial commit
r0
def reindex_logs():
try:
black: reformat source
r153 Datastores.es.indices.delete("rcae_l*")
elasticsearch: replace pyelasticsearch with elasticsearch
r151 except elasticsearch.exceptions.NotFoundError as e:
project: initial commit
r0 log.error(e)
# logs
black: reformat source
r153 log.info("reindexing logs")
project: initial commit
r0 i = 0
task_start = datetime.datetime.now()
black: reformat source
r153 log_tables = detect_tables("logs_p_")
project: initial commit
r0 for partition_table in log_tables:
conn = DBSession.connection().execution_options(stream_results=True)
result = conn.execute(partition_table.select())
while True:
chunk = result.fetchmany(2000)
if not chunk:
break
es_docs = defaultdict(list)
for row in chunk:
i += 1
item = Log(**dict(list(row.items())))
d_range = item.partition_id
es_docs[d_range].append(item.es_doc())
if es_docs:
name = partition_table.name
black: reformat source
r153 log.info("round {}, {}".format(i, name))
project: initial commit
r0 for k, v in es_docs.items():
black: reformat source
r153 to_update = {"_index": k, "_type": "log"}
elasticsearch: replace pyelasticsearch with elasticsearch
r151 [i.update(to_update) for i in v]
elasticsearch.helpers.bulk(Datastores.es, v)
project: initial commit
r0
black: reformat source
r153 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
project: initial commit
r0
def reindex_metrics():
try:
black: reformat source
r153 Datastores.es.indices.delete("rcae_m*")
elasticsearch: replace pyelasticsearch with elasticsearch
r151 except elasticsearch.exceptions.NotFoundError as e:
log.error(e)
project: initial commit
r0
black: reformat source
r153 log.info("reindexing applications metrics")
project: initial commit
r0 i = 0
task_start = datetime.datetime.now()
black: reformat source
r153 metric_tables = detect_tables("metrics_p_")
project: initial commit
r0 for partition_table in metric_tables:
conn = DBSession.connection().execution_options(stream_results=True)
result = conn.execute(partition_table.select())
while True:
chunk = result.fetchmany(2000)
if not chunk:
break
es_docs = defaultdict(list)
for row in chunk:
i += 1
item = Metric(**dict(list(row.items())))
d_range = item.partition_id
es_docs[d_range].append(item.es_doc())
if es_docs:
name = partition_table.name
black: reformat source
r153 log.info("round {}, {}".format(i, name))
project: initial commit
r0 for k, v in es_docs.items():
black: reformat source
r153 to_update = {"_index": k, "_type": "log"}
elasticsearch: replace pyelasticsearch with elasticsearch
r151 [i.update(to_update) for i in v]
elasticsearch.helpers.bulk(Datastores.es, v)
project: initial commit
r0
black: reformat source
r153 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
project: initial commit
r0
def reindex_slow_calls():
try:
black: reformat source
r153 Datastores.es.indices.delete("rcae_sc*")
elasticsearch: replace pyelasticsearch with elasticsearch
r151 except elasticsearch.exceptions.NotFoundError as e:
log.error(e)
project: initial commit
r0
black: reformat source
r153 log.info("reindexing slow calls")
project: initial commit
r0 i = 0
task_start = datetime.datetime.now()
black: reformat source
r153 slow_calls_tables = detect_tables("slow_calls_p_")
project: initial commit
r0 for partition_table in slow_calls_tables:
conn = DBSession.connection().execution_options(stream_results=True)
result = conn.execute(partition_table.select())
while True:
chunk = result.fetchmany(2000)
if not chunk:
break
es_docs = defaultdict(list)
for row in chunk:
i += 1
item = SlowCall(**dict(list(row.items())))
d_range = item.partition_id
es_docs[d_range].append(item.es_doc())
if es_docs:
name = partition_table.name
black: reformat source
r153 log.info("round {}, {}".format(i, name))
project: initial commit
r0 for k, v in es_docs.items():
black: reformat source
r153 to_update = {"_index": k, "_type": "log"}
elasticsearch: replace pyelasticsearch with elasticsearch
r151 [i.update(to_update) for i in v]
elasticsearch.helpers.bulk(Datastores.es, v)
project: initial commit
r0
black: reformat source
r153 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
project: initial commit
r0
black: reformat source
r153 if __name__ == "__main__":
project: initial commit
r0 main()