##// END OF EJS Templates
elasticsearch: replace pyelasticsearch with elasticsearch
ergo -
Show More
@@ -36,7 +36,7 b' pygments==2.1.3'
36 36 lxml==4.3.2
37 37 paginate==0.5.4
38 38 paginate-sqlalchemy==0.2.0
39 pyelasticsearch==1.4
39 elasticsearch>=2.0.0,<3.0.0
40 40 six>=1.10.0
41 41 mock==1.0.1
42 42 itsdangerous==1.1.0
@@ -16,7 +16,7 b''
16 16
17 17 import datetime
18 18 import logging
19 import pyelasticsearch
19 from elasticsearch import Elasticsearch
20 20 import redis
21 21 import os
22 22 import pkg_resources
@@ -150,7 +150,7 b' def main(global_config, **settings):'
150 150 redis_url = settings['redis.url']
151 151 log.warning('Elasticsearch server list: {}'.format(es_server_list))
152 152 log.warning('Redis server: {}'.format(redis_url))
153 config.registry.es_conn = pyelasticsearch.ElasticSearch(es_server_list)
153 config.registry.es_conn = Elasticsearch(es_server_list)
154 154 config.registry.redis_conn = redis.StrictRedis.from_url(redis_url)
155 155
156 156 config.registry.redis_lockmgr = Redlock([settings['redis.redlock.url'], ],
@@ -20,7 +20,8 b' import math'
20 20 from datetime import datetime, timedelta
21 21
22 22 import sqlalchemy as sa
23 import pyelasticsearch
23 import elasticsearch.exceptions
24 import elasticsearch.helpers
24 25
25 26 from celery.utils.log import get_task_logger
26 27 from zope.sqlalchemy import mark_changed
@@ -226,22 +227,29 b' def add_reports(resource_id, request_params, dataset, **kwargs):'
226 227 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
227 228 def add_reports_es(report_group_docs, report_docs):
228 229 for k, v in report_group_docs.items():
229 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
230 to_update = {'_index': k, '_type': 'report_group'}
231 [i.update(to_update) for i in v]
232 elasticsearch.helpers.bulk(Datastores.es, v)
230 233 for k, v in report_docs.items():
231 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
232 parent_field='_parent')
234 to_update = {'_index': k, '_type': 'report'}
235 [i.update(to_update) for i in v]
236 elasticsearch.helpers.bulk(Datastores.es, v)
233 237
234 238
235 239 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
236 240 def add_reports_slow_calls_es(es_docs):
237 241 for k, v in es_docs.items():
238 Datastores.es.bulk_index(k, 'log', v)
242 to_update = {'_index': k, '_type': 'log'}
243 [i.update(to_update) for i in v]
244 elasticsearch.helpers.bulk(Datastores.es, v)
239 245
240 246
241 247 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
242 248 def add_reports_stats_rows_es(es_docs):
243 249 for k, v in es_docs.items():
244 Datastores.es.bulk_index(k, 'log', v)
250 to_update = {'_index': k, '_type': 'log'}
251 [i.update(to_update) for i in v]
252 elasticsearch.helpers.bulk(Datastores.es, v)
245 253
246 254
247 255 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
@@ -304,12 +312,12 b' def add_logs(resource_id, request_params, dataset, **kwargs):'
304 312 # batch this to avoid problems with default ES bulk limits
305 313 for es_index in es_docs_to_delete.keys():
306 314 for batch in in_batches(es_docs_to_delete[es_index], 20):
307 query = {'terms': {'delete_hash': batch}}
315 query = {"query": {'terms': {'delete_hash': batch}}}
308 316
309 317 try:
310 Datastores.es.delete_by_query(
311 es_index, 'log', query)
312 except pyelasticsearch.ElasticHttpNotFoundError as exc:
318 Datastores.es.transport.perform_request(
319 "DELETE", '/{}/{}/_query'.format(es_index, 'log'), body=query)
320 except elasticsearch.exceptions.NotFoundError as exc:
313 321 msg = 'skipping index {}'.format(es_index)
314 322 log.info(msg)
315 323
@@ -349,7 +357,9 b' def add_logs(resource_id, request_params, dataset, **kwargs):'
349 357 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
350 358 def add_logs_es(es_docs):
351 359 for k, v in es_docs.items():
352 Datastores.es.bulk_index(k, 'log', v)
360 to_update = {'_index': k, '_type': 'log'}
361 [i.update(to_update) for i in v]
362 elasticsearch.helpers.bulk(Datastores.es, v)
353 363
354 364
355 365 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
@@ -627,8 +637,6 b' def logs_cleanup(resource_id, filter_settings):'
627 637 request = get_current_request()
628 638 request.tm.begin()
629 639 es_query = {
630 "_source": False,
631 "size": 5000,
632 640 "query": {
633 641 "filtered": {
634 642 "filter": {
@@ -646,27 +654,4 b' def logs_cleanup(resource_id, filter_settings):'
646 654 )
647 655 query.delete(synchronize_session=False)
648 656 request.tm.commit()
649 result = request.es_conn.search(es_query, index='rcae_l_*',
650 doc_type='log', es_scroll='1m',
651 es_search_type='scan')
652 scroll_id = result['_scroll_id']
653 while True:
654 log.warning('log_cleanup, app:{} ns:{} batch'.format(
655 resource_id,
656 filter_settings['namespace']
657 ))
658 es_docs_to_delete = []
659 result = request.es_conn.send_request(
660 'POST', ['_search', 'scroll'],
661 body=scroll_id, query_params={"scroll": '1m'})
662 scroll_id = result['_scroll_id']
663 if not result['hits']['hits']:
664 break
665 for doc in result['hits']['hits']:
666 es_docs_to_delete.append({"id": doc['_id'],
667 "index": doc['_index']})
668
669 for batch in in_batches(es_docs_to_delete, 10):
670 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
671 **to_del)
672 for to_del in batch])
657 Datastores.es.transport.perform_request("DELETE", '/{}/{}/_query'.format('rcae_l_*', 'log'), body=es_query)
@@ -19,7 +19,6 b' import logging'
19 19
20 20 from pyramid.httpexceptions import HTTPForbidden, HTTPTooManyRequests
21 21
22 from appenlight.models import Datastores
23 22 from appenlight.models.services.config import ConfigService
24 23 from appenlight.lib.redis_keys import REDIS_KEYS
25 24
@@ -189,7 +189,7 b' def es_index_name_limiter(start_date=None, end_date=None, months_in_past=6,'
189 189
190 190 # should be cached later
191 191 def get_possible_names():
192 return list(Datastores.es.aliases().keys())
192 return list(Datastores.es.indices.get_alias('*'))
193 193
194 194 possible_names = get_possible_names()
195 195 es_index_types = []
@@ -66,11 +66,11 b' class SliceableESQuery(object):'
66 66
67 67 def __getitem__(self, index):
68 68 config = self.kwconfig.copy()
69 config['es_from'] = index.start
69 config['from_'] = index.start
70 70 query = self.query.copy()
71 71 if self.sort_query:
72 72 query.update(self.sort_query)
73 self.result = Datastores.es.search(query, size=self.items_per_page,
73 self.result = Datastores.es.search(body=query, size=self.items_per_page,
74 74 **config)
75 75 if self.aggregations:
76 76 self.items = self.result.get('aggregations')
@@ -85,7 +85,7 b' class SliceableESQuery(object):'
85 85 def __len__(self):
86 86 config = self.kwconfig.copy()
87 87 query = self.query.copy()
88 self.result = Datastores.es.search(query, size=self.items_per_page,
88 self.result = Datastores.es.search(body=query, size=self.items_per_page,
89 89 **config)
90 90 if self.aggregations:
91 91 self.items = self.result.get('aggregations')
@@ -310,7 +310,7 b' class Report(Base, BaseModel):'
310 310 {"_doc": {"order": "desc"}},
311 311 ],
312 312 }
313 result = request.es_conn.search(query, index=self.partition_id,
313 result = request.es_conn.search(body=query, index=self.partition_id,
314 314 doc_type='report')
315 315 if result['hits']['total']:
316 316 return result['hits']['hits'][0]['_source']['pg_id']
@@ -330,7 +330,7 b' class Report(Base, BaseModel):'
330 330 {"_doc": {"order": "asc"}},
331 331 ],
332 332 }
333 result = request.es_conn.search(query, index=self.partition_id,
333 result = request.es_conn.search(body=query, index=self.partition_id,
334 334 doc_type='report')
335 335 if result['hits']['total']:
336 336 return result['hits']['hits'][0]['_source']['pg_id']
@@ -505,8 +505,8 b' def after_update(mapper, connection, target):'
505 505
506 506 def after_delete(mapper, connection, target):
507 507 if not hasattr(target, '_skip_ft_index'):
508 query = {'term': {'pg_id': target.id}}
509 Datastores.es.delete_by_query(target.partition_id, 'report', query)
508 query = {"query":{'term': {'pg_id': target.id}}}
509 Datastores.es.transport.perform_request("DELETE", '/{}/{}/_query'.format(target.partition_id, 'report'), body=query)
510 510
511 511
512 512 sa.event.listen(Report, 'after_insert', after_insert)
@@ -256,13 +256,11 b' def after_update(mapper, connection, target):'
256 256
257 257
258 258 def after_delete(mapper, connection, target):
259 query = {'term': {'group_id': target.id}}
260 # TODO: routing seems unnecessary, need to test a bit more
261 # Datastores.es.delete_by_query(target.partition_id, 'report', query,
262 # query_params={'routing':str(target.id)})
263 Datastores.es.delete_by_query(target.partition_id, 'report', query)
264 query = {'term': {'pg_id': target.id}}
265 Datastores.es.delete_by_query(target.partition_id, 'report_group', query)
259 query = {"query": {'term': {'group_id': target.id}}}
260 # delete by query
261 Datastores.es.transport.perform_request("DELETE", '/{}/{}/_query'.format(target.partition_id, 'report'), body=query)
262 query = {"query": {'term': {'pg_id': target.id}}}
263 Datastores.es.transport.perform_request("DELETE", '/{}/{}/_query'.format(target.partition_id, 'report_group'), body=query)
266 264
267 265
268 266 sa.event.listen(ReportGroup, 'after_insert', after_insert)
@@ -130,7 +130,7 b' class LogService(BaseService):'
130 130 ixtypes=['logs'])
131 131 if index_names:
132 132 results = Datastores.es.search(
133 es_query, index=index_names, doc_type='log', size=0)
133 body=es_query, index=index_names, doc_type='log', size=0)
134 134 else:
135 135 results = []
136 136 return results
@@ -156,9 +156,9 b' class LogService(BaseService):'
156 156 if not index_names:
157 157 return {}, 0
158 158
159 results = Datastores.es.search(es_query, index=index_names,
159 results = Datastores.es.search(body=es_query, index=index_names,
160 160 doc_type='log', size=items_per_page,
161 es_from=es_from)
161 from_=es_from)
162 162 if results['hits']['total'] > 5000:
163 163 count = 5000
164 164 else:
@@ -78,7 +78,7 b' class ReportGroupService(BaseService):'
78 78 es_query['query']['filtered']['filter']['and'].extend(tags)
79 79
80 80 result = Datastores.es.search(
81 es_query, index=index_names, doc_type='log', size=0)
81 body=es_query, index=index_names, doc_type='log', size=0)
82 82 series = []
83 83 for bucket in result['aggregations']['parent_agg']['buckets']:
84 84 series.append({
@@ -249,7 +249,7 b' class ReportGroupService(BaseService):'
249 249 ixtypes=['reports'])
250 250 if index_names:
251 251 results = Datastores.es.search(
252 query, index=index_names, doc_type=["report", "report_group"],
252 body=query, index=index_names, doc_type=["report", "report_group"],
253 253 size=0)
254 254 else:
255 255 return []
@@ -428,7 +428,7 b' class ReportGroupService(BaseService):'
428 428 if not index_names:
429 429 return []
430 430
431 result = Datastores.es.search(es_query,
431 result = Datastores.es.search(body=es_query,
432 432 index=index_names,
433 433 doc_type='log',
434 434 size=0)
@@ -41,7 +41,7 b' class ReportStatService(BaseService):'
41 41 'gte': since_when}}}]}}}}
42 42
43 43 if index_names:
44 result = Datastores.es.search(es_query,
44 result = Datastores.es.search(body=es_query,
45 45 index=index_names,
46 46 doc_type='log',
47 47 size=0)
@@ -113,7 +113,7 b' class RequestMetricService(BaseService):'
113 113 if not index_names:
114 114 return []
115 115
116 result = Datastores.es.search(es_query,
116 result = Datastores.es.search(body=es_query,
117 117 index=index_names,
118 118 doc_type='log',
119 119 size=0)
@@ -156,7 +156,7 b' class RequestMetricService(BaseService):'
156 156 'lte': filter_settings['end_date']}}},
157 157 {'terms': {'namespace': [
158 158 'appenlight.request_metric']}}]}}}}
159 result = Datastores.es.search(es_query,
159 result = Datastores.es.search(body=es_query,
160 160 index=index_names,
161 161 doc_type='log',
162 162 size=0)
@@ -205,7 +205,7 b' class RequestMetricService(BaseService):'
205 205 ]}
206 206 }}
207 207 }
208 result = Datastores.es.search(es_query,
208 result = Datastores.es.search(body=es_query,
209 209 index=index_names,
210 210 doc_type='log',
211 211 size=0)
@@ -249,7 +249,7 b' class RequestMetricService(BaseService):'
249 249 index_names = es_index_name_limiter(ixtypes=['reports'])
250 250 if index_names and series:
251 251 result = Datastores.es.search(
252 query, doc_type='report', size=0, index=index_names)
252 body=query, doc_type='report', size=0, index=index_names)
253 253 for bucket in result['aggregations']['top_reports']['buckets']:
254 254 details[bucket['key']] = []
255 255
@@ -340,7 +340,7 b' class RequestMetricService(BaseService):'
340 340 {'terms': {'namespace': [
341 341 'appenlight.request_metric']}}]}}}}
342 342
343 result = Datastores.es.search(es_query,
343 result = Datastores.es.search(body=es_query,
344 344 index=index_names,
345 345 doc_type='log',
346 346 size=0)
@@ -391,7 +391,7 b' class RequestMetricService(BaseService):'
391 391 }
392 392 }}
393 393 }
394 result = Datastores.es.search(es_query,
394 result = Datastores.es.search(body=es_query,
395 395 index=index_names,
396 396 doc_type='log',
397 397 size=0)
@@ -65,7 +65,7 b' class SlowCallService(BaseService):'
65 65 }
66 66 }
67 67 result = Datastores.es.search(
68 es_query, index=index_names, doc_type='log', size=0)
68 body=es_query, index=index_names, doc_type='log', size=0)
69 69 results = result['aggregations']['parent_agg']['buckets']
70 70 else:
71 71 return []
@@ -118,7 +118,7 b' class SlowCallService(BaseService):'
118 118 }
119 119 }
120 120 }
121 calls = Datastores.es.search(calls_query,
121 calls = Datastores.es.search(body=calls_query,
122 122 index=index_names,
123 123 doc_type='log',
124 124 size=0)
@@ -19,6 +19,9 b' import datetime'
19 19 import logging
20 20
21 21 import sqlalchemy as sa
22 import elasticsearch.exceptions
23 import elasticsearch.helpers
24
22 25 from collections import defaultdict
23 26 from pyramid.paster import setup_logging
24 27 from pyramid.paster import bootstrap
@@ -97,7 +100,7 b' def main():'
97 100 if v.get('fulltext_indexer'):
98 101 choices[k] = v['fulltext_indexer']
99 102 parser.add_argument('-t', '--types', nargs='*',
100 choices=['all'] + list(choices.keys()), default=['all'],
103 choices=['all'] + list(choices.keys()), default=[],
101 104 help='Which parts of database should get reindexed')
102 105 parser.add_argument('-c', '--config', required=True,
103 106 help='Configuration ini file of application')
@@ -107,6 +110,8 b' def main():'
107 110 if 'all' in args.types:
108 111 args.types = list(choices.keys())
109 112
113 print("Selected types to reindex: {}".format(args.types))
114
110 115 log.info('settings {}'.format(args.types))
111 116
112 117 if 'template' in args.types:
@@ -118,10 +123,9 b' def main():'
118 123
119 124 def update_template():
120 125 try:
121 Datastores.es.send_request("delete", ['_template', 'rcae'],
122 query_params={})
123 except Exception as e:
124 print(e)
126 Datastores.es.indices.delete_template('rcae')
127 except elasticsearch.exceptions.NotFoundError as e:
128 log.error(e)
125 129 log.info('updating elasticsearch template')
126 130 tag_templates = [
127 131 {"values": {
@@ -230,15 +234,14 b' def update_template():'
230 234 }
231 235 }
232 236
233 Datastores.es.send_request('PUT', ['_template', 'rcae'],
234 body=template_schema, query_params={})
237 Datastores.es.indices.put_template('rcae', body=template_schema)
235 238
236 239
237 240 def reindex_reports():
238 241 reports_groups_tables = detect_tables('reports_groups_p_')
239 242 try:
240 Datastores.es.delete_index('rcae_r*')
241 except Exception as e:
243 Datastores.es.indices.delete('rcae_r*')
244 except elasticsearch.exceptions.NotFoundError as e:
242 245 log.error(e)
243 246
244 247 log.info('reindexing report groups')
@@ -261,8 +264,9 b' def reindex_reports():'
261 264 name = partition_table.name
262 265 log.info('round {}, {}'.format(i, name))
263 266 for k, v in es_docs.items():
264 Datastores.es.bulk_index(k, 'report_group', v,
265 id_field="_id")
267 to_update = {'_index': k, '_type': 'report_group'}
268 [i.update(to_update) for i in v]
269 elasticsearch.helpers.bulk(Datastores.es, v)
266 270
267 271 log.info(
268 272 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
@@ -288,8 +292,9 b' def reindex_reports():'
288 292 name = partition_table.name
289 293 log.info('round {}, {}'.format(i, name))
290 294 for k, v in es_docs.items():
291 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
292 parent_field='_parent')
295 to_update = {'_index': k, '_type': 'report'}
296 [i.update(to_update) for i in v]
297 elasticsearch.helpers.bulk(Datastores.es, v)
293 298
294 299 log.info(
295 300 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
@@ -319,7 +324,9 b' def reindex_reports():'
319 324 name = partition_table.name
320 325 log.info('round {}, {}'.format(i, name))
321 326 for k, v in es_docs.items():
322 Datastores.es.bulk_index(k, 'log', v)
327 to_update = {'_index': k, '_type': 'log'}
328 [i.update(to_update) for i in v]
329 elasticsearch.helpers.bulk(Datastores.es, v)
323 330
324 331 log.info(
325 332 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
@@ -327,8 +334,8 b' def reindex_reports():'
327 334
328 335 def reindex_logs():
329 336 try:
330 Datastores.es.delete_index('rcae_l*')
331 except Exception as e:
337 Datastores.es.indices.delete('rcae_l*')
338 except elasticsearch.exceptions.NotFoundError as e:
332 339 log.error(e)
333 340
334 341 # logs
@@ -354,7 +361,9 b' def reindex_logs():'
354 361 name = partition_table.name
355 362 log.info('round {}, {}'.format(i, name))
356 363 for k, v in es_docs.items():
357 Datastores.es.bulk_index(k, 'log', v)
364 to_update = {'_index': k, '_type': 'log'}
365 [i.update(to_update) for i in v]
366 elasticsearch.helpers.bulk(Datastores.es, v)
358 367
359 368 log.info(
360 369 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
@@ -362,9 +371,9 b' def reindex_logs():'
362 371
363 372 def reindex_metrics():
364 373 try:
365 Datastores.es.delete_index('rcae_m*')
366 except Exception as e:
367 print(e)
374 Datastores.es.indices.delete('rcae_m*')
375 except elasticsearch.exceptions.NotFoundError as e:
376 log.error(e)
368 377
369 378 log.info('reindexing applications metrics')
370 379 i = 0
@@ -387,7 +396,9 b' def reindex_metrics():'
387 396 name = partition_table.name
388 397 log.info('round {}, {}'.format(i, name))
389 398 for k, v in es_docs.items():
390 Datastores.es.bulk_index(k, 'log', v)
399 to_update = {'_index': k, '_type': 'log'}
400 [i.update(to_update) for i in v]
401 elasticsearch.helpers.bulk(Datastores.es, v)
391 402
392 403 log.info(
393 404 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
@@ -395,9 +406,9 b' def reindex_metrics():'
395 406
396 407 def reindex_slow_calls():
397 408 try:
398 Datastores.es.delete_index('rcae_sc*')
399 except Exception as e:
400 print(e)
409 Datastores.es.indices.delete('rcae_sc*')
410 except elasticsearch.exceptions.NotFoundError as e:
411 log.error(e)
401 412
402 413 log.info('reindexing slow calls')
403 414 i = 0
@@ -420,7 +431,9 b' def reindex_slow_calls():'
420 431 name = partition_table.name
421 432 log.info('round {}, {}'.format(i, name))
422 433 for k, v in es_docs.items():
423 Datastores.es.bulk_index(k, 'log', v)
434 to_update = {'_index': k, '_type': 'log'}
435 [i.update(to_update) for i in v]
436 elasticsearch.helpers.bulk(Datastores.es, v)
424 437
425 438 log.info(
426 439 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
@@ -159,8 +159,7 b' def system(request):'
159 159
160 160 # es indices
161 161 es_indices = []
162 result = Datastores.es.send_request(
163 'GET', ['_stats', 'store, docs'], query_params={})
162 result = Datastores.es.indices.stats(metric=['store, docs'])
164 163 for ix, stats in result['indices'].items():
165 164 size = stats['primaries']['store']['size_in_bytes']
166 165 es_indices.append({'name': ix,
@@ -50,7 +50,7 b' def get_partition_stats():'
50 50 if not ix_time in holder:
51 51 holder[ix_time] = {'pg': [], 'elasticsearch': []}
52 52
53 for partition in list(Datastores.es.aliases().keys()):
53 for partition in list(Datastores.es.indices.get_alias('rcae*')):
54 54 if not partition.startswith('rcae'):
55 55 continue
56 56 split_data = partition.split('_')
@@ -128,7 +128,7 b' def partitions_remove(request):'
128 128 if form.validate():
129 129 for ix in form.data['es_index']:
130 130 log.warning('deleting ES partition: {}'.format(ix))
131 Datastores.es.delete_index(ix)
131 Datastores.es.indices.delete(ix)
132 132 for ix in form.data['pg_index']:
133 133 log.warning('deleting PG partition: {}'.format(ix))
134 134 stmt = sa.text('DROP TABLE %s CASCADE' % sa.text(ix))
@@ -163,7 +163,7 b' def common_tags(request):'
163 163 # tags
164 164 index_names = es_index_name_limiter(
165 165 ixtypes=[config.get('datasource', 'logs')])
166 result = Datastores.es.search(query, index=index_names, doc_type='log',
166 result = Datastores.es.search(body=query, index=index_names, doc_type='log',
167 167 size=0)
168 168 tag_buckets = result['aggregations']['sub_agg'].get('buckets', [])
169 169 # namespaces
@@ -175,7 +175,7 b' def common_tags(request):'
175 175 }
176 176 }
177 177 }
178 result = Datastores.es.search(query, index=index_names, doc_type='log',
178 result = Datastores.es.search(body=query, index=index_names, doc_type='log',
179 179 size=0)
180 180 namespaces_buckets = result['aggregations']['sub_agg'].get('buckets', [])
181 181 return {
@@ -216,7 +216,7 b' def common_values(request):'
216 216 }
217 217 }
218 218 index_names = es_index_name_limiter(ixtypes=[datasource])
219 result = Datastores.es.search(query, index=index_names, doc_type='log',
219 result = Datastores.es.search(body=query, index=index_names, doc_type='log',
220 220 size=0)
221 221 values_buckets = result['aggregations']['sub_agg'].get('buckets', [])
222 222 return {
General Comments 0
You need to be logged in to leave comments. Login now