From 80739827c4209f4276081c4a399365566cd80d95 2019-03-17 10:29:32 From: Marcin Lulek Date: 2019-03-17 10:29:32 Subject: [PATCH] elasticsearch: replace pyelasticsearch with elasticsearch --- diff --git a/backend/requirements.txt b/backend/requirements.txt index 6705750..6c1aac2 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -36,7 +36,7 @@ pygments==2.1.3 lxml==4.3.2 paginate==0.5.4 paginate-sqlalchemy==0.2.0 -pyelasticsearch==1.4 +elasticsearch>=2.0.0,<3.0.0 six>=1.10.0 mock==1.0.1 itsdangerous==1.1.0 diff --git a/backend/src/appenlight/__init__.py b/backend/src/appenlight/__init__.py index fe08b01..0e0700e 100644 --- a/backend/src/appenlight/__init__.py +++ b/backend/src/appenlight/__init__.py @@ -16,7 +16,7 @@ import datetime import logging -import pyelasticsearch +from elasticsearch import Elasticsearch import redis import os import pkg_resources @@ -150,7 +150,7 @@ def main(global_config, **settings): redis_url = settings['redis.url'] log.warning('Elasticsearch server list: {}'.format(es_server_list)) log.warning('Redis server: {}'.format(redis_url)) - config.registry.es_conn = pyelasticsearch.ElasticSearch(es_server_list) + config.registry.es_conn = Elasticsearch(es_server_list) config.registry.redis_conn = redis.StrictRedis.from_url(redis_url) config.registry.redis_lockmgr = Redlock([settings['redis.redlock.url'], ], diff --git a/backend/src/appenlight/celery/tasks.py b/backend/src/appenlight/celery/tasks.py index c44cb98..a804224 100644 --- a/backend/src/appenlight/celery/tasks.py +++ b/backend/src/appenlight/celery/tasks.py @@ -20,7 +20,8 @@ import math from datetime import datetime, timedelta import sqlalchemy as sa -import pyelasticsearch +import elasticsearch.exceptions +import elasticsearch.helpers from celery.utils.log import get_task_logger from zope.sqlalchemy import mark_changed @@ -226,22 +227,29 @@ def add_reports(resource_id, request_params, dataset, **kwargs): @celery.task(queue="es", default_retry_delay=600, max_retries=144) def add_reports_es(report_group_docs, report_docs): for k, v in report_group_docs.items(): - Datastores.es.bulk_index(k, 'report_group', v, id_field="_id") + to_update = {'_index': k, '_type': 'report_group'} + [i.update(to_update) for i in v] + elasticsearch.helpers.bulk(Datastores.es, v) for k, v in report_docs.items(): - Datastores.es.bulk_index(k, 'report', v, id_field="_id", - parent_field='_parent') + to_update = {'_index': k, '_type': 'report'} + [i.update(to_update) for i in v] + elasticsearch.helpers.bulk(Datastores.es, v) @celery.task(queue="es", default_retry_delay=600, max_retries=144) def add_reports_slow_calls_es(es_docs): for k, v in es_docs.items(): - Datastores.es.bulk_index(k, 'log', v) + to_update = {'_index': k, '_type': 'log'} + [i.update(to_update) for i in v] + elasticsearch.helpers.bulk(Datastores.es, v) @celery.task(queue="es", default_retry_delay=600, max_retries=144) def add_reports_stats_rows_es(es_docs): for k, v in es_docs.items(): - Datastores.es.bulk_index(k, 'log', v) + to_update = {'_index': k, '_type': 'log'} + [i.update(to_update) for i in v] + elasticsearch.helpers.bulk(Datastores.es, v) @celery.task(queue="logs", default_retry_delay=600, max_retries=144) @@ -304,12 +312,12 @@ def add_logs(resource_id, request_params, dataset, **kwargs): # batch this to avoid problems with default ES bulk limits for es_index in es_docs_to_delete.keys(): for batch in in_batches(es_docs_to_delete[es_index], 20): - query = {'terms': {'delete_hash': batch}} + query = {"query": {'terms': {'delete_hash': batch}}} try: - Datastores.es.delete_by_query( - es_index, 'log', query) - except pyelasticsearch.ElasticHttpNotFoundError as exc: + Datastores.es.transport.perform_request( + "DELETE", '/{}/{}/_query'.format(es_index, 'log'), body=query) + except elasticsearch.exceptions.NotFoundError as exc: msg = 'skipping index {}'.format(es_index) log.info(msg) @@ -349,7 +357,9 @@ def add_logs(resource_id, request_params, dataset, **kwargs): @celery.task(queue="es", default_retry_delay=600, max_retries=144) def add_logs_es(es_docs): for k, v in es_docs.items(): - Datastores.es.bulk_index(k, 'log', v) + to_update = {'_index': k, '_type': 'log'} + [i.update(to_update) for i in v] + elasticsearch.helpers.bulk(Datastores.es, v) @celery.task(queue="metrics", default_retry_delay=600, max_retries=144) @@ -627,8 +637,6 @@ def logs_cleanup(resource_id, filter_settings): request = get_current_request() request.tm.begin() es_query = { - "_source": False, - "size": 5000, "query": { "filtered": { "filter": { @@ -646,27 +654,4 @@ def logs_cleanup(resource_id, filter_settings): ) query.delete(synchronize_session=False) request.tm.commit() - result = request.es_conn.search(es_query, index='rcae_l_*', - doc_type='log', es_scroll='1m', - es_search_type='scan') - scroll_id = result['_scroll_id'] - while True: - log.warning('log_cleanup, app:{} ns:{} batch'.format( - resource_id, - filter_settings['namespace'] - )) - es_docs_to_delete = [] - result = request.es_conn.send_request( - 'POST', ['_search', 'scroll'], - body=scroll_id, query_params={"scroll": '1m'}) - scroll_id = result['_scroll_id'] - if not result['hits']['hits']: - break - for doc in result['hits']['hits']: - es_docs_to_delete.append({"id": doc['_id'], - "index": doc['_index']}) - - for batch in in_batches(es_docs_to_delete, 10): - Datastores.es.bulk([Datastores.es.delete_op(doc_type='log', - **to_del) - for to_del in batch]) + Datastores.es.transport.perform_request("DELETE", '/{}/{}/_query'.format('rcae_l_*', 'log'), body=es_query) diff --git a/backend/src/appenlight/lib/api.py b/backend/src/appenlight/lib/api.py index a2106f5..d87efe0 100644 --- a/backend/src/appenlight/lib/api.py +++ b/backend/src/appenlight/lib/api.py @@ -19,7 +19,6 @@ import logging from pyramid.httpexceptions import HTTPForbidden, HTTPTooManyRequests -from appenlight.models import Datastores from appenlight.models.services.config import ConfigService from appenlight.lib.redis_keys import REDIS_KEYS diff --git a/backend/src/appenlight/lib/utils/__init__.py b/backend/src/appenlight/lib/utils/__init__.py index e986343..3260f39 100644 --- a/backend/src/appenlight/lib/utils/__init__.py +++ b/backend/src/appenlight/lib/utils/__init__.py @@ -189,7 +189,7 @@ def es_index_name_limiter(start_date=None, end_date=None, months_in_past=6, # should be cached later def get_possible_names(): - return list(Datastores.es.aliases().keys()) + return list(Datastores.es.indices.get_alias('*')) possible_names = get_possible_names() es_index_types = [] diff --git a/backend/src/appenlight/models/__init__.py b/backend/src/appenlight/models/__init__.py index 3ad155a..6810002 100644 --- a/backend/src/appenlight/models/__init__.py +++ b/backend/src/appenlight/models/__init__.py @@ -66,11 +66,11 @@ class SliceableESQuery(object): def __getitem__(self, index): config = self.kwconfig.copy() - config['es_from'] = index.start + config['from_'] = index.start query = self.query.copy() if self.sort_query: query.update(self.sort_query) - self.result = Datastores.es.search(query, size=self.items_per_page, + self.result = Datastores.es.search(body=query, size=self.items_per_page, **config) if self.aggregations: self.items = self.result.get('aggregations') @@ -85,7 +85,7 @@ class SliceableESQuery(object): def __len__(self): config = self.kwconfig.copy() query = self.query.copy() - self.result = Datastores.es.search(query, size=self.items_per_page, + self.result = Datastores.es.search(body=query, size=self.items_per_page, **config) if self.aggregations: self.items = self.result.get('aggregations') diff --git a/backend/src/appenlight/models/report.py b/backend/src/appenlight/models/report.py index b3dc2c5..9d90192 100644 --- a/backend/src/appenlight/models/report.py +++ b/backend/src/appenlight/models/report.py @@ -310,7 +310,7 @@ class Report(Base, BaseModel): {"_doc": {"order": "desc"}}, ], } - result = request.es_conn.search(query, index=self.partition_id, + result = request.es_conn.search(body=query, index=self.partition_id, doc_type='report') if result['hits']['total']: return result['hits']['hits'][0]['_source']['pg_id'] @@ -330,7 +330,7 @@ class Report(Base, BaseModel): {"_doc": {"order": "asc"}}, ], } - result = request.es_conn.search(query, index=self.partition_id, + result = request.es_conn.search(body=query, index=self.partition_id, doc_type='report') if result['hits']['total']: return result['hits']['hits'][0]['_source']['pg_id'] @@ -505,8 +505,8 @@ def after_update(mapper, connection, target): def after_delete(mapper, connection, target): if not hasattr(target, '_skip_ft_index'): - query = {'term': {'pg_id': target.id}} - Datastores.es.delete_by_query(target.partition_id, 'report', query) + query = {"query":{'term': {'pg_id': target.id}}} + Datastores.es.transport.perform_request("DELETE", '/{}/{}/_query'.format(target.partition_id, 'report'), body=query) sa.event.listen(Report, 'after_insert', after_insert) diff --git a/backend/src/appenlight/models/report_group.py b/backend/src/appenlight/models/report_group.py index d7a9293..5dd3a86 100644 --- a/backend/src/appenlight/models/report_group.py +++ b/backend/src/appenlight/models/report_group.py @@ -256,13 +256,11 @@ def after_update(mapper, connection, target): def after_delete(mapper, connection, target): - query = {'term': {'group_id': target.id}} - # TODO: routing seems unnecessary, need to test a bit more - # Datastores.es.delete_by_query(target.partition_id, 'report', query, - # query_params={'routing':str(target.id)}) - Datastores.es.delete_by_query(target.partition_id, 'report', query) - query = {'term': {'pg_id': target.id}} - Datastores.es.delete_by_query(target.partition_id, 'report_group', query) + query = {"query": {'term': {'group_id': target.id}}} + # delete by query + Datastores.es.transport.perform_request("DELETE", '/{}/{}/_query'.format(target.partition_id, 'report'), body=query) + query = {"query": {'term': {'pg_id': target.id}}} + Datastores.es.transport.perform_request("DELETE", '/{}/{}/_query'.format(target.partition_id, 'report_group'), body=query) sa.event.listen(ReportGroup, 'after_insert', after_insert) diff --git a/backend/src/appenlight/models/services/log.py b/backend/src/appenlight/models/services/log.py index 4e89859..4e5979d 100644 --- a/backend/src/appenlight/models/services/log.py +++ b/backend/src/appenlight/models/services/log.py @@ -130,7 +130,7 @@ class LogService(BaseService): ixtypes=['logs']) if index_names: results = Datastores.es.search( - es_query, index=index_names, doc_type='log', size=0) + body=es_query, index=index_names, doc_type='log', size=0) else: results = [] return results @@ -156,9 +156,9 @@ class LogService(BaseService): if not index_names: return {}, 0 - results = Datastores.es.search(es_query, index=index_names, + results = Datastores.es.search(body=es_query, index=index_names, doc_type='log', size=items_per_page, - es_from=es_from) + from_=es_from) if results['hits']['total'] > 5000: count = 5000 else: diff --git a/backend/src/appenlight/models/services/report_group.py b/backend/src/appenlight/models/services/report_group.py index 15e4218..38c54e0 100644 --- a/backend/src/appenlight/models/services/report_group.py +++ b/backend/src/appenlight/models/services/report_group.py @@ -78,7 +78,7 @@ class ReportGroupService(BaseService): es_query['query']['filtered']['filter']['and'].extend(tags) result = Datastores.es.search( - es_query, index=index_names, doc_type='log', size=0) + body=es_query, index=index_names, doc_type='log', size=0) series = [] for bucket in result['aggregations']['parent_agg']['buckets']: series.append({ @@ -249,7 +249,7 @@ class ReportGroupService(BaseService): ixtypes=['reports']) if index_names: results = Datastores.es.search( - query, index=index_names, doc_type=["report", "report_group"], + body=query, index=index_names, doc_type=["report", "report_group"], size=0) else: return [] @@ -428,7 +428,7 @@ class ReportGroupService(BaseService): if not index_names: return [] - result = Datastores.es.search(es_query, + result = Datastores.es.search(body=es_query, index=index_names, doc_type='log', size=0) diff --git a/backend/src/appenlight/models/services/report_stat.py b/backend/src/appenlight/models/services/report_stat.py index 274686b..35f2d13 100644 --- a/backend/src/appenlight/models/services/report_stat.py +++ b/backend/src/appenlight/models/services/report_stat.py @@ -41,7 +41,7 @@ class ReportStatService(BaseService): 'gte': since_when}}}]}}}} if index_names: - result = Datastores.es.search(es_query, + result = Datastores.es.search(body=es_query, index=index_names, doc_type='log', size=0) diff --git a/backend/src/appenlight/models/services/request_metric.py b/backend/src/appenlight/models/services/request_metric.py index d3c9d4d..9134331 100644 --- a/backend/src/appenlight/models/services/request_metric.py +++ b/backend/src/appenlight/models/services/request_metric.py @@ -113,7 +113,7 @@ class RequestMetricService(BaseService): if not index_names: return [] - result = Datastores.es.search(es_query, + result = Datastores.es.search(body=es_query, index=index_names, doc_type='log', size=0) @@ -156,7 +156,7 @@ class RequestMetricService(BaseService): 'lte': filter_settings['end_date']}}}, {'terms': {'namespace': [ 'appenlight.request_metric']}}]}}}} - result = Datastores.es.search(es_query, + result = Datastores.es.search(body=es_query, index=index_names, doc_type='log', size=0) @@ -205,7 +205,7 @@ class RequestMetricService(BaseService): ]} }} } - result = Datastores.es.search(es_query, + result = Datastores.es.search(body=es_query, index=index_names, doc_type='log', size=0) @@ -249,7 +249,7 @@ class RequestMetricService(BaseService): index_names = es_index_name_limiter(ixtypes=['reports']) if index_names and series: result = Datastores.es.search( - query, doc_type='report', size=0, index=index_names) + body=query, doc_type='report', size=0, index=index_names) for bucket in result['aggregations']['top_reports']['buckets']: details[bucket['key']] = [] @@ -340,7 +340,7 @@ class RequestMetricService(BaseService): {'terms': {'namespace': [ 'appenlight.request_metric']}}]}}}} - result = Datastores.es.search(es_query, + result = Datastores.es.search(body=es_query, index=index_names, doc_type='log', size=0) @@ -391,7 +391,7 @@ class RequestMetricService(BaseService): } }} } - result = Datastores.es.search(es_query, + result = Datastores.es.search(body=es_query, index=index_names, doc_type='log', size=0) diff --git a/backend/src/appenlight/models/services/slow_call.py b/backend/src/appenlight/models/services/slow_call.py index 9f2fe47..1207484 100644 --- a/backend/src/appenlight/models/services/slow_call.py +++ b/backend/src/appenlight/models/services/slow_call.py @@ -65,7 +65,7 @@ class SlowCallService(BaseService): } } result = Datastores.es.search( - es_query, index=index_names, doc_type='log', size=0) + body=es_query, index=index_names, doc_type='log', size=0) results = result['aggregations']['parent_agg']['buckets'] else: return [] @@ -118,7 +118,7 @@ class SlowCallService(BaseService): } } } - calls = Datastores.es.search(calls_query, + calls = Datastores.es.search(body=calls_query, index=index_names, doc_type='log', size=0) diff --git a/backend/src/appenlight/scripts/reindex_elasticsearch.py b/backend/src/appenlight/scripts/reindex_elasticsearch.py index f999c2e..911c20e 100644 --- a/backend/src/appenlight/scripts/reindex_elasticsearch.py +++ b/backend/src/appenlight/scripts/reindex_elasticsearch.py @@ -19,6 +19,9 @@ import datetime import logging import sqlalchemy as sa +import elasticsearch.exceptions +import elasticsearch.helpers + from collections import defaultdict from pyramid.paster import setup_logging from pyramid.paster import bootstrap @@ -97,7 +100,7 @@ def main(): if v.get('fulltext_indexer'): choices[k] = v['fulltext_indexer'] parser.add_argument('-t', '--types', nargs='*', - choices=['all'] + list(choices.keys()), default=['all'], + choices=['all'] + list(choices.keys()), default=[], help='Which parts of database should get reindexed') parser.add_argument('-c', '--config', required=True, help='Configuration ini file of application') @@ -107,6 +110,8 @@ def main(): if 'all' in args.types: args.types = list(choices.keys()) + print("Selected types to reindex: {}".format(args.types)) + log.info('settings {}'.format(args.types)) if 'template' in args.types: @@ -118,10 +123,9 @@ def main(): def update_template(): try: - Datastores.es.send_request("delete", ['_template', 'rcae'], - query_params={}) - except Exception as e: - print(e) + Datastores.es.indices.delete_template('rcae') + except elasticsearch.exceptions.NotFoundError as e: + log.error(e) log.info('updating elasticsearch template') tag_templates = [ {"values": { @@ -230,15 +234,14 @@ def update_template(): } } - Datastores.es.send_request('PUT', ['_template', 'rcae'], - body=template_schema, query_params={}) + Datastores.es.indices.put_template('rcae', body=template_schema) def reindex_reports(): reports_groups_tables = detect_tables('reports_groups_p_') try: - Datastores.es.delete_index('rcae_r*') - except Exception as e: + Datastores.es.indices.delete('rcae_r*') + except elasticsearch.exceptions.NotFoundError as e: log.error(e) log.info('reindexing report groups') @@ -261,8 +264,9 @@ def reindex_reports(): name = partition_table.name log.info('round {}, {}'.format(i, name)) for k, v in es_docs.items(): - Datastores.es.bulk_index(k, 'report_group', v, - id_field="_id") + to_update = {'_index': k, '_type': 'report_group'} + [i.update(to_update) for i in v] + elasticsearch.helpers.bulk(Datastores.es, v) log.info( 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) @@ -288,8 +292,9 @@ def reindex_reports(): name = partition_table.name log.info('round {}, {}'.format(i, name)) for k, v in es_docs.items(): - Datastores.es.bulk_index(k, 'report', v, id_field="_id", - parent_field='_parent') + to_update = {'_index': k, '_type': 'report'} + [i.update(to_update) for i in v] + elasticsearch.helpers.bulk(Datastores.es, v) log.info( 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) @@ -319,7 +324,9 @@ def reindex_reports(): name = partition_table.name log.info('round {}, {}'.format(i, name)) for k, v in es_docs.items(): - Datastores.es.bulk_index(k, 'log', v) + to_update = {'_index': k, '_type': 'log'} + [i.update(to_update) for i in v] + elasticsearch.helpers.bulk(Datastores.es, v) log.info( 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) @@ -327,8 +334,8 @@ def reindex_reports(): def reindex_logs(): try: - Datastores.es.delete_index('rcae_l*') - except Exception as e: + Datastores.es.indices.delete('rcae_l*') + except elasticsearch.exceptions.NotFoundError as e: log.error(e) # logs @@ -354,7 +361,9 @@ def reindex_logs(): name = partition_table.name log.info('round {}, {}'.format(i, name)) for k, v in es_docs.items(): - Datastores.es.bulk_index(k, 'log', v) + to_update = {'_index': k, '_type': 'log'} + [i.update(to_update) for i in v] + elasticsearch.helpers.bulk(Datastores.es, v) log.info( 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) @@ -362,9 +371,9 @@ def reindex_logs(): def reindex_metrics(): try: - Datastores.es.delete_index('rcae_m*') - except Exception as e: - print(e) + Datastores.es.indices.delete('rcae_m*') + except elasticsearch.exceptions.NotFoundError as e: + log.error(e) log.info('reindexing applications metrics') i = 0 @@ -387,7 +396,9 @@ def reindex_metrics(): name = partition_table.name log.info('round {}, {}'.format(i, name)) for k, v in es_docs.items(): - Datastores.es.bulk_index(k, 'log', v) + to_update = {'_index': k, '_type': 'log'} + [i.update(to_update) for i in v] + elasticsearch.helpers.bulk(Datastores.es, v) log.info( 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) @@ -395,9 +406,9 @@ def reindex_metrics(): def reindex_slow_calls(): try: - Datastores.es.delete_index('rcae_sc*') - except Exception as e: - print(e) + Datastores.es.indices.delete('rcae_sc*') + except elasticsearch.exceptions.NotFoundError as e: + log.error(e) log.info('reindexing slow calls') i = 0 @@ -420,7 +431,9 @@ def reindex_slow_calls(): name = partition_table.name log.info('round {}, {}'.format(i, name)) for k, v in es_docs.items(): - Datastores.es.bulk_index(k, 'log', v) + to_update = {'_index': k, '_type': 'log'} + [i.update(to_update) for i in v] + elasticsearch.helpers.bulk(Datastores.es, v) log.info( 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) diff --git a/backend/src/appenlight/views/admin/admin.py b/backend/src/appenlight/views/admin/admin.py index 57a639a..a7fb45c 100644 --- a/backend/src/appenlight/views/admin/admin.py +++ b/backend/src/appenlight/views/admin/admin.py @@ -159,8 +159,7 @@ def system(request): # es indices es_indices = [] - result = Datastores.es.send_request( - 'GET', ['_stats', 'store, docs'], query_params={}) + result = Datastores.es.indices.stats(metric=['store, docs']) for ix, stats in result['indices'].items(): size = stats['primaries']['store']['size_in_bytes'] es_indices.append({'name': ix, diff --git a/backend/src/appenlight/views/admin/partitions.py b/backend/src/appenlight/views/admin/partitions.py index 16a6203..30fc7e7 100644 --- a/backend/src/appenlight/views/admin/partitions.py +++ b/backend/src/appenlight/views/admin/partitions.py @@ -50,7 +50,7 @@ def get_partition_stats(): if not ix_time in holder: holder[ix_time] = {'pg': [], 'elasticsearch': []} - for partition in list(Datastores.es.aliases().keys()): + for partition in list(Datastores.es.indices.get_alias('rcae*')): if not partition.startswith('rcae'): continue split_data = partition.split('_') @@ -128,7 +128,7 @@ def partitions_remove(request): if form.validate(): for ix in form.data['es_index']: log.warning('deleting ES partition: {}'.format(ix)) - Datastores.es.delete_index(ix) + Datastores.es.indices.delete(ix) for ix in form.data['pg_index']: log.warning('deleting PG partition: {}'.format(ix)) stmt = sa.text('DROP TABLE %s CASCADE' % sa.text(ix)) diff --git a/backend/src/appenlight/views/logs.py b/backend/src/appenlight/views/logs.py index 3ab7538..5e8e815 100644 --- a/backend/src/appenlight/views/logs.py +++ b/backend/src/appenlight/views/logs.py @@ -163,7 +163,7 @@ def common_tags(request): # tags index_names = es_index_name_limiter( ixtypes=[config.get('datasource', 'logs')]) - result = Datastores.es.search(query, index=index_names, doc_type='log', + result = Datastores.es.search(body=query, index=index_names, doc_type='log', size=0) tag_buckets = result['aggregations']['sub_agg'].get('buckets', []) # namespaces @@ -175,7 +175,7 @@ def common_tags(request): } } } - result = Datastores.es.search(query, index=index_names, doc_type='log', + result = Datastores.es.search(body=query, index=index_names, doc_type='log', size=0) namespaces_buckets = result['aggregations']['sub_agg'].get('buckets', []) return { @@ -216,7 +216,7 @@ def common_values(request): } } index_names = es_index_name_limiter(ixtypes=[datasource]) - result = Datastores.es.search(query, index=index_names, doc_type='log', + result = Datastores.es.search(body=query, index=index_names, doc_type='log', size=0) values_buckets = result['aggregations']['sub_agg'].get('buckets', []) return {