##// END OF EJS Templates
tasks: when CELERY_EAGER_PROPAGATES_EXCEPTIONS is true do not retry tasks
ergo -
Show More
@@ -1,662 +1,672 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 #
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
7 # You may obtain a copy of the License at
8 #
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
10 #
11 # Unless required by applicable law or agreed to in writing, software
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
15 # limitations under the License.
16
16
17 import bisect
17 import bisect
18 import collections
18 import collections
19 import math
19 import math
20 from datetime import datetime, timedelta
20 from datetime import datetime, timedelta
21
21
22 import sqlalchemy as sa
22 import sqlalchemy as sa
23 import pyelasticsearch
23 import pyelasticsearch
24
24
25 from celery.utils.log import get_task_logger
25 from celery.utils.log import get_task_logger
26 from zope.sqlalchemy import mark_changed
26 from zope.sqlalchemy import mark_changed
27 from pyramid.threadlocal import get_current_request, get_current_registry
27 from pyramid.threadlocal import get_current_request, get_current_registry
28 from ziggurat_foundations.models.services.resource import ResourceService
28 from ziggurat_foundations.models.services.resource import ResourceService
29
29
30 from appenlight.celery import celery
30 from appenlight.celery import celery
31 from appenlight.models.report_group import ReportGroup
31 from appenlight.models.report_group import ReportGroup
32 from appenlight.models import DBSession, Datastores
32 from appenlight.models import DBSession, Datastores
33 from appenlight.models.report import Report
33 from appenlight.models.report import Report
34 from appenlight.models.log import Log
34 from appenlight.models.log import Log
35 from appenlight.models.metric import Metric
35 from appenlight.models.metric import Metric
36 from appenlight.models.event import Event
36 from appenlight.models.event import Event
37
37
38 from appenlight.models.services.application import ApplicationService
38 from appenlight.models.services.application import ApplicationService
39 from appenlight.models.services.event import EventService
39 from appenlight.models.services.event import EventService
40 from appenlight.models.services.log import LogService
40 from appenlight.models.services.log import LogService
41 from appenlight.models.services.report import ReportService
41 from appenlight.models.services.report import ReportService
42 from appenlight.models.services.report_group import ReportGroupService
42 from appenlight.models.services.report_group import ReportGroupService
43 from appenlight.models.services.user import UserService
43 from appenlight.models.services.user import UserService
44 from appenlight.models.tag import Tag
44 from appenlight.models.tag import Tag
45 from appenlight.lib import print_traceback
45 from appenlight.lib import print_traceback
46 from appenlight.lib.utils import parse_proto, in_batches
46 from appenlight.lib.utils import parse_proto, in_batches
47 from appenlight.lib.ext_json import json
47 from appenlight.lib.ext_json import json
48 from appenlight.lib.redis_keys import REDIS_KEYS
48 from appenlight.lib.redis_keys import REDIS_KEYS
49 from appenlight.lib.enums import ReportType
49 from appenlight.lib.enums import ReportType
50
50
51 log = get_task_logger(__name__)
51 log = get_task_logger(__name__)
52
52
53 sample_boundries = list(range(100, 1000, 100)) + \
53 sample_boundries = list(range(100, 1000, 100)) + \
54 list(range(1000, 10000, 1000)) + \
54 list(range(1000, 10000, 1000)) + \
55 list(range(10000, 100000, 5000))
55 list(range(10000, 100000, 5000))
56
56
57
57
58 def pick_sample(total_occurences, report_type=None):
58 def pick_sample(total_occurences, report_type=None):
59 every = 1.0
59 every = 1.0
60 position = bisect.bisect_left(sample_boundries, total_occurences)
60 position = bisect.bisect_left(sample_boundries, total_occurences)
61 if position > 0:
61 if position > 0:
62 if report_type == ReportType.not_found:
62 if report_type == ReportType.not_found:
63 divide = 10.0
63 divide = 10.0
64 else:
64 else:
65 divide = 100.0
65 divide = 100.0
66 every = sample_boundries[position - 1] / divide
66 every = sample_boundries[position - 1] / divide
67 return total_occurences % every == 0
67 return total_occurences % every == 0
68
68
69
69
70 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
70 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
71 def test_exception_task():
71 def test_exception_task():
72 log.error('test celery log', extra={'location': 'celery'})
72 log.error('test celery log', extra={'location': 'celery'})
73 log.warning('test celery log', extra={'location': 'celery'})
73 log.warning('test celery log', extra={'location': 'celery'})
74 raise Exception('Celery exception test')
74 raise Exception('Celery exception test')
75
75
76
76
77 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
77 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
78 def test_retry_exception_task():
78 def test_retry_exception_task():
79 try:
79 try:
80 import time
80 import time
81
81
82 time.sleep(1.3)
82 time.sleep(1.3)
83 log.error('test retry celery log', extra={'location': 'celery'})
83 log.error('test retry celery log', extra={'location': 'celery'})
84 log.warning('test retry celery log', extra={'location': 'celery'})
84 log.warning('test retry celery log', extra={'location': 'celery'})
85 raise Exception('Celery exception test')
85 raise Exception('Celery exception test')
86 except Exception as exc:
86 except Exception as exc:
87 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
88 raise
87 test_retry_exception_task.retry(exc=exc)
89 test_retry_exception_task.retry(exc=exc)
88
90
89
91
90 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
92 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
91 def add_reports(resource_id, request_params, dataset, **kwargs):
93 def add_reports(resource_id, request_params, dataset, **kwargs):
92 proto_version = parse_proto(request_params.get('protocol_version', ''))
94 proto_version = parse_proto(request_params.get('protocol_version', ''))
93 current_time = datetime.utcnow().replace(second=0, microsecond=0)
95 current_time = datetime.utcnow().replace(second=0, microsecond=0)
94 try:
96 try:
95 # we will store solr docs here for single insert
97 # we will store solr docs here for single insert
96 es_report_docs = {}
98 es_report_docs = {}
97 es_report_group_docs = {}
99 es_report_group_docs = {}
98 resource = ApplicationService.by_id(resource_id)
100 resource = ApplicationService.by_id(resource_id)
99
101
100 tags = []
102 tags = []
101 es_slow_calls_docs = {}
103 es_slow_calls_docs = {}
102 es_reports_stats_rows = {}
104 es_reports_stats_rows = {}
103 for report_data in dataset:
105 for report_data in dataset:
104 # build report details for later
106 # build report details for later
105 added_details = 0
107 added_details = 0
106 report = Report()
108 report = Report()
107 report.set_data(report_data, resource, proto_version)
109 report.set_data(report_data, resource, proto_version)
108 report._skip_ft_index = True
110 report._skip_ft_index = True
109
111
110 # find latest group in this months partition
112 # find latest group in this months partition
111 report_group = ReportGroupService.by_hash_and_resource(
113 report_group = ReportGroupService.by_hash_and_resource(
112 report.resource_id,
114 report.resource_id,
113 report.grouping_hash,
115 report.grouping_hash,
114 since_when=datetime.utcnow().date().replace(day=1)
116 since_when=datetime.utcnow().date().replace(day=1)
115 )
117 )
116 occurences = report_data.get('occurences', 1)
118 occurences = report_data.get('occurences', 1)
117 if not report_group:
119 if not report_group:
118 # total reports will be +1 moment later
120 # total reports will be +1 moment later
119 report_group = ReportGroup(grouping_hash=report.grouping_hash,
121 report_group = ReportGroup(grouping_hash=report.grouping_hash,
120 occurences=0, total_reports=0,
122 occurences=0, total_reports=0,
121 last_report=0,
123 last_report=0,
122 priority=report.priority,
124 priority=report.priority,
123 error=report.error,
125 error=report.error,
124 first_timestamp=report.start_time)
126 first_timestamp=report.start_time)
125 report_group._skip_ft_index = True
127 report_group._skip_ft_index = True
126 report_group.report_type = report.report_type
128 report_group.report_type = report.report_type
127 report.report_group_time = report_group.first_timestamp
129 report.report_group_time = report_group.first_timestamp
128 add_sample = pick_sample(report_group.occurences,
130 add_sample = pick_sample(report_group.occurences,
129 report_type=report_group.report_type)
131 report_type=report_group.report_type)
130 if add_sample:
132 if add_sample:
131 resource.report_groups.append(report_group)
133 resource.report_groups.append(report_group)
132 report_group.reports.append(report)
134 report_group.reports.append(report)
133 added_details += 1
135 added_details += 1
134 DBSession.flush()
136 DBSession.flush()
135 if report.partition_id not in es_report_docs:
137 if report.partition_id not in es_report_docs:
136 es_report_docs[report.partition_id] = []
138 es_report_docs[report.partition_id] = []
137 es_report_docs[report.partition_id].append(report.es_doc())
139 es_report_docs[report.partition_id].append(report.es_doc())
138 tags.extend(list(report.tags.items()))
140 tags.extend(list(report.tags.items()))
139 slow_calls = report.add_slow_calls(report_data, report_group)
141 slow_calls = report.add_slow_calls(report_data, report_group)
140 DBSession.flush()
142 DBSession.flush()
141 for s_call in slow_calls:
143 for s_call in slow_calls:
142 if s_call.partition_id not in es_slow_calls_docs:
144 if s_call.partition_id not in es_slow_calls_docs:
143 es_slow_calls_docs[s_call.partition_id] = []
145 es_slow_calls_docs[s_call.partition_id] = []
144 es_slow_calls_docs[s_call.partition_id].append(
146 es_slow_calls_docs[s_call.partition_id].append(
145 s_call.es_doc())
147 s_call.es_doc())
146 # try generating new stat rows if needed
148 # try generating new stat rows if needed
147 else:
149 else:
148 # required for postprocessing to not fail later
150 # required for postprocessing to not fail later
149 report.report_group = report_group
151 report.report_group = report_group
150
152
151 stat_row = ReportService.generate_stat_rows(
153 stat_row = ReportService.generate_stat_rows(
152 report, resource, report_group)
154 report, resource, report_group)
153 if stat_row.partition_id not in es_reports_stats_rows:
155 if stat_row.partition_id not in es_reports_stats_rows:
154 es_reports_stats_rows[stat_row.partition_id] = []
156 es_reports_stats_rows[stat_row.partition_id] = []
155 es_reports_stats_rows[stat_row.partition_id].append(
157 es_reports_stats_rows[stat_row.partition_id].append(
156 stat_row.es_doc())
158 stat_row.es_doc())
157
159
158 # see if we should mark 10th occurence of report
160 # see if we should mark 10th occurence of report
159 last_occurences_10 = int(math.floor(report_group.occurences / 10))
161 last_occurences_10 = int(math.floor(report_group.occurences / 10))
160 curr_occurences_10 = int(math.floor(
162 curr_occurences_10 = int(math.floor(
161 (report_group.occurences + report.occurences) / 10))
163 (report_group.occurences + report.occurences) / 10))
162 last_occurences_100 = int(
164 last_occurences_100 = int(
163 math.floor(report_group.occurences / 100))
165 math.floor(report_group.occurences / 100))
164 curr_occurences_100 = int(math.floor(
166 curr_occurences_100 = int(math.floor(
165 (report_group.occurences + report.occurences) / 100))
167 (report_group.occurences + report.occurences) / 100))
166 notify_occurences_10 = last_occurences_10 != curr_occurences_10
168 notify_occurences_10 = last_occurences_10 != curr_occurences_10
167 notify_occurences_100 = last_occurences_100 != curr_occurences_100
169 notify_occurences_100 = last_occurences_100 != curr_occurences_100
168 report_group.occurences = ReportGroup.occurences + occurences
170 report_group.occurences = ReportGroup.occurences + occurences
169 report_group.last_timestamp = report.start_time
171 report_group.last_timestamp = report.start_time
170 report_group.summed_duration = ReportGroup.summed_duration + report.duration
172 report_group.summed_duration = ReportGroup.summed_duration + report.duration
171 summed_duration = ReportGroup.summed_duration + report.duration
173 summed_duration = ReportGroup.summed_duration + report.duration
172 summed_occurences = ReportGroup.occurences + occurences
174 summed_occurences = ReportGroup.occurences + occurences
173 report_group.average_duration = summed_duration / summed_occurences
175 report_group.average_duration = summed_duration / summed_occurences
174 report_group.run_postprocessing(report)
176 report_group.run_postprocessing(report)
175 if added_details:
177 if added_details:
176 report_group.total_reports = ReportGroup.total_reports + 1
178 report_group.total_reports = ReportGroup.total_reports + 1
177 report_group.last_report = report.id
179 report_group.last_report = report.id
178 report_group.set_notification_info(notify_10=notify_occurences_10,
180 report_group.set_notification_info(notify_10=notify_occurences_10,
179 notify_100=notify_occurences_100)
181 notify_100=notify_occurences_100)
180 DBSession.flush()
182 DBSession.flush()
181 report_group.get_report().notify_channel(report_group)
183 report_group.get_report().notify_channel(report_group)
182 if report_group.partition_id not in es_report_group_docs:
184 if report_group.partition_id not in es_report_group_docs:
183 es_report_group_docs[report_group.partition_id] = []
185 es_report_group_docs[report_group.partition_id] = []
184 es_report_group_docs[report_group.partition_id].append(
186 es_report_group_docs[report_group.partition_id].append(
185 report_group.es_doc())
187 report_group.es_doc())
186
188
187 action = 'REPORT'
189 action = 'REPORT'
188 log_msg = '%s: %s %s, client: %s, proto: %s' % (
190 log_msg = '%s: %s %s, client: %s, proto: %s' % (
189 action,
191 action,
190 report_data.get('http_status', 'unknown'),
192 report_data.get('http_status', 'unknown'),
191 str(resource),
193 str(resource),
192 report_data.get('client'),
194 report_data.get('client'),
193 proto_version)
195 proto_version)
194 log.info(log_msg)
196 log.info(log_msg)
195 total_reports = len(dataset)
197 total_reports = len(dataset)
196 redis_pipeline = Datastores.redis.pipeline(transaction=False)
198 redis_pipeline = Datastores.redis.pipeline(transaction=False)
197 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
199 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
198 redis_pipeline.incr(key, total_reports)
200 redis_pipeline.incr(key, total_reports)
199 redis_pipeline.expire(key, 3600 * 24)
201 redis_pipeline.expire(key, 3600 * 24)
200 key = REDIS_KEYS['counters']['events_per_minute_per_user'].format(
202 key = REDIS_KEYS['counters']['events_per_minute_per_user'].format(
201 resource.owner_user_id, current_time)
203 resource.owner_user_id, current_time)
202 redis_pipeline.incr(key, total_reports)
204 redis_pipeline.incr(key, total_reports)
203 redis_pipeline.expire(key, 3600)
205 redis_pipeline.expire(key, 3600)
204 key = REDIS_KEYS['counters']['reports_per_hour_per_app'].format(
206 key = REDIS_KEYS['counters']['reports_per_hour_per_app'].format(
205 resource_id, current_time.replace(minute=0))
207 resource_id, current_time.replace(minute=0))
206 redis_pipeline.incr(key, total_reports)
208 redis_pipeline.incr(key, total_reports)
207 redis_pipeline.expire(key, 3600 * 24 * 7)
209 redis_pipeline.expire(key, 3600 * 24 * 7)
208 redis_pipeline.sadd(
210 redis_pipeline.sadd(
209 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
211 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
210 current_time.replace(minute=0)), resource_id)
212 current_time.replace(minute=0)), resource_id)
211 redis_pipeline.execute()
213 redis_pipeline.execute()
212
214
213 add_reports_es(es_report_group_docs, es_report_docs)
215 add_reports_es(es_report_group_docs, es_report_docs)
214 add_reports_slow_calls_es(es_slow_calls_docs)
216 add_reports_slow_calls_es(es_slow_calls_docs)
215 add_reports_stats_rows_es(es_reports_stats_rows)
217 add_reports_stats_rows_es(es_reports_stats_rows)
216 return True
218 return True
217 except Exception as exc:
219 except Exception as exc:
218 print_traceback(log)
220 print_traceback(log)
221 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
222 raise
219 add_reports.retry(exc=exc)
223 add_reports.retry(exc=exc)
220
224
221
225
222 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
226 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
223 def add_reports_es(report_group_docs, report_docs):
227 def add_reports_es(report_group_docs, report_docs):
224 for k, v in report_group_docs.items():
228 for k, v in report_group_docs.items():
225 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
229 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
226 for k, v in report_docs.items():
230 for k, v in report_docs.items():
227 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
231 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
228 parent_field='_parent')
232 parent_field='_parent')
229
233
230
234
231 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
235 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
232 def add_reports_slow_calls_es(es_docs):
236 def add_reports_slow_calls_es(es_docs):
233 for k, v in es_docs.items():
237 for k, v in es_docs.items():
234 Datastores.es.bulk_index(k, 'log', v)
238 Datastores.es.bulk_index(k, 'log', v)
235
239
236
240
237 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
241 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
238 def add_reports_stats_rows_es(es_docs):
242 def add_reports_stats_rows_es(es_docs):
239 for k, v in es_docs.items():
243 for k, v in es_docs.items():
240 Datastores.es.bulk_index(k, 'log', v)
244 Datastores.es.bulk_index(k, 'log', v)
241
245
242
246
243 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
247 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
244 def add_logs(resource_id, request_params, dataset, **kwargs):
248 def add_logs(resource_id, request_params, dataset, **kwargs):
245 proto_version = request_params.get('protocol_version')
249 proto_version = request_params.get('protocol_version')
246 current_time = datetime.utcnow().replace(second=0, microsecond=0)
250 current_time = datetime.utcnow().replace(second=0, microsecond=0)
247
251
248 try:
252 try:
249 es_docs = collections.defaultdict(list)
253 es_docs = collections.defaultdict(list)
250 resource = ApplicationService.by_id_cached()(resource_id)
254 resource = ApplicationService.by_id_cached()(resource_id)
251 resource = DBSession.merge(resource, load=False)
255 resource = DBSession.merge(resource, load=False)
252 ns_pairs = []
256 ns_pairs = []
253 for entry in dataset:
257 for entry in dataset:
254 # gather pk and ns so we can remove older versions of row later
258 # gather pk and ns so we can remove older versions of row later
255 if entry['primary_key'] is not None:
259 if entry['primary_key'] is not None:
256 ns_pairs.append({"pk": entry['primary_key'],
260 ns_pairs.append({"pk": entry['primary_key'],
257 "ns": entry['namespace']})
261 "ns": entry['namespace']})
258 log_entry = Log()
262 log_entry = Log()
259 log_entry.set_data(entry, resource=resource)
263 log_entry.set_data(entry, resource=resource)
260 log_entry._skip_ft_index = True
264 log_entry._skip_ft_index = True
261 resource.logs.append(log_entry)
265 resource.logs.append(log_entry)
262 DBSession.flush()
266 DBSession.flush()
263 # insert non pk rows first
267 # insert non pk rows first
264 if entry['primary_key'] is None:
268 if entry['primary_key'] is None:
265 es_docs[log_entry.partition_id].append(log_entry.es_doc())
269 es_docs[log_entry.partition_id].append(log_entry.es_doc())
266
270
267 # 2nd pass to delete all log entries from db foe same pk/ns pair
271 # 2nd pass to delete all log entries from db foe same pk/ns pair
268 if ns_pairs:
272 if ns_pairs:
269 ids_to_delete = []
273 ids_to_delete = []
270 es_docs = collections.defaultdict(list)
274 es_docs = collections.defaultdict(list)
271 es_docs_to_delete = collections.defaultdict(list)
275 es_docs_to_delete = collections.defaultdict(list)
272 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
276 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
273 list_of_pairs=ns_pairs)
277 list_of_pairs=ns_pairs)
274 log_dict = {}
278 log_dict = {}
275 for log_entry in found_pkey_logs:
279 for log_entry in found_pkey_logs:
276 log_key = (log_entry.primary_key, log_entry.namespace)
280 log_key = (log_entry.primary_key, log_entry.namespace)
277 if log_key not in log_dict:
281 if log_key not in log_dict:
278 log_dict[log_key] = []
282 log_dict[log_key] = []
279 log_dict[log_key].append(log_entry)
283 log_dict[log_key].append(log_entry)
280
284
281 for ns, entry_list in log_dict.items():
285 for ns, entry_list in log_dict.items():
282 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
286 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
283 # newest row needs to be indexed in es
287 # newest row needs to be indexed in es
284 log_entry = entry_list[-1]
288 log_entry = entry_list[-1]
285 # delete everything from pg and ES, leave the last row in pg
289 # delete everything from pg and ES, leave the last row in pg
286 for e in entry_list[:-1]:
290 for e in entry_list[:-1]:
287 ids_to_delete.append(e.log_id)
291 ids_to_delete.append(e.log_id)
288 es_docs_to_delete[e.partition_id].append(e.delete_hash)
292 es_docs_to_delete[e.partition_id].append(e.delete_hash)
289
293
290 es_docs_to_delete[log_entry.partition_id].append(
294 es_docs_to_delete[log_entry.partition_id].append(
291 log_entry.delete_hash)
295 log_entry.delete_hash)
292
296
293 es_docs[log_entry.partition_id].append(log_entry.es_doc())
297 es_docs[log_entry.partition_id].append(log_entry.es_doc())
294
298
295 if ids_to_delete:
299 if ids_to_delete:
296 query = DBSession.query(Log).filter(
300 query = DBSession.query(Log).filter(
297 Log.log_id.in_(ids_to_delete))
301 Log.log_id.in_(ids_to_delete))
298 query.delete(synchronize_session=False)
302 query.delete(synchronize_session=False)
299 if es_docs_to_delete:
303 if es_docs_to_delete:
300 # batch this to avoid problems with default ES bulk limits
304 # batch this to avoid problems with default ES bulk limits
301 for es_index in es_docs_to_delete.keys():
305 for es_index in es_docs_to_delete.keys():
302 for batch in in_batches(es_docs_to_delete[es_index], 20):
306 for batch in in_batches(es_docs_to_delete[es_index], 20):
303 query = {'terms': {'delete_hash': batch}}
307 query = {'terms': {'delete_hash': batch}}
304
308
305 try:
309 try:
306 Datastores.es.delete_by_query(
310 Datastores.es.delete_by_query(
307 es_index, 'log', query)
311 es_index, 'log', query)
308 except pyelasticsearch.ElasticHttpNotFoundError as exc:
312 except pyelasticsearch.ElasticHttpNotFoundError as exc:
309 msg = 'skipping index {}'.format(es_index)
313 msg = 'skipping index {}'.format(es_index)
310 log.info(msg)
314 log.info(msg)
311
315
312 total_logs = len(dataset)
316 total_logs = len(dataset)
313
317
314 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
318 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
315 str(resource),
319 str(resource),
316 total_logs,
320 total_logs,
317 proto_version)
321 proto_version)
318 log.info(log_msg)
322 log.info(log_msg)
319 # mark_changed(session)
323 # mark_changed(session)
320 redis_pipeline = Datastores.redis.pipeline(transaction=False)
324 redis_pipeline = Datastores.redis.pipeline(transaction=False)
321 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
325 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
322 redis_pipeline.incr(key, total_logs)
326 redis_pipeline.incr(key, total_logs)
323 redis_pipeline.expire(key, 3600 * 24)
327 redis_pipeline.expire(key, 3600 * 24)
324 key = REDIS_KEYS['counters']['events_per_minute_per_user'].format(
328 key = REDIS_KEYS['counters']['events_per_minute_per_user'].format(
325 resource.owner_user_id, current_time)
329 resource.owner_user_id, current_time)
326 redis_pipeline.incr(key, total_logs)
330 redis_pipeline.incr(key, total_logs)
327 redis_pipeline.expire(key, 3600)
331 redis_pipeline.expire(key, 3600)
328 key = REDIS_KEYS['counters']['logs_per_hour_per_app'].format(
332 key = REDIS_KEYS['counters']['logs_per_hour_per_app'].format(
329 resource_id, current_time.replace(minute=0))
333 resource_id, current_time.replace(minute=0))
330 redis_pipeline.incr(key, total_logs)
334 redis_pipeline.incr(key, total_logs)
331 redis_pipeline.expire(key, 3600 * 24 * 7)
335 redis_pipeline.expire(key, 3600 * 24 * 7)
332 redis_pipeline.sadd(
336 redis_pipeline.sadd(
333 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
337 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
334 current_time.replace(minute=0)), resource_id)
338 current_time.replace(minute=0)), resource_id)
335 redis_pipeline.execute()
339 redis_pipeline.execute()
336 add_logs_es(es_docs)
340 add_logs_es(es_docs)
337 return True
341 return True
338 except Exception as exc:
342 except Exception as exc:
339 print_traceback(log)
343 print_traceback(log)
344 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
345 raise
340 add_logs.retry(exc=exc)
346 add_logs.retry(exc=exc)
341
347
342
348
343 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
349 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
344 def add_logs_es(es_docs):
350 def add_logs_es(es_docs):
345 for k, v in es_docs.items():
351 for k, v in es_docs.items():
346 Datastores.es.bulk_index(k, 'log', v)
352 Datastores.es.bulk_index(k, 'log', v)
347
353
348
354
349 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
355 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
350 def add_metrics(resource_id, request_params, dataset, proto_version):
356 def add_metrics(resource_id, request_params, dataset, proto_version):
351 current_time = datetime.utcnow().replace(second=0, microsecond=0)
357 current_time = datetime.utcnow().replace(second=0, microsecond=0)
352 try:
358 try:
353 resource = ApplicationService.by_id_cached()(resource_id)
359 resource = ApplicationService.by_id_cached()(resource_id)
354 resource = DBSession.merge(resource, load=False)
360 resource = DBSession.merge(resource, load=False)
355 es_docs = []
361 es_docs = []
356 rows = []
362 rows = []
357 for metric in dataset:
363 for metric in dataset:
358 tags = dict(metric['tags'])
364 tags = dict(metric['tags'])
359 server_n = tags.get('server_name', metric['server_name']).lower()
365 server_n = tags.get('server_name', metric['server_name']).lower()
360 tags['server_name'] = server_n or 'unknown'
366 tags['server_name'] = server_n or 'unknown'
361 new_metric = Metric(
367 new_metric = Metric(
362 timestamp=metric['timestamp'],
368 timestamp=metric['timestamp'],
363 resource_id=resource.resource_id,
369 resource_id=resource.resource_id,
364 namespace=metric['namespace'],
370 namespace=metric['namespace'],
365 tags=tags)
371 tags=tags)
366 rows.append(new_metric)
372 rows.append(new_metric)
367 es_docs.append(new_metric.es_doc())
373 es_docs.append(new_metric.es_doc())
368 session = DBSession()
374 session = DBSession()
369 session.bulk_save_objects(rows)
375 session.bulk_save_objects(rows)
370 session.flush()
376 session.flush()
371
377
372 action = 'METRICS'
378 action = 'METRICS'
373 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
379 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
374 action,
380 action,
375 str(resource),
381 str(resource),
376 len(dataset),
382 len(dataset),
377 proto_version
383 proto_version
378 )
384 )
379 log.info(metrics_msg)
385 log.info(metrics_msg)
380
386
381 mark_changed(session)
387 mark_changed(session)
382 redis_pipeline = Datastores.redis.pipeline(transaction=False)
388 redis_pipeline = Datastores.redis.pipeline(transaction=False)
383 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
389 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
384 redis_pipeline.incr(key, len(rows))
390 redis_pipeline.incr(key, len(rows))
385 redis_pipeline.expire(key, 3600 * 24)
391 redis_pipeline.expire(key, 3600 * 24)
386 key = REDIS_KEYS['counters']['events_per_minute_per_user'].format(
392 key = REDIS_KEYS['counters']['events_per_minute_per_user'].format(
387 resource.owner_user_id, current_time)
393 resource.owner_user_id, current_time)
388 redis_pipeline.incr(key, len(rows))
394 redis_pipeline.incr(key, len(rows))
389 redis_pipeline.expire(key, 3600)
395 redis_pipeline.expire(key, 3600)
390 key = REDIS_KEYS['counters']['metrics_per_hour_per_app'].format(
396 key = REDIS_KEYS['counters']['metrics_per_hour_per_app'].format(
391 resource_id, current_time.replace(minute=0))
397 resource_id, current_time.replace(minute=0))
392 redis_pipeline.incr(key, len(rows))
398 redis_pipeline.incr(key, len(rows))
393 redis_pipeline.expire(key, 3600 * 24 * 7)
399 redis_pipeline.expire(key, 3600 * 24 * 7)
394 redis_pipeline.sadd(
400 redis_pipeline.sadd(
395 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
401 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
396 current_time.replace(minute=0)), resource_id)
402 current_time.replace(minute=0)), resource_id)
397 redis_pipeline.execute()
403 redis_pipeline.execute()
398 add_metrics_es(es_docs)
404 add_metrics_es(es_docs)
399 return True
405 return True
400 except Exception as exc:
406 except Exception as exc:
401 print_traceback(log)
407 print_traceback(log)
408 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
409 raise
402 add_metrics.retry(exc=exc)
410 add_metrics.retry(exc=exc)
403
411
404
412
405 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
413 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
406 def add_metrics_es(es_docs):
414 def add_metrics_es(es_docs):
407 for doc in es_docs:
415 for doc in es_docs:
408 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
416 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
409 Datastores.es.index(partition, 'log', doc)
417 Datastores.es.index(partition, 'log', doc)
410
418
411
419
412 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
420 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
413 def check_user_report_notifications(resource_id):
421 def check_user_report_notifications(resource_id):
414 since_when = datetime.utcnow()
422 since_when = datetime.utcnow()
415 try:
423 try:
416 request = get_current_request()
424 request = get_current_request()
417 application = ApplicationService.by_id(resource_id)
425 application = ApplicationService.by_id(resource_id)
418 if not application:
426 if not application:
419 return
427 return
420 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
428 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
421 ReportType.error, resource_id)
429 ReportType.error, resource_id)
422 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
430 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
423 ReportType.slow, resource_id)
431 ReportType.slow, resource_id)
424 error_group_ids = Datastores.redis.smembers(error_key)
432 error_group_ids = Datastores.redis.smembers(error_key)
425 slow_group_ids = Datastores.redis.smembers(slow_key)
433 slow_group_ids = Datastores.redis.smembers(slow_key)
426 Datastores.redis.delete(error_key)
434 Datastores.redis.delete(error_key)
427 Datastores.redis.delete(slow_key)
435 Datastores.redis.delete(slow_key)
428 err_gids = [int(g_id) for g_id in error_group_ids]
436 err_gids = [int(g_id) for g_id in error_group_ids]
429 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
437 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
430 group_ids = err_gids + slow_gids
438 group_ids = err_gids + slow_gids
431 occurence_dict = {}
439 occurence_dict = {}
432 for g_id in group_ids:
440 for g_id in group_ids:
433 key = REDIS_KEYS['counters']['report_group_occurences'].format(
441 key = REDIS_KEYS['counters']['report_group_occurences'].format(
434 g_id)
442 g_id)
435 val = Datastores.redis.get(key)
443 val = Datastores.redis.get(key)
436 Datastores.redis.delete(key)
444 Datastores.redis.delete(key)
437 if val:
445 if val:
438 occurence_dict[g_id] = int(val)
446 occurence_dict[g_id] = int(val)
439 else:
447 else:
440 occurence_dict[g_id] = 1
448 occurence_dict[g_id] = 1
441 report_groups = ReportGroupService.by_ids(group_ids)
449 report_groups = ReportGroupService.by_ids(group_ids)
442 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
450 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
443
451
444 ApplicationService.check_for_groups_alert(
452 ApplicationService.check_for_groups_alert(
445 application, 'alert', report_groups=report_groups,
453 application, 'alert', report_groups=report_groups,
446 occurence_dict=occurence_dict)
454 occurence_dict=occurence_dict)
447 users = set([p.user for p in ResourceService.users_for_perm(application, 'view')])
455 users = set([p.user for p in ResourceService.users_for_perm(application, 'view')])
448 report_groups = report_groups.all()
456 report_groups = report_groups.all()
449 for user in users:
457 for user in users:
450 UserService.report_notify(user, request, application,
458 UserService.report_notify(user, request, application,
451 report_groups=report_groups,
459 report_groups=report_groups,
452 occurence_dict=occurence_dict)
460 occurence_dict=occurence_dict)
453 for group in report_groups:
461 for group in report_groups:
454 # marks report_groups as notified
462 # marks report_groups as notified
455 if not group.notified:
463 if not group.notified:
456 group.notified = True
464 group.notified = True
457 except Exception as exc:
465 except Exception as exc:
458 print_traceback(log)
466 print_traceback(log)
459 raise
467 raise
460
468
461
469
462 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
470 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
463 def check_alerts(resource_id):
471 def check_alerts(resource_id):
464 since_when = datetime.utcnow()
472 since_when = datetime.utcnow()
465 try:
473 try:
466 request = get_current_request()
474 request = get_current_request()
467 application = ApplicationService.by_id(resource_id)
475 application = ApplicationService.by_id(resource_id)
468 if not application:
476 if not application:
469 return
477 return
470 error_key = REDIS_KEYS[
478 error_key = REDIS_KEYS[
471 'reports_to_notify_per_type_per_app_alerting'].format(
479 'reports_to_notify_per_type_per_app_alerting'].format(
472 ReportType.error, resource_id)
480 ReportType.error, resource_id)
473 slow_key = REDIS_KEYS[
481 slow_key = REDIS_KEYS[
474 'reports_to_notify_per_type_per_app_alerting'].format(
482 'reports_to_notify_per_type_per_app_alerting'].format(
475 ReportType.slow, resource_id)
483 ReportType.slow, resource_id)
476 error_group_ids = Datastores.redis.smembers(error_key)
484 error_group_ids = Datastores.redis.smembers(error_key)
477 slow_group_ids = Datastores.redis.smembers(slow_key)
485 slow_group_ids = Datastores.redis.smembers(slow_key)
478 Datastores.redis.delete(error_key)
486 Datastores.redis.delete(error_key)
479 Datastores.redis.delete(slow_key)
487 Datastores.redis.delete(slow_key)
480 err_gids = [int(g_id) for g_id in error_group_ids]
488 err_gids = [int(g_id) for g_id in error_group_ids]
481 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
489 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
482 group_ids = err_gids + slow_gids
490 group_ids = err_gids + slow_gids
483 occurence_dict = {}
491 occurence_dict = {}
484 for g_id in group_ids:
492 for g_id in group_ids:
485 key = REDIS_KEYS['counters'][
493 key = REDIS_KEYS['counters'][
486 'report_group_occurences_alerting'].format(
494 'report_group_occurences_alerting'].format(
487 g_id)
495 g_id)
488 val = Datastores.redis.get(key)
496 val = Datastores.redis.get(key)
489 Datastores.redis.delete(key)
497 Datastores.redis.delete(key)
490 if val:
498 if val:
491 occurence_dict[g_id] = int(val)
499 occurence_dict[g_id] = int(val)
492 else:
500 else:
493 occurence_dict[g_id] = 1
501 occurence_dict[g_id] = 1
494 report_groups = ReportGroupService.by_ids(group_ids)
502 report_groups = ReportGroupService.by_ids(group_ids)
495 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
503 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
496
504
497 ApplicationService.check_for_groups_alert(
505 ApplicationService.check_for_groups_alert(
498 application, 'alert', report_groups=report_groups,
506 application, 'alert', report_groups=report_groups,
499 occurence_dict=occurence_dict, since_when=since_when)
507 occurence_dict=occurence_dict, since_when=since_when)
500 except Exception as exc:
508 except Exception as exc:
501 print_traceback(log)
509 print_traceback(log)
502 raise
510 raise
503
511
504
512
505 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
513 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
506 def close_alerts():
514 def close_alerts():
507 log.warning('Checking alerts')
515 log.warning('Checking alerts')
508 since_when = datetime.utcnow()
516 since_when = datetime.utcnow()
509 try:
517 try:
510 event_types = [Event.types['error_report_alert'],
518 event_types = [Event.types['error_report_alert'],
511 Event.types['slow_report_alert'], ]
519 Event.types['slow_report_alert'], ]
512 statuses = [Event.statuses['active']]
520 statuses = [Event.statuses['active']]
513 # get events older than 5 min
521 # get events older than 5 min
514 events = EventService.by_type_and_status(
522 events = EventService.by_type_and_status(
515 event_types,
523 event_types,
516 statuses,
524 statuses,
517 older_than=(since_when - timedelta(minutes=5)))
525 older_than=(since_when - timedelta(minutes=5)))
518 for event in events:
526 for event in events:
519 # see if we can close them
527 # see if we can close them
520 event.validate_or_close(
528 event.validate_or_close(
521 since_when=(since_when - timedelta(minutes=1)))
529 since_when=(since_when - timedelta(minutes=1)))
522 except Exception as exc:
530 except Exception as exc:
523 print_traceback(log)
531 print_traceback(log)
524 raise
532 raise
525
533
526
534
527 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
535 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
528 def update_tag_counter(tag_name, tag_value, count):
536 def update_tag_counter(tag_name, tag_value, count):
529 try:
537 try:
530 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
538 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
531 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
539 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
532 sa.types.TEXT))
540 sa.types.TEXT))
533 query.update({'times_seen': Tag.times_seen + count,
541 query.update({'times_seen': Tag.times_seen + count,
534 'last_timestamp': datetime.utcnow()},
542 'last_timestamp': datetime.utcnow()},
535 synchronize_session=False)
543 synchronize_session=False)
536 session = DBSession()
544 session = DBSession()
537 mark_changed(session)
545 mark_changed(session)
538 return True
546 return True
539 except Exception as exc:
547 except Exception as exc:
540 print_traceback(log)
548 print_traceback(log)
549 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
550 raise
541 update_tag_counter.retry(exc=exc)
551 update_tag_counter.retry(exc=exc)
542
552
543
553
544 @celery.task(queue="default")
554 @celery.task(queue="default")
545 def update_tag_counters():
555 def update_tag_counters():
546 """
556 """
547 Sets task to update counters for application tags
557 Sets task to update counters for application tags
548 """
558 """
549 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
559 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
550 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
560 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
551 c = collections.Counter(tags)
561 c = collections.Counter(tags)
552 for t_json, count in c.items():
562 for t_json, count in c.items():
553 tag_info = json.loads(t_json)
563 tag_info = json.loads(t_json)
554 update_tag_counter.delay(tag_info[0], tag_info[1], count)
564 update_tag_counter.delay(tag_info[0], tag_info[1], count)
555
565
556
566
557 @celery.task(queue="default")
567 @celery.task(queue="default")
558 def daily_digest():
568 def daily_digest():
559 """
569 """
560 Sends daily digest with top 50 error reports
570 Sends daily digest with top 50 error reports
561 """
571 """
562 request = get_current_request()
572 request = get_current_request()
563 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
573 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
564 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
574 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
565 since_when = datetime.utcnow() - timedelta(hours=8)
575 since_when = datetime.utcnow() - timedelta(hours=8)
566 log.warning('Generating daily digests')
576 log.warning('Generating daily digests')
567 for resource_id in apps:
577 for resource_id in apps:
568 resource_id = resource_id.decode('utf8')
578 resource_id = resource_id.decode('utf8')
569 end_date = datetime.utcnow().replace(microsecond=0, second=0)
579 end_date = datetime.utcnow().replace(microsecond=0, second=0)
570 filter_settings = {'resource': [resource_id],
580 filter_settings = {'resource': [resource_id],
571 'tags': [{'name': 'type',
581 'tags': [{'name': 'type',
572 'value': ['error'], 'op': None}],
582 'value': ['error'], 'op': None}],
573 'type': 'error', 'start_date': since_when,
583 'type': 'error', 'start_date': since_when,
574 'end_date': end_date}
584 'end_date': end_date}
575
585
576 reports = ReportGroupService.get_trending(
586 reports = ReportGroupService.get_trending(
577 request, filter_settings=filter_settings, limit=50)
587 request, filter_settings=filter_settings, limit=50)
578
588
579 application = ApplicationService.by_id(resource_id)
589 application = ApplicationService.by_id(resource_id)
580 if application:
590 if application:
581 users = set([p.user for p in ResourceService.users_for_perm(application, 'view')])
591 users = set([p.user for p in ResourceService.users_for_perm(application, 'view')])
582 for user in users:
592 for user in users:
583 user.send_digest(request, application, reports=reports,
593 user.send_digest(request, application, reports=reports,
584 since_when=since_when)
594 since_when=since_when)
585
595
586
596
587 @celery.task(queue="default")
597 @celery.task(queue="default")
588 def notifications_reports():
598 def notifications_reports():
589 """
599 """
590 Loop that checks redis for info and then issues new tasks to celery to
600 Loop that checks redis for info and then issues new tasks to celery to
591 issue notifications
601 issue notifications
592 """
602 """
593 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
603 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
594 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
604 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
595 for app in apps:
605 for app in apps:
596 log.warning('Notify for app: %s' % app)
606 log.warning('Notify for app: %s' % app)
597 check_user_report_notifications.delay(app.decode('utf8'))
607 check_user_report_notifications.delay(app.decode('utf8'))
598
608
599 @celery.task(queue="default")
609 @celery.task(queue="default")
600 def alerting_reports():
610 def alerting_reports():
601 """
611 """
602 Loop that checks redis for info and then issues new tasks to celery to
612 Loop that checks redis for info and then issues new tasks to celery to
603 perform the following:
613 perform the following:
604 - which applications should have new alerts opened
614 - which applications should have new alerts opened
605 """
615 """
606
616
607 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
617 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
608 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
618 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
609 for app in apps:
619 for app in apps:
610 log.warning('Notify for app: %s' % app)
620 log.warning('Notify for app: %s' % app)
611 check_alerts.delay(app.decode('utf8'))
621 check_alerts.delay(app.decode('utf8'))
612
622
613
623
614 @celery.task(queue="default", soft_time_limit=3600 * 4,
624 @celery.task(queue="default", soft_time_limit=3600 * 4,
615 hard_time_limit=3600 * 4, max_retries=144)
625 hard_time_limit=3600 * 4, max_retries=144)
616 def logs_cleanup(resource_id, filter_settings):
626 def logs_cleanup(resource_id, filter_settings):
617 request = get_current_request()
627 request = get_current_request()
618 request.tm.begin()
628 request.tm.begin()
619 es_query = {
629 es_query = {
620 "_source": False,
630 "_source": False,
621 "size": 5000,
631 "size": 5000,
622 "query": {
632 "query": {
623 "filtered": {
633 "filtered": {
624 "filter": {
634 "filter": {
625 "and": [{"term": {"resource_id": resource_id}}]
635 "and": [{"term": {"resource_id": resource_id}}]
626 }
636 }
627 }
637 }
628 }
638 }
629 }
639 }
630
640
631 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
641 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
632 if filter_settings['namespace']:
642 if filter_settings['namespace']:
633 query = query.filter(Log.namespace == filter_settings['namespace'][0])
643 query = query.filter(Log.namespace == filter_settings['namespace'][0])
634 es_query['query']['filtered']['filter']['and'].append(
644 es_query['query']['filtered']['filter']['and'].append(
635 {"term": {"namespace": filter_settings['namespace'][0]}}
645 {"term": {"namespace": filter_settings['namespace'][0]}}
636 )
646 )
637 query.delete(synchronize_session=False)
647 query.delete(synchronize_session=False)
638 request.tm.commit()
648 request.tm.commit()
639 result = request.es_conn.search(es_query, index='rcae_l_*',
649 result = request.es_conn.search(es_query, index='rcae_l_*',
640 doc_type='log', es_scroll='1m',
650 doc_type='log', es_scroll='1m',
641 es_search_type='scan')
651 es_search_type='scan')
642 scroll_id = result['_scroll_id']
652 scroll_id = result['_scroll_id']
643 while True:
653 while True:
644 log.warning('log_cleanup, app:{} ns:{} batch'.format(
654 log.warning('log_cleanup, app:{} ns:{} batch'.format(
645 resource_id,
655 resource_id,
646 filter_settings['namespace']
656 filter_settings['namespace']
647 ))
657 ))
648 es_docs_to_delete = []
658 es_docs_to_delete = []
649 result = request.es_conn.send_request(
659 result = request.es_conn.send_request(
650 'POST', ['_search', 'scroll'],
660 'POST', ['_search', 'scroll'],
651 body=scroll_id, query_params={"scroll": '1m'})
661 body=scroll_id, query_params={"scroll": '1m'})
652 scroll_id = result['_scroll_id']
662 scroll_id = result['_scroll_id']
653 if not result['hits']['hits']:
663 if not result['hits']['hits']:
654 break
664 break
655 for doc in result['hits']['hits']:
665 for doc in result['hits']['hits']:
656 es_docs_to_delete.append({"id": doc['_id'],
666 es_docs_to_delete.append({"id": doc['_id'],
657 "index": doc['_index']})
667 "index": doc['_index']})
658
668
659 for batch in in_batches(es_docs_to_delete, 10):
669 for batch in in_batches(es_docs_to_delete, 10):
660 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
670 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
661 **to_del)
671 **to_del)
662 for to_del in batch])
672 for to_del in batch])
General Comments 0
You need to be logged in to leave comments. Login now