##// END OF EJS Templates
elasticsearch: replace pyelasticsearch with elasticsearch
ergo -
Show More
@@ -36,7 +36,7 b' pygments==2.1.3'
36 lxml==4.3.2
36 lxml==4.3.2
37 paginate==0.5.4
37 paginate==0.5.4
38 paginate-sqlalchemy==0.2.0
38 paginate-sqlalchemy==0.2.0
39 pyelasticsearch==1.4
39 elasticsearch>=2.0.0,<3.0.0
40 six>=1.10.0
40 six>=1.10.0
41 mock==1.0.1
41 mock==1.0.1
42 itsdangerous==1.1.0
42 itsdangerous==1.1.0
@@ -16,7 +16,7 b''
16
16
17 import datetime
17 import datetime
18 import logging
18 import logging
19 import pyelasticsearch
19 from elasticsearch import Elasticsearch
20 import redis
20 import redis
21 import os
21 import os
22 import pkg_resources
22 import pkg_resources
@@ -150,7 +150,7 b' def main(global_config, **settings):'
150 redis_url = settings['redis.url']
150 redis_url = settings['redis.url']
151 log.warning('Elasticsearch server list: {}'.format(es_server_list))
151 log.warning('Elasticsearch server list: {}'.format(es_server_list))
152 log.warning('Redis server: {}'.format(redis_url))
152 log.warning('Redis server: {}'.format(redis_url))
153 config.registry.es_conn = pyelasticsearch.ElasticSearch(es_server_list)
153 config.registry.es_conn = Elasticsearch(es_server_list)
154 config.registry.redis_conn = redis.StrictRedis.from_url(redis_url)
154 config.registry.redis_conn = redis.StrictRedis.from_url(redis_url)
155
155
156 config.registry.redis_lockmgr = Redlock([settings['redis.redlock.url'], ],
156 config.registry.redis_lockmgr = Redlock([settings['redis.redlock.url'], ],
@@ -20,7 +20,8 b' import math'
20 from datetime import datetime, timedelta
20 from datetime import datetime, timedelta
21
21
22 import sqlalchemy as sa
22 import sqlalchemy as sa
23 import pyelasticsearch
23 import elasticsearch.exceptions
24 import elasticsearch.helpers
24
25
25 from celery.utils.log import get_task_logger
26 from celery.utils.log import get_task_logger
26 from zope.sqlalchemy import mark_changed
27 from zope.sqlalchemy import mark_changed
@@ -226,22 +227,29 b' def add_reports(resource_id, request_params, dataset, **kwargs):'
226 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
227 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
227 def add_reports_es(report_group_docs, report_docs):
228 def add_reports_es(report_group_docs, report_docs):
228 for k, v in report_group_docs.items():
229 for k, v in report_group_docs.items():
229 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
230 to_update = {'_index': k, '_type': 'report_group'}
231 [i.update(to_update) for i in v]
232 elasticsearch.helpers.bulk(Datastores.es, v)
230 for k, v in report_docs.items():
233 for k, v in report_docs.items():
231 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
234 to_update = {'_index': k, '_type': 'report'}
232 parent_field='_parent')
235 [i.update(to_update) for i in v]
236 elasticsearch.helpers.bulk(Datastores.es, v)
233
237
234
238
235 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
239 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
236 def add_reports_slow_calls_es(es_docs):
240 def add_reports_slow_calls_es(es_docs):
237 for k, v in es_docs.items():
241 for k, v in es_docs.items():
238 Datastores.es.bulk_index(k, 'log', v)
242 to_update = {'_index': k, '_type': 'log'}
243 [i.update(to_update) for i in v]
244 elasticsearch.helpers.bulk(Datastores.es, v)
239
245
240
246
241 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
247 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
242 def add_reports_stats_rows_es(es_docs):
248 def add_reports_stats_rows_es(es_docs):
243 for k, v in es_docs.items():
249 for k, v in es_docs.items():
244 Datastores.es.bulk_index(k, 'log', v)
250 to_update = {'_index': k, '_type': 'log'}
251 [i.update(to_update) for i in v]
252 elasticsearch.helpers.bulk(Datastores.es, v)
245
253
246
254
247 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
255 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
@@ -304,12 +312,12 b' def add_logs(resource_id, request_params, dataset, **kwargs):'
304 # batch this to avoid problems with default ES bulk limits
312 # batch this to avoid problems with default ES bulk limits
305 for es_index in es_docs_to_delete.keys():
313 for es_index in es_docs_to_delete.keys():
306 for batch in in_batches(es_docs_to_delete[es_index], 20):
314 for batch in in_batches(es_docs_to_delete[es_index], 20):
307 query = {'terms': {'delete_hash': batch}}
315 query = {"query": {'terms': {'delete_hash': batch}}}
308
316
309 try:
317 try:
310 Datastores.es.delete_by_query(
318 Datastores.es.transport.perform_request(
311 es_index, 'log', query)
319 "DELETE", '/{}/{}/_query'.format(es_index, 'log'), body=query)
312 except pyelasticsearch.ElasticHttpNotFoundError as exc:
320 except elasticsearch.exceptions.NotFoundError as exc:
313 msg = 'skipping index {}'.format(es_index)
321 msg = 'skipping index {}'.format(es_index)
314 log.info(msg)
322 log.info(msg)
315
323
@@ -349,7 +357,9 b' def add_logs(resource_id, request_params, dataset, **kwargs):'
349 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
357 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
350 def add_logs_es(es_docs):
358 def add_logs_es(es_docs):
351 for k, v in es_docs.items():
359 for k, v in es_docs.items():
352 Datastores.es.bulk_index(k, 'log', v)
360 to_update = {'_index': k, '_type': 'log'}
361 [i.update(to_update) for i in v]
362 elasticsearch.helpers.bulk(Datastores.es, v)
353
363
354
364
355 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
365 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
@@ -627,8 +637,6 b' def logs_cleanup(resource_id, filter_settings):'
627 request = get_current_request()
637 request = get_current_request()
628 request.tm.begin()
638 request.tm.begin()
629 es_query = {
639 es_query = {
630 "_source": False,
631 "size": 5000,
632 "query": {
640 "query": {
633 "filtered": {
641 "filtered": {
634 "filter": {
642 "filter": {
@@ -646,27 +654,4 b' def logs_cleanup(resource_id, filter_settings):'
646 )
654 )
647 query.delete(synchronize_session=False)
655 query.delete(synchronize_session=False)
648 request.tm.commit()
656 request.tm.commit()
649 result = request.es_conn.search(es_query, index='rcae_l_*',
657 Datastores.es.transport.perform_request("DELETE", '/{}/{}/_query'.format('rcae_l_*', 'log'), body=es_query)
650 doc_type='log', es_scroll='1m',
651 es_search_type='scan')
652 scroll_id = result['_scroll_id']
653 while True:
654 log.warning('log_cleanup, app:{} ns:{} batch'.format(
655 resource_id,
656 filter_settings['namespace']
657 ))
658 es_docs_to_delete = []
659 result = request.es_conn.send_request(
660 'POST', ['_search', 'scroll'],
661 body=scroll_id, query_params={"scroll": '1m'})
662 scroll_id = result['_scroll_id']
663 if not result['hits']['hits']:
664 break
665 for doc in result['hits']['hits']:
666 es_docs_to_delete.append({"id": doc['_id'],
667 "index": doc['_index']})
668
669 for batch in in_batches(es_docs_to_delete, 10):
670 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
671 **to_del)
672 for to_del in batch])
@@ -19,7 +19,6 b' import logging'
19
19
20 from pyramid.httpexceptions import HTTPForbidden, HTTPTooManyRequests
20 from pyramid.httpexceptions import HTTPForbidden, HTTPTooManyRequests
21
21
22 from appenlight.models import Datastores
23 from appenlight.models.services.config import ConfigService
22 from appenlight.models.services.config import ConfigService
24 from appenlight.lib.redis_keys import REDIS_KEYS
23 from appenlight.lib.redis_keys import REDIS_KEYS
25
24
@@ -189,7 +189,7 b' def es_index_name_limiter(start_date=None, end_date=None, months_in_past=6,'
189
189
190 # should be cached later
190 # should be cached later
191 def get_possible_names():
191 def get_possible_names():
192 return list(Datastores.es.aliases().keys())
192 return list(Datastores.es.indices.get_alias('*'))
193
193
194 possible_names = get_possible_names()
194 possible_names = get_possible_names()
195 es_index_types = []
195 es_index_types = []
@@ -66,11 +66,11 b' class SliceableESQuery(object):'
66
66
67 def __getitem__(self, index):
67 def __getitem__(self, index):
68 config = self.kwconfig.copy()
68 config = self.kwconfig.copy()
69 config['es_from'] = index.start
69 config['from_'] = index.start
70 query = self.query.copy()
70 query = self.query.copy()
71 if self.sort_query:
71 if self.sort_query:
72 query.update(self.sort_query)
72 query.update(self.sort_query)
73 self.result = Datastores.es.search(query, size=self.items_per_page,
73 self.result = Datastores.es.search(body=query, size=self.items_per_page,
74 **config)
74 **config)
75 if self.aggregations:
75 if self.aggregations:
76 self.items = self.result.get('aggregations')
76 self.items = self.result.get('aggregations')
@@ -85,7 +85,7 b' class SliceableESQuery(object):'
85 def __len__(self):
85 def __len__(self):
86 config = self.kwconfig.copy()
86 config = self.kwconfig.copy()
87 query = self.query.copy()
87 query = self.query.copy()
88 self.result = Datastores.es.search(query, size=self.items_per_page,
88 self.result = Datastores.es.search(body=query, size=self.items_per_page,
89 **config)
89 **config)
90 if self.aggregations:
90 if self.aggregations:
91 self.items = self.result.get('aggregations')
91 self.items = self.result.get('aggregations')
@@ -310,7 +310,7 b' class Report(Base, BaseModel):'
310 {"_doc": {"order": "desc"}},
310 {"_doc": {"order": "desc"}},
311 ],
311 ],
312 }
312 }
313 result = request.es_conn.search(query, index=self.partition_id,
313 result = request.es_conn.search(body=query, index=self.partition_id,
314 doc_type='report')
314 doc_type='report')
315 if result['hits']['total']:
315 if result['hits']['total']:
316 return result['hits']['hits'][0]['_source']['pg_id']
316 return result['hits']['hits'][0]['_source']['pg_id']
@@ -330,7 +330,7 b' class Report(Base, BaseModel):'
330 {"_doc": {"order": "asc"}},
330 {"_doc": {"order": "asc"}},
331 ],
331 ],
332 }
332 }
333 result = request.es_conn.search(query, index=self.partition_id,
333 result = request.es_conn.search(body=query, index=self.partition_id,
334 doc_type='report')
334 doc_type='report')
335 if result['hits']['total']:
335 if result['hits']['total']:
336 return result['hits']['hits'][0]['_source']['pg_id']
336 return result['hits']['hits'][0]['_source']['pg_id']
@@ -505,8 +505,8 b' def after_update(mapper, connection, target):'
505
505
506 def after_delete(mapper, connection, target):
506 def after_delete(mapper, connection, target):
507 if not hasattr(target, '_skip_ft_index'):
507 if not hasattr(target, '_skip_ft_index'):
508 query = {'term': {'pg_id': target.id}}
508 query = {"query":{'term': {'pg_id': target.id}}}
509 Datastores.es.delete_by_query(target.partition_id, 'report', query)
509 Datastores.es.transport.perform_request("DELETE", '/{}/{}/_query'.format(target.partition_id, 'report'), body=query)
510
510
511
511
512 sa.event.listen(Report, 'after_insert', after_insert)
512 sa.event.listen(Report, 'after_insert', after_insert)
@@ -256,13 +256,11 b' def after_update(mapper, connection, target):'
256
256
257
257
258 def after_delete(mapper, connection, target):
258 def after_delete(mapper, connection, target):
259 query = {'term': {'group_id': target.id}}
259 query = {"query": {'term': {'group_id': target.id}}}
260 # TODO: routing seems unnecessary, need to test a bit more
260 # delete by query
261 # Datastores.es.delete_by_query(target.partition_id, 'report', query,
261 Datastores.es.transport.perform_request("DELETE", '/{}/{}/_query'.format(target.partition_id, 'report'), body=query)
262 # query_params={'routing':str(target.id)})
262 query = {"query": {'term': {'pg_id': target.id}}}
263 Datastores.es.delete_by_query(target.partition_id, 'report', query)
263 Datastores.es.transport.perform_request("DELETE", '/{}/{}/_query'.format(target.partition_id, 'report_group'), body=query)
264 query = {'term': {'pg_id': target.id}}
265 Datastores.es.delete_by_query(target.partition_id, 'report_group', query)
266
264
267
265
268 sa.event.listen(ReportGroup, 'after_insert', after_insert)
266 sa.event.listen(ReportGroup, 'after_insert', after_insert)
@@ -130,7 +130,7 b' class LogService(BaseService):'
130 ixtypes=['logs'])
130 ixtypes=['logs'])
131 if index_names:
131 if index_names:
132 results = Datastores.es.search(
132 results = Datastores.es.search(
133 es_query, index=index_names, doc_type='log', size=0)
133 body=es_query, index=index_names, doc_type='log', size=0)
134 else:
134 else:
135 results = []
135 results = []
136 return results
136 return results
@@ -156,9 +156,9 b' class LogService(BaseService):'
156 if not index_names:
156 if not index_names:
157 return {}, 0
157 return {}, 0
158
158
159 results = Datastores.es.search(es_query, index=index_names,
159 results = Datastores.es.search(body=es_query, index=index_names,
160 doc_type='log', size=items_per_page,
160 doc_type='log', size=items_per_page,
161 es_from=es_from)
161 from_=es_from)
162 if results['hits']['total'] > 5000:
162 if results['hits']['total'] > 5000:
163 count = 5000
163 count = 5000
164 else:
164 else:
@@ -78,7 +78,7 b' class ReportGroupService(BaseService):'
78 es_query['query']['filtered']['filter']['and'].extend(tags)
78 es_query['query']['filtered']['filter']['and'].extend(tags)
79
79
80 result = Datastores.es.search(
80 result = Datastores.es.search(
81 es_query, index=index_names, doc_type='log', size=0)
81 body=es_query, index=index_names, doc_type='log', size=0)
82 series = []
82 series = []
83 for bucket in result['aggregations']['parent_agg']['buckets']:
83 for bucket in result['aggregations']['parent_agg']['buckets']:
84 series.append({
84 series.append({
@@ -249,7 +249,7 b' class ReportGroupService(BaseService):'
249 ixtypes=['reports'])
249 ixtypes=['reports'])
250 if index_names:
250 if index_names:
251 results = Datastores.es.search(
251 results = Datastores.es.search(
252 query, index=index_names, doc_type=["report", "report_group"],
252 body=query, index=index_names, doc_type=["report", "report_group"],
253 size=0)
253 size=0)
254 else:
254 else:
255 return []
255 return []
@@ -428,7 +428,7 b' class ReportGroupService(BaseService):'
428 if not index_names:
428 if not index_names:
429 return []
429 return []
430
430
431 result = Datastores.es.search(es_query,
431 result = Datastores.es.search(body=es_query,
432 index=index_names,
432 index=index_names,
433 doc_type='log',
433 doc_type='log',
434 size=0)
434 size=0)
@@ -41,7 +41,7 b' class ReportStatService(BaseService):'
41 'gte': since_when}}}]}}}}
41 'gte': since_when}}}]}}}}
42
42
43 if index_names:
43 if index_names:
44 result = Datastores.es.search(es_query,
44 result = Datastores.es.search(body=es_query,
45 index=index_names,
45 index=index_names,
46 doc_type='log',
46 doc_type='log',
47 size=0)
47 size=0)
@@ -113,7 +113,7 b' class RequestMetricService(BaseService):'
113 if not index_names:
113 if not index_names:
114 return []
114 return []
115
115
116 result = Datastores.es.search(es_query,
116 result = Datastores.es.search(body=es_query,
117 index=index_names,
117 index=index_names,
118 doc_type='log',
118 doc_type='log',
119 size=0)
119 size=0)
@@ -156,7 +156,7 b' class RequestMetricService(BaseService):'
156 'lte': filter_settings['end_date']}}},
156 'lte': filter_settings['end_date']}}},
157 {'terms': {'namespace': [
157 {'terms': {'namespace': [
158 'appenlight.request_metric']}}]}}}}
158 'appenlight.request_metric']}}]}}}}
159 result = Datastores.es.search(es_query,
159 result = Datastores.es.search(body=es_query,
160 index=index_names,
160 index=index_names,
161 doc_type='log',
161 doc_type='log',
162 size=0)
162 size=0)
@@ -205,7 +205,7 b' class RequestMetricService(BaseService):'
205 ]}
205 ]}
206 }}
206 }}
207 }
207 }
208 result = Datastores.es.search(es_query,
208 result = Datastores.es.search(body=es_query,
209 index=index_names,
209 index=index_names,
210 doc_type='log',
210 doc_type='log',
211 size=0)
211 size=0)
@@ -249,7 +249,7 b' class RequestMetricService(BaseService):'
249 index_names = es_index_name_limiter(ixtypes=['reports'])
249 index_names = es_index_name_limiter(ixtypes=['reports'])
250 if index_names and series:
250 if index_names and series:
251 result = Datastores.es.search(
251 result = Datastores.es.search(
252 query, doc_type='report', size=0, index=index_names)
252 body=query, doc_type='report', size=0, index=index_names)
253 for bucket in result['aggregations']['top_reports']['buckets']:
253 for bucket in result['aggregations']['top_reports']['buckets']:
254 details[bucket['key']] = []
254 details[bucket['key']] = []
255
255
@@ -340,7 +340,7 b' class RequestMetricService(BaseService):'
340 {'terms': {'namespace': [
340 {'terms': {'namespace': [
341 'appenlight.request_metric']}}]}}}}
341 'appenlight.request_metric']}}]}}}}
342
342
343 result = Datastores.es.search(es_query,
343 result = Datastores.es.search(body=es_query,
344 index=index_names,
344 index=index_names,
345 doc_type='log',
345 doc_type='log',
346 size=0)
346 size=0)
@@ -391,7 +391,7 b' class RequestMetricService(BaseService):'
391 }
391 }
392 }}
392 }}
393 }
393 }
394 result = Datastores.es.search(es_query,
394 result = Datastores.es.search(body=es_query,
395 index=index_names,
395 index=index_names,
396 doc_type='log',
396 doc_type='log',
397 size=0)
397 size=0)
@@ -65,7 +65,7 b' class SlowCallService(BaseService):'
65 }
65 }
66 }
66 }
67 result = Datastores.es.search(
67 result = Datastores.es.search(
68 es_query, index=index_names, doc_type='log', size=0)
68 body=es_query, index=index_names, doc_type='log', size=0)
69 results = result['aggregations']['parent_agg']['buckets']
69 results = result['aggregations']['parent_agg']['buckets']
70 else:
70 else:
71 return []
71 return []
@@ -118,7 +118,7 b' class SlowCallService(BaseService):'
118 }
118 }
119 }
119 }
120 }
120 }
121 calls = Datastores.es.search(calls_query,
121 calls = Datastores.es.search(body=calls_query,
122 index=index_names,
122 index=index_names,
123 doc_type='log',
123 doc_type='log',
124 size=0)
124 size=0)
@@ -19,6 +19,9 b' import datetime'
19 import logging
19 import logging
20
20
21 import sqlalchemy as sa
21 import sqlalchemy as sa
22 import elasticsearch.exceptions
23 import elasticsearch.helpers
24
22 from collections import defaultdict
25 from collections import defaultdict
23 from pyramid.paster import setup_logging
26 from pyramid.paster import setup_logging
24 from pyramid.paster import bootstrap
27 from pyramid.paster import bootstrap
@@ -97,7 +100,7 b' def main():'
97 if v.get('fulltext_indexer'):
100 if v.get('fulltext_indexer'):
98 choices[k] = v['fulltext_indexer']
101 choices[k] = v['fulltext_indexer']
99 parser.add_argument('-t', '--types', nargs='*',
102 parser.add_argument('-t', '--types', nargs='*',
100 choices=['all'] + list(choices.keys()), default=['all'],
103 choices=['all'] + list(choices.keys()), default=[],
101 help='Which parts of database should get reindexed')
104 help='Which parts of database should get reindexed')
102 parser.add_argument('-c', '--config', required=True,
105 parser.add_argument('-c', '--config', required=True,
103 help='Configuration ini file of application')
106 help='Configuration ini file of application')
@@ -107,6 +110,8 b' def main():'
107 if 'all' in args.types:
110 if 'all' in args.types:
108 args.types = list(choices.keys())
111 args.types = list(choices.keys())
109
112
113 print("Selected types to reindex: {}".format(args.types))
114
110 log.info('settings {}'.format(args.types))
115 log.info('settings {}'.format(args.types))
111
116
112 if 'template' in args.types:
117 if 'template' in args.types:
@@ -118,10 +123,9 b' def main():'
118
123
119 def update_template():
124 def update_template():
120 try:
125 try:
121 Datastores.es.send_request("delete", ['_template', 'rcae'],
126 Datastores.es.indices.delete_template('rcae')
122 query_params={})
127 except elasticsearch.exceptions.NotFoundError as e:
123 except Exception as e:
128 log.error(e)
124 print(e)
125 log.info('updating elasticsearch template')
129 log.info('updating elasticsearch template')
126 tag_templates = [
130 tag_templates = [
127 {"values": {
131 {"values": {
@@ -230,15 +234,14 b' def update_template():'
230 }
234 }
231 }
235 }
232
236
233 Datastores.es.send_request('PUT', ['_template', 'rcae'],
237 Datastores.es.indices.put_template('rcae', body=template_schema)
234 body=template_schema, query_params={})
235
238
236
239
237 def reindex_reports():
240 def reindex_reports():
238 reports_groups_tables = detect_tables('reports_groups_p_')
241 reports_groups_tables = detect_tables('reports_groups_p_')
239 try:
242 try:
240 Datastores.es.delete_index('rcae_r*')
243 Datastores.es.indices.delete('rcae_r*')
241 except Exception as e:
244 except elasticsearch.exceptions.NotFoundError as e:
242 log.error(e)
245 log.error(e)
243
246
244 log.info('reindexing report groups')
247 log.info('reindexing report groups')
@@ -261,8 +264,9 b' def reindex_reports():'
261 name = partition_table.name
264 name = partition_table.name
262 log.info('round {}, {}'.format(i, name))
265 log.info('round {}, {}'.format(i, name))
263 for k, v in es_docs.items():
266 for k, v in es_docs.items():
264 Datastores.es.bulk_index(k, 'report_group', v,
267 to_update = {'_index': k, '_type': 'report_group'}
265 id_field="_id")
268 [i.update(to_update) for i in v]
269 elasticsearch.helpers.bulk(Datastores.es, v)
266
270
267 log.info(
271 log.info(
268 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
272 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
@@ -288,8 +292,9 b' def reindex_reports():'
288 name = partition_table.name
292 name = partition_table.name
289 log.info('round {}, {}'.format(i, name))
293 log.info('round {}, {}'.format(i, name))
290 for k, v in es_docs.items():
294 for k, v in es_docs.items():
291 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
295 to_update = {'_index': k, '_type': 'report'}
292 parent_field='_parent')
296 [i.update(to_update) for i in v]
297 elasticsearch.helpers.bulk(Datastores.es, v)
293
298
294 log.info(
299 log.info(
295 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
300 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
@@ -319,7 +324,9 b' def reindex_reports():'
319 name = partition_table.name
324 name = partition_table.name
320 log.info('round {}, {}'.format(i, name))
325 log.info('round {}, {}'.format(i, name))
321 for k, v in es_docs.items():
326 for k, v in es_docs.items():
322 Datastores.es.bulk_index(k, 'log', v)
327 to_update = {'_index': k, '_type': 'log'}
328 [i.update(to_update) for i in v]
329 elasticsearch.helpers.bulk(Datastores.es, v)
323
330
324 log.info(
331 log.info(
325 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
332 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
@@ -327,8 +334,8 b' def reindex_reports():'
327
334
328 def reindex_logs():
335 def reindex_logs():
329 try:
336 try:
330 Datastores.es.delete_index('rcae_l*')
337 Datastores.es.indices.delete('rcae_l*')
331 except Exception as e:
338 except elasticsearch.exceptions.NotFoundError as e:
332 log.error(e)
339 log.error(e)
333
340
334 # logs
341 # logs
@@ -354,7 +361,9 b' def reindex_logs():'
354 name = partition_table.name
361 name = partition_table.name
355 log.info('round {}, {}'.format(i, name))
362 log.info('round {}, {}'.format(i, name))
356 for k, v in es_docs.items():
363 for k, v in es_docs.items():
357 Datastores.es.bulk_index(k, 'log', v)
364 to_update = {'_index': k, '_type': 'log'}
365 [i.update(to_update) for i in v]
366 elasticsearch.helpers.bulk(Datastores.es, v)
358
367
359 log.info(
368 log.info(
360 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
369 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
@@ -362,9 +371,9 b' def reindex_logs():'
362
371
363 def reindex_metrics():
372 def reindex_metrics():
364 try:
373 try:
365 Datastores.es.delete_index('rcae_m*')
374 Datastores.es.indices.delete('rcae_m*')
366 except Exception as e:
375 except elasticsearch.exceptions.NotFoundError as e:
367 print(e)
376 log.error(e)
368
377
369 log.info('reindexing applications metrics')
378 log.info('reindexing applications metrics')
370 i = 0
379 i = 0
@@ -387,7 +396,9 b' def reindex_metrics():'
387 name = partition_table.name
396 name = partition_table.name
388 log.info('round {}, {}'.format(i, name))
397 log.info('round {}, {}'.format(i, name))
389 for k, v in es_docs.items():
398 for k, v in es_docs.items():
390 Datastores.es.bulk_index(k, 'log', v)
399 to_update = {'_index': k, '_type': 'log'}
400 [i.update(to_update) for i in v]
401 elasticsearch.helpers.bulk(Datastores.es, v)
391
402
392 log.info(
403 log.info(
393 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
404 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
@@ -395,9 +406,9 b' def reindex_metrics():'
395
406
396 def reindex_slow_calls():
407 def reindex_slow_calls():
397 try:
408 try:
398 Datastores.es.delete_index('rcae_sc*')
409 Datastores.es.indices.delete('rcae_sc*')
399 except Exception as e:
410 except elasticsearch.exceptions.NotFoundError as e:
400 print(e)
411 log.error(e)
401
412
402 log.info('reindexing slow calls')
413 log.info('reindexing slow calls')
403 i = 0
414 i = 0
@@ -420,7 +431,9 b' def reindex_slow_calls():'
420 name = partition_table.name
431 name = partition_table.name
421 log.info('round {}, {}'.format(i, name))
432 log.info('round {}, {}'.format(i, name))
422 for k, v in es_docs.items():
433 for k, v in es_docs.items():
423 Datastores.es.bulk_index(k, 'log', v)
434 to_update = {'_index': k, '_type': 'log'}
435 [i.update(to_update) for i in v]
436 elasticsearch.helpers.bulk(Datastores.es, v)
424
437
425 log.info(
438 log.info(
426 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
439 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
@@ -159,8 +159,7 b' def system(request):'
159
159
160 # es indices
160 # es indices
161 es_indices = []
161 es_indices = []
162 result = Datastores.es.send_request(
162 result = Datastores.es.indices.stats(metric=['store, docs'])
163 'GET', ['_stats', 'store, docs'], query_params={})
164 for ix, stats in result['indices'].items():
163 for ix, stats in result['indices'].items():
165 size = stats['primaries']['store']['size_in_bytes']
164 size = stats['primaries']['store']['size_in_bytes']
166 es_indices.append({'name': ix,
165 es_indices.append({'name': ix,
@@ -50,7 +50,7 b' def get_partition_stats():'
50 if not ix_time in holder:
50 if not ix_time in holder:
51 holder[ix_time] = {'pg': [], 'elasticsearch': []}
51 holder[ix_time] = {'pg': [], 'elasticsearch': []}
52
52
53 for partition in list(Datastores.es.aliases().keys()):
53 for partition in list(Datastores.es.indices.get_alias('rcae*')):
54 if not partition.startswith('rcae'):
54 if not partition.startswith('rcae'):
55 continue
55 continue
56 split_data = partition.split('_')
56 split_data = partition.split('_')
@@ -128,7 +128,7 b' def partitions_remove(request):'
128 if form.validate():
128 if form.validate():
129 for ix in form.data['es_index']:
129 for ix in form.data['es_index']:
130 log.warning('deleting ES partition: {}'.format(ix))
130 log.warning('deleting ES partition: {}'.format(ix))
131 Datastores.es.delete_index(ix)
131 Datastores.es.indices.delete(ix)
132 for ix in form.data['pg_index']:
132 for ix in form.data['pg_index']:
133 log.warning('deleting PG partition: {}'.format(ix))
133 log.warning('deleting PG partition: {}'.format(ix))
134 stmt = sa.text('DROP TABLE %s CASCADE' % sa.text(ix))
134 stmt = sa.text('DROP TABLE %s CASCADE' % sa.text(ix))
@@ -163,7 +163,7 b' def common_tags(request):'
163 # tags
163 # tags
164 index_names = es_index_name_limiter(
164 index_names = es_index_name_limiter(
165 ixtypes=[config.get('datasource', 'logs')])
165 ixtypes=[config.get('datasource', 'logs')])
166 result = Datastores.es.search(query, index=index_names, doc_type='log',
166 result = Datastores.es.search(body=query, index=index_names, doc_type='log',
167 size=0)
167 size=0)
168 tag_buckets = result['aggregations']['sub_agg'].get('buckets', [])
168 tag_buckets = result['aggregations']['sub_agg'].get('buckets', [])
169 # namespaces
169 # namespaces
@@ -175,7 +175,7 b' def common_tags(request):'
175 }
175 }
176 }
176 }
177 }
177 }
178 result = Datastores.es.search(query, index=index_names, doc_type='log',
178 result = Datastores.es.search(body=query, index=index_names, doc_type='log',
179 size=0)
179 size=0)
180 namespaces_buckets = result['aggregations']['sub_agg'].get('buckets', [])
180 namespaces_buckets = result['aggregations']['sub_agg'].get('buckets', [])
181 return {
181 return {
@@ -216,7 +216,7 b' def common_values(request):'
216 }
216 }
217 }
217 }
218 index_names = es_index_name_limiter(ixtypes=[datasource])
218 index_names = es_index_name_limiter(ixtypes=[datasource])
219 result = Datastores.es.search(query, index=index_names, doc_type='log',
219 result = Datastores.es.search(body=query, index=index_names, doc_type='log',
220 size=0)
220 size=0)
221 values_buckets = result['aggregations']['sub_agg'].get('buckets', [])
221 values_buckets = result['aggregations']['sub_agg'].get('buckets', [])
222 return {
222 return {
General Comments 0
You need to be logged in to leave comments. Login now