##// END OF EJS Templates
elasticsearch: make compatible with elasticsearch 5.6
ergo -
Show More
@@ -1,430 +1,430 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 #
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
7 # You may obtain a copy of the License at
8 #
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
10 #
11 # Unless required by applicable law or agreed to in writing, software
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
15 # limitations under the License.
16
16
17 import argparse
17 import argparse
18 import datetime
18 import datetime
19 import logging
19 import logging
20
20
21 import sqlalchemy as sa
21 import sqlalchemy as sa
22 from collections import defaultdict
22 from collections import defaultdict
23 from pyramid.paster import setup_logging
23 from pyramid.paster import setup_logging
24 from pyramid.paster import bootstrap
24 from pyramid.paster import bootstrap
25 from appenlight.models import (
25 from appenlight.models import (
26 DBSession,
26 DBSession,
27 Datastores,
27 Datastores,
28 metadata
28 metadata
29 )
29 )
30 from appenlight.lib import get_callable
30 from appenlight.lib import get_callable
31 from appenlight.models.report_group import ReportGroup
31 from appenlight.models.report_group import ReportGroup
32 from appenlight.models.report import Report
32 from appenlight.models.report import Report
33 from appenlight.models.report_stat import ReportStat
33 from appenlight.models.report_stat import ReportStat
34 from appenlight.models.log import Log
34 from appenlight.models.log import Log
35 from appenlight.models.slow_call import SlowCall
35 from appenlight.models.slow_call import SlowCall
36 from appenlight.models.metric import Metric
36 from appenlight.models.metric import Metric
37
37
38
38
39 log = logging.getLogger(__name__)
39 log = logging.getLogger(__name__)
40
40
41 tables = {
41 tables = {
42 'slow_calls_p_': [],
42 'slow_calls_p_': [],
43 'reports_stats_p_': [],
43 'reports_stats_p_': [],
44 'reports_p_': [],
44 'reports_p_': [],
45 'reports_groups_p_': [],
45 'reports_groups_p_': [],
46 'logs_p_': [],
46 'logs_p_': [],
47 'metrics_p_': [],
47 'metrics_p_': [],
48 }
48 }
49
49
50 def detect_tables(table_prefix):
50 def detect_tables(table_prefix):
51 found_tables = []
51 found_tables = []
52 db_tables_query = '''
52 db_tables_query = '''
53 SELECT tablename FROM pg_tables WHERE tablename NOT LIKE 'pg_%' AND
53 SELECT tablename FROM pg_tables WHERE tablename NOT LIKE 'pg_%' AND
54 tablename NOT LIKE 'sql_%' ORDER BY tablename ASC;'''
54 tablename NOT LIKE 'sql_%' ORDER BY tablename ASC;'''
55
55
56 for table in DBSession.execute(db_tables_query).fetchall():
56 for table in DBSession.execute(db_tables_query).fetchall():
57 tablename = table.tablename
57 tablename = table.tablename
58 if tablename.startswith(table_prefix):
58 if tablename.startswith(table_prefix):
59 t = sa.Table(tablename, metadata, autoload=True,
59 t = sa.Table(tablename, metadata, autoload=True,
60 autoload_with=DBSession.bind.engine)
60 autoload_with=DBSession.bind.engine)
61 found_tables.append(t)
61 found_tables.append(t)
62 return found_tables
62 return found_tables
63
63
64
64
65 def main():
65 def main():
66 """
66 """
67 Recreates Elasticsearch indexes
67 Recreates Elasticsearch indexes
68 Performs reindex of whole db to Elasticsearch
68 Performs reindex of whole db to Elasticsearch
69
69
70 """
70 """
71
71
72 # need parser twice because we first need to load ini file
72 # need parser twice because we first need to load ini file
73 # bootstrap pyramid and then load plugins
73 # bootstrap pyramid and then load plugins
74 pre_parser = argparse.ArgumentParser(
74 pre_parser = argparse.ArgumentParser(
75 description='Reindex AppEnlight data',
75 description='Reindex AppEnlight data',
76 add_help=False)
76 add_help=False)
77 pre_parser.add_argument('-c', '--config', required=True,
77 pre_parser.add_argument('-c', '--config', required=True,
78 help='Configuration ini file of application')
78 help='Configuration ini file of application')
79 pre_parser.add_argument('-h', '--help', help='Show help', nargs='?')
79 pre_parser.add_argument('-h', '--help', help='Show help', nargs='?')
80 pre_parser.add_argument('-t', '--types', nargs='+',
80 pre_parser.add_argument('-t', '--types', nargs='+',
81 help='Which parts of database should get reindexed')
81 help='Which parts of database should get reindexed')
82 args = pre_parser.parse_args()
82 args = pre_parser.parse_args()
83
83
84 config_uri = args.config
84 config_uri = args.config
85 setup_logging(config_uri)
85 setup_logging(config_uri)
86 log.setLevel(logging.INFO)
86 log.setLevel(logging.INFO)
87 env = bootstrap(config_uri)
87 env = bootstrap(config_uri)
88 parser = argparse.ArgumentParser(description='Reindex AppEnlight data')
88 parser = argparse.ArgumentParser(description='Reindex AppEnlight data')
89 choices = {
89 choices = {
90 'reports': 'appenlight.scripts.reindex_elasticsearch:reindex_reports',
90 'reports': 'appenlight.scripts.reindex_elasticsearch:reindex_reports',
91 'logs': 'appenlight.scripts.reindex_elasticsearch:reindex_logs',
91 'logs': 'appenlight.scripts.reindex_elasticsearch:reindex_logs',
92 'metrics': 'appenlight.scripts.reindex_elasticsearch:reindex_metrics',
92 'metrics': 'appenlight.scripts.reindex_elasticsearch:reindex_metrics',
93 'slow_calls': 'appenlight.scripts.reindex_elasticsearch:reindex_slow_calls',
93 'slow_calls': 'appenlight.scripts.reindex_elasticsearch:reindex_slow_calls',
94 'template': 'appenlight.scripts.reindex_elasticsearch:update_template'
94 'template': 'appenlight.scripts.reindex_elasticsearch:update_template'
95 }
95 }
96 for k, v in env['registry'].appenlight_plugins.items():
96 for k, v in env['registry'].appenlight_plugins.items():
97 if v.get('fulltext_indexer'):
97 if v.get('fulltext_indexer'):
98 choices[k] = v['fulltext_indexer']
98 choices[k] = v['fulltext_indexer']
99 parser.add_argument('-t', '--types', nargs='*',
99 parser.add_argument('-t', '--types', nargs='*',
100 choices=['all'] + list(choices.keys()), default=['all'],
100 choices=['all'] + list(choices.keys()), default=['all'],
101 help='Which parts of database should get reindexed')
101 help='Which parts of database should get reindexed')
102 parser.add_argument('-c', '--config', required=True,
102 parser.add_argument('-c', '--config', required=True,
103 help='Configuration ini file of application')
103 help='Configuration ini file of application')
104 args = parser.parse_args()
104 args = parser.parse_args()
105
105
106
106
107 if 'all' in args.types:
107 if 'all' in args.types:
108 args.types = list(choices.keys())
108 args.types = list(choices.keys())
109
109
110 log.info('settings {}'.format(args.types))
110 log.info('settings {}'.format(args.types))
111
111
112 if 'template' in args.types:
112 if 'template' in args.types:
113 get_callable(choices['template'])()
113 get_callable(choices['template'])()
114 args.types.remove('template')
114 args.types.remove('template')
115 for selected in args.types:
115 for selected in args.types:
116 get_callable(choices[selected])()
116 get_callable(choices[selected])()
117
117
118
118
119 def update_template():
119 def update_template():
120 try:
120 try:
121 Datastores.es.send_request("delete", ['_template', 'rcae'],
121 Datastores.es.send_request("delete", ['_template', 'rcae'],
122 query_params={})
122 query_params={})
123 except Exception as e:
123 except Exception as e:
124 print(e)
124 print(e)
125 log.info('updating elasticsearch template')
125 log.info('updating elasticsearch template')
126 tag_templates = [
126 tag_templates = [
127 {"values": {
127 {"values": {
128 "path_match": "tags.*",
128 "path_match": "tags.*",
129 "mapping": {
129 "mapping": {
130 "type": "object",
130 "type": "object",
131 "properties": {
131 "properties": {
132 "values": {"type": "string", "analyzer": "tag_value"},
132 "values": {"type": "string", "analyzer": "tag_value"},
133 "numeric_values": {"type": "float"}
133 "numeric_values": {"type": "float"}
134 }
134 }
135 }
135 }
136 }}
136 }}
137 ]
137 ]
138
138
139 template_schema = {
139 template_schema = {
140 "template": "rcae_*",
140 "template": "rcae_*",
141 "settings": {
141 "settings": {
142 "index": {
142 "index": {
143 "refresh_interval": "5s",
143 "refresh_interval": "5s",
144 "translog": {"interval": "5s",
144 "translog": {"sync_interval": "5s",
145 "durability": "async"}
145 "durability": "async"}
146 },
146 },
147 "number_of_shards": 5,
147 "number_of_shards": 5,
148 "analysis": {
148 "analysis": {
149 "analyzer": {
149 "analyzer": {
150 "url_path": {
150 "url_path": {
151 "type": "custom",
151 "type": "custom",
152 "char_filter": [],
152 "char_filter": [],
153 "tokenizer": "path_hierarchy",
153 "tokenizer": "path_hierarchy",
154 "filter": []
154 "filter": []
155 },
155 },
156 "tag_value": {
156 "tag_value": {
157 "type": "custom",
157 "type": "custom",
158 "char_filter": [],
158 "char_filter": [],
159 "tokenizer": "keyword",
159 "tokenizer": "keyword",
160 "filter": ["lowercase"]
160 "filter": ["lowercase"]
161 },
161 },
162 }
162 }
163 },
163 },
164 },
164 },
165 "mappings": {
165 "mappings": {
166 "report_group": {
166 "report_group": {
167 "_all": {"enabled": False},
167 "_all": {"enabled": False},
168 "dynamic_templates": tag_templates,
168 "dynamic_templates": tag_templates,
169 "properties": {
169 "properties": {
170 "pg_id": {"type": "string", "index": "not_analyzed"},
170 "pg_id": {"type": "string", "index": "not_analyzed"},
171 "resource_id": {"type": "integer"},
171 "resource_id": {"type": "integer"},
172 "priority": {"type": "integer"},
172 "priority": {"type": "integer"},
173 "error": {"type": "string", "analyzer": "simple"},
173 "error": {"type": "string", "analyzer": "simple"},
174 "read": {"type": "boolean"},
174 "read": {"type": "boolean"},
175 "occurences": {"type": "integer"},
175 "occurences": {"type": "integer"},
176 "fixed": {"type": "boolean"},
176 "fixed": {"type": "boolean"},
177 "first_timestamp": {"type": "date"},
177 "first_timestamp": {"type": "date"},
178 "last_timestamp": {"type": "date"},
178 "last_timestamp": {"type": "date"},
179 "average_duration": {"type": "float"},
179 "average_duration": {"type": "float"},
180 "summed_duration": {"type": "float"},
180 "summed_duration": {"type": "float"},
181 "public": {"type": "boolean"}
181 "public": {"type": "boolean"}
182 }
182 }
183 },
183 },
184 "report": {
184 "report": {
185 "_all": {"enabled": False},
185 "_all": {"enabled": False},
186 "dynamic_templates": tag_templates,
186 "dynamic_templates": tag_templates,
187 "properties": {
187 "properties": {
188 "pg_id": {"type": "string", "index": "not_analyzed"},
188 "pg_id": {"type": "string", "index": "not_analyzed"},
189 "resource_id": {"type": "integer"},
189 "resource_id": {"type": "integer"},
190 "group_id": {"type": "string"},
190 "group_id": {"type": "string"},
191 "http_status": {"type": "integer"},
191 "http_status": {"type": "integer"},
192 "ip": {"type": "string", "index": "not_analyzed"},
192 "ip": {"type": "string", "index": "not_analyzed"},
193 "url_domain": {"type": "string", "analyzer": "simple"},
193 "url_domain": {"type": "string", "analyzer": "simple"},
194 "url_path": {"type": "string", "analyzer": "url_path"},
194 "url_path": {"type": "string", "analyzer": "url_path"},
195 "error": {"type": "string", "analyzer": "simple"},
195 "error": {"type": "string", "analyzer": "simple"},
196 "report_type": {"type": "integer"},
196 "report_type": {"type": "integer"},
197 "start_time": {"type": "date"},
197 "start_time": {"type": "date"},
198 "request_id": {"type": "string", "index": "not_analyzed"},
198 "request_id": {"type": "string", "index": "not_analyzed"},
199 "end_time": {"type": "date"},
199 "end_time": {"type": "date"},
200 "duration": {"type": "float"},
200 "duration": {"type": "float"},
201 "tags": {
201 "tags": {
202 "type": "object"
202 "type": "object"
203 },
203 },
204 "tag_list": {"type": "string", "analyzer": "tag_value"},
204 "tag_list": {"type": "string", "analyzer": "tag_value"},
205 "extra": {
205 "extra": {
206 "type": "object"
206 "type": "object"
207 },
207 },
208 },
208 },
209 "_parent": {"type": "report_group"}
209 "_parent": {"type": "report_group"}
210 },
210 },
211 "log": {
211 "log": {
212 "_all": {"enabled": False},
212 "_all": {"enabled": False},
213 "dynamic_templates": tag_templates,
213 "dynamic_templates": tag_templates,
214 "properties": {
214 "properties": {
215 "pg_id": {"type": "string", "index": "not_analyzed"},
215 "pg_id": {"type": "string", "index": "not_analyzed"},
216 "delete_hash": {"type": "string", "index": "not_analyzed"},
216 "delete_hash": {"type": "string", "index": "not_analyzed"},
217 "resource_id": {"type": "integer"},
217 "resource_id": {"type": "integer"},
218 "timestamp": {"type": "date"},
218 "timestamp": {"type": "date"},
219 "permanent": {"type": "boolean"},
219 "permanent": {"type": "boolean"},
220 "request_id": {"type": "string", "index": "not_analyzed"},
220 "request_id": {"type": "string", "index": "not_analyzed"},
221 "log_level": {"type": "string", "analyzer": "simple"},
221 "log_level": {"type": "string", "analyzer": "simple"},
222 "message": {"type": "string", "analyzer": "simple"},
222 "message": {"type": "string", "analyzer": "simple"},
223 "namespace": {"type": "string", "index": "not_analyzed"},
223 "namespace": {"type": "string", "index": "not_analyzed"},
224 "tags": {
224 "tags": {
225 "type": "object"
225 "type": "object"
226 },
226 },
227 "tag_list": {"type": "string", "analyzer": "tag_value"}
227 "tag_list": {"type": "string", "analyzer": "tag_value"}
228 }
228 }
229 }
229 }
230 }
230 }
231 }
231 }
232
232
233 Datastores.es.send_request('PUT', ['_template', 'rcae'],
233 Datastores.es.send_request('PUT', ['_template', 'rcae'],
234 body=template_schema, query_params={})
234 body=template_schema, query_params={})
235
235
236
236
237 def reindex_reports():
237 def reindex_reports():
238 reports_groups_tables = detect_tables('reports_groups_p_')
238 reports_groups_tables = detect_tables('reports_groups_p_')
239 try:
239 try:
240 Datastores.es.delete_index('rcae_r*')
240 Datastores.es.delete_index('rcae_r*')
241 except Exception as e:
241 except Exception as e:
242 log.error(e)
242 log.error(e)
243
243
244 log.info('reindexing report groups')
244 log.info('reindexing report groups')
245 i = 0
245 i = 0
246 task_start = datetime.datetime.now()
246 task_start = datetime.datetime.now()
247 for partition_table in reports_groups_tables:
247 for partition_table in reports_groups_tables:
248 conn = DBSession.connection().execution_options(stream_results=True)
248 conn = DBSession.connection().execution_options(stream_results=True)
249 result = conn.execute(partition_table.select())
249 result = conn.execute(partition_table.select())
250 while True:
250 while True:
251 chunk = result.fetchmany(2000)
251 chunk = result.fetchmany(2000)
252 if not chunk:
252 if not chunk:
253 break
253 break
254 es_docs = defaultdict(list)
254 es_docs = defaultdict(list)
255 for row in chunk:
255 for row in chunk:
256 i += 1
256 i += 1
257 item = ReportGroup(**dict(list(row.items())))
257 item = ReportGroup(**dict(list(row.items())))
258 d_range = item.partition_id
258 d_range = item.partition_id
259 es_docs[d_range].append(item.es_doc())
259 es_docs[d_range].append(item.es_doc())
260 if es_docs:
260 if es_docs:
261 name = partition_table.name
261 name = partition_table.name
262 log.info('round {}, {}'.format(i, name))
262 log.info('round {}, {}'.format(i, name))
263 for k, v in es_docs.items():
263 for k, v in es_docs.items():
264 Datastores.es.bulk_index(k, 'report_group', v,
264 Datastores.es.bulk_index(k, 'report_group', v,
265 id_field="_id")
265 id_field="_id")
266
266
267 log.info(
267 log.info(
268 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
268 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
269
269
270 i = 0
270 i = 0
271 log.info('reindexing reports')
271 log.info('reindexing reports')
272 task_start = datetime.datetime.now()
272 task_start = datetime.datetime.now()
273 reports_tables = detect_tables('reports_p_')
273 reports_tables = detect_tables('reports_p_')
274 for partition_table in reports_tables:
274 for partition_table in reports_tables:
275 conn = DBSession.connection().execution_options(stream_results=True)
275 conn = DBSession.connection().execution_options(stream_results=True)
276 result = conn.execute(partition_table.select())
276 result = conn.execute(partition_table.select())
277 while True:
277 while True:
278 chunk = result.fetchmany(2000)
278 chunk = result.fetchmany(2000)
279 if not chunk:
279 if not chunk:
280 break
280 break
281 es_docs = defaultdict(list)
281 es_docs = defaultdict(list)
282 for row in chunk:
282 for row in chunk:
283 i += 1
283 i += 1
284 item = Report(**dict(list(row.items())))
284 item = Report(**dict(list(row.items())))
285 d_range = item.partition_id
285 d_range = item.partition_id
286 es_docs[d_range].append(item.es_doc())
286 es_docs[d_range].append(item.es_doc())
287 if es_docs:
287 if es_docs:
288 name = partition_table.name
288 name = partition_table.name
289 log.info('round {}, {}'.format(i, name))
289 log.info('round {}, {}'.format(i, name))
290 for k, v in es_docs.items():
290 for k, v in es_docs.items():
291 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
291 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
292 parent_field='_parent')
292 parent_field='_parent')
293
293
294 log.info(
294 log.info(
295 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
295 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
296
296
297 log.info('reindexing reports stats')
297 log.info('reindexing reports stats')
298 i = 0
298 i = 0
299 task_start = datetime.datetime.now()
299 task_start = datetime.datetime.now()
300 reports_stats_tables = detect_tables('reports_stats_p_')
300 reports_stats_tables = detect_tables('reports_stats_p_')
301 for partition_table in reports_stats_tables:
301 for partition_table in reports_stats_tables:
302 conn = DBSession.connection().execution_options(stream_results=True)
302 conn = DBSession.connection().execution_options(stream_results=True)
303 result = conn.execute(partition_table.select())
303 result = conn.execute(partition_table.select())
304 while True:
304 while True:
305 chunk = result.fetchmany(2000)
305 chunk = result.fetchmany(2000)
306 if not chunk:
306 if not chunk:
307 break
307 break
308 es_docs = defaultdict(list)
308 es_docs = defaultdict(list)
309 for row in chunk:
309 for row in chunk:
310 rd = dict(list(row.items()))
310 rd = dict(list(row.items()))
311 # remove legacy columns
311 # remove legacy columns
312 # TODO: remove the column later
312 # TODO: remove the column later
313 rd.pop('size', None)
313 rd.pop('size', None)
314 item = ReportStat(**rd)
314 item = ReportStat(**rd)
315 i += 1
315 i += 1
316 d_range = item.partition_id
316 d_range = item.partition_id
317 es_docs[d_range].append(item.es_doc())
317 es_docs[d_range].append(item.es_doc())
318 if es_docs:
318 if es_docs:
319 name = partition_table.name
319 name = partition_table.name
320 log.info('round {}, {}'.format(i, name))
320 log.info('round {}, {}'.format(i, name))
321 for k, v in es_docs.items():
321 for k, v in es_docs.items():
322 Datastores.es.bulk_index(k, 'log', v)
322 Datastores.es.bulk_index(k, 'log', v)
323
323
324 log.info(
324 log.info(
325 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
325 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
326
326
327
327
328 def reindex_logs():
328 def reindex_logs():
329 try:
329 try:
330 Datastores.es.delete_index('rcae_l*')
330 Datastores.es.delete_index('rcae_l*')
331 except Exception as e:
331 except Exception as e:
332 log.error(e)
332 log.error(e)
333
333
334 # logs
334 # logs
335 log.info('reindexing logs')
335 log.info('reindexing logs')
336 i = 0
336 i = 0
337 task_start = datetime.datetime.now()
337 task_start = datetime.datetime.now()
338 log_tables = detect_tables('logs_p_')
338 log_tables = detect_tables('logs_p_')
339 for partition_table in log_tables:
339 for partition_table in log_tables:
340 conn = DBSession.connection().execution_options(stream_results=True)
340 conn = DBSession.connection().execution_options(stream_results=True)
341 result = conn.execute(partition_table.select())
341 result = conn.execute(partition_table.select())
342 while True:
342 while True:
343 chunk = result.fetchmany(2000)
343 chunk = result.fetchmany(2000)
344 if not chunk:
344 if not chunk:
345 break
345 break
346 es_docs = defaultdict(list)
346 es_docs = defaultdict(list)
347
347
348 for row in chunk:
348 for row in chunk:
349 i += 1
349 i += 1
350 item = Log(**dict(list(row.items())))
350 item = Log(**dict(list(row.items())))
351 d_range = item.partition_id
351 d_range = item.partition_id
352 es_docs[d_range].append(item.es_doc())
352 es_docs[d_range].append(item.es_doc())
353 if es_docs:
353 if es_docs:
354 name = partition_table.name
354 name = partition_table.name
355 log.info('round {}, {}'.format(i, name))
355 log.info('round {}, {}'.format(i, name))
356 for k, v in es_docs.items():
356 for k, v in es_docs.items():
357 Datastores.es.bulk_index(k, 'log', v)
357 Datastores.es.bulk_index(k, 'log', v)
358
358
359 log.info(
359 log.info(
360 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
360 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
361
361
362
362
363 def reindex_metrics():
363 def reindex_metrics():
364 try:
364 try:
365 Datastores.es.delete_index('rcae_m*')
365 Datastores.es.delete_index('rcae_m*')
366 except Exception as e:
366 except Exception as e:
367 print(e)
367 print(e)
368
368
369 log.info('reindexing applications metrics')
369 log.info('reindexing applications metrics')
370 i = 0
370 i = 0
371 task_start = datetime.datetime.now()
371 task_start = datetime.datetime.now()
372 metric_tables = detect_tables('metrics_p_')
372 metric_tables = detect_tables('metrics_p_')
373 for partition_table in metric_tables:
373 for partition_table in metric_tables:
374 conn = DBSession.connection().execution_options(stream_results=True)
374 conn = DBSession.connection().execution_options(stream_results=True)
375 result = conn.execute(partition_table.select())
375 result = conn.execute(partition_table.select())
376 while True:
376 while True:
377 chunk = result.fetchmany(2000)
377 chunk = result.fetchmany(2000)
378 if not chunk:
378 if not chunk:
379 break
379 break
380 es_docs = defaultdict(list)
380 es_docs = defaultdict(list)
381 for row in chunk:
381 for row in chunk:
382 i += 1
382 i += 1
383 item = Metric(**dict(list(row.items())))
383 item = Metric(**dict(list(row.items())))
384 d_range = item.partition_id
384 d_range = item.partition_id
385 es_docs[d_range].append(item.es_doc())
385 es_docs[d_range].append(item.es_doc())
386 if es_docs:
386 if es_docs:
387 name = partition_table.name
387 name = partition_table.name
388 log.info('round {}, {}'.format(i, name))
388 log.info('round {}, {}'.format(i, name))
389 for k, v in es_docs.items():
389 for k, v in es_docs.items():
390 Datastores.es.bulk_index(k, 'log', v)
390 Datastores.es.bulk_index(k, 'log', v)
391
391
392 log.info(
392 log.info(
393 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
393 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
394
394
395
395
396 def reindex_slow_calls():
396 def reindex_slow_calls():
397 try:
397 try:
398 Datastores.es.delete_index('rcae_sc*')
398 Datastores.es.delete_index('rcae_sc*')
399 except Exception as e:
399 except Exception as e:
400 print(e)
400 print(e)
401
401
402 log.info('reindexing slow calls')
402 log.info('reindexing slow calls')
403 i = 0
403 i = 0
404 task_start = datetime.datetime.now()
404 task_start = datetime.datetime.now()
405 slow_calls_tables = detect_tables('slow_calls_p_')
405 slow_calls_tables = detect_tables('slow_calls_p_')
406 for partition_table in slow_calls_tables:
406 for partition_table in slow_calls_tables:
407 conn = DBSession.connection().execution_options(stream_results=True)
407 conn = DBSession.connection().execution_options(stream_results=True)
408 result = conn.execute(partition_table.select())
408 result = conn.execute(partition_table.select())
409 while True:
409 while True:
410 chunk = result.fetchmany(2000)
410 chunk = result.fetchmany(2000)
411 if not chunk:
411 if not chunk:
412 break
412 break
413 es_docs = defaultdict(list)
413 es_docs = defaultdict(list)
414 for row in chunk:
414 for row in chunk:
415 i += 1
415 i += 1
416 item = SlowCall(**dict(list(row.items())))
416 item = SlowCall(**dict(list(row.items())))
417 d_range = item.partition_id
417 d_range = item.partition_id
418 es_docs[d_range].append(item.es_doc())
418 es_docs[d_range].append(item.es_doc())
419 if es_docs:
419 if es_docs:
420 name = partition_table.name
420 name = partition_table.name
421 log.info('round {}, {}'.format(i, name))
421 log.info('round {}, {}'.format(i, name))
422 for k, v in es_docs.items():
422 for k, v in es_docs.items():
423 Datastores.es.bulk_index(k, 'log', v)
423 Datastores.es.bulk_index(k, 'log', v)
424
424
425 log.info(
425 log.info(
426 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
426 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
427
427
428
428
429 if __name__ == '__main__':
429 if __name__ == '__main__':
430 main()
430 main()
General Comments 2
Under Review
author

Auto status change to "Under Review"

Rejected

Please use: https://github.com/Appenlight/appenlight to contribute :) Thanks !

You need to be logged in to leave comments. Login now