##// END OF EJS Templates
tasks: change retry amount
ergo -
Show More
@@ -1,634 +1,634 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # AppEnlight Enterprise Edition, including its added features, Support
18 # AppEnlight Enterprise Edition, including its added features, Support
19 # services, and proprietary license terms, please see
19 # services, and proprietary license terms, please see
20 # https://rhodecode.com/licenses/
20 # https://rhodecode.com/licenses/
21
21
22 import bisect
22 import bisect
23 import collections
23 import collections
24 import math
24 import math
25 from datetime import datetime, timedelta
25 from datetime import datetime, timedelta
26
26
27 import sqlalchemy as sa
27 import sqlalchemy as sa
28 import pyelasticsearch
28 import pyelasticsearch
29
29
30 from celery.utils.log import get_task_logger
30 from celery.utils.log import get_task_logger
31 from zope.sqlalchemy import mark_changed
31 from zope.sqlalchemy import mark_changed
32 from pyramid.threadlocal import get_current_request, get_current_registry
32 from pyramid.threadlocal import get_current_request, get_current_registry
33 from appenlight.celery import celery
33 from appenlight.celery import celery
34 from appenlight.models.report_group import ReportGroup
34 from appenlight.models.report_group import ReportGroup
35 from appenlight.models import DBSession, Datastores
35 from appenlight.models import DBSession, Datastores
36 from appenlight.models.report import Report
36 from appenlight.models.report import Report
37 from appenlight.models.log import Log
37 from appenlight.models.log import Log
38 from appenlight.models.request_metric import Metric
38 from appenlight.models.request_metric import Metric
39 from appenlight.models.event import Event
39 from appenlight.models.event import Event
40
40
41 from appenlight.models.services.application import ApplicationService
41 from appenlight.models.services.application import ApplicationService
42 from appenlight.models.services.event import EventService
42 from appenlight.models.services.event import EventService
43 from appenlight.models.services.log import LogService
43 from appenlight.models.services.log import LogService
44 from appenlight.models.services.report import ReportService
44 from appenlight.models.services.report import ReportService
45 from appenlight.models.services.report_group import ReportGroupService
45 from appenlight.models.services.report_group import ReportGroupService
46 from appenlight.models.services.user import UserService
46 from appenlight.models.services.user import UserService
47 from appenlight.models.tag import Tag
47 from appenlight.models.tag import Tag
48 from appenlight.lib import print_traceback
48 from appenlight.lib import print_traceback
49 from appenlight.lib.utils import parse_proto, in_batches
49 from appenlight.lib.utils import parse_proto, in_batches
50 from appenlight.lib.ext_json import json
50 from appenlight.lib.ext_json import json
51 from appenlight.lib.redis_keys import REDIS_KEYS
51 from appenlight.lib.redis_keys import REDIS_KEYS
52 from appenlight.lib.enums import ReportType
52 from appenlight.lib.enums import ReportType
53
53
54 log = get_task_logger(__name__)
54 log = get_task_logger(__name__)
55
55
56 sample_boundries = list(range(100, 1000, 100)) + \
56 sample_boundries = list(range(100, 1000, 100)) + \
57 list(range(1000, 10000, 1000)) + \
57 list(range(1000, 10000, 1000)) + \
58 list(range(10000, 100000, 5000))
58 list(range(10000, 100000, 5000))
59
59
60
60
61 def pick_sample(total_occurences, report_type=None):
61 def pick_sample(total_occurences, report_type=None):
62 every = 1.0
62 every = 1.0
63 position = bisect.bisect_left(sample_boundries, total_occurences)
63 position = bisect.bisect_left(sample_boundries, total_occurences)
64 if position > 0:
64 if position > 0:
65 if report_type == ReportType.not_found:
65 if report_type == ReportType.not_found:
66 divide = 10.0
66 divide = 10.0
67 else:
67 else:
68 divide = 100.0
68 divide = 100.0
69 every = sample_boundries[position - 1] / divide
69 every = sample_boundries[position - 1] / divide
70 return total_occurences % every == 0
70 return total_occurences % every == 0
71
71
72
72
73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
74 def test_exception_task():
74 def test_exception_task():
75 log.error('test celery log', extra={'location': 'celery'})
75 log.error('test celery log', extra={'location': 'celery'})
76 log.warning('test celery log', extra={'location': 'celery'})
76 log.warning('test celery log', extra={'location': 'celery'})
77 raise Exception('Celery exception test')
77 raise Exception('Celery exception test')
78
78
79
79
80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
81 def test_retry_exception_task():
81 def test_retry_exception_task():
82 try:
82 try:
83 import time
83 import time
84
84
85 time.sleep(1.3)
85 time.sleep(1.3)
86 log.error('test retry celery log', extra={'location': 'celery'})
86 log.error('test retry celery log', extra={'location': 'celery'})
87 log.warning('test retry celery log', extra={'location': 'celery'})
87 log.warning('test retry celery log', extra={'location': 'celery'})
88 raise Exception('Celery exception test')
88 raise Exception('Celery exception test')
89 except Exception as exc:
89 except Exception as exc:
90 test_retry_exception_task.retry(exc=exc)
90 test_retry_exception_task.retry(exc=exc)
91
91
92
92
93 @celery.task(queue="reports", default_retry_delay=600, max_retries=999)
93 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
94 def add_reports(resource_id, params, dataset, environ=None, **kwargs):
94 def add_reports(resource_id, params, dataset, environ=None, **kwargs):
95 proto_version = parse_proto(params.get('protocol_version', ''))
95 proto_version = parse_proto(params.get('protocol_version', ''))
96 current_time = datetime.utcnow().replace(second=0, microsecond=0)
96 current_time = datetime.utcnow().replace(second=0, microsecond=0)
97 try:
97 try:
98 # we will store solr docs here for single insert
98 # we will store solr docs here for single insert
99 es_report_docs = {}
99 es_report_docs = {}
100 es_report_group_docs = {}
100 es_report_group_docs = {}
101 resource = ApplicationService.by_id(resource_id)
101 resource = ApplicationService.by_id(resource_id)
102
102
103 tags = []
103 tags = []
104 es_slow_calls_docs = {}
104 es_slow_calls_docs = {}
105 es_reports_stats_rows = {}
105 es_reports_stats_rows = {}
106 for report_data in dataset:
106 for report_data in dataset:
107 # build report details for later
107 # build report details for later
108 added_details = 0
108 added_details = 0
109 report = Report()
109 report = Report()
110 report.set_data(report_data, resource, proto_version)
110 report.set_data(report_data, resource, proto_version)
111 report._skip_ft_index = True
111 report._skip_ft_index = True
112
112
113 report_group = ReportGroupService.by_hash_and_resource(
113 report_group = ReportGroupService.by_hash_and_resource(
114 report.resource_id,
114 report.resource_id,
115 report.grouping_hash
115 report.grouping_hash
116 )
116 )
117 occurences = report_data.get('occurences', 1)
117 occurences = report_data.get('occurences', 1)
118 if not report_group:
118 if not report_group:
119 # total reports will be +1 moment later
119 # total reports will be +1 moment later
120 report_group = ReportGroup(grouping_hash=report.grouping_hash,
120 report_group = ReportGroup(grouping_hash=report.grouping_hash,
121 occurences=0, total_reports=0,
121 occurences=0, total_reports=0,
122 last_report=0,
122 last_report=0,
123 priority=report.priority,
123 priority=report.priority,
124 error=report.error,
124 error=report.error,
125 first_timestamp=report.start_time)
125 first_timestamp=report.start_time)
126 report_group._skip_ft_index = True
126 report_group._skip_ft_index = True
127 report_group.report_type = report.report_type
127 report_group.report_type = report.report_type
128 report.report_group_time = report_group.first_timestamp
128 report.report_group_time = report_group.first_timestamp
129 add_sample = pick_sample(report_group.occurences,
129 add_sample = pick_sample(report_group.occurences,
130 report_type=report_group.report_type)
130 report_type=report_group.report_type)
131 if add_sample:
131 if add_sample:
132 resource.report_groups.append(report_group)
132 resource.report_groups.append(report_group)
133 report_group.reports.append(report)
133 report_group.reports.append(report)
134 added_details += 1
134 added_details += 1
135 DBSession.flush()
135 DBSession.flush()
136 if report.partition_id not in es_report_docs:
136 if report.partition_id not in es_report_docs:
137 es_report_docs[report.partition_id] = []
137 es_report_docs[report.partition_id] = []
138 es_report_docs[report.partition_id].append(report.es_doc())
138 es_report_docs[report.partition_id].append(report.es_doc())
139 tags.extend(list(report.tags.items()))
139 tags.extend(list(report.tags.items()))
140 slow_calls = report.add_slow_calls(report_data, report_group)
140 slow_calls = report.add_slow_calls(report_data, report_group)
141 DBSession.flush()
141 DBSession.flush()
142 for s_call in slow_calls:
142 for s_call in slow_calls:
143 if s_call.partition_id not in es_slow_calls_docs:
143 if s_call.partition_id not in es_slow_calls_docs:
144 es_slow_calls_docs[s_call.partition_id] = []
144 es_slow_calls_docs[s_call.partition_id] = []
145 es_slow_calls_docs[s_call.partition_id].append(
145 es_slow_calls_docs[s_call.partition_id].append(
146 s_call.es_doc())
146 s_call.es_doc())
147 # try generating new stat rows if needed
147 # try generating new stat rows if needed
148 else:
148 else:
149 # required for postprocessing to not fail later
149 # required for postprocessing to not fail later
150 report.report_group = report_group
150 report.report_group = report_group
151
151
152 stat_row = ReportService.generate_stat_rows(
152 stat_row = ReportService.generate_stat_rows(
153 report, resource, report_group)
153 report, resource, report_group)
154 if stat_row.partition_id not in es_reports_stats_rows:
154 if stat_row.partition_id not in es_reports_stats_rows:
155 es_reports_stats_rows[stat_row.partition_id] = []
155 es_reports_stats_rows[stat_row.partition_id] = []
156 es_reports_stats_rows[stat_row.partition_id].append(
156 es_reports_stats_rows[stat_row.partition_id].append(
157 stat_row.es_doc())
157 stat_row.es_doc())
158
158
159 # see if we should mark 10th occurence of report
159 # see if we should mark 10th occurence of report
160 last_occurences_10 = int(math.floor(report_group.occurences / 10))
160 last_occurences_10 = int(math.floor(report_group.occurences / 10))
161 curr_occurences_10 = int(math.floor(
161 curr_occurences_10 = int(math.floor(
162 (report_group.occurences + report.occurences) / 10))
162 (report_group.occurences + report.occurences) / 10))
163 last_occurences_100 = int(
163 last_occurences_100 = int(
164 math.floor(report_group.occurences / 100))
164 math.floor(report_group.occurences / 100))
165 curr_occurences_100 = int(math.floor(
165 curr_occurences_100 = int(math.floor(
166 (report_group.occurences + report.occurences) / 100))
166 (report_group.occurences + report.occurences) / 100))
167 notify_occurences_10 = last_occurences_10 != curr_occurences_10
167 notify_occurences_10 = last_occurences_10 != curr_occurences_10
168 notify_occurences_100 = last_occurences_100 != curr_occurences_100
168 notify_occurences_100 = last_occurences_100 != curr_occurences_100
169 report_group.occurences = ReportGroup.occurences + occurences
169 report_group.occurences = ReportGroup.occurences + occurences
170 report_group.last_timestamp = report.start_time
170 report_group.last_timestamp = report.start_time
171 report_group.summed_duration = ReportGroup.summed_duration + report.duration
171 report_group.summed_duration = ReportGroup.summed_duration + report.duration
172 summed_duration = ReportGroup.summed_duration + report.duration
172 summed_duration = ReportGroup.summed_duration + report.duration
173 summed_occurences = ReportGroup.occurences + occurences
173 summed_occurences = ReportGroup.occurences + occurences
174 report_group.average_duration = summed_duration / summed_occurences
174 report_group.average_duration = summed_duration / summed_occurences
175 report_group.run_postprocessing(report)
175 report_group.run_postprocessing(report)
176 if added_details:
176 if added_details:
177 report_group.total_reports = ReportGroup.total_reports + 1
177 report_group.total_reports = ReportGroup.total_reports + 1
178 report_group.last_report = report.id
178 report_group.last_report = report.id
179 report_group.set_notification_info(notify_10=notify_occurences_10,
179 report_group.set_notification_info(notify_10=notify_occurences_10,
180 notify_100=notify_occurences_100)
180 notify_100=notify_occurences_100)
181 DBSession.flush()
181 DBSession.flush()
182 report_group.get_report().notify_channel(report_group)
182 report_group.get_report().notify_channel(report_group)
183 if report_group.partition_id not in es_report_group_docs:
183 if report_group.partition_id not in es_report_group_docs:
184 es_report_group_docs[report_group.partition_id] = []
184 es_report_group_docs[report_group.partition_id] = []
185 es_report_group_docs[report_group.partition_id].append(
185 es_report_group_docs[report_group.partition_id].append(
186 report_group.es_doc())
186 report_group.es_doc())
187
187
188 action = 'REPORT'
188 action = 'REPORT'
189 log_msg = '%s: %s %s, client: %s, proto: %s' % (
189 log_msg = '%s: %s %s, client: %s, proto: %s' % (
190 action,
190 action,
191 report_data.get('http_status', 'unknown'),
191 report_data.get('http_status', 'unknown'),
192 str(resource),
192 str(resource),
193 report_data.get('client'),
193 report_data.get('client'),
194 proto_version)
194 proto_version)
195 log.info(log_msg)
195 log.info(log_msg)
196 total_reports = len(dataset)
196 total_reports = len(dataset)
197 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
197 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
198 Datastores.redis.incr(key, total_reports)
198 Datastores.redis.incr(key, total_reports)
199 Datastores.redis.expire(key, 3600 * 24)
199 Datastores.redis.expire(key, 3600 * 24)
200 key = REDIS_KEYS['counters']['reports_per_minute_per_app'].format(
200 key = REDIS_KEYS['counters']['reports_per_minute_per_app'].format(
201 resource_id, current_time)
201 resource_id, current_time)
202 Datastores.redis.incr(key, total_reports)
202 Datastores.redis.incr(key, total_reports)
203 Datastores.redis.expire(key, 3600 * 24)
203 Datastores.redis.expire(key, 3600 * 24)
204
204
205 add_reports_es(es_report_group_docs, es_report_docs)
205 add_reports_es(es_report_group_docs, es_report_docs)
206 add_reports_slow_calls_es(es_slow_calls_docs)
206 add_reports_slow_calls_es(es_slow_calls_docs)
207 add_reports_stats_rows_es(es_reports_stats_rows)
207 add_reports_stats_rows_es(es_reports_stats_rows)
208 return True
208 return True
209 except Exception as exc:
209 except Exception as exc:
210 print_traceback(log)
210 print_traceback(log)
211 add_reports.retry(exc=exc)
211 add_reports.retry(exc=exc)
212
212
213
213
214 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
214 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
215 def add_reports_es(report_group_docs, report_docs):
215 def add_reports_es(report_group_docs, report_docs):
216 for k, v in report_group_docs.items():
216 for k, v in report_group_docs.items():
217 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
217 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
218 for k, v in report_docs.items():
218 for k, v in report_docs.items():
219 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
219 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
220 parent_field='_parent')
220 parent_field='_parent')
221
221
222
222
223 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
223 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
224 def add_reports_slow_calls_es(es_docs):
224 def add_reports_slow_calls_es(es_docs):
225 for k, v in es_docs.items():
225 for k, v in es_docs.items():
226 Datastores.es.bulk_index(k, 'log', v)
226 Datastores.es.bulk_index(k, 'log', v)
227
227
228
228
229 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
229 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
230 def add_reports_stats_rows_es(es_docs):
230 def add_reports_stats_rows_es(es_docs):
231 for k, v in es_docs.items():
231 for k, v in es_docs.items():
232 Datastores.es.bulk_index(k, 'log', v)
232 Datastores.es.bulk_index(k, 'log', v)
233
233
234
234
235 @celery.task(queue="logs", default_retry_delay=600, max_retries=999)
235 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
236 def add_logs(resource_id, request, dataset, environ=None, **kwargs):
236 def add_logs(resource_id, request, dataset, environ=None, **kwargs):
237 proto_version = request.get('protocol_version')
237 proto_version = request.get('protocol_version')
238 current_time = datetime.utcnow().replace(second=0, microsecond=0)
238 current_time = datetime.utcnow().replace(second=0, microsecond=0)
239
239
240 try:
240 try:
241 es_docs = collections.defaultdict(list)
241 es_docs = collections.defaultdict(list)
242 application = ApplicationService.by_id(resource_id)
242 application = ApplicationService.by_id(resource_id)
243 ns_pairs = []
243 ns_pairs = []
244 for entry in dataset:
244 for entry in dataset:
245 # gather pk and ns so we can remove older versions of row later
245 # gather pk and ns so we can remove older versions of row later
246 if entry['primary_key'] is not None:
246 if entry['primary_key'] is not None:
247 ns_pairs.append({"pk": entry['primary_key'],
247 ns_pairs.append({"pk": entry['primary_key'],
248 "ns": entry['namespace']})
248 "ns": entry['namespace']})
249 log_entry = Log()
249 log_entry = Log()
250 log_entry.set_data(entry, resource=application)
250 log_entry.set_data(entry, resource=application)
251 log_entry._skip_ft_index = True
251 log_entry._skip_ft_index = True
252 application.logs.append(log_entry)
252 application.logs.append(log_entry)
253 DBSession.flush()
253 DBSession.flush()
254 # insert non pk rows first
254 # insert non pk rows first
255 if entry['primary_key'] is None:
255 if entry['primary_key'] is None:
256 es_docs[log_entry.partition_id].append(log_entry.es_doc())
256 es_docs[log_entry.partition_id].append(log_entry.es_doc())
257
257
258 # 2nd pass to delete all log entries from db foe same pk/ns pair
258 # 2nd pass to delete all log entries from db foe same pk/ns pair
259 if ns_pairs:
259 if ns_pairs:
260 ids_to_delete = []
260 ids_to_delete = []
261 es_docs = collections.defaultdict(list)
261 es_docs = collections.defaultdict(list)
262 es_docs_to_delete = collections.defaultdict(list)
262 es_docs_to_delete = collections.defaultdict(list)
263 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
263 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
264 list_of_pairs=ns_pairs)
264 list_of_pairs=ns_pairs)
265 log_dict = {}
265 log_dict = {}
266 for log_entry in found_pkey_logs:
266 for log_entry in found_pkey_logs:
267 log_key = (log_entry.primary_key, log_entry.namespace)
267 log_key = (log_entry.primary_key, log_entry.namespace)
268 if log_key not in log_dict:
268 if log_key not in log_dict:
269 log_dict[log_key] = []
269 log_dict[log_key] = []
270 log_dict[log_key].append(log_entry)
270 log_dict[log_key].append(log_entry)
271
271
272 for ns, entry_list in log_dict.items():
272 for ns, entry_list in log_dict.items():
273 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
273 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
274 # newest row needs to be indexed in es
274 # newest row needs to be indexed in es
275 log_entry = entry_list[-1]
275 log_entry = entry_list[-1]
276 # delete everything from pg and ES, leave the last row in pg
276 # delete everything from pg and ES, leave the last row in pg
277 for e in entry_list[:-1]:
277 for e in entry_list[:-1]:
278 ids_to_delete.append(e.log_id)
278 ids_to_delete.append(e.log_id)
279 es_docs_to_delete[e.partition_id].append(e.delete_hash)
279 es_docs_to_delete[e.partition_id].append(e.delete_hash)
280
280
281 es_docs_to_delete[log_entry.partition_id].append(
281 es_docs_to_delete[log_entry.partition_id].append(
282 log_entry.delete_hash)
282 log_entry.delete_hash)
283
283
284 es_docs[log_entry.partition_id].append(log_entry.es_doc())
284 es_docs[log_entry.partition_id].append(log_entry.es_doc())
285
285
286 if ids_to_delete:
286 if ids_to_delete:
287 query = DBSession.query(Log).filter(
287 query = DBSession.query(Log).filter(
288 Log.log_id.in_(ids_to_delete))
288 Log.log_id.in_(ids_to_delete))
289 query.delete(synchronize_session=False)
289 query.delete(synchronize_session=False)
290 if es_docs_to_delete:
290 if es_docs_to_delete:
291 # batch this to avoid problems with default ES bulk limits
291 # batch this to avoid problems with default ES bulk limits
292 for es_index in es_docs_to_delete.keys():
292 for es_index in es_docs_to_delete.keys():
293 for batch in in_batches(es_docs_to_delete[es_index], 20):
293 for batch in in_batches(es_docs_to_delete[es_index], 20):
294 query = {'terms': {'delete_hash': batch}}
294 query = {'terms': {'delete_hash': batch}}
295
295
296 try:
296 try:
297 Datastores.es.delete_by_query(
297 Datastores.es.delete_by_query(
298 es_index, 'log', query)
298 es_index, 'log', query)
299 except pyelasticsearch.ElasticHttpNotFoundError as exc:
299 except pyelasticsearch.ElasticHttpNotFoundError as exc:
300 log.error(exc)
300 log.error(exc)
301
301
302 total_logs = len(dataset)
302 total_logs = len(dataset)
303
303
304 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
304 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
305 str(application),
305 str(application),
306 total_logs,
306 total_logs,
307 proto_version)
307 proto_version)
308 log.info(log_msg)
308 log.info(log_msg)
309 # mark_changed(session)
309 # mark_changed(session)
310 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
310 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
311 Datastores.redis.incr(key, total_logs)
311 Datastores.redis.incr(key, total_logs)
312 Datastores.redis.expire(key, 3600 * 24)
312 Datastores.redis.expire(key, 3600 * 24)
313 key = REDIS_KEYS['counters']['logs_per_minute_per_app'].format(
313 key = REDIS_KEYS['counters']['logs_per_minute_per_app'].format(
314 resource_id, current_time)
314 resource_id, current_time)
315 Datastores.redis.incr(key, total_logs)
315 Datastores.redis.incr(key, total_logs)
316 Datastores.redis.expire(key, 3600 * 24)
316 Datastores.redis.expire(key, 3600 * 24)
317 add_logs_es(es_docs)
317 add_logs_es(es_docs)
318 return True
318 return True
319 except Exception as exc:
319 except Exception as exc:
320 print_traceback(log)
320 print_traceback(log)
321 add_logs.retry(exc=exc)
321 add_logs.retry(exc=exc)
322
322
323
323
324 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
324 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
325 def add_logs_es(es_docs):
325 def add_logs_es(es_docs):
326 for k, v in es_docs.items():
326 for k, v in es_docs.items():
327 Datastores.es.bulk_index(k, 'log', v)
327 Datastores.es.bulk_index(k, 'log', v)
328
328
329
329
330 @celery.task(queue="metrics", default_retry_delay=600, max_retries=999)
330 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
331 def add_metrics(resource_id, request, dataset, proto_version):
331 def add_metrics(resource_id, request, dataset, proto_version):
332 current_time = datetime.utcnow().replace(second=0, microsecond=0)
332 current_time = datetime.utcnow().replace(second=0, microsecond=0)
333 try:
333 try:
334 application = ApplicationService.by_id_cached()(resource_id)
334 application = ApplicationService.by_id_cached()(resource_id)
335 application = DBSession.merge(application, load=False)
335 application = DBSession.merge(application, load=False)
336 es_docs = []
336 es_docs = []
337 rows = []
337 rows = []
338 for metric in dataset:
338 for metric in dataset:
339 tags = dict(metric['tags'])
339 tags = dict(metric['tags'])
340 server_n = tags.get('server_name', metric['server_name']).lower()
340 server_n = tags.get('server_name', metric['server_name']).lower()
341 tags['server_name'] = server_n or 'unknown'
341 tags['server_name'] = server_n or 'unknown'
342 new_metric = Metric(
342 new_metric = Metric(
343 timestamp=metric['timestamp'],
343 timestamp=metric['timestamp'],
344 resource_id=application.resource_id,
344 resource_id=application.resource_id,
345 namespace=metric['namespace'],
345 namespace=metric['namespace'],
346 tags=tags)
346 tags=tags)
347 rows.append(new_metric)
347 rows.append(new_metric)
348 es_docs.append(new_metric.es_doc())
348 es_docs.append(new_metric.es_doc())
349 session = DBSession()
349 session = DBSession()
350 session.bulk_save_objects(rows)
350 session.bulk_save_objects(rows)
351 session.flush()
351 session.flush()
352
352
353 action = 'METRICS'
353 action = 'METRICS'
354 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
354 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
355 action,
355 action,
356 str(application),
356 str(application),
357 len(dataset),
357 len(dataset),
358 proto_version
358 proto_version
359 )
359 )
360 log.info(metrics_msg)
360 log.info(metrics_msg)
361
361
362 mark_changed(session)
362 mark_changed(session)
363 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
363 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
364 Datastores.redis.incr(key, len(rows))
364 Datastores.redis.incr(key, len(rows))
365 Datastores.redis.expire(key, 3600 * 24)
365 Datastores.redis.expire(key, 3600 * 24)
366 key = REDIS_KEYS['counters']['metrics_per_minute_per_app'].format(
366 key = REDIS_KEYS['counters']['metrics_per_minute_per_app'].format(
367 resource_id, current_time)
367 resource_id, current_time)
368 Datastores.redis.incr(key, len(rows))
368 Datastores.redis.incr(key, len(rows))
369 Datastores.redis.expire(key, 3600 * 24)
369 Datastores.redis.expire(key, 3600 * 24)
370 add_metrics_es(es_docs)
370 add_metrics_es(es_docs)
371 return True
371 return True
372 except Exception as exc:
372 except Exception as exc:
373 print_traceback(log)
373 print_traceback(log)
374 add_metrics.retry(exc=exc)
374 add_metrics.retry(exc=exc)
375
375
376
376
377 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
377 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
378 def add_metrics_es(es_docs):
378 def add_metrics_es(es_docs):
379 for doc in es_docs:
379 for doc in es_docs:
380 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
380 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
381 Datastores.es.index(partition, 'log', doc)
381 Datastores.es.index(partition, 'log', doc)
382
382
383
383
384 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
384 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
385 def check_user_report_notifications(resource_id):
385 def check_user_report_notifications(resource_id):
386 since_when = datetime.utcnow()
386 since_when = datetime.utcnow()
387 try:
387 try:
388 request = get_current_request()
388 request = get_current_request()
389 application = ApplicationService.by_id(resource_id)
389 application = ApplicationService.by_id(resource_id)
390 if not application:
390 if not application:
391 return
391 return
392 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
392 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
393 ReportType.error, resource_id)
393 ReportType.error, resource_id)
394 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
394 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
395 ReportType.slow, resource_id)
395 ReportType.slow, resource_id)
396 error_group_ids = Datastores.redis.smembers(error_key)
396 error_group_ids = Datastores.redis.smembers(error_key)
397 slow_group_ids = Datastores.redis.smembers(slow_key)
397 slow_group_ids = Datastores.redis.smembers(slow_key)
398 Datastores.redis.delete(error_key)
398 Datastores.redis.delete(error_key)
399 Datastores.redis.delete(slow_key)
399 Datastores.redis.delete(slow_key)
400 err_gids = [int(g_id) for g_id in error_group_ids]
400 err_gids = [int(g_id) for g_id in error_group_ids]
401 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
401 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
402 group_ids = err_gids + slow_gids
402 group_ids = err_gids + slow_gids
403 occurence_dict = {}
403 occurence_dict = {}
404 for g_id in group_ids:
404 for g_id in group_ids:
405 key = REDIS_KEYS['counters']['report_group_occurences'].format(
405 key = REDIS_KEYS['counters']['report_group_occurences'].format(
406 g_id)
406 g_id)
407 val = Datastores.redis.get(key)
407 val = Datastores.redis.get(key)
408 Datastores.redis.delete(key)
408 Datastores.redis.delete(key)
409 if val:
409 if val:
410 occurence_dict[g_id] = int(val)
410 occurence_dict[g_id] = int(val)
411 else:
411 else:
412 occurence_dict[g_id] = 1
412 occurence_dict[g_id] = 1
413 report_groups = ReportGroupService.by_ids(group_ids)
413 report_groups = ReportGroupService.by_ids(group_ids)
414 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
414 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
415
415
416 ApplicationService.check_for_groups_alert(
416 ApplicationService.check_for_groups_alert(
417 application, 'alert', report_groups=report_groups,
417 application, 'alert', report_groups=report_groups,
418 occurence_dict=occurence_dict)
418 occurence_dict=occurence_dict)
419 users = set([p.user for p in application.users_for_perm('view')])
419 users = set([p.user for p in application.users_for_perm('view')])
420 report_groups = report_groups.all()
420 report_groups = report_groups.all()
421 for user in users:
421 for user in users:
422 UserService.report_notify(user, request, application,
422 UserService.report_notify(user, request, application,
423 report_groups=report_groups,
423 report_groups=report_groups,
424 occurence_dict=occurence_dict)
424 occurence_dict=occurence_dict)
425 for group in report_groups:
425 for group in report_groups:
426 # marks report_groups as notified
426 # marks report_groups as notified
427 if not group.notified:
427 if not group.notified:
428 group.notified = True
428 group.notified = True
429 except Exception as exc:
429 except Exception as exc:
430 print_traceback(log)
430 print_traceback(log)
431 raise
431 raise
432
432
433
433
434 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
434 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
435 def check_alerts(resource_id):
435 def check_alerts(resource_id):
436 since_when = datetime.utcnow()
436 since_when = datetime.utcnow()
437 try:
437 try:
438 request = get_current_request()
438 request = get_current_request()
439 application = ApplicationService.by_id(resource_id)
439 application = ApplicationService.by_id(resource_id)
440 if not application:
440 if not application:
441 return
441 return
442 error_key = REDIS_KEYS[
442 error_key = REDIS_KEYS[
443 'reports_to_notify_per_type_per_app_alerting'].format(
443 'reports_to_notify_per_type_per_app_alerting'].format(
444 ReportType.error, resource_id)
444 ReportType.error, resource_id)
445 slow_key = REDIS_KEYS[
445 slow_key = REDIS_KEYS[
446 'reports_to_notify_per_type_per_app_alerting'].format(
446 'reports_to_notify_per_type_per_app_alerting'].format(
447 ReportType.slow, resource_id)
447 ReportType.slow, resource_id)
448 error_group_ids = Datastores.redis.smembers(error_key)
448 error_group_ids = Datastores.redis.smembers(error_key)
449 slow_group_ids = Datastores.redis.smembers(slow_key)
449 slow_group_ids = Datastores.redis.smembers(slow_key)
450 Datastores.redis.delete(error_key)
450 Datastores.redis.delete(error_key)
451 Datastores.redis.delete(slow_key)
451 Datastores.redis.delete(slow_key)
452 err_gids = [int(g_id) for g_id in error_group_ids]
452 err_gids = [int(g_id) for g_id in error_group_ids]
453 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
453 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
454 group_ids = err_gids + slow_gids
454 group_ids = err_gids + slow_gids
455 occurence_dict = {}
455 occurence_dict = {}
456 for g_id in group_ids:
456 for g_id in group_ids:
457 key = REDIS_KEYS['counters'][
457 key = REDIS_KEYS['counters'][
458 'report_group_occurences_alerting'].format(
458 'report_group_occurences_alerting'].format(
459 g_id)
459 g_id)
460 val = Datastores.redis.get(key)
460 val = Datastores.redis.get(key)
461 Datastores.redis.delete(key)
461 Datastores.redis.delete(key)
462 if val:
462 if val:
463 occurence_dict[g_id] = int(val)
463 occurence_dict[g_id] = int(val)
464 else:
464 else:
465 occurence_dict[g_id] = 1
465 occurence_dict[g_id] = 1
466 report_groups = ReportGroupService.by_ids(group_ids)
466 report_groups = ReportGroupService.by_ids(group_ids)
467 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
467 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
468
468
469 ApplicationService.check_for_groups_alert(
469 ApplicationService.check_for_groups_alert(
470 application, 'alert', report_groups=report_groups,
470 application, 'alert', report_groups=report_groups,
471 occurence_dict=occurence_dict, since_when=since_when)
471 occurence_dict=occurence_dict, since_when=since_when)
472 except Exception as exc:
472 except Exception as exc:
473 print_traceback(log)
473 print_traceback(log)
474 raise
474 raise
475
475
476
476
477 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
477 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
478 def close_alerts():
478 def close_alerts():
479 log.warning('Checking alerts')
479 log.warning('Checking alerts')
480 since_when = datetime.utcnow()
480 since_when = datetime.utcnow()
481 try:
481 try:
482 event_types = [Event.types['error_report_alert'],
482 event_types = [Event.types['error_report_alert'],
483 Event.types['slow_report_alert'], ]
483 Event.types['slow_report_alert'], ]
484 statuses = [Event.statuses['active']]
484 statuses = [Event.statuses['active']]
485 # get events older than 5 min
485 # get events older than 5 min
486 events = EventService.by_type_and_status(
486 events = EventService.by_type_and_status(
487 event_types,
487 event_types,
488 statuses,
488 statuses,
489 older_than=(since_when - timedelta(minutes=5)))
489 older_than=(since_when - timedelta(minutes=5)))
490 for event in events:
490 for event in events:
491 # see if we can close them
491 # see if we can close them
492 event.validate_or_close(
492 event.validate_or_close(
493 since_when=(since_when - timedelta(minutes=1)))
493 since_when=(since_when - timedelta(minutes=1)))
494 except Exception as exc:
494 except Exception as exc:
495 print_traceback(log)
495 print_traceback(log)
496 raise
496 raise
497
497
498
498
499 @celery.task(queue="default", default_retry_delay=600, max_retries=999)
499 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
500 def update_tag_counter(tag_name, tag_value, count):
500 def update_tag_counter(tag_name, tag_value, count):
501 try:
501 try:
502 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
502 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
503 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
503 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
504 sa.types.TEXT))
504 sa.types.TEXT))
505 query.update({'times_seen': Tag.times_seen + count,
505 query.update({'times_seen': Tag.times_seen + count,
506 'last_timestamp': datetime.utcnow()},
506 'last_timestamp': datetime.utcnow()},
507 synchronize_session=False)
507 synchronize_session=False)
508 session = DBSession()
508 session = DBSession()
509 mark_changed(session)
509 mark_changed(session)
510 return True
510 return True
511 except Exception as exc:
511 except Exception as exc:
512 print_traceback(log)
512 print_traceback(log)
513 update_tag_counter.retry(exc=exc)
513 update_tag_counter.retry(exc=exc)
514
514
515
515
516 @celery.task(queue="default")
516 @celery.task(queue="default")
517 def update_tag_counters():
517 def update_tag_counters():
518 """
518 """
519 Sets task to update counters for application tags
519 Sets task to update counters for application tags
520 """
520 """
521 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
521 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
522 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
522 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
523 c = collections.Counter(tags)
523 c = collections.Counter(tags)
524 for t_json, count in c.items():
524 for t_json, count in c.items():
525 tag_info = json.loads(t_json)
525 tag_info = json.loads(t_json)
526 update_tag_counter.delay(tag_info[0], tag_info[1], count)
526 update_tag_counter.delay(tag_info[0], tag_info[1], count)
527
527
528
528
529 @celery.task(queue="default")
529 @celery.task(queue="default")
530 def daily_digest():
530 def daily_digest():
531 """
531 """
532 Sends daily digest with top 50 error reports
532 Sends daily digest with top 50 error reports
533 """
533 """
534 request = get_current_request()
534 request = get_current_request()
535 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
535 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
536 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
536 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
537 since_when = datetime.utcnow() - timedelta(hours=8)
537 since_when = datetime.utcnow() - timedelta(hours=8)
538 log.warning('Generating daily digests')
538 log.warning('Generating daily digests')
539 for resource_id in apps:
539 for resource_id in apps:
540 resource_id = resource_id.decode('utf8')
540 resource_id = resource_id.decode('utf8')
541 end_date = datetime.utcnow().replace(microsecond=0, second=0)
541 end_date = datetime.utcnow().replace(microsecond=0, second=0)
542 filter_settings = {'resource': [resource_id],
542 filter_settings = {'resource': [resource_id],
543 'tags': [{'name': 'type',
543 'tags': [{'name': 'type',
544 'value': ['error'], 'op': None}],
544 'value': ['error'], 'op': None}],
545 'type': 'error', 'start_date': since_when,
545 'type': 'error', 'start_date': since_when,
546 'end_date': end_date}
546 'end_date': end_date}
547
547
548 reports = ReportGroupService.get_trending(
548 reports = ReportGroupService.get_trending(
549 request, filter_settings=filter_settings, limit=50)
549 request, filter_settings=filter_settings, limit=50)
550
550
551 application = ApplicationService.by_id(resource_id)
551 application = ApplicationService.by_id(resource_id)
552 if application:
552 if application:
553 users = set([p.user for p in application.users_for_perm('view')])
553 users = set([p.user for p in application.users_for_perm('view')])
554 for user in users:
554 for user in users:
555 user.send_digest(request, application, reports=reports,
555 user.send_digest(request, application, reports=reports,
556 since_when=since_when)
556 since_when=since_when)
557
557
558
558
559 @celery.task(queue="default")
559 @celery.task(queue="default")
560 def notifications_reports():
560 def notifications_reports():
561 """
561 """
562 Loop that checks redis for info and then issues new tasks to celery to
562 Loop that checks redis for info and then issues new tasks to celery to
563 issue notifications
563 issue notifications
564 """
564 """
565 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
565 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
566 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
566 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
567 for app in apps:
567 for app in apps:
568 log.warning('Notify for app: %s' % app)
568 log.warning('Notify for app: %s' % app)
569 check_user_report_notifications.delay(app.decode('utf8'))
569 check_user_report_notifications.delay(app.decode('utf8'))
570
570
571 @celery.task(queue="default")
571 @celery.task(queue="default")
572 def alerting_reports():
572 def alerting_reports():
573 """
573 """
574 Loop that checks redis for info and then issues new tasks to celery to
574 Loop that checks redis for info and then issues new tasks to celery to
575 perform the following:
575 perform the following:
576 - which applications should have new alerts opened
576 - which applications should have new alerts opened
577 """
577 """
578
578
579 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
579 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
580 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
580 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
581 for app in apps:
581 for app in apps:
582 log.warning('Notify for app: %s' % app)
582 log.warning('Notify for app: %s' % app)
583 check_alerts.delay(app.decode('utf8'))
583 check_alerts.delay(app.decode('utf8'))
584
584
585
585
586 @celery.task(queue="default", soft_time_limit=3600 * 4,
586 @celery.task(queue="default", soft_time_limit=3600 * 4,
587 hard_time_limit=3600 * 4, max_retries=999)
587 hard_time_limit=3600 * 4, max_retries=144)
588 def logs_cleanup(resource_id, filter_settings):
588 def logs_cleanup(resource_id, filter_settings):
589 request = get_current_request()
589 request = get_current_request()
590 request.tm.begin()
590 request.tm.begin()
591 es_query = {
591 es_query = {
592 "_source": False,
592 "_source": False,
593 "size": 5000,
593 "size": 5000,
594 "query": {
594 "query": {
595 "filtered": {
595 "filtered": {
596 "filter": {
596 "filter": {
597 "and": [{"term": {"resource_id": resource_id}}]
597 "and": [{"term": {"resource_id": resource_id}}]
598 }
598 }
599 }
599 }
600 }
600 }
601 }
601 }
602
602
603 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
603 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
604 if filter_settings['namespace']:
604 if filter_settings['namespace']:
605 query = query.filter(Log.namespace == filter_settings['namespace'][0])
605 query = query.filter(Log.namespace == filter_settings['namespace'][0])
606 es_query['query']['filtered']['filter']['and'].append(
606 es_query['query']['filtered']['filter']['and'].append(
607 {"term": {"namespace": filter_settings['namespace'][0]}}
607 {"term": {"namespace": filter_settings['namespace'][0]}}
608 )
608 )
609 query.delete(synchronize_session=False)
609 query.delete(synchronize_session=False)
610 request.tm.commit()
610 request.tm.commit()
611 result = request.es_conn.search(es_query, index='rcae_l_*',
611 result = request.es_conn.search(es_query, index='rcae_l_*',
612 doc_type='log', es_scroll='1m',
612 doc_type='log', es_scroll='1m',
613 es_search_type='scan')
613 es_search_type='scan')
614 scroll_id = result['_scroll_id']
614 scroll_id = result['_scroll_id']
615 while True:
615 while True:
616 log.warning('log_cleanup, app:{} ns:{} batch'.format(
616 log.warning('log_cleanup, app:{} ns:{} batch'.format(
617 resource_id,
617 resource_id,
618 filter_settings['namespace']
618 filter_settings['namespace']
619 ))
619 ))
620 es_docs_to_delete = []
620 es_docs_to_delete = []
621 result = request.es_conn.send_request(
621 result = request.es_conn.send_request(
622 'POST', ['_search', 'scroll'],
622 'POST', ['_search', 'scroll'],
623 body=scroll_id, query_params={"scroll": '1m'})
623 body=scroll_id, query_params={"scroll": '1m'})
624 scroll_id = result['_scroll_id']
624 scroll_id = result['_scroll_id']
625 if not result['hits']['hits']:
625 if not result['hits']['hits']:
626 break
626 break
627 for doc in result['hits']['hits']:
627 for doc in result['hits']['hits']:
628 es_docs_to_delete.append({"id": doc['_id'],
628 es_docs_to_delete.append({"id": doc['_id'],
629 "index": doc['_index']})
629 "index": doc['_index']})
630
630
631 for batch in in_batches(es_docs_to_delete, 10):
631 for batch in in_batches(es_docs_to_delete, 10):
632 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
632 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
633 **to_del)
633 **to_del)
634 for to_del in batch])
634 for to_del in batch])
General Comments 0
You need to be logged in to leave comments. Login now