Auto status change to "Under Review"
Show More
@@ -1,430 +1,430 b'' | |||||
1 | # -*- coding: utf-8 -*- |
|
1 | # -*- coding: utf-8 -*- | |
2 |
|
2 | |||
3 | # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors |
|
3 | # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors | |
4 | # |
|
4 | # | |
5 | # Licensed under the Apache License, Version 2.0 (the "License"); |
|
5 | # Licensed under the Apache License, Version 2.0 (the "License"); | |
6 | # you may not use this file except in compliance with the License. |
|
6 | # you may not use this file except in compliance with the License. | |
7 | # You may obtain a copy of the License at |
|
7 | # You may obtain a copy of the License at | |
8 | # |
|
8 | # | |
9 | # http://www.apache.org/licenses/LICENSE-2.0 |
|
9 | # http://www.apache.org/licenses/LICENSE-2.0 | |
10 | # |
|
10 | # | |
11 | # Unless required by applicable law or agreed to in writing, software |
|
11 | # Unless required by applicable law or agreed to in writing, software | |
12 | # distributed under the License is distributed on an "AS IS" BASIS, |
|
12 | # distributed under the License is distributed on an "AS IS" BASIS, | |
13 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
13 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | # See the License for the specific language governing permissions and |
|
14 | # See the License for the specific language governing permissions and | |
15 | # limitations under the License. |
|
15 | # limitations under the License. | |
16 |
|
16 | |||
17 | import argparse |
|
17 | import argparse | |
18 | import datetime |
|
18 | import datetime | |
19 | import logging |
|
19 | import logging | |
20 |
|
20 | |||
21 | import sqlalchemy as sa |
|
21 | import sqlalchemy as sa | |
22 | from collections import defaultdict |
|
22 | from collections import defaultdict | |
23 | from pyramid.paster import setup_logging |
|
23 | from pyramid.paster import setup_logging | |
24 | from pyramid.paster import bootstrap |
|
24 | from pyramid.paster import bootstrap | |
25 | from appenlight.models import ( |
|
25 | from appenlight.models import ( | |
26 | DBSession, |
|
26 | DBSession, | |
27 | Datastores, |
|
27 | Datastores, | |
28 | metadata |
|
28 | metadata | |
29 | ) |
|
29 | ) | |
30 | from appenlight.lib import get_callable |
|
30 | from appenlight.lib import get_callable | |
31 | from appenlight.models.report_group import ReportGroup |
|
31 | from appenlight.models.report_group import ReportGroup | |
32 | from appenlight.models.report import Report |
|
32 | from appenlight.models.report import Report | |
33 | from appenlight.models.report_stat import ReportStat |
|
33 | from appenlight.models.report_stat import ReportStat | |
34 | from appenlight.models.log import Log |
|
34 | from appenlight.models.log import Log | |
35 | from appenlight.models.slow_call import SlowCall |
|
35 | from appenlight.models.slow_call import SlowCall | |
36 | from appenlight.models.metric import Metric |
|
36 | from appenlight.models.metric import Metric | |
37 |
|
37 | |||
38 |
|
38 | |||
39 | log = logging.getLogger(__name__) |
|
39 | log = logging.getLogger(__name__) | |
40 |
|
40 | |||
41 | tables = { |
|
41 | tables = { | |
42 | 'slow_calls_p_': [], |
|
42 | 'slow_calls_p_': [], | |
43 | 'reports_stats_p_': [], |
|
43 | 'reports_stats_p_': [], | |
44 | 'reports_p_': [], |
|
44 | 'reports_p_': [], | |
45 | 'reports_groups_p_': [], |
|
45 | 'reports_groups_p_': [], | |
46 | 'logs_p_': [], |
|
46 | 'logs_p_': [], | |
47 | 'metrics_p_': [], |
|
47 | 'metrics_p_': [], | |
48 | } |
|
48 | } | |
49 |
|
49 | |||
50 | def detect_tables(table_prefix): |
|
50 | def detect_tables(table_prefix): | |
51 | found_tables = [] |
|
51 | found_tables = [] | |
52 | db_tables_query = ''' |
|
52 | db_tables_query = ''' | |
53 | SELECT tablename FROM pg_tables WHERE tablename NOT LIKE 'pg_%' AND |
|
53 | SELECT tablename FROM pg_tables WHERE tablename NOT LIKE 'pg_%' AND | |
54 | tablename NOT LIKE 'sql_%' ORDER BY tablename ASC;''' |
|
54 | tablename NOT LIKE 'sql_%' ORDER BY tablename ASC;''' | |
55 |
|
55 | |||
56 | for table in DBSession.execute(db_tables_query).fetchall(): |
|
56 | for table in DBSession.execute(db_tables_query).fetchall(): | |
57 | tablename = table.tablename |
|
57 | tablename = table.tablename | |
58 | if tablename.startswith(table_prefix): |
|
58 | if tablename.startswith(table_prefix): | |
59 | t = sa.Table(tablename, metadata, autoload=True, |
|
59 | t = sa.Table(tablename, metadata, autoload=True, | |
60 | autoload_with=DBSession.bind.engine) |
|
60 | autoload_with=DBSession.bind.engine) | |
61 | found_tables.append(t) |
|
61 | found_tables.append(t) | |
62 | return found_tables |
|
62 | return found_tables | |
63 |
|
63 | |||
64 |
|
64 | |||
65 | def main(): |
|
65 | def main(): | |
66 | """ |
|
66 | """ | |
67 | Recreates Elasticsearch indexes |
|
67 | Recreates Elasticsearch indexes | |
68 | Performs reindex of whole db to Elasticsearch |
|
68 | Performs reindex of whole db to Elasticsearch | |
69 |
|
69 | |||
70 | """ |
|
70 | """ | |
71 |
|
71 | |||
72 | # need parser twice because we first need to load ini file |
|
72 | # need parser twice because we first need to load ini file | |
73 | # bootstrap pyramid and then load plugins |
|
73 | # bootstrap pyramid and then load plugins | |
74 | pre_parser = argparse.ArgumentParser( |
|
74 | pre_parser = argparse.ArgumentParser( | |
75 | description='Reindex AppEnlight data', |
|
75 | description='Reindex AppEnlight data', | |
76 | add_help=False) |
|
76 | add_help=False) | |
77 | pre_parser.add_argument('-c', '--config', required=True, |
|
77 | pre_parser.add_argument('-c', '--config', required=True, | |
78 | help='Configuration ini file of application') |
|
78 | help='Configuration ini file of application') | |
79 | pre_parser.add_argument('-h', '--help', help='Show help', nargs='?') |
|
79 | pre_parser.add_argument('-h', '--help', help='Show help', nargs='?') | |
80 | pre_parser.add_argument('-t', '--types', nargs='+', |
|
80 | pre_parser.add_argument('-t', '--types', nargs='+', | |
81 | help='Which parts of database should get reindexed') |
|
81 | help='Which parts of database should get reindexed') | |
82 | args = pre_parser.parse_args() |
|
82 | args = pre_parser.parse_args() | |
83 |
|
83 | |||
84 | config_uri = args.config |
|
84 | config_uri = args.config | |
85 | setup_logging(config_uri) |
|
85 | setup_logging(config_uri) | |
86 | log.setLevel(logging.INFO) |
|
86 | log.setLevel(logging.INFO) | |
87 | env = bootstrap(config_uri) |
|
87 | env = bootstrap(config_uri) | |
88 | parser = argparse.ArgumentParser(description='Reindex AppEnlight data') |
|
88 | parser = argparse.ArgumentParser(description='Reindex AppEnlight data') | |
89 | choices = { |
|
89 | choices = { | |
90 | 'reports': 'appenlight.scripts.reindex_elasticsearch:reindex_reports', |
|
90 | 'reports': 'appenlight.scripts.reindex_elasticsearch:reindex_reports', | |
91 | 'logs': 'appenlight.scripts.reindex_elasticsearch:reindex_logs', |
|
91 | 'logs': 'appenlight.scripts.reindex_elasticsearch:reindex_logs', | |
92 | 'metrics': 'appenlight.scripts.reindex_elasticsearch:reindex_metrics', |
|
92 | 'metrics': 'appenlight.scripts.reindex_elasticsearch:reindex_metrics', | |
93 | 'slow_calls': 'appenlight.scripts.reindex_elasticsearch:reindex_slow_calls', |
|
93 | 'slow_calls': 'appenlight.scripts.reindex_elasticsearch:reindex_slow_calls', | |
94 | 'template': 'appenlight.scripts.reindex_elasticsearch:update_template' |
|
94 | 'template': 'appenlight.scripts.reindex_elasticsearch:update_template' | |
95 | } |
|
95 | } | |
96 | for k, v in env['registry'].appenlight_plugins.items(): |
|
96 | for k, v in env['registry'].appenlight_plugins.items(): | |
97 | if v.get('fulltext_indexer'): |
|
97 | if v.get('fulltext_indexer'): | |
98 | choices[k] = v['fulltext_indexer'] |
|
98 | choices[k] = v['fulltext_indexer'] | |
99 | parser.add_argument('-t', '--types', nargs='*', |
|
99 | parser.add_argument('-t', '--types', nargs='*', | |
100 | choices=['all'] + list(choices.keys()), default=['all'], |
|
100 | choices=['all'] + list(choices.keys()), default=['all'], | |
101 | help='Which parts of database should get reindexed') |
|
101 | help='Which parts of database should get reindexed') | |
102 | parser.add_argument('-c', '--config', required=True, |
|
102 | parser.add_argument('-c', '--config', required=True, | |
103 | help='Configuration ini file of application') |
|
103 | help='Configuration ini file of application') | |
104 | args = parser.parse_args() |
|
104 | args = parser.parse_args() | |
105 |
|
105 | |||
106 |
|
106 | |||
107 | if 'all' in args.types: |
|
107 | if 'all' in args.types: | |
108 | args.types = list(choices.keys()) |
|
108 | args.types = list(choices.keys()) | |
109 |
|
109 | |||
110 | log.info('settings {}'.format(args.types)) |
|
110 | log.info('settings {}'.format(args.types)) | |
111 |
|
111 | |||
112 | if 'template' in args.types: |
|
112 | if 'template' in args.types: | |
113 | get_callable(choices['template'])() |
|
113 | get_callable(choices['template'])() | |
114 | args.types.remove('template') |
|
114 | args.types.remove('template') | |
115 | for selected in args.types: |
|
115 | for selected in args.types: | |
116 | get_callable(choices[selected])() |
|
116 | get_callable(choices[selected])() | |
117 |
|
117 | |||
118 |
|
118 | |||
119 | def update_template(): |
|
119 | def update_template(): | |
120 | try: |
|
120 | try: | |
121 | Datastores.es.send_request("delete", ['_template', 'rcae'], |
|
121 | Datastores.es.send_request("delete", ['_template', 'rcae'], | |
122 | query_params={}) |
|
122 | query_params={}) | |
123 | except Exception as e: |
|
123 | except Exception as e: | |
124 | print(e) |
|
124 | print(e) | |
125 | log.info('updating elasticsearch template') |
|
125 | log.info('updating elasticsearch template') | |
126 | tag_templates = [ |
|
126 | tag_templates = [ | |
127 | {"values": { |
|
127 | {"values": { | |
128 | "path_match": "tags.*", |
|
128 | "path_match": "tags.*", | |
129 | "mapping": { |
|
129 | "mapping": { | |
130 | "type": "object", |
|
130 | "type": "object", | |
131 | "properties": { |
|
131 | "properties": { | |
132 | "values": {"type": "string", "analyzer": "tag_value"}, |
|
132 | "values": {"type": "string", "analyzer": "tag_value"}, | |
133 | "numeric_values": {"type": "float"} |
|
133 | "numeric_values": {"type": "float"} | |
134 | } |
|
134 | } | |
135 | } |
|
135 | } | |
136 | }} |
|
136 | }} | |
137 | ] |
|
137 | ] | |
138 |
|
138 | |||
139 | template_schema = { |
|
139 | template_schema = { | |
140 | "template": "rcae_*", |
|
140 | "template": "rcae_*", | |
141 | "settings": { |
|
141 | "settings": { | |
142 | "index": { |
|
142 | "index": { | |
143 | "refresh_interval": "5s", |
|
143 | "refresh_interval": "5s", | |
144 | "translog": {"interval": "5s", |
|
144 | "translog": {"sync_interval": "5s", | |
145 | "durability": "async"} |
|
145 | "durability": "async"} | |
146 | }, |
|
146 | }, | |
147 | "number_of_shards": 5, |
|
147 | "number_of_shards": 5, | |
148 | "analysis": { |
|
148 | "analysis": { | |
149 | "analyzer": { |
|
149 | "analyzer": { | |
150 | "url_path": { |
|
150 | "url_path": { | |
151 | "type": "custom", |
|
151 | "type": "custom", | |
152 | "char_filter": [], |
|
152 | "char_filter": [], | |
153 | "tokenizer": "path_hierarchy", |
|
153 | "tokenizer": "path_hierarchy", | |
154 | "filter": [] |
|
154 | "filter": [] | |
155 | }, |
|
155 | }, | |
156 | "tag_value": { |
|
156 | "tag_value": { | |
157 | "type": "custom", |
|
157 | "type": "custom", | |
158 | "char_filter": [], |
|
158 | "char_filter": [], | |
159 | "tokenizer": "keyword", |
|
159 | "tokenizer": "keyword", | |
160 | "filter": ["lowercase"] |
|
160 | "filter": ["lowercase"] | |
161 | }, |
|
161 | }, | |
162 | } |
|
162 | } | |
163 | }, |
|
163 | }, | |
164 | }, |
|
164 | }, | |
165 | "mappings": { |
|
165 | "mappings": { | |
166 | "report_group": { |
|
166 | "report_group": { | |
167 | "_all": {"enabled": False}, |
|
167 | "_all": {"enabled": False}, | |
168 | "dynamic_templates": tag_templates, |
|
168 | "dynamic_templates": tag_templates, | |
169 | "properties": { |
|
169 | "properties": { | |
170 | "pg_id": {"type": "string", "index": "not_analyzed"}, |
|
170 | "pg_id": {"type": "string", "index": "not_analyzed"}, | |
171 | "resource_id": {"type": "integer"}, |
|
171 | "resource_id": {"type": "integer"}, | |
172 | "priority": {"type": "integer"}, |
|
172 | "priority": {"type": "integer"}, | |
173 | "error": {"type": "string", "analyzer": "simple"}, |
|
173 | "error": {"type": "string", "analyzer": "simple"}, | |
174 | "read": {"type": "boolean"}, |
|
174 | "read": {"type": "boolean"}, | |
175 | "occurences": {"type": "integer"}, |
|
175 | "occurences": {"type": "integer"}, | |
176 | "fixed": {"type": "boolean"}, |
|
176 | "fixed": {"type": "boolean"}, | |
177 | "first_timestamp": {"type": "date"}, |
|
177 | "first_timestamp": {"type": "date"}, | |
178 | "last_timestamp": {"type": "date"}, |
|
178 | "last_timestamp": {"type": "date"}, | |
179 | "average_duration": {"type": "float"}, |
|
179 | "average_duration": {"type": "float"}, | |
180 | "summed_duration": {"type": "float"}, |
|
180 | "summed_duration": {"type": "float"}, | |
181 | "public": {"type": "boolean"} |
|
181 | "public": {"type": "boolean"} | |
182 | } |
|
182 | } | |
183 | }, |
|
183 | }, | |
184 | "report": { |
|
184 | "report": { | |
185 | "_all": {"enabled": False}, |
|
185 | "_all": {"enabled": False}, | |
186 | "dynamic_templates": tag_templates, |
|
186 | "dynamic_templates": tag_templates, | |
187 | "properties": { |
|
187 | "properties": { | |
188 | "pg_id": {"type": "string", "index": "not_analyzed"}, |
|
188 | "pg_id": {"type": "string", "index": "not_analyzed"}, | |
189 | "resource_id": {"type": "integer"}, |
|
189 | "resource_id": {"type": "integer"}, | |
190 | "group_id": {"type": "string"}, |
|
190 | "group_id": {"type": "string"}, | |
191 | "http_status": {"type": "integer"}, |
|
191 | "http_status": {"type": "integer"}, | |
192 | "ip": {"type": "string", "index": "not_analyzed"}, |
|
192 | "ip": {"type": "string", "index": "not_analyzed"}, | |
193 | "url_domain": {"type": "string", "analyzer": "simple"}, |
|
193 | "url_domain": {"type": "string", "analyzer": "simple"}, | |
194 | "url_path": {"type": "string", "analyzer": "url_path"}, |
|
194 | "url_path": {"type": "string", "analyzer": "url_path"}, | |
195 | "error": {"type": "string", "analyzer": "simple"}, |
|
195 | "error": {"type": "string", "analyzer": "simple"}, | |
196 | "report_type": {"type": "integer"}, |
|
196 | "report_type": {"type": "integer"}, | |
197 | "start_time": {"type": "date"}, |
|
197 | "start_time": {"type": "date"}, | |
198 | "request_id": {"type": "string", "index": "not_analyzed"}, |
|
198 | "request_id": {"type": "string", "index": "not_analyzed"}, | |
199 | "end_time": {"type": "date"}, |
|
199 | "end_time": {"type": "date"}, | |
200 | "duration": {"type": "float"}, |
|
200 | "duration": {"type": "float"}, | |
201 | "tags": { |
|
201 | "tags": { | |
202 | "type": "object" |
|
202 | "type": "object" | |
203 | }, |
|
203 | }, | |
204 | "tag_list": {"type": "string", "analyzer": "tag_value"}, |
|
204 | "tag_list": {"type": "string", "analyzer": "tag_value"}, | |
205 | "extra": { |
|
205 | "extra": { | |
206 | "type": "object" |
|
206 | "type": "object" | |
207 | }, |
|
207 | }, | |
208 | }, |
|
208 | }, | |
209 | "_parent": {"type": "report_group"} |
|
209 | "_parent": {"type": "report_group"} | |
210 | }, |
|
210 | }, | |
211 | "log": { |
|
211 | "log": { | |
212 | "_all": {"enabled": False}, |
|
212 | "_all": {"enabled": False}, | |
213 | "dynamic_templates": tag_templates, |
|
213 | "dynamic_templates": tag_templates, | |
214 | "properties": { |
|
214 | "properties": { | |
215 | "pg_id": {"type": "string", "index": "not_analyzed"}, |
|
215 | "pg_id": {"type": "string", "index": "not_analyzed"}, | |
216 | "delete_hash": {"type": "string", "index": "not_analyzed"}, |
|
216 | "delete_hash": {"type": "string", "index": "not_analyzed"}, | |
217 | "resource_id": {"type": "integer"}, |
|
217 | "resource_id": {"type": "integer"}, | |
218 | "timestamp": {"type": "date"}, |
|
218 | "timestamp": {"type": "date"}, | |
219 | "permanent": {"type": "boolean"}, |
|
219 | "permanent": {"type": "boolean"}, | |
220 | "request_id": {"type": "string", "index": "not_analyzed"}, |
|
220 | "request_id": {"type": "string", "index": "not_analyzed"}, | |
221 | "log_level": {"type": "string", "analyzer": "simple"}, |
|
221 | "log_level": {"type": "string", "analyzer": "simple"}, | |
222 | "message": {"type": "string", "analyzer": "simple"}, |
|
222 | "message": {"type": "string", "analyzer": "simple"}, | |
223 | "namespace": {"type": "string", "index": "not_analyzed"}, |
|
223 | "namespace": {"type": "string", "index": "not_analyzed"}, | |
224 | "tags": { |
|
224 | "tags": { | |
225 | "type": "object" |
|
225 | "type": "object" | |
226 | }, |
|
226 | }, | |
227 | "tag_list": {"type": "string", "analyzer": "tag_value"} |
|
227 | "tag_list": {"type": "string", "analyzer": "tag_value"} | |
228 | } |
|
228 | } | |
229 | } |
|
229 | } | |
230 | } |
|
230 | } | |
231 | } |
|
231 | } | |
232 |
|
232 | |||
233 | Datastores.es.send_request('PUT', ['_template', 'rcae'], |
|
233 | Datastores.es.send_request('PUT', ['_template', 'rcae'], | |
234 | body=template_schema, query_params={}) |
|
234 | body=template_schema, query_params={}) | |
235 |
|
235 | |||
236 |
|
236 | |||
237 | def reindex_reports(): |
|
237 | def reindex_reports(): | |
238 | reports_groups_tables = detect_tables('reports_groups_p_') |
|
238 | reports_groups_tables = detect_tables('reports_groups_p_') | |
239 | try: |
|
239 | try: | |
240 | Datastores.es.delete_index('rcae_r*') |
|
240 | Datastores.es.delete_index('rcae_r*') | |
241 | except Exception as e: |
|
241 | except Exception as e: | |
242 | log.error(e) |
|
242 | log.error(e) | |
243 |
|
243 | |||
244 | log.info('reindexing report groups') |
|
244 | log.info('reindexing report groups') | |
245 | i = 0 |
|
245 | i = 0 | |
246 | task_start = datetime.datetime.now() |
|
246 | task_start = datetime.datetime.now() | |
247 | for partition_table in reports_groups_tables: |
|
247 | for partition_table in reports_groups_tables: | |
248 | conn = DBSession.connection().execution_options(stream_results=True) |
|
248 | conn = DBSession.connection().execution_options(stream_results=True) | |
249 | result = conn.execute(partition_table.select()) |
|
249 | result = conn.execute(partition_table.select()) | |
250 | while True: |
|
250 | while True: | |
251 | chunk = result.fetchmany(2000) |
|
251 | chunk = result.fetchmany(2000) | |
252 | if not chunk: |
|
252 | if not chunk: | |
253 | break |
|
253 | break | |
254 | es_docs = defaultdict(list) |
|
254 | es_docs = defaultdict(list) | |
255 | for row in chunk: |
|
255 | for row in chunk: | |
256 | i += 1 |
|
256 | i += 1 | |
257 | item = ReportGroup(**dict(list(row.items()))) |
|
257 | item = ReportGroup(**dict(list(row.items()))) | |
258 | d_range = item.partition_id |
|
258 | d_range = item.partition_id | |
259 | es_docs[d_range].append(item.es_doc()) |
|
259 | es_docs[d_range].append(item.es_doc()) | |
260 | if es_docs: |
|
260 | if es_docs: | |
261 | name = partition_table.name |
|
261 | name = partition_table.name | |
262 | log.info('round {}, {}'.format(i, name)) |
|
262 | log.info('round {}, {}'.format(i, name)) | |
263 | for k, v in es_docs.items(): |
|
263 | for k, v in es_docs.items(): | |
264 | Datastores.es.bulk_index(k, 'report_group', v, |
|
264 | Datastores.es.bulk_index(k, 'report_group', v, | |
265 | id_field="_id") |
|
265 | id_field="_id") | |
266 |
|
266 | |||
267 | log.info( |
|
267 | log.info( | |
268 | 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) |
|
268 | 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) | |
269 |
|
269 | |||
270 | i = 0 |
|
270 | i = 0 | |
271 | log.info('reindexing reports') |
|
271 | log.info('reindexing reports') | |
272 | task_start = datetime.datetime.now() |
|
272 | task_start = datetime.datetime.now() | |
273 | reports_tables = detect_tables('reports_p_') |
|
273 | reports_tables = detect_tables('reports_p_') | |
274 | for partition_table in reports_tables: |
|
274 | for partition_table in reports_tables: | |
275 | conn = DBSession.connection().execution_options(stream_results=True) |
|
275 | conn = DBSession.connection().execution_options(stream_results=True) | |
276 | result = conn.execute(partition_table.select()) |
|
276 | result = conn.execute(partition_table.select()) | |
277 | while True: |
|
277 | while True: | |
278 | chunk = result.fetchmany(2000) |
|
278 | chunk = result.fetchmany(2000) | |
279 | if not chunk: |
|
279 | if not chunk: | |
280 | break |
|
280 | break | |
281 | es_docs = defaultdict(list) |
|
281 | es_docs = defaultdict(list) | |
282 | for row in chunk: |
|
282 | for row in chunk: | |
283 | i += 1 |
|
283 | i += 1 | |
284 | item = Report(**dict(list(row.items()))) |
|
284 | item = Report(**dict(list(row.items()))) | |
285 | d_range = item.partition_id |
|
285 | d_range = item.partition_id | |
286 | es_docs[d_range].append(item.es_doc()) |
|
286 | es_docs[d_range].append(item.es_doc()) | |
287 | if es_docs: |
|
287 | if es_docs: | |
288 | name = partition_table.name |
|
288 | name = partition_table.name | |
289 | log.info('round {}, {}'.format(i, name)) |
|
289 | log.info('round {}, {}'.format(i, name)) | |
290 | for k, v in es_docs.items(): |
|
290 | for k, v in es_docs.items(): | |
291 | Datastores.es.bulk_index(k, 'report', v, id_field="_id", |
|
291 | Datastores.es.bulk_index(k, 'report', v, id_field="_id", | |
292 | parent_field='_parent') |
|
292 | parent_field='_parent') | |
293 |
|
293 | |||
294 | log.info( |
|
294 | log.info( | |
295 | 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) |
|
295 | 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) | |
296 |
|
296 | |||
297 | log.info('reindexing reports stats') |
|
297 | log.info('reindexing reports stats') | |
298 | i = 0 |
|
298 | i = 0 | |
299 | task_start = datetime.datetime.now() |
|
299 | task_start = datetime.datetime.now() | |
300 | reports_stats_tables = detect_tables('reports_stats_p_') |
|
300 | reports_stats_tables = detect_tables('reports_stats_p_') | |
301 | for partition_table in reports_stats_tables: |
|
301 | for partition_table in reports_stats_tables: | |
302 | conn = DBSession.connection().execution_options(stream_results=True) |
|
302 | conn = DBSession.connection().execution_options(stream_results=True) | |
303 | result = conn.execute(partition_table.select()) |
|
303 | result = conn.execute(partition_table.select()) | |
304 | while True: |
|
304 | while True: | |
305 | chunk = result.fetchmany(2000) |
|
305 | chunk = result.fetchmany(2000) | |
306 | if not chunk: |
|
306 | if not chunk: | |
307 | break |
|
307 | break | |
308 | es_docs = defaultdict(list) |
|
308 | es_docs = defaultdict(list) | |
309 | for row in chunk: |
|
309 | for row in chunk: | |
310 | rd = dict(list(row.items())) |
|
310 | rd = dict(list(row.items())) | |
311 | # remove legacy columns |
|
311 | # remove legacy columns | |
312 | # TODO: remove the column later |
|
312 | # TODO: remove the column later | |
313 | rd.pop('size', None) |
|
313 | rd.pop('size', None) | |
314 | item = ReportStat(**rd) |
|
314 | item = ReportStat(**rd) | |
315 | i += 1 |
|
315 | i += 1 | |
316 | d_range = item.partition_id |
|
316 | d_range = item.partition_id | |
317 | es_docs[d_range].append(item.es_doc()) |
|
317 | es_docs[d_range].append(item.es_doc()) | |
318 | if es_docs: |
|
318 | if es_docs: | |
319 | name = partition_table.name |
|
319 | name = partition_table.name | |
320 | log.info('round {}, {}'.format(i, name)) |
|
320 | log.info('round {}, {}'.format(i, name)) | |
321 | for k, v in es_docs.items(): |
|
321 | for k, v in es_docs.items(): | |
322 | Datastores.es.bulk_index(k, 'log', v) |
|
322 | Datastores.es.bulk_index(k, 'log', v) | |
323 |
|
323 | |||
324 | log.info( |
|
324 | log.info( | |
325 | 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) |
|
325 | 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) | |
326 |
|
326 | |||
327 |
|
327 | |||
328 | def reindex_logs(): |
|
328 | def reindex_logs(): | |
329 | try: |
|
329 | try: | |
330 | Datastores.es.delete_index('rcae_l*') |
|
330 | Datastores.es.delete_index('rcae_l*') | |
331 | except Exception as e: |
|
331 | except Exception as e: | |
332 | log.error(e) |
|
332 | log.error(e) | |
333 |
|
333 | |||
334 | # logs |
|
334 | # logs | |
335 | log.info('reindexing logs') |
|
335 | log.info('reindexing logs') | |
336 | i = 0 |
|
336 | i = 0 | |
337 | task_start = datetime.datetime.now() |
|
337 | task_start = datetime.datetime.now() | |
338 | log_tables = detect_tables('logs_p_') |
|
338 | log_tables = detect_tables('logs_p_') | |
339 | for partition_table in log_tables: |
|
339 | for partition_table in log_tables: | |
340 | conn = DBSession.connection().execution_options(stream_results=True) |
|
340 | conn = DBSession.connection().execution_options(stream_results=True) | |
341 | result = conn.execute(partition_table.select()) |
|
341 | result = conn.execute(partition_table.select()) | |
342 | while True: |
|
342 | while True: | |
343 | chunk = result.fetchmany(2000) |
|
343 | chunk = result.fetchmany(2000) | |
344 | if not chunk: |
|
344 | if not chunk: | |
345 | break |
|
345 | break | |
346 | es_docs = defaultdict(list) |
|
346 | es_docs = defaultdict(list) | |
347 |
|
347 | |||
348 | for row in chunk: |
|
348 | for row in chunk: | |
349 | i += 1 |
|
349 | i += 1 | |
350 | item = Log(**dict(list(row.items()))) |
|
350 | item = Log(**dict(list(row.items()))) | |
351 | d_range = item.partition_id |
|
351 | d_range = item.partition_id | |
352 | es_docs[d_range].append(item.es_doc()) |
|
352 | es_docs[d_range].append(item.es_doc()) | |
353 | if es_docs: |
|
353 | if es_docs: | |
354 | name = partition_table.name |
|
354 | name = partition_table.name | |
355 | log.info('round {}, {}'.format(i, name)) |
|
355 | log.info('round {}, {}'.format(i, name)) | |
356 | for k, v in es_docs.items(): |
|
356 | for k, v in es_docs.items(): | |
357 | Datastores.es.bulk_index(k, 'log', v) |
|
357 | Datastores.es.bulk_index(k, 'log', v) | |
358 |
|
358 | |||
359 | log.info( |
|
359 | log.info( | |
360 | 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) |
|
360 | 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) | |
361 |
|
361 | |||
362 |
|
362 | |||
363 | def reindex_metrics(): |
|
363 | def reindex_metrics(): | |
364 | try: |
|
364 | try: | |
365 | Datastores.es.delete_index('rcae_m*') |
|
365 | Datastores.es.delete_index('rcae_m*') | |
366 | except Exception as e: |
|
366 | except Exception as e: | |
367 | print(e) |
|
367 | print(e) | |
368 |
|
368 | |||
369 | log.info('reindexing applications metrics') |
|
369 | log.info('reindexing applications metrics') | |
370 | i = 0 |
|
370 | i = 0 | |
371 | task_start = datetime.datetime.now() |
|
371 | task_start = datetime.datetime.now() | |
372 | metric_tables = detect_tables('metrics_p_') |
|
372 | metric_tables = detect_tables('metrics_p_') | |
373 | for partition_table in metric_tables: |
|
373 | for partition_table in metric_tables: | |
374 | conn = DBSession.connection().execution_options(stream_results=True) |
|
374 | conn = DBSession.connection().execution_options(stream_results=True) | |
375 | result = conn.execute(partition_table.select()) |
|
375 | result = conn.execute(partition_table.select()) | |
376 | while True: |
|
376 | while True: | |
377 | chunk = result.fetchmany(2000) |
|
377 | chunk = result.fetchmany(2000) | |
378 | if not chunk: |
|
378 | if not chunk: | |
379 | break |
|
379 | break | |
380 | es_docs = defaultdict(list) |
|
380 | es_docs = defaultdict(list) | |
381 | for row in chunk: |
|
381 | for row in chunk: | |
382 | i += 1 |
|
382 | i += 1 | |
383 | item = Metric(**dict(list(row.items()))) |
|
383 | item = Metric(**dict(list(row.items()))) | |
384 | d_range = item.partition_id |
|
384 | d_range = item.partition_id | |
385 | es_docs[d_range].append(item.es_doc()) |
|
385 | es_docs[d_range].append(item.es_doc()) | |
386 | if es_docs: |
|
386 | if es_docs: | |
387 | name = partition_table.name |
|
387 | name = partition_table.name | |
388 | log.info('round {}, {}'.format(i, name)) |
|
388 | log.info('round {}, {}'.format(i, name)) | |
389 | for k, v in es_docs.items(): |
|
389 | for k, v in es_docs.items(): | |
390 | Datastores.es.bulk_index(k, 'log', v) |
|
390 | Datastores.es.bulk_index(k, 'log', v) | |
391 |
|
391 | |||
392 | log.info( |
|
392 | log.info( | |
393 | 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) |
|
393 | 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) | |
394 |
|
394 | |||
395 |
|
395 | |||
396 | def reindex_slow_calls(): |
|
396 | def reindex_slow_calls(): | |
397 | try: |
|
397 | try: | |
398 | Datastores.es.delete_index('rcae_sc*') |
|
398 | Datastores.es.delete_index('rcae_sc*') | |
399 | except Exception as e: |
|
399 | except Exception as e: | |
400 | print(e) |
|
400 | print(e) | |
401 |
|
401 | |||
402 | log.info('reindexing slow calls') |
|
402 | log.info('reindexing slow calls') | |
403 | i = 0 |
|
403 | i = 0 | |
404 | task_start = datetime.datetime.now() |
|
404 | task_start = datetime.datetime.now() | |
405 | slow_calls_tables = detect_tables('slow_calls_p_') |
|
405 | slow_calls_tables = detect_tables('slow_calls_p_') | |
406 | for partition_table in slow_calls_tables: |
|
406 | for partition_table in slow_calls_tables: | |
407 | conn = DBSession.connection().execution_options(stream_results=True) |
|
407 | conn = DBSession.connection().execution_options(stream_results=True) | |
408 | result = conn.execute(partition_table.select()) |
|
408 | result = conn.execute(partition_table.select()) | |
409 | while True: |
|
409 | while True: | |
410 | chunk = result.fetchmany(2000) |
|
410 | chunk = result.fetchmany(2000) | |
411 | if not chunk: |
|
411 | if not chunk: | |
412 | break |
|
412 | break | |
413 | es_docs = defaultdict(list) |
|
413 | es_docs = defaultdict(list) | |
414 | for row in chunk: |
|
414 | for row in chunk: | |
415 | i += 1 |
|
415 | i += 1 | |
416 | item = SlowCall(**dict(list(row.items()))) |
|
416 | item = SlowCall(**dict(list(row.items()))) | |
417 | d_range = item.partition_id |
|
417 | d_range = item.partition_id | |
418 | es_docs[d_range].append(item.es_doc()) |
|
418 | es_docs[d_range].append(item.es_doc()) | |
419 | if es_docs: |
|
419 | if es_docs: | |
420 | name = partition_table.name |
|
420 | name = partition_table.name | |
421 | log.info('round {}, {}'.format(i, name)) |
|
421 | log.info('round {}, {}'.format(i, name)) | |
422 | for k, v in es_docs.items(): |
|
422 | for k, v in es_docs.items(): | |
423 | Datastores.es.bulk_index(k, 'log', v) |
|
423 | Datastores.es.bulk_index(k, 'log', v) | |
424 |
|
424 | |||
425 | log.info( |
|
425 | log.info( | |
426 | 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) |
|
426 | 'total docs {} {}'.format(i, datetime.datetime.now() - task_start)) | |
427 |
|
427 | |||
428 |
|
428 | |||
429 | if __name__ == '__main__': |
|
429 | if __name__ == '__main__': | |
430 | main() |
|
430 | main() |
General Comments 2
Please use: https://github.com/Appenlight/appenlight to contribute :) Thanks !
You need to be logged in to leave comments.
Login now