##// END OF EJS Templates
tasks: change sampling rates for something better suited for big amounts of data
ergo -
Show More
@@ -1,662 +1,663 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # AppEnlight Enterprise Edition, including its added features, Support
18 # AppEnlight Enterprise Edition, including its added features, Support
19 # services, and proprietary license terms, please see
19 # services, and proprietary license terms, please see
20 # https://rhodecode.com/licenses/
20 # https://rhodecode.com/licenses/
21
21
22 import bisect
22 import bisect
23 import collections
23 import collections
24 import math
24 import math
25 from datetime import datetime, timedelta
25 from datetime import datetime, timedelta
26
26
27 import sqlalchemy as sa
27 import sqlalchemy as sa
28 import pyelasticsearch
28 import pyelasticsearch
29
29
30 from celery.utils.log import get_task_logger
30 from celery.utils.log import get_task_logger
31 from zope.sqlalchemy import mark_changed
31 from zope.sqlalchemy import mark_changed
32 from pyramid.threadlocal import get_current_request, get_current_registry
32 from pyramid.threadlocal import get_current_request, get_current_registry
33 from appenlight.celery import celery
33 from appenlight.celery import celery
34 from appenlight.models.report_group import ReportGroup
34 from appenlight.models.report_group import ReportGroup
35 from appenlight.models import DBSession, Datastores
35 from appenlight.models import DBSession, Datastores
36 from appenlight.models.report import Report
36 from appenlight.models.report import Report
37 from appenlight.models.log import Log
37 from appenlight.models.log import Log
38 from appenlight.models.request_metric import Metric
38 from appenlight.models.request_metric import Metric
39 from appenlight.models.event import Event
39 from appenlight.models.event import Event
40
40
41 from appenlight.models.services.application import ApplicationService
41 from appenlight.models.services.application import ApplicationService
42 from appenlight.models.services.event import EventService
42 from appenlight.models.services.event import EventService
43 from appenlight.models.services.log import LogService
43 from appenlight.models.services.log import LogService
44 from appenlight.models.services.report import ReportService
44 from appenlight.models.services.report import ReportService
45 from appenlight.models.services.report_group import ReportGroupService
45 from appenlight.models.services.report_group import ReportGroupService
46 from appenlight.models.services.user import UserService
46 from appenlight.models.services.user import UserService
47 from appenlight.models.tag import Tag
47 from appenlight.models.tag import Tag
48 from appenlight.lib import print_traceback
48 from appenlight.lib import print_traceback
49 from appenlight.lib.utils import parse_proto, in_batches
49 from appenlight.lib.utils import parse_proto, in_batches
50 from appenlight.lib.ext_json import json
50 from appenlight.lib.ext_json import json
51 from appenlight.lib.redis_keys import REDIS_KEYS
51 from appenlight.lib.redis_keys import REDIS_KEYS
52 from appenlight.lib.enums import ReportType
52 from appenlight.lib.enums import ReportType
53
53
54 log = get_task_logger(__name__)
54 log = get_task_logger(__name__)
55
55
56 sample_boundries = list(range(100, 10000, 100))
56 sample_boundries = list(range(100, 1000, 100)) + \
57 list(range(1000, 10000, 1000)) + \
58 list(range(10000, 100000, 5000))
57
59
58
60
59 def pick_sample(total_occurences, report_type=1):
61 def pick_sample(total_occurences, report_type=None):
60 every = 1.0
62 every = 1.0
61 position = bisect.bisect_left(sample_boundries, total_occurences)
63 position = bisect.bisect_left(sample_boundries, total_occurences)
62 if position > 0:
64 if position > 0:
63 # 404
65 if report_type == ReportType.not_found:
64 if report_type == 2:
65 divide = 10.0
66 divide = 10.0
66 else:
67 else:
67 divide = 100.0
68 divide = 100.0
68 every = sample_boundries[position - 1] / divide
69 every = sample_boundries[position - 1] / divide
69 return total_occurences % every == 0
70 return total_occurences % every == 0
70
71
71
72
72 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
73 def test_exception_task():
74 def test_exception_task():
74 log.error('test celery log', extra={'location': 'celery'})
75 log.error('test celery log', extra={'location': 'celery'})
75 log.warning('test celery log', extra={'location': 'celery'})
76 log.warning('test celery log', extra={'location': 'celery'})
76 raise Exception('Celery exception test')
77 raise Exception('Celery exception test')
77
78
78
79
79 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
80 def test_retry_exception_task():
81 def test_retry_exception_task():
81 try:
82 try:
82 import time
83 import time
83
84
84 time.sleep(1.3)
85 time.sleep(1.3)
85 log.error('test retry celery log', extra={'location': 'celery'})
86 log.error('test retry celery log', extra={'location': 'celery'})
86 log.warning('test retry celery log', extra={'location': 'celery'})
87 log.warning('test retry celery log', extra={'location': 'celery'})
87 raise Exception('Celery exception test')
88 raise Exception('Celery exception test')
88 except Exception as exc:
89 except Exception as exc:
89 test_retry_exception_task.retry(exc=exc)
90 test_retry_exception_task.retry(exc=exc)
90
91
91
92
92 @celery.task(queue="reports", default_retry_delay=600, max_retries=999)
93 @celery.task(queue="reports", default_retry_delay=600, max_retries=999)
93 def add_reports(resource_id, params, dataset, environ=None, **kwargs):
94 def add_reports(resource_id, params, dataset, environ=None, **kwargs):
94 proto_version = parse_proto(params.get('protocol_version', ''))
95 proto_version = parse_proto(params.get('protocol_version', ''))
95 current_time = datetime.utcnow().replace(second=0, microsecond=0)
96 current_time = datetime.utcnow().replace(second=0, microsecond=0)
96 try:
97 try:
97 # we will store solr docs here for single insert
98 # we will store solr docs here for single insert
98 es_report_docs = {}
99 es_report_docs = {}
99 es_report_group_docs = {}
100 es_report_group_docs = {}
100 resource = ApplicationService.by_id(resource_id)
101 resource = ApplicationService.by_id(resource_id)
101 reports = []
102 reports = []
102
103
103 if proto_version.major < 1 and proto_version.minor < 5:
104 if proto_version.major < 1 and proto_version.minor < 5:
104 for report_data in dataset:
105 for report_data in dataset:
105 report_details = report_data.get('report_details', [])
106 report_details = report_data.get('report_details', [])
106 for i, detail_data in enumerate(report_details):
107 for i, detail_data in enumerate(report_details):
107 report_data.update(detail_data)
108 report_data.update(detail_data)
108 report_data.pop('report_details')
109 report_data.pop('report_details')
109 traceback = report_data.get('traceback')
110 traceback = report_data.get('traceback')
110 if traceback is None:
111 if traceback is None:
111 report_data['traceback'] = report_data.get('frameinfo')
112 report_data['traceback'] = report_data.get('frameinfo')
112 # for 0.3 api
113 # for 0.3 api
113 error = report_data.pop('error_type', '')
114 error = report_data.pop('error_type', '')
114 if error:
115 if error:
115 report_data['error'] = error
116 report_data['error'] = error
116 if proto_version.minor < 4:
117 if proto_version.minor < 4:
117 # convert "Unknown" slow reports to
118 # convert "Unknown" slow reports to
118 # '' (from older clients)
119 # '' (from older clients)
119 if (report_data['error'] and
120 if (report_data['error'] and
120 report_data['http_status'] < 500):
121 report_data['http_status'] < 500):
121 report_data['error'] = ''
122 report_data['error'] = ''
122 message = report_data.get('message')
123 message = report_data.get('message')
123 if 'extra' not in report_data:
124 if 'extra' not in report_data:
124 report_data['extra'] = []
125 report_data['extra'] = []
125 if message:
126 if message:
126 report_data['extra'] = [('message', message), ]
127 report_data['extra'] = [('message', message), ]
127 reports.append(report_data)
128 reports.append(report_data)
128 else:
129 else:
129 reports = dataset
130 reports = dataset
130
131
131 tags = []
132 tags = []
132 es_slow_calls_docs = {}
133 es_slow_calls_docs = {}
133 es_reports_stats_rows = {}
134 es_reports_stats_rows = {}
134 for report_data in reports:
135 for report_data in reports:
135 # build report details for later
136 # build report details for later
136 added_details = 0
137 added_details = 0
137 report = Report()
138 report = Report()
138 report.set_data(report_data, resource, proto_version)
139 report.set_data(report_data, resource, proto_version)
139 report._skip_ft_index = True
140 report._skip_ft_index = True
140
141
141 report_group = ReportGroupService.by_hash_and_resource(
142 report_group = ReportGroupService.by_hash_and_resource(
142 report.resource_id,
143 report.resource_id,
143 report.grouping_hash
144 report.grouping_hash
144 )
145 )
145 occurences = report_data.get('occurences', 1)
146 occurences = report_data.get('occurences', 1)
146 if not report_group:
147 if not report_group:
147 # total reports will be +1 moment later
148 # total reports will be +1 moment later
148 report_group = ReportGroup(grouping_hash=report.grouping_hash,
149 report_group = ReportGroup(grouping_hash=report.grouping_hash,
149 occurences=0, total_reports=0,
150 occurences=0, total_reports=0,
150 last_report=0,
151 last_report=0,
151 priority=report.priority,
152 priority=report.priority,
152 error=report.error,
153 error=report.error,
153 first_timestamp=report.start_time)
154 first_timestamp=report.start_time)
154 report_group._skip_ft_index = True
155 report_group._skip_ft_index = True
155 report_group.report_type = report.report_type
156 report_group.report_type = report.report_type
156 report.report_group_time = report_group.first_timestamp
157 report.report_group_time = report_group.first_timestamp
157 add_sample = pick_sample(report_group.occurences,
158 add_sample = pick_sample(report_group.occurences,
158 report_type=report_group.report_type)
159 report_type=report_group.report_type)
159 if add_sample:
160 if add_sample:
160 resource.report_groups.append(report_group)
161 resource.report_groups.append(report_group)
161 report_group.reports.append(report)
162 report_group.reports.append(report)
162 added_details += 1
163 added_details += 1
163 DBSession.flush()
164 DBSession.flush()
164 if report.partition_id not in es_report_docs:
165 if report.partition_id not in es_report_docs:
165 es_report_docs[report.partition_id] = []
166 es_report_docs[report.partition_id] = []
166 es_report_docs[report.partition_id].append(report.es_doc())
167 es_report_docs[report.partition_id].append(report.es_doc())
167 tags.extend(list(report.tags.items()))
168 tags.extend(list(report.tags.items()))
168 slow_calls = report.add_slow_calls(report_data, report_group)
169 slow_calls = report.add_slow_calls(report_data, report_group)
169 DBSession.flush()
170 DBSession.flush()
170 for s_call in slow_calls:
171 for s_call in slow_calls:
171 if s_call.partition_id not in es_slow_calls_docs:
172 if s_call.partition_id not in es_slow_calls_docs:
172 es_slow_calls_docs[s_call.partition_id] = []
173 es_slow_calls_docs[s_call.partition_id] = []
173 es_slow_calls_docs[s_call.partition_id].append(
174 es_slow_calls_docs[s_call.partition_id].append(
174 s_call.es_doc())
175 s_call.es_doc())
175 # try generating new stat rows if needed
176 # try generating new stat rows if needed
176 else:
177 else:
177 # required for postprocessing to not fail later
178 # required for postprocessing to not fail later
178 report.report_group = report_group
179 report.report_group = report_group
179
180
180 stat_row = ReportService.generate_stat_rows(
181 stat_row = ReportService.generate_stat_rows(
181 report, resource, report_group)
182 report, resource, report_group)
182 if stat_row.partition_id not in es_reports_stats_rows:
183 if stat_row.partition_id not in es_reports_stats_rows:
183 es_reports_stats_rows[stat_row.partition_id] = []
184 es_reports_stats_rows[stat_row.partition_id] = []
184 es_reports_stats_rows[stat_row.partition_id].append(
185 es_reports_stats_rows[stat_row.partition_id].append(
185 stat_row.es_doc())
186 stat_row.es_doc())
186
187
187 # see if we should mark 10th occurence of report
188 # see if we should mark 10th occurence of report
188 last_occurences_10 = int(math.floor(report_group.occurences / 10))
189 last_occurences_10 = int(math.floor(report_group.occurences / 10))
189 curr_occurences_10 = int(math.floor(
190 curr_occurences_10 = int(math.floor(
190 (report_group.occurences + report.occurences) / 10))
191 (report_group.occurences + report.occurences) / 10))
191 last_occurences_100 = int(
192 last_occurences_100 = int(
192 math.floor(report_group.occurences / 100))
193 math.floor(report_group.occurences / 100))
193 curr_occurences_100 = int(math.floor(
194 curr_occurences_100 = int(math.floor(
194 (report_group.occurences + report.occurences) / 100))
195 (report_group.occurences + report.occurences) / 100))
195 notify_occurences_10 = last_occurences_10 != curr_occurences_10
196 notify_occurences_10 = last_occurences_10 != curr_occurences_10
196 notify_occurences_100 = last_occurences_100 != curr_occurences_100
197 notify_occurences_100 = last_occurences_100 != curr_occurences_100
197 report_group.occurences = ReportGroup.occurences + occurences
198 report_group.occurences = ReportGroup.occurences + occurences
198 report_group.last_timestamp = report.start_time
199 report_group.last_timestamp = report.start_time
199 report_group.summed_duration = ReportGroup.summed_duration + report.duration
200 report_group.summed_duration = ReportGroup.summed_duration + report.duration
200 summed_duration = ReportGroup.summed_duration + report.duration
201 summed_duration = ReportGroup.summed_duration + report.duration
201 summed_occurences = ReportGroup.occurences + occurences
202 summed_occurences = ReportGroup.occurences + occurences
202 report_group.average_duration = summed_duration / summed_occurences
203 report_group.average_duration = summed_duration / summed_occurences
203 report_group.run_postprocessing(report)
204 report_group.run_postprocessing(report)
204 if added_details:
205 if added_details:
205 report_group.total_reports = ReportGroup.total_reports + 1
206 report_group.total_reports = ReportGroup.total_reports + 1
206 report_group.last_report = report.id
207 report_group.last_report = report.id
207 report_group.set_notification_info(notify_10=notify_occurences_10,
208 report_group.set_notification_info(notify_10=notify_occurences_10,
208 notify_100=notify_occurences_100)
209 notify_100=notify_occurences_100)
209 DBSession.flush()
210 DBSession.flush()
210 report_group.get_report().notify_channel(report_group)
211 report_group.get_report().notify_channel(report_group)
211 if report_group.partition_id not in es_report_group_docs:
212 if report_group.partition_id not in es_report_group_docs:
212 es_report_group_docs[report_group.partition_id] = []
213 es_report_group_docs[report_group.partition_id] = []
213 es_report_group_docs[report_group.partition_id].append(
214 es_report_group_docs[report_group.partition_id].append(
214 report_group.es_doc())
215 report_group.es_doc())
215
216
216 action = 'REPORT'
217 action = 'REPORT'
217 log_msg = '%s: %s %s, client: %s, proto: %s' % (
218 log_msg = '%s: %s %s, client: %s, proto: %s' % (
218 action,
219 action,
219 report_data.get('http_status', 'unknown'),
220 report_data.get('http_status', 'unknown'),
220 str(resource),
221 str(resource),
221 report_data.get('client'),
222 report_data.get('client'),
222 proto_version)
223 proto_version)
223 log.info(log_msg)
224 log.info(log_msg)
224 total_reports = len(dataset)
225 total_reports = len(dataset)
225 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
226 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
226 Datastores.redis.incr(key, total_reports)
227 Datastores.redis.incr(key, total_reports)
227 Datastores.redis.expire(key, 3600 * 24)
228 Datastores.redis.expire(key, 3600 * 24)
228 key = REDIS_KEYS['counters']['reports_per_minute_per_app'].format(
229 key = REDIS_KEYS['counters']['reports_per_minute_per_app'].format(
229 resource_id, current_time)
230 resource_id, current_time)
230 Datastores.redis.incr(key, total_reports)
231 Datastores.redis.incr(key, total_reports)
231 Datastores.redis.expire(key, 3600 * 24)
232 Datastores.redis.expire(key, 3600 * 24)
232
233
233 add_reports_es(es_report_group_docs, es_report_docs)
234 add_reports_es(es_report_group_docs, es_report_docs)
234 add_reports_slow_calls_es(es_slow_calls_docs)
235 add_reports_slow_calls_es(es_slow_calls_docs)
235 add_reports_stats_rows_es(es_reports_stats_rows)
236 add_reports_stats_rows_es(es_reports_stats_rows)
236 return True
237 return True
237 except Exception as exc:
238 except Exception as exc:
238 print_traceback(log)
239 print_traceback(log)
239 add_reports.retry(exc=exc)
240 add_reports.retry(exc=exc)
240
241
241
242
242 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
243 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
243 def add_reports_es(report_group_docs, report_docs):
244 def add_reports_es(report_group_docs, report_docs):
244 for k, v in report_group_docs.items():
245 for k, v in report_group_docs.items():
245 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
246 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
246 for k, v in report_docs.items():
247 for k, v in report_docs.items():
247 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
248 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
248 parent_field='_parent')
249 parent_field='_parent')
249
250
250
251
251 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
252 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
252 def add_reports_slow_calls_es(es_docs):
253 def add_reports_slow_calls_es(es_docs):
253 for k, v in es_docs.items():
254 for k, v in es_docs.items():
254 Datastores.es.bulk_index(k, 'log', v)
255 Datastores.es.bulk_index(k, 'log', v)
255
256
256
257
257 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
258 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
258 def add_reports_stats_rows_es(es_docs):
259 def add_reports_stats_rows_es(es_docs):
259 for k, v in es_docs.items():
260 for k, v in es_docs.items():
260 Datastores.es.bulk_index(k, 'log', v)
261 Datastores.es.bulk_index(k, 'log', v)
261
262
262
263
263 @celery.task(queue="logs", default_retry_delay=600, max_retries=999)
264 @celery.task(queue="logs", default_retry_delay=600, max_retries=999)
264 def add_logs(resource_id, request, dataset, environ=None, **kwargs):
265 def add_logs(resource_id, request, dataset, environ=None, **kwargs):
265 proto_version = request.get('protocol_version')
266 proto_version = request.get('protocol_version')
266 current_time = datetime.utcnow().replace(second=0, microsecond=0)
267 current_time = datetime.utcnow().replace(second=0, microsecond=0)
267
268
268 try:
269 try:
269 es_docs = collections.defaultdict(list)
270 es_docs = collections.defaultdict(list)
270 application = ApplicationService.by_id(resource_id)
271 application = ApplicationService.by_id(resource_id)
271 ns_pairs = []
272 ns_pairs = []
272 for entry in dataset:
273 for entry in dataset:
273 # gather pk and ns so we can remove older versions of row later
274 # gather pk and ns so we can remove older versions of row later
274 if entry['primary_key'] is not None:
275 if entry['primary_key'] is not None:
275 ns_pairs.append({"pk": entry['primary_key'],
276 ns_pairs.append({"pk": entry['primary_key'],
276 "ns": entry['namespace']})
277 "ns": entry['namespace']})
277 log_entry = Log()
278 log_entry = Log()
278 log_entry.set_data(entry, resource=application)
279 log_entry.set_data(entry, resource=application)
279 log_entry._skip_ft_index = True
280 log_entry._skip_ft_index = True
280 application.logs.append(log_entry)
281 application.logs.append(log_entry)
281 DBSession.flush()
282 DBSession.flush()
282 # insert non pk rows first
283 # insert non pk rows first
283 if entry['primary_key'] is None:
284 if entry['primary_key'] is None:
284 es_docs[log_entry.partition_id].append(log_entry.es_doc())
285 es_docs[log_entry.partition_id].append(log_entry.es_doc())
285
286
286 # 2nd pass to delete all log entries from db foe same pk/ns pair
287 # 2nd pass to delete all log entries from db foe same pk/ns pair
287 if ns_pairs:
288 if ns_pairs:
288 ids_to_delete = []
289 ids_to_delete = []
289 es_docs = collections.defaultdict(list)
290 es_docs = collections.defaultdict(list)
290 es_docs_to_delete = collections.defaultdict(list)
291 es_docs_to_delete = collections.defaultdict(list)
291 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
292 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
292 list_of_pairs=ns_pairs)
293 list_of_pairs=ns_pairs)
293 log_dict = {}
294 log_dict = {}
294 for log_entry in found_pkey_logs:
295 for log_entry in found_pkey_logs:
295 log_key = (log_entry.primary_key, log_entry.namespace)
296 log_key = (log_entry.primary_key, log_entry.namespace)
296 if log_key not in log_dict:
297 if log_key not in log_dict:
297 log_dict[log_key] = []
298 log_dict[log_key] = []
298 log_dict[log_key].append(log_entry)
299 log_dict[log_key].append(log_entry)
299
300
300 for ns, entry_list in log_dict.items():
301 for ns, entry_list in log_dict.items():
301 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
302 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
302 # newest row needs to be indexed in es
303 # newest row needs to be indexed in es
303 log_entry = entry_list[-1]
304 log_entry = entry_list[-1]
304 # delete everything from pg and ES, leave the last row in pg
305 # delete everything from pg and ES, leave the last row in pg
305 for e in entry_list[:-1]:
306 for e in entry_list[:-1]:
306 ids_to_delete.append(e.log_id)
307 ids_to_delete.append(e.log_id)
307 es_docs_to_delete[e.partition_id].append(e.delete_hash)
308 es_docs_to_delete[e.partition_id].append(e.delete_hash)
308
309
309 es_docs_to_delete[log_entry.partition_id].append(
310 es_docs_to_delete[log_entry.partition_id].append(
310 log_entry.delete_hash)
311 log_entry.delete_hash)
311
312
312 es_docs[log_entry.partition_id].append(log_entry.es_doc())
313 es_docs[log_entry.partition_id].append(log_entry.es_doc())
313
314
314 if ids_to_delete:
315 if ids_to_delete:
315 query = DBSession.query(Log).filter(
316 query = DBSession.query(Log).filter(
316 Log.log_id.in_(ids_to_delete))
317 Log.log_id.in_(ids_to_delete))
317 query.delete(synchronize_session=False)
318 query.delete(synchronize_session=False)
318 if es_docs_to_delete:
319 if es_docs_to_delete:
319 # batch this to avoid problems with default ES bulk limits
320 # batch this to avoid problems with default ES bulk limits
320 for es_index in es_docs_to_delete.keys():
321 for es_index in es_docs_to_delete.keys():
321 for batch in in_batches(es_docs_to_delete[es_index], 20):
322 for batch in in_batches(es_docs_to_delete[es_index], 20):
322 query = {'terms': {'delete_hash': batch}}
323 query = {'terms': {'delete_hash': batch}}
323
324
324 try:
325 try:
325 Datastores.es.delete_by_query(
326 Datastores.es.delete_by_query(
326 es_index, 'log', query)
327 es_index, 'log', query)
327 except pyelasticsearch.ElasticHttpNotFoundError as exc:
328 except pyelasticsearch.ElasticHttpNotFoundError as exc:
328 log.error(exc)
329 log.error(exc)
329
330
330 total_logs = len(dataset)
331 total_logs = len(dataset)
331
332
332 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
333 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
333 str(application),
334 str(application),
334 total_logs,
335 total_logs,
335 proto_version)
336 proto_version)
336 log.info(log_msg)
337 log.info(log_msg)
337 # mark_changed(session)
338 # mark_changed(session)
338 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
339 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
339 Datastores.redis.incr(key, total_logs)
340 Datastores.redis.incr(key, total_logs)
340 Datastores.redis.expire(key, 3600 * 24)
341 Datastores.redis.expire(key, 3600 * 24)
341 key = REDIS_KEYS['counters']['logs_per_minute_per_app'].format(
342 key = REDIS_KEYS['counters']['logs_per_minute_per_app'].format(
342 resource_id, current_time)
343 resource_id, current_time)
343 Datastores.redis.incr(key, total_logs)
344 Datastores.redis.incr(key, total_logs)
344 Datastores.redis.expire(key, 3600 * 24)
345 Datastores.redis.expire(key, 3600 * 24)
345 add_logs_es(es_docs)
346 add_logs_es(es_docs)
346 return True
347 return True
347 except Exception as exc:
348 except Exception as exc:
348 print_traceback(log)
349 print_traceback(log)
349 add_logs.retry(exc=exc)
350 add_logs.retry(exc=exc)
350
351
351
352
352 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
353 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
353 def add_logs_es(es_docs):
354 def add_logs_es(es_docs):
354 for k, v in es_docs.items():
355 for k, v in es_docs.items():
355 Datastores.es.bulk_index(k, 'log', v)
356 Datastores.es.bulk_index(k, 'log', v)
356
357
357
358
358 @celery.task(queue="metrics", default_retry_delay=600, max_retries=999)
359 @celery.task(queue="metrics", default_retry_delay=600, max_retries=999)
359 def add_metrics(resource_id, request, dataset, proto_version):
360 def add_metrics(resource_id, request, dataset, proto_version):
360 current_time = datetime.utcnow().replace(second=0, microsecond=0)
361 current_time = datetime.utcnow().replace(second=0, microsecond=0)
361 try:
362 try:
362 application = ApplicationService.by_id_cached()(resource_id)
363 application = ApplicationService.by_id_cached()(resource_id)
363 application = DBSession.merge(application, load=False)
364 application = DBSession.merge(application, load=False)
364 es_docs = []
365 es_docs = []
365 rows = []
366 rows = []
366 for metric in dataset:
367 for metric in dataset:
367 tags = dict(metric['tags'])
368 tags = dict(metric['tags'])
368 server_n = tags.get('server_name', metric['server_name']).lower()
369 server_n = tags.get('server_name', metric['server_name']).lower()
369 tags['server_name'] = server_n or 'unknown'
370 tags['server_name'] = server_n or 'unknown'
370 new_metric = Metric(
371 new_metric = Metric(
371 timestamp=metric['timestamp'],
372 timestamp=metric['timestamp'],
372 resource_id=application.resource_id,
373 resource_id=application.resource_id,
373 namespace=metric['namespace'],
374 namespace=metric['namespace'],
374 tags=tags)
375 tags=tags)
375 rows.append(new_metric)
376 rows.append(new_metric)
376 es_docs.append(new_metric.es_doc())
377 es_docs.append(new_metric.es_doc())
377 session = DBSession()
378 session = DBSession()
378 session.bulk_save_objects(rows)
379 session.bulk_save_objects(rows)
379 session.flush()
380 session.flush()
380
381
381 action = 'METRICS'
382 action = 'METRICS'
382 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
383 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
383 action,
384 action,
384 str(application),
385 str(application),
385 len(dataset),
386 len(dataset),
386 proto_version
387 proto_version
387 )
388 )
388 log.info(metrics_msg)
389 log.info(metrics_msg)
389
390
390 mark_changed(session)
391 mark_changed(session)
391 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
392 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
392 Datastores.redis.incr(key, len(rows))
393 Datastores.redis.incr(key, len(rows))
393 Datastores.redis.expire(key, 3600 * 24)
394 Datastores.redis.expire(key, 3600 * 24)
394 key = REDIS_KEYS['counters']['metrics_per_minute_per_app'].format(
395 key = REDIS_KEYS['counters']['metrics_per_minute_per_app'].format(
395 resource_id, current_time)
396 resource_id, current_time)
396 Datastores.redis.incr(key, len(rows))
397 Datastores.redis.incr(key, len(rows))
397 Datastores.redis.expire(key, 3600 * 24)
398 Datastores.redis.expire(key, 3600 * 24)
398 add_metrics_es(es_docs)
399 add_metrics_es(es_docs)
399 return True
400 return True
400 except Exception as exc:
401 except Exception as exc:
401 print_traceback(log)
402 print_traceback(log)
402 add_metrics.retry(exc=exc)
403 add_metrics.retry(exc=exc)
403
404
404
405
405 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
406 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
406 def add_metrics_es(es_docs):
407 def add_metrics_es(es_docs):
407 for doc in es_docs:
408 for doc in es_docs:
408 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
409 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
409 Datastores.es.index(partition, 'log', doc)
410 Datastores.es.index(partition, 'log', doc)
410
411
411
412
412 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
413 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
413 def check_user_report_notifications(resource_id):
414 def check_user_report_notifications(resource_id):
414 since_when = datetime.utcnow()
415 since_when = datetime.utcnow()
415 try:
416 try:
416 request = get_current_request()
417 request = get_current_request()
417 application = ApplicationService.by_id(resource_id)
418 application = ApplicationService.by_id(resource_id)
418 if not application:
419 if not application:
419 return
420 return
420 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
421 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
421 ReportType.error, resource_id)
422 ReportType.error, resource_id)
422 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
423 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
423 ReportType.slow, resource_id)
424 ReportType.slow, resource_id)
424 error_group_ids = Datastores.redis.smembers(error_key)
425 error_group_ids = Datastores.redis.smembers(error_key)
425 slow_group_ids = Datastores.redis.smembers(slow_key)
426 slow_group_ids = Datastores.redis.smembers(slow_key)
426 Datastores.redis.delete(error_key)
427 Datastores.redis.delete(error_key)
427 Datastores.redis.delete(slow_key)
428 Datastores.redis.delete(slow_key)
428 err_gids = [int(g_id) for g_id in error_group_ids]
429 err_gids = [int(g_id) for g_id in error_group_ids]
429 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
430 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
430 group_ids = err_gids + slow_gids
431 group_ids = err_gids + slow_gids
431 occurence_dict = {}
432 occurence_dict = {}
432 for g_id in group_ids:
433 for g_id in group_ids:
433 key = REDIS_KEYS['counters']['report_group_occurences'].format(
434 key = REDIS_KEYS['counters']['report_group_occurences'].format(
434 g_id)
435 g_id)
435 val = Datastores.redis.get(key)
436 val = Datastores.redis.get(key)
436 Datastores.redis.delete(key)
437 Datastores.redis.delete(key)
437 if val:
438 if val:
438 occurence_dict[g_id] = int(val)
439 occurence_dict[g_id] = int(val)
439 else:
440 else:
440 occurence_dict[g_id] = 1
441 occurence_dict[g_id] = 1
441 report_groups = ReportGroupService.by_ids(group_ids)
442 report_groups = ReportGroupService.by_ids(group_ids)
442 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
443 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
443
444
444 ApplicationService.check_for_groups_alert(
445 ApplicationService.check_for_groups_alert(
445 application, 'alert', report_groups=report_groups,
446 application, 'alert', report_groups=report_groups,
446 occurence_dict=occurence_dict)
447 occurence_dict=occurence_dict)
447 users = set([p.user for p in application.users_for_perm('view')])
448 users = set([p.user for p in application.users_for_perm('view')])
448 report_groups = report_groups.all()
449 report_groups = report_groups.all()
449 for user in users:
450 for user in users:
450 UserService.report_notify(user, request, application,
451 UserService.report_notify(user, request, application,
451 report_groups=report_groups,
452 report_groups=report_groups,
452 occurence_dict=occurence_dict)
453 occurence_dict=occurence_dict)
453 for group in report_groups:
454 for group in report_groups:
454 # marks report_groups as notified
455 # marks report_groups as notified
455 if not group.notified:
456 if not group.notified:
456 group.notified = True
457 group.notified = True
457 except Exception as exc:
458 except Exception as exc:
458 print_traceback(log)
459 print_traceback(log)
459 raise
460 raise
460
461
461
462
462 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
463 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
463 def check_alerts(resource_id):
464 def check_alerts(resource_id):
464 since_when = datetime.utcnow()
465 since_when = datetime.utcnow()
465 try:
466 try:
466 request = get_current_request()
467 request = get_current_request()
467 application = ApplicationService.by_id(resource_id)
468 application = ApplicationService.by_id(resource_id)
468 if not application:
469 if not application:
469 return
470 return
470 error_key = REDIS_KEYS[
471 error_key = REDIS_KEYS[
471 'reports_to_notify_per_type_per_app_alerting'].format(
472 'reports_to_notify_per_type_per_app_alerting'].format(
472 ReportType.error, resource_id)
473 ReportType.error, resource_id)
473 slow_key = REDIS_KEYS[
474 slow_key = REDIS_KEYS[
474 'reports_to_notify_per_type_per_app_alerting'].format(
475 'reports_to_notify_per_type_per_app_alerting'].format(
475 ReportType.slow, resource_id)
476 ReportType.slow, resource_id)
476 error_group_ids = Datastores.redis.smembers(error_key)
477 error_group_ids = Datastores.redis.smembers(error_key)
477 slow_group_ids = Datastores.redis.smembers(slow_key)
478 slow_group_ids = Datastores.redis.smembers(slow_key)
478 Datastores.redis.delete(error_key)
479 Datastores.redis.delete(error_key)
479 Datastores.redis.delete(slow_key)
480 Datastores.redis.delete(slow_key)
480 err_gids = [int(g_id) for g_id in error_group_ids]
481 err_gids = [int(g_id) for g_id in error_group_ids]
481 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
482 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
482 group_ids = err_gids + slow_gids
483 group_ids = err_gids + slow_gids
483 occurence_dict = {}
484 occurence_dict = {}
484 for g_id in group_ids:
485 for g_id in group_ids:
485 key = REDIS_KEYS['counters'][
486 key = REDIS_KEYS['counters'][
486 'report_group_occurences_alerting'].format(
487 'report_group_occurences_alerting'].format(
487 g_id)
488 g_id)
488 val = Datastores.redis.get(key)
489 val = Datastores.redis.get(key)
489 Datastores.redis.delete(key)
490 Datastores.redis.delete(key)
490 if val:
491 if val:
491 occurence_dict[g_id] = int(val)
492 occurence_dict[g_id] = int(val)
492 else:
493 else:
493 occurence_dict[g_id] = 1
494 occurence_dict[g_id] = 1
494 report_groups = ReportGroupService.by_ids(group_ids)
495 report_groups = ReportGroupService.by_ids(group_ids)
495 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
496 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
496
497
497 ApplicationService.check_for_groups_alert(
498 ApplicationService.check_for_groups_alert(
498 application, 'alert', report_groups=report_groups,
499 application, 'alert', report_groups=report_groups,
499 occurence_dict=occurence_dict, since_when=since_when)
500 occurence_dict=occurence_dict, since_when=since_when)
500 except Exception as exc:
501 except Exception as exc:
501 print_traceback(log)
502 print_traceback(log)
502 raise
503 raise
503
504
504
505
505 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
506 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
506 def close_alerts():
507 def close_alerts():
507 log.warning('Checking alerts')
508 log.warning('Checking alerts')
508 since_when = datetime.utcnow()
509 since_when = datetime.utcnow()
509 try:
510 try:
510 event_types = [Event.types['error_report_alert'],
511 event_types = [Event.types['error_report_alert'],
511 Event.types['slow_report_alert'], ]
512 Event.types['slow_report_alert'], ]
512 statuses = [Event.statuses['active']]
513 statuses = [Event.statuses['active']]
513 # get events older than 5 min
514 # get events older than 5 min
514 events = EventService.by_type_and_status(
515 events = EventService.by_type_and_status(
515 event_types,
516 event_types,
516 statuses,
517 statuses,
517 older_than=(since_when - timedelta(minutes=5)))
518 older_than=(since_when - timedelta(minutes=5)))
518 for event in events:
519 for event in events:
519 # see if we can close them
520 # see if we can close them
520 event.validate_or_close(
521 event.validate_or_close(
521 since_when=(since_when - timedelta(minutes=1)))
522 since_when=(since_when - timedelta(minutes=1)))
522 except Exception as exc:
523 except Exception as exc:
523 print_traceback(log)
524 print_traceback(log)
524 raise
525 raise
525
526
526
527
527 @celery.task(queue="default", default_retry_delay=600, max_retries=999)
528 @celery.task(queue="default", default_retry_delay=600, max_retries=999)
528 def update_tag_counter(tag_name, tag_value, count):
529 def update_tag_counter(tag_name, tag_value, count):
529 try:
530 try:
530 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
531 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
531 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
532 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
532 sa.types.TEXT))
533 sa.types.TEXT))
533 query.update({'times_seen': Tag.times_seen + count,
534 query.update({'times_seen': Tag.times_seen + count,
534 'last_timestamp': datetime.utcnow()},
535 'last_timestamp': datetime.utcnow()},
535 synchronize_session=False)
536 synchronize_session=False)
536 session = DBSession()
537 session = DBSession()
537 mark_changed(session)
538 mark_changed(session)
538 return True
539 return True
539 except Exception as exc:
540 except Exception as exc:
540 print_traceback(log)
541 print_traceback(log)
541 update_tag_counter.retry(exc=exc)
542 update_tag_counter.retry(exc=exc)
542
543
543
544
544 @celery.task(queue="default")
545 @celery.task(queue="default")
545 def update_tag_counters():
546 def update_tag_counters():
546 """
547 """
547 Sets task to update counters for application tags
548 Sets task to update counters for application tags
548 """
549 """
549 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
550 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
550 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
551 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
551 c = collections.Counter(tags)
552 c = collections.Counter(tags)
552 for t_json, count in c.items():
553 for t_json, count in c.items():
553 tag_info = json.loads(t_json)
554 tag_info = json.loads(t_json)
554 update_tag_counter.delay(tag_info[0], tag_info[1], count)
555 update_tag_counter.delay(tag_info[0], tag_info[1], count)
555
556
556
557
557 @celery.task(queue="default")
558 @celery.task(queue="default")
558 def daily_digest():
559 def daily_digest():
559 """
560 """
560 Sends daily digest with top 50 error reports
561 Sends daily digest with top 50 error reports
561 """
562 """
562 request = get_current_request()
563 request = get_current_request()
563 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
564 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
564 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
565 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
565 since_when = datetime.utcnow() - timedelta(hours=8)
566 since_when = datetime.utcnow() - timedelta(hours=8)
566 log.warning('Generating daily digests')
567 log.warning('Generating daily digests')
567 for resource_id in apps:
568 for resource_id in apps:
568 resource_id = resource_id.decode('utf8')
569 resource_id = resource_id.decode('utf8')
569 end_date = datetime.utcnow().replace(microsecond=0, second=0)
570 end_date = datetime.utcnow().replace(microsecond=0, second=0)
570 filter_settings = {'resource': [resource_id],
571 filter_settings = {'resource': [resource_id],
571 'tags': [{'name': 'type',
572 'tags': [{'name': 'type',
572 'value': ['error'], 'op': None}],
573 'value': ['error'], 'op': None}],
573 'type': 'error', 'start_date': since_when,
574 'type': 'error', 'start_date': since_when,
574 'end_date': end_date}
575 'end_date': end_date}
575
576
576 reports = ReportGroupService.get_trending(
577 reports = ReportGroupService.get_trending(
577 request, filter_settings=filter_settings, limit=50)
578 request, filter_settings=filter_settings, limit=50)
578
579
579 application = ApplicationService.by_id(resource_id)
580 application = ApplicationService.by_id(resource_id)
580 if application:
581 if application:
581 users = set([p.user for p in application.users_for_perm('view')])
582 users = set([p.user for p in application.users_for_perm('view')])
582 for user in users:
583 for user in users:
583 user.send_digest(request, application, reports=reports,
584 user.send_digest(request, application, reports=reports,
584 since_when=since_when)
585 since_when=since_when)
585
586
586
587
587 @celery.task(queue="default")
588 @celery.task(queue="default")
588 def notifications_reports():
589 def notifications_reports():
589 """
590 """
590 Loop that checks redis for info and then issues new tasks to celery to
591 Loop that checks redis for info and then issues new tasks to celery to
591 issue notifications
592 issue notifications
592 """
593 """
593 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
594 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
594 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
595 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
595 for app in apps:
596 for app in apps:
596 log.warning('Notify for app: %s' % app)
597 log.warning('Notify for app: %s' % app)
597 check_user_report_notifications.delay(app.decode('utf8'))
598 check_user_report_notifications.delay(app.decode('utf8'))
598
599
599 @celery.task(queue="default")
600 @celery.task(queue="default")
600 def alerting_reports():
601 def alerting_reports():
601 """
602 """
602 Loop that checks redis for info and then issues new tasks to celery to
603 Loop that checks redis for info and then issues new tasks to celery to
603 perform the following:
604 perform the following:
604 - which applications should have new alerts opened
605 - which applications should have new alerts opened
605 """
606 """
606
607
607 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
608 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
608 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
609 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
609 for app in apps:
610 for app in apps:
610 log.warning('Notify for app: %s' % app)
611 log.warning('Notify for app: %s' % app)
611 check_alerts.delay(app.decode('utf8'))
612 check_alerts.delay(app.decode('utf8'))
612
613
613
614
614 @celery.task(queue="default", soft_time_limit=3600 * 4,
615 @celery.task(queue="default", soft_time_limit=3600 * 4,
615 hard_time_limit=3600 * 4, max_retries=999)
616 hard_time_limit=3600 * 4, max_retries=999)
616 def logs_cleanup(resource_id, filter_settings):
617 def logs_cleanup(resource_id, filter_settings):
617 request = get_current_request()
618 request = get_current_request()
618 request.tm.begin()
619 request.tm.begin()
619 es_query = {
620 es_query = {
620 "_source": False,
621 "_source": False,
621 "size": 5000,
622 "size": 5000,
622 "query": {
623 "query": {
623 "filtered": {
624 "filtered": {
624 "filter": {
625 "filter": {
625 "and": [{"term": {"resource_id": resource_id}}]
626 "and": [{"term": {"resource_id": resource_id}}]
626 }
627 }
627 }
628 }
628 }
629 }
629 }
630 }
630
631
631 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
632 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
632 if filter_settings['namespace']:
633 if filter_settings['namespace']:
633 query = query.filter(Log.namespace == filter_settings['namespace'][0])
634 query = query.filter(Log.namespace == filter_settings['namespace'][0])
634 es_query['query']['filtered']['filter']['and'].append(
635 es_query['query']['filtered']['filter']['and'].append(
635 {"term": {"namespace": filter_settings['namespace'][0]}}
636 {"term": {"namespace": filter_settings['namespace'][0]}}
636 )
637 )
637 query.delete(synchronize_session=False)
638 query.delete(synchronize_session=False)
638 request.tm.commit()
639 request.tm.commit()
639 result = request.es_conn.search(es_query, index='rcae_l_*',
640 result = request.es_conn.search(es_query, index='rcae_l_*',
640 doc_type='log', es_scroll='1m',
641 doc_type='log', es_scroll='1m',
641 es_search_type='scan')
642 es_search_type='scan')
642 scroll_id = result['_scroll_id']
643 scroll_id = result['_scroll_id']
643 while True:
644 while True:
644 log.warning('log_cleanup, app:{} ns:{} batch'.format(
645 log.warning('log_cleanup, app:{} ns:{} batch'.format(
645 resource_id,
646 resource_id,
646 filter_settings['namespace']
647 filter_settings['namespace']
647 ))
648 ))
648 es_docs_to_delete = []
649 es_docs_to_delete = []
649 result = request.es_conn.send_request(
650 result = request.es_conn.send_request(
650 'POST', ['_search', 'scroll'],
651 'POST', ['_search', 'scroll'],
651 body=scroll_id, query_params={"scroll": '1m'})
652 body=scroll_id, query_params={"scroll": '1m'})
652 scroll_id = result['_scroll_id']
653 scroll_id = result['_scroll_id']
653 if not result['hits']['hits']:
654 if not result['hits']['hits']:
654 break
655 break
655 for doc in result['hits']['hits']:
656 for doc in result['hits']['hits']:
656 es_docs_to_delete.append({"id": doc['_id'],
657 es_docs_to_delete.append({"id": doc['_id'],
657 "index": doc['_index']})
658 "index": doc['_index']})
658
659
659 for batch in in_batches(es_docs_to_delete, 10):
660 for batch in in_batches(es_docs_to_delete, 10):
660 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
661 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
661 **to_del)
662 **to_del)
662 for to_del in batch])
663 for to_del in batch])
General Comments 0
You need to be logged in to leave comments. Login now