reindex_elasticsearch.py
437 lines
| 16.0 KiB
| text/x-python
|
PythonLexer
r0 | # -*- coding: utf-8 -*- | |||
r112 | # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors | |||
r0 | # | |||
r112 | # Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | ||||
# You may obtain a copy of the License at | ||||
r0 | # | |||
r112 | # http://www.apache.org/licenses/LICENSE-2.0 | |||
r0 | # | |||
r112 | # Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | ||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
# See the License for the specific language governing permissions and | ||||
# limitations under the License. | ||||
r0 | ||||
import argparse | ||||
import datetime | ||||
import logging | ||||
import sqlalchemy as sa | ||||
r151 | import elasticsearch.exceptions | |||
import elasticsearch.helpers | ||||
r0 | from collections import defaultdict | |||
from pyramid.paster import setup_logging | ||||
from pyramid.paster import bootstrap | ||||
r153 | from appenlight.models import DBSession, Datastores, metadata | |||
r0 | from appenlight.lib import get_callable | |||
from appenlight.models.report_group import ReportGroup | ||||
from appenlight.models.report import Report | ||||
from appenlight.models.report_stat import ReportStat | ||||
from appenlight.models.log import Log | ||||
from appenlight.models.slow_call import SlowCall | ||||
r49 | from appenlight.models.metric import Metric | |||
r0 | ||||
log = logging.getLogger(__name__) | ||||
tables = { | ||||
r153 | "slow_calls_p_": [], | |||
"reports_stats_p_": [], | ||||
"reports_p_": [], | ||||
"reports_groups_p_": [], | ||||
"logs_p_": [], | ||||
"metrics_p_": [], | ||||
r0 | } | |||
r153 | ||||
r0 | def detect_tables(table_prefix): | |||
found_tables = [] | ||||
r153 | db_tables_query = """ | |||
r0 | SELECT tablename FROM pg_tables WHERE tablename NOT LIKE 'pg_%' AND | |||
r153 | tablename NOT LIKE 'sql_%' ORDER BY tablename ASC;""" | |||
r0 | ||||
for table in DBSession.execute(db_tables_query).fetchall(): | ||||
tablename = table.tablename | ||||
if tablename.startswith(table_prefix): | ||||
r153 | t = sa.Table( | |||
tablename, metadata, autoload=True, autoload_with=DBSession.bind.engine | ||||
) | ||||
r0 | found_tables.append(t) | |||
return found_tables | ||||
def main(): | ||||
""" | ||||
Recreates Elasticsearch indexes | ||||
Performs reindex of whole db to Elasticsearch | ||||
""" | ||||
# need parser twice because we first need to load ini file | ||||
# bootstrap pyramid and then load plugins | ||||
pre_parser = argparse.ArgumentParser( | ||||
r153 | description="Reindex AppEnlight data", add_help=False | |||
) | ||||
pre_parser.add_argument( | ||||
"-c", "--config", required=True, help="Configuration ini file of application" | ||||
) | ||||
pre_parser.add_argument("-h", "--help", help="Show help", nargs="?") | ||||
pre_parser.add_argument( | ||||
"-t", "--types", nargs="+", help="Which parts of database should get reindexed" | ||||
) | ||||
r0 | args = pre_parser.parse_args() | |||
config_uri = args.config | ||||
setup_logging(config_uri) | ||||
log.setLevel(logging.INFO) | ||||
env = bootstrap(config_uri) | ||||
r153 | parser = argparse.ArgumentParser(description="Reindex AppEnlight data") | |||
r0 | choices = { | |||
r153 | "reports": "appenlight.scripts.reindex_elasticsearch:reindex_reports", | |||
"logs": "appenlight.scripts.reindex_elasticsearch:reindex_logs", | ||||
"metrics": "appenlight.scripts.reindex_elasticsearch:reindex_metrics", | ||||
"slow_calls": "appenlight.scripts.reindex_elasticsearch:reindex_slow_calls", | ||||
"template": "appenlight.scripts.reindex_elasticsearch:update_template", | ||||
r0 | } | |||
r153 | for k, v in env["registry"].appenlight_plugins.items(): | |||
if v.get("fulltext_indexer"): | ||||
choices[k] = v["fulltext_indexer"] | ||||
parser.add_argument( | ||||
"-t", | ||||
"--types", | ||||
nargs="*", | ||||
choices=["all"] + list(choices.keys()), | ||||
default=[], | ||||
help="Which parts of database should get reindexed", | ||||
) | ||||
parser.add_argument( | ||||
"-c", "--config", required=True, help="Configuration ini file of application" | ||||
) | ||||
r0 | args = parser.parse_args() | |||
r153 | if "all" in args.types: | |||
r0 | args.types = list(choices.keys()) | |||
r151 | print("Selected types to reindex: {}".format(args.types)) | |||
r153 | log.info("settings {}".format(args.types)) | |||
r0 | ||||
r153 | if "template" in args.types: | |||
get_callable(choices["template"])() | ||||
args.types.remove("template") | ||||
r0 | for selected in args.types: | |||
get_callable(choices[selected])() | ||||
def update_template(): | ||||
try: | ||||
r153 | Datastores.es.indices.delete_template("rcae") | |||
r151 | except elasticsearch.exceptions.NotFoundError as e: | |||
log.error(e) | ||||
r153 | log.info("updating elasticsearch template") | |||
r0 | tag_templates = [ | |||
r153 | { | |||
"values": { | ||||
"path_match": "tags.*", | ||||
"mapping": { | ||||
"type": "object", | ||||
"properties": { | ||||
"values": {"type": "string", "analyzer": "tag_value"}, | ||||
"numeric_values": {"type": "float"}, | ||||
}, | ||||
}, | ||||
r0 | } | |||
r153 | } | |||
r0 | ] | |||
template_schema = { | ||||
"template": "rcae_*", | ||||
"settings": { | ||||
"index": { | ||||
"refresh_interval": "5s", | ||||
r153 | "translog": {"sync_interval": "5s", "durability": "async"}, | |||
r0 | }, | |||
"number_of_shards": 5, | ||||
"analysis": { | ||||
"analyzer": { | ||||
"url_path": { | ||||
"type": "custom", | ||||
"char_filter": [], | ||||
"tokenizer": "path_hierarchy", | ||||
r153 | "filter": [], | |||
r0 | }, | |||
"tag_value": { | ||||
"type": "custom", | ||||
"char_filter": [], | ||||
"tokenizer": "keyword", | ||||
r153 | "filter": ["lowercase"], | |||
r0 | }, | |||
} | ||||
}, | ||||
}, | ||||
"mappings": { | ||||
"report_group": { | ||||
"_all": {"enabled": False}, | ||||
"dynamic_templates": tag_templates, | ||||
"properties": { | ||||
"pg_id": {"type": "string", "index": "not_analyzed"}, | ||||
"resource_id": {"type": "integer"}, | ||||
"priority": {"type": "integer"}, | ||||
"error": {"type": "string", "analyzer": "simple"}, | ||||
"read": {"type": "boolean"}, | ||||
"occurences": {"type": "integer"}, | ||||
"fixed": {"type": "boolean"}, | ||||
"first_timestamp": {"type": "date"}, | ||||
"last_timestamp": {"type": "date"}, | ||||
"average_duration": {"type": "float"}, | ||||
"summed_duration": {"type": "float"}, | ||||
r153 | "public": {"type": "boolean"}, | |||
}, | ||||
r0 | }, | |||
"report": { | ||||
"_all": {"enabled": False}, | ||||
"dynamic_templates": tag_templates, | ||||
"properties": { | ||||
"pg_id": {"type": "string", "index": "not_analyzed"}, | ||||
"resource_id": {"type": "integer"}, | ||||
"group_id": {"type": "string"}, | ||||
"http_status": {"type": "integer"}, | ||||
"ip": {"type": "string", "index": "not_analyzed"}, | ||||
"url_domain": {"type": "string", "analyzer": "simple"}, | ||||
"url_path": {"type": "string", "analyzer": "url_path"}, | ||||
"error": {"type": "string", "analyzer": "simple"}, | ||||
"report_type": {"type": "integer"}, | ||||
"start_time": {"type": "date"}, | ||||
"request_id": {"type": "string", "index": "not_analyzed"}, | ||||
"end_time": {"type": "date"}, | ||||
"duration": {"type": "float"}, | ||||
r153 | "tags": {"type": "object"}, | |||
r0 | "tag_list": {"type": "string", "analyzer": "tag_value"}, | |||
r153 | "extra": {"type": "object"}, | |||
r0 | }, | |||
r153 | "_parent": {"type": "report_group"}, | |||
r0 | }, | |||
"log": { | ||||
"_all": {"enabled": False}, | ||||
"dynamic_templates": tag_templates, | ||||
"properties": { | ||||
"pg_id": {"type": "string", "index": "not_analyzed"}, | ||||
"delete_hash": {"type": "string", "index": "not_analyzed"}, | ||||
"resource_id": {"type": "integer"}, | ||||
"timestamp": {"type": "date"}, | ||||
"permanent": {"type": "boolean"}, | ||||
"request_id": {"type": "string", "index": "not_analyzed"}, | ||||
"log_level": {"type": "string", "analyzer": "simple"}, | ||||
"message": {"type": "string", "analyzer": "simple"}, | ||||
"namespace": {"type": "string", "index": "not_analyzed"}, | ||||
r153 | "tags": {"type": "object"}, | |||
"tag_list": {"type": "string", "analyzer": "tag_value"}, | ||||
}, | ||||
}, | ||||
}, | ||||
r0 | } | |||
r153 | Datastores.es.indices.put_template("rcae", body=template_schema) | |||
r0 | ||||
def reindex_reports(): | ||||
r153 | reports_groups_tables = detect_tables("reports_groups_p_") | |||
r0 | try: | |||
r153 | Datastores.es.indices.delete("rcae_r*") | |||
r151 | except elasticsearch.exceptions.NotFoundError as e: | |||
r0 | log.error(e) | |||
r153 | log.info("reindexing report groups") | |||
r0 | i = 0 | |||
task_start = datetime.datetime.now() | ||||
for partition_table in reports_groups_tables: | ||||
conn = DBSession.connection().execution_options(stream_results=True) | ||||
result = conn.execute(partition_table.select()) | ||||
while True: | ||||
chunk = result.fetchmany(2000) | ||||
if not chunk: | ||||
break | ||||
es_docs = defaultdict(list) | ||||
for row in chunk: | ||||
i += 1 | ||||
item = ReportGroup(**dict(list(row.items()))) | ||||
d_range = item.partition_id | ||||
es_docs[d_range].append(item.es_doc()) | ||||
if es_docs: | ||||
name = partition_table.name | ||||
r153 | log.info("round {}, {}".format(i, name)) | |||
r0 | for k, v in es_docs.items(): | |||
r153 | to_update = {"_index": k, "_type": "report_group"} | |||
r151 | [i.update(to_update) for i in v] | |||
elasticsearch.helpers.bulk(Datastores.es, v) | ||||
r0 | ||||
r153 | log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start)) | |||
r0 | ||||
i = 0 | ||||
r153 | log.info("reindexing reports") | |||
r0 | task_start = datetime.datetime.now() | |||
r153 | reports_tables = detect_tables("reports_p_") | |||
r0 | for partition_table in reports_tables: | |||
conn = DBSession.connection().execution_options(stream_results=True) | ||||
result = conn.execute(partition_table.select()) | ||||
while True: | ||||
chunk = result.fetchmany(2000) | ||||
if not chunk: | ||||
break | ||||
es_docs = defaultdict(list) | ||||
for row in chunk: | ||||
i += 1 | ||||
item = Report(**dict(list(row.items()))) | ||||
d_range = item.partition_id | ||||
es_docs[d_range].append(item.es_doc()) | ||||
if es_docs: | ||||
name = partition_table.name | ||||
r153 | log.info("round {}, {}".format(i, name)) | |||
r0 | for k, v in es_docs.items(): | |||
r153 | to_update = {"_index": k, "_type": "report"} | |||
r151 | [i.update(to_update) for i in v] | |||
elasticsearch.helpers.bulk(Datastores.es, v) | ||||
r0 | ||||
r153 | log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start)) | |||
r0 | ||||
r153 | log.info("reindexing reports stats") | |||
r0 | i = 0 | |||
task_start = datetime.datetime.now() | ||||
r153 | reports_stats_tables = detect_tables("reports_stats_p_") | |||
r0 | for partition_table in reports_stats_tables: | |||
conn = DBSession.connection().execution_options(stream_results=True) | ||||
result = conn.execute(partition_table.select()) | ||||
while True: | ||||
chunk = result.fetchmany(2000) | ||||
if not chunk: | ||||
break | ||||
es_docs = defaultdict(list) | ||||
for row in chunk: | ||||
rd = dict(list(row.items())) | ||||
# remove legacy columns | ||||
# TODO: remove the column later | ||||
r153 | rd.pop("size", None) | |||
r0 | item = ReportStat(**rd) | |||
i += 1 | ||||
d_range = item.partition_id | ||||
es_docs[d_range].append(item.es_doc()) | ||||
if es_docs: | ||||
name = partition_table.name | ||||
r153 | log.info("round {}, {}".format(i, name)) | |||
r0 | for k, v in es_docs.items(): | |||
r153 | to_update = {"_index": k, "_type": "log"} | |||
r151 | [i.update(to_update) for i in v] | |||
elasticsearch.helpers.bulk(Datastores.es, v) | ||||
r0 | ||||
r153 | log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start)) | |||
r0 | ||||
def reindex_logs(): | ||||
try: | ||||
r153 | Datastores.es.indices.delete("rcae_l*") | |||
r151 | except elasticsearch.exceptions.NotFoundError as e: | |||
r0 | log.error(e) | |||
# logs | ||||
r153 | log.info("reindexing logs") | |||
r0 | i = 0 | |||
task_start = datetime.datetime.now() | ||||
r153 | log_tables = detect_tables("logs_p_") | |||
r0 | for partition_table in log_tables: | |||
conn = DBSession.connection().execution_options(stream_results=True) | ||||
result = conn.execute(partition_table.select()) | ||||
while True: | ||||
chunk = result.fetchmany(2000) | ||||
if not chunk: | ||||
break | ||||
es_docs = defaultdict(list) | ||||
for row in chunk: | ||||
i += 1 | ||||
item = Log(**dict(list(row.items()))) | ||||
d_range = item.partition_id | ||||
es_docs[d_range].append(item.es_doc()) | ||||
if es_docs: | ||||
name = partition_table.name | ||||
r153 | log.info("round {}, {}".format(i, name)) | |||
r0 | for k, v in es_docs.items(): | |||
r153 | to_update = {"_index": k, "_type": "log"} | |||
r151 | [i.update(to_update) for i in v] | |||
elasticsearch.helpers.bulk(Datastores.es, v) | ||||
r0 | ||||
r153 | log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start)) | |||
r0 | ||||
def reindex_metrics(): | ||||
try: | ||||
r153 | Datastores.es.indices.delete("rcae_m*") | |||
r151 | except elasticsearch.exceptions.NotFoundError as e: | |||
log.error(e) | ||||
r0 | ||||
r153 | log.info("reindexing applications metrics") | |||
r0 | i = 0 | |||
task_start = datetime.datetime.now() | ||||
r153 | metric_tables = detect_tables("metrics_p_") | |||
r0 | for partition_table in metric_tables: | |||
conn = DBSession.connection().execution_options(stream_results=True) | ||||
result = conn.execute(partition_table.select()) | ||||
while True: | ||||
chunk = result.fetchmany(2000) | ||||
if not chunk: | ||||
break | ||||
es_docs = defaultdict(list) | ||||
for row in chunk: | ||||
i += 1 | ||||
item = Metric(**dict(list(row.items()))) | ||||
d_range = item.partition_id | ||||
es_docs[d_range].append(item.es_doc()) | ||||
if es_docs: | ||||
name = partition_table.name | ||||
r153 | log.info("round {}, {}".format(i, name)) | |||
r0 | for k, v in es_docs.items(): | |||
r153 | to_update = {"_index": k, "_type": "log"} | |||
r151 | [i.update(to_update) for i in v] | |||
elasticsearch.helpers.bulk(Datastores.es, v) | ||||
r0 | ||||
r153 | log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start)) | |||
r0 | ||||
def reindex_slow_calls(): | ||||
try: | ||||
r153 | Datastores.es.indices.delete("rcae_sc*") | |||
r151 | except elasticsearch.exceptions.NotFoundError as e: | |||
log.error(e) | ||||
r0 | ||||
r153 | log.info("reindexing slow calls") | |||
r0 | i = 0 | |||
task_start = datetime.datetime.now() | ||||
r153 | slow_calls_tables = detect_tables("slow_calls_p_") | |||
r0 | for partition_table in slow_calls_tables: | |||
conn = DBSession.connection().execution_options(stream_results=True) | ||||
result = conn.execute(partition_table.select()) | ||||
while True: | ||||
chunk = result.fetchmany(2000) | ||||
if not chunk: | ||||
break | ||||
es_docs = defaultdict(list) | ||||
for row in chunk: | ||||
i += 1 | ||||
item = SlowCall(**dict(list(row.items()))) | ||||
d_range = item.partition_id | ||||
es_docs[d_range].append(item.es_doc()) | ||||
if es_docs: | ||||
name = partition_table.name | ||||
r153 | log.info("round {}, {}".format(i, name)) | |||
r0 | for k, v in es_docs.items(): | |||
r153 | to_update = {"_index": k, "_type": "log"} | |||
r151 | [i.update(to_update) for i in v] | |||
elasticsearch.helpers.bulk(Datastores.es, v) | ||||
r0 | ||||
r153 | log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start)) | |||
r0 | ||||
r153 | if __name__ == "__main__": | |||
r0 | main() | |||