##// END OF EJS Templates
logs: better log message
ergo -
Show More
@@ -1,634 +1,635 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # AppEnlight Enterprise Edition, including its added features, Support
18 # AppEnlight Enterprise Edition, including its added features, Support
19 # services, and proprietary license terms, please see
19 # services, and proprietary license terms, please see
20 # https://rhodecode.com/licenses/
20 # https://rhodecode.com/licenses/
21
21
22 import bisect
22 import bisect
23 import collections
23 import collections
24 import math
24 import math
25 from datetime import datetime, timedelta
25 from datetime import datetime, timedelta
26
26
27 import sqlalchemy as sa
27 import sqlalchemy as sa
28 import pyelasticsearch
28 import pyelasticsearch
29
29
30 from celery.utils.log import get_task_logger
30 from celery.utils.log import get_task_logger
31 from zope.sqlalchemy import mark_changed
31 from zope.sqlalchemy import mark_changed
32 from pyramid.threadlocal import get_current_request, get_current_registry
32 from pyramid.threadlocal import get_current_request, get_current_registry
33 from appenlight.celery import celery
33 from appenlight.celery import celery
34 from appenlight.models.report_group import ReportGroup
34 from appenlight.models.report_group import ReportGroup
35 from appenlight.models import DBSession, Datastores
35 from appenlight.models import DBSession, Datastores
36 from appenlight.models.report import Report
36 from appenlight.models.report import Report
37 from appenlight.models.log import Log
37 from appenlight.models.log import Log
38 from appenlight.models.metric import Metric
38 from appenlight.models.metric import Metric
39 from appenlight.models.event import Event
39 from appenlight.models.event import Event
40
40
41 from appenlight.models.services.application import ApplicationService
41 from appenlight.models.services.application import ApplicationService
42 from appenlight.models.services.event import EventService
42 from appenlight.models.services.event import EventService
43 from appenlight.models.services.log import LogService
43 from appenlight.models.services.log import LogService
44 from appenlight.models.services.report import ReportService
44 from appenlight.models.services.report import ReportService
45 from appenlight.models.services.report_group import ReportGroupService
45 from appenlight.models.services.report_group import ReportGroupService
46 from appenlight.models.services.user import UserService
46 from appenlight.models.services.user import UserService
47 from appenlight.models.tag import Tag
47 from appenlight.models.tag import Tag
48 from appenlight.lib import print_traceback
48 from appenlight.lib import print_traceback
49 from appenlight.lib.utils import parse_proto, in_batches
49 from appenlight.lib.utils import parse_proto, in_batches
50 from appenlight.lib.ext_json import json
50 from appenlight.lib.ext_json import json
51 from appenlight.lib.redis_keys import REDIS_KEYS
51 from appenlight.lib.redis_keys import REDIS_KEYS
52 from appenlight.lib.enums import ReportType
52 from appenlight.lib.enums import ReportType
53
53
54 log = get_task_logger(__name__)
54 log = get_task_logger(__name__)
55
55
56 sample_boundries = list(range(100, 1000, 100)) + \
56 sample_boundries = list(range(100, 1000, 100)) + \
57 list(range(1000, 10000, 1000)) + \
57 list(range(1000, 10000, 1000)) + \
58 list(range(10000, 100000, 5000))
58 list(range(10000, 100000, 5000))
59
59
60
60
61 def pick_sample(total_occurences, report_type=None):
61 def pick_sample(total_occurences, report_type=None):
62 every = 1.0
62 every = 1.0
63 position = bisect.bisect_left(sample_boundries, total_occurences)
63 position = bisect.bisect_left(sample_boundries, total_occurences)
64 if position > 0:
64 if position > 0:
65 if report_type == ReportType.not_found:
65 if report_type == ReportType.not_found:
66 divide = 10.0
66 divide = 10.0
67 else:
67 else:
68 divide = 100.0
68 divide = 100.0
69 every = sample_boundries[position - 1] / divide
69 every = sample_boundries[position - 1] / divide
70 return total_occurences % every == 0
70 return total_occurences % every == 0
71
71
72
72
73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
74 def test_exception_task():
74 def test_exception_task():
75 log.error('test celery log', extra={'location': 'celery'})
75 log.error('test celery log', extra={'location': 'celery'})
76 log.warning('test celery log', extra={'location': 'celery'})
76 log.warning('test celery log', extra={'location': 'celery'})
77 raise Exception('Celery exception test')
77 raise Exception('Celery exception test')
78
78
79
79
80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
81 def test_retry_exception_task():
81 def test_retry_exception_task():
82 try:
82 try:
83 import time
83 import time
84
84
85 time.sleep(1.3)
85 time.sleep(1.3)
86 log.error('test retry celery log', extra={'location': 'celery'})
86 log.error('test retry celery log', extra={'location': 'celery'})
87 log.warning('test retry celery log', extra={'location': 'celery'})
87 log.warning('test retry celery log', extra={'location': 'celery'})
88 raise Exception('Celery exception test')
88 raise Exception('Celery exception test')
89 except Exception as exc:
89 except Exception as exc:
90 test_retry_exception_task.retry(exc=exc)
90 test_retry_exception_task.retry(exc=exc)
91
91
92
92
93 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
93 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
94 def add_reports(resource_id, params, dataset, environ=None, **kwargs):
94 def add_reports(resource_id, params, dataset, environ=None, **kwargs):
95 proto_version = parse_proto(params.get('protocol_version', ''))
95 proto_version = parse_proto(params.get('protocol_version', ''))
96 current_time = datetime.utcnow().replace(second=0, microsecond=0)
96 current_time = datetime.utcnow().replace(second=0, microsecond=0)
97 try:
97 try:
98 # we will store solr docs here for single insert
98 # we will store solr docs here for single insert
99 es_report_docs = {}
99 es_report_docs = {}
100 es_report_group_docs = {}
100 es_report_group_docs = {}
101 resource = ApplicationService.by_id(resource_id)
101 resource = ApplicationService.by_id(resource_id)
102
102
103 tags = []
103 tags = []
104 es_slow_calls_docs = {}
104 es_slow_calls_docs = {}
105 es_reports_stats_rows = {}
105 es_reports_stats_rows = {}
106 for report_data in dataset:
106 for report_data in dataset:
107 # build report details for later
107 # build report details for later
108 added_details = 0
108 added_details = 0
109 report = Report()
109 report = Report()
110 report.set_data(report_data, resource, proto_version)
110 report.set_data(report_data, resource, proto_version)
111 report._skip_ft_index = True
111 report._skip_ft_index = True
112
112
113 report_group = ReportGroupService.by_hash_and_resource(
113 report_group = ReportGroupService.by_hash_and_resource(
114 report.resource_id,
114 report.resource_id,
115 report.grouping_hash
115 report.grouping_hash
116 )
116 )
117 occurences = report_data.get('occurences', 1)
117 occurences = report_data.get('occurences', 1)
118 if not report_group:
118 if not report_group:
119 # total reports will be +1 moment later
119 # total reports will be +1 moment later
120 report_group = ReportGroup(grouping_hash=report.grouping_hash,
120 report_group = ReportGroup(grouping_hash=report.grouping_hash,
121 occurences=0, total_reports=0,
121 occurences=0, total_reports=0,
122 last_report=0,
122 last_report=0,
123 priority=report.priority,
123 priority=report.priority,
124 error=report.error,
124 error=report.error,
125 first_timestamp=report.start_time)
125 first_timestamp=report.start_time)
126 report_group._skip_ft_index = True
126 report_group._skip_ft_index = True
127 report_group.report_type = report.report_type
127 report_group.report_type = report.report_type
128 report.report_group_time = report_group.first_timestamp
128 report.report_group_time = report_group.first_timestamp
129 add_sample = pick_sample(report_group.occurences,
129 add_sample = pick_sample(report_group.occurences,
130 report_type=report_group.report_type)
130 report_type=report_group.report_type)
131 if add_sample:
131 if add_sample:
132 resource.report_groups.append(report_group)
132 resource.report_groups.append(report_group)
133 report_group.reports.append(report)
133 report_group.reports.append(report)
134 added_details += 1
134 added_details += 1
135 DBSession.flush()
135 DBSession.flush()
136 if report.partition_id not in es_report_docs:
136 if report.partition_id not in es_report_docs:
137 es_report_docs[report.partition_id] = []
137 es_report_docs[report.partition_id] = []
138 es_report_docs[report.partition_id].append(report.es_doc())
138 es_report_docs[report.partition_id].append(report.es_doc())
139 tags.extend(list(report.tags.items()))
139 tags.extend(list(report.tags.items()))
140 slow_calls = report.add_slow_calls(report_data, report_group)
140 slow_calls = report.add_slow_calls(report_data, report_group)
141 DBSession.flush()
141 DBSession.flush()
142 for s_call in slow_calls:
142 for s_call in slow_calls:
143 if s_call.partition_id not in es_slow_calls_docs:
143 if s_call.partition_id not in es_slow_calls_docs:
144 es_slow_calls_docs[s_call.partition_id] = []
144 es_slow_calls_docs[s_call.partition_id] = []
145 es_slow_calls_docs[s_call.partition_id].append(
145 es_slow_calls_docs[s_call.partition_id].append(
146 s_call.es_doc())
146 s_call.es_doc())
147 # try generating new stat rows if needed
147 # try generating new stat rows if needed
148 else:
148 else:
149 # required for postprocessing to not fail later
149 # required for postprocessing to not fail later
150 report.report_group = report_group
150 report.report_group = report_group
151
151
152 stat_row = ReportService.generate_stat_rows(
152 stat_row = ReportService.generate_stat_rows(
153 report, resource, report_group)
153 report, resource, report_group)
154 if stat_row.partition_id not in es_reports_stats_rows:
154 if stat_row.partition_id not in es_reports_stats_rows:
155 es_reports_stats_rows[stat_row.partition_id] = []
155 es_reports_stats_rows[stat_row.partition_id] = []
156 es_reports_stats_rows[stat_row.partition_id].append(
156 es_reports_stats_rows[stat_row.partition_id].append(
157 stat_row.es_doc())
157 stat_row.es_doc())
158
158
159 # see if we should mark 10th occurence of report
159 # see if we should mark 10th occurence of report
160 last_occurences_10 = int(math.floor(report_group.occurences / 10))
160 last_occurences_10 = int(math.floor(report_group.occurences / 10))
161 curr_occurences_10 = int(math.floor(
161 curr_occurences_10 = int(math.floor(
162 (report_group.occurences + report.occurences) / 10))
162 (report_group.occurences + report.occurences) / 10))
163 last_occurences_100 = int(
163 last_occurences_100 = int(
164 math.floor(report_group.occurences / 100))
164 math.floor(report_group.occurences / 100))
165 curr_occurences_100 = int(math.floor(
165 curr_occurences_100 = int(math.floor(
166 (report_group.occurences + report.occurences) / 100))
166 (report_group.occurences + report.occurences) / 100))
167 notify_occurences_10 = last_occurences_10 != curr_occurences_10
167 notify_occurences_10 = last_occurences_10 != curr_occurences_10
168 notify_occurences_100 = last_occurences_100 != curr_occurences_100
168 notify_occurences_100 = last_occurences_100 != curr_occurences_100
169 report_group.occurences = ReportGroup.occurences + occurences
169 report_group.occurences = ReportGroup.occurences + occurences
170 report_group.last_timestamp = report.start_time
170 report_group.last_timestamp = report.start_time
171 report_group.summed_duration = ReportGroup.summed_duration + report.duration
171 report_group.summed_duration = ReportGroup.summed_duration + report.duration
172 summed_duration = ReportGroup.summed_duration + report.duration
172 summed_duration = ReportGroup.summed_duration + report.duration
173 summed_occurences = ReportGroup.occurences + occurences
173 summed_occurences = ReportGroup.occurences + occurences
174 report_group.average_duration = summed_duration / summed_occurences
174 report_group.average_duration = summed_duration / summed_occurences
175 report_group.run_postprocessing(report)
175 report_group.run_postprocessing(report)
176 if added_details:
176 if added_details:
177 report_group.total_reports = ReportGroup.total_reports + 1
177 report_group.total_reports = ReportGroup.total_reports + 1
178 report_group.last_report = report.id
178 report_group.last_report = report.id
179 report_group.set_notification_info(notify_10=notify_occurences_10,
179 report_group.set_notification_info(notify_10=notify_occurences_10,
180 notify_100=notify_occurences_100)
180 notify_100=notify_occurences_100)
181 DBSession.flush()
181 DBSession.flush()
182 report_group.get_report().notify_channel(report_group)
182 report_group.get_report().notify_channel(report_group)
183 if report_group.partition_id not in es_report_group_docs:
183 if report_group.partition_id not in es_report_group_docs:
184 es_report_group_docs[report_group.partition_id] = []
184 es_report_group_docs[report_group.partition_id] = []
185 es_report_group_docs[report_group.partition_id].append(
185 es_report_group_docs[report_group.partition_id].append(
186 report_group.es_doc())
186 report_group.es_doc())
187
187
188 action = 'REPORT'
188 action = 'REPORT'
189 log_msg = '%s: %s %s, client: %s, proto: %s' % (
189 log_msg = '%s: %s %s, client: %s, proto: %s' % (
190 action,
190 action,
191 report_data.get('http_status', 'unknown'),
191 report_data.get('http_status', 'unknown'),
192 str(resource),
192 str(resource),
193 report_data.get('client'),
193 report_data.get('client'),
194 proto_version)
194 proto_version)
195 log.info(log_msg)
195 log.info(log_msg)
196 total_reports = len(dataset)
196 total_reports = len(dataset)
197 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
197 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
198 Datastores.redis.incr(key, total_reports)
198 Datastores.redis.incr(key, total_reports)
199 Datastores.redis.expire(key, 3600 * 24)
199 Datastores.redis.expire(key, 3600 * 24)
200 key = REDIS_KEYS['counters']['reports_per_minute_per_app'].format(
200 key = REDIS_KEYS['counters']['reports_per_minute_per_app'].format(
201 resource_id, current_time)
201 resource_id, current_time)
202 Datastores.redis.incr(key, total_reports)
202 Datastores.redis.incr(key, total_reports)
203 Datastores.redis.expire(key, 3600 * 24)
203 Datastores.redis.expire(key, 3600 * 24)
204
204
205 add_reports_es(es_report_group_docs, es_report_docs)
205 add_reports_es(es_report_group_docs, es_report_docs)
206 add_reports_slow_calls_es(es_slow_calls_docs)
206 add_reports_slow_calls_es(es_slow_calls_docs)
207 add_reports_stats_rows_es(es_reports_stats_rows)
207 add_reports_stats_rows_es(es_reports_stats_rows)
208 return True
208 return True
209 except Exception as exc:
209 except Exception as exc:
210 print_traceback(log)
210 print_traceback(log)
211 add_reports.retry(exc=exc)
211 add_reports.retry(exc=exc)
212
212
213
213
214 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
214 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
215 def add_reports_es(report_group_docs, report_docs):
215 def add_reports_es(report_group_docs, report_docs):
216 for k, v in report_group_docs.items():
216 for k, v in report_group_docs.items():
217 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
217 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
218 for k, v in report_docs.items():
218 for k, v in report_docs.items():
219 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
219 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
220 parent_field='_parent')
220 parent_field='_parent')
221
221
222
222
223 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
223 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
224 def add_reports_slow_calls_es(es_docs):
224 def add_reports_slow_calls_es(es_docs):
225 for k, v in es_docs.items():
225 for k, v in es_docs.items():
226 Datastores.es.bulk_index(k, 'log', v)
226 Datastores.es.bulk_index(k, 'log', v)
227
227
228
228
229 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
229 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
230 def add_reports_stats_rows_es(es_docs):
230 def add_reports_stats_rows_es(es_docs):
231 for k, v in es_docs.items():
231 for k, v in es_docs.items():
232 Datastores.es.bulk_index(k, 'log', v)
232 Datastores.es.bulk_index(k, 'log', v)
233
233
234
234
235 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
235 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
236 def add_logs(resource_id, request, dataset, environ=None, **kwargs):
236 def add_logs(resource_id, request, dataset, environ=None, **kwargs):
237 proto_version = request.get('protocol_version')
237 proto_version = request.get('protocol_version')
238 current_time = datetime.utcnow().replace(second=0, microsecond=0)
238 current_time = datetime.utcnow().replace(second=0, microsecond=0)
239
239
240 try:
240 try:
241 es_docs = collections.defaultdict(list)
241 es_docs = collections.defaultdict(list)
242 application = ApplicationService.by_id(resource_id)
242 application = ApplicationService.by_id(resource_id)
243 ns_pairs = []
243 ns_pairs = []
244 for entry in dataset:
244 for entry in dataset:
245 # gather pk and ns so we can remove older versions of row later
245 # gather pk and ns so we can remove older versions of row later
246 if entry['primary_key'] is not None:
246 if entry['primary_key'] is not None:
247 ns_pairs.append({"pk": entry['primary_key'],
247 ns_pairs.append({"pk": entry['primary_key'],
248 "ns": entry['namespace']})
248 "ns": entry['namespace']})
249 log_entry = Log()
249 log_entry = Log()
250 log_entry.set_data(entry, resource=application)
250 log_entry.set_data(entry, resource=application)
251 log_entry._skip_ft_index = True
251 log_entry._skip_ft_index = True
252 application.logs.append(log_entry)
252 application.logs.append(log_entry)
253 DBSession.flush()
253 DBSession.flush()
254 # insert non pk rows first
254 # insert non pk rows first
255 if entry['primary_key'] is None:
255 if entry['primary_key'] is None:
256 es_docs[log_entry.partition_id].append(log_entry.es_doc())
256 es_docs[log_entry.partition_id].append(log_entry.es_doc())
257
257
258 # 2nd pass to delete all log entries from db foe same pk/ns pair
258 # 2nd pass to delete all log entries from db foe same pk/ns pair
259 if ns_pairs:
259 if ns_pairs:
260 ids_to_delete = []
260 ids_to_delete = []
261 es_docs = collections.defaultdict(list)
261 es_docs = collections.defaultdict(list)
262 es_docs_to_delete = collections.defaultdict(list)
262 es_docs_to_delete = collections.defaultdict(list)
263 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
263 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
264 list_of_pairs=ns_pairs)
264 list_of_pairs=ns_pairs)
265 log_dict = {}
265 log_dict = {}
266 for log_entry in found_pkey_logs:
266 for log_entry in found_pkey_logs:
267 log_key = (log_entry.primary_key, log_entry.namespace)
267 log_key = (log_entry.primary_key, log_entry.namespace)
268 if log_key not in log_dict:
268 if log_key not in log_dict:
269 log_dict[log_key] = []
269 log_dict[log_key] = []
270 log_dict[log_key].append(log_entry)
270 log_dict[log_key].append(log_entry)
271
271
272 for ns, entry_list in log_dict.items():
272 for ns, entry_list in log_dict.items():
273 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
273 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
274 # newest row needs to be indexed in es
274 # newest row needs to be indexed in es
275 log_entry = entry_list[-1]
275 log_entry = entry_list[-1]
276 # delete everything from pg and ES, leave the last row in pg
276 # delete everything from pg and ES, leave the last row in pg
277 for e in entry_list[:-1]:
277 for e in entry_list[:-1]:
278 ids_to_delete.append(e.log_id)
278 ids_to_delete.append(e.log_id)
279 es_docs_to_delete[e.partition_id].append(e.delete_hash)
279 es_docs_to_delete[e.partition_id].append(e.delete_hash)
280
280
281 es_docs_to_delete[log_entry.partition_id].append(
281 es_docs_to_delete[log_entry.partition_id].append(
282 log_entry.delete_hash)
282 log_entry.delete_hash)
283
283
284 es_docs[log_entry.partition_id].append(log_entry.es_doc())
284 es_docs[log_entry.partition_id].append(log_entry.es_doc())
285
285
286 if ids_to_delete:
286 if ids_to_delete:
287 query = DBSession.query(Log).filter(
287 query = DBSession.query(Log).filter(
288 Log.log_id.in_(ids_to_delete))
288 Log.log_id.in_(ids_to_delete))
289 query.delete(synchronize_session=False)
289 query.delete(synchronize_session=False)
290 if es_docs_to_delete:
290 if es_docs_to_delete:
291 # batch this to avoid problems with default ES bulk limits
291 # batch this to avoid problems with default ES bulk limits
292 for es_index in es_docs_to_delete.keys():
292 for es_index in es_docs_to_delete.keys():
293 for batch in in_batches(es_docs_to_delete[es_index], 20):
293 for batch in in_batches(es_docs_to_delete[es_index], 20):
294 query = {'terms': {'delete_hash': batch}}
294 query = {'terms': {'delete_hash': batch}}
295
295
296 try:
296 try:
297 Datastores.es.delete_by_query(
297 Datastores.es.delete_by_query(
298 es_index, 'log', query)
298 es_index, 'log', query)
299 except pyelasticsearch.ElasticHttpNotFoundError as exc:
299 except pyelasticsearch.ElasticHttpNotFoundError as exc:
300 log.error(exc)
300 msg = 'skipping index {}'.format(es_index)
301 log.info(msg)
301
302
302 total_logs = len(dataset)
303 total_logs = len(dataset)
303
304
304 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
305 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
305 str(application),
306 str(application),
306 total_logs,
307 total_logs,
307 proto_version)
308 proto_version)
308 log.info(log_msg)
309 log.info(log_msg)
309 # mark_changed(session)
310 # mark_changed(session)
310 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
311 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
311 Datastores.redis.incr(key, total_logs)
312 Datastores.redis.incr(key, total_logs)
312 Datastores.redis.expire(key, 3600 * 24)
313 Datastores.redis.expire(key, 3600 * 24)
313 key = REDIS_KEYS['counters']['logs_per_minute_per_app'].format(
314 key = REDIS_KEYS['counters']['logs_per_minute_per_app'].format(
314 resource_id, current_time)
315 resource_id, current_time)
315 Datastores.redis.incr(key, total_logs)
316 Datastores.redis.incr(key, total_logs)
316 Datastores.redis.expire(key, 3600 * 24)
317 Datastores.redis.expire(key, 3600 * 24)
317 add_logs_es(es_docs)
318 add_logs_es(es_docs)
318 return True
319 return True
319 except Exception as exc:
320 except Exception as exc:
320 print_traceback(log)
321 print_traceback(log)
321 add_logs.retry(exc=exc)
322 add_logs.retry(exc=exc)
322
323
323
324
324 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
325 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
325 def add_logs_es(es_docs):
326 def add_logs_es(es_docs):
326 for k, v in es_docs.items():
327 for k, v in es_docs.items():
327 Datastores.es.bulk_index(k, 'log', v)
328 Datastores.es.bulk_index(k, 'log', v)
328
329
329
330
330 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
331 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
331 def add_metrics(resource_id, request, dataset, proto_version):
332 def add_metrics(resource_id, request, dataset, proto_version):
332 current_time = datetime.utcnow().replace(second=0, microsecond=0)
333 current_time = datetime.utcnow().replace(second=0, microsecond=0)
333 try:
334 try:
334 application = ApplicationService.by_id_cached()(resource_id)
335 application = ApplicationService.by_id_cached()(resource_id)
335 application = DBSession.merge(application, load=False)
336 application = DBSession.merge(application, load=False)
336 es_docs = []
337 es_docs = []
337 rows = []
338 rows = []
338 for metric in dataset:
339 for metric in dataset:
339 tags = dict(metric['tags'])
340 tags = dict(metric['tags'])
340 server_n = tags.get('server_name', metric['server_name']).lower()
341 server_n = tags.get('server_name', metric['server_name']).lower()
341 tags['server_name'] = server_n or 'unknown'
342 tags['server_name'] = server_n or 'unknown'
342 new_metric = Metric(
343 new_metric = Metric(
343 timestamp=metric['timestamp'],
344 timestamp=metric['timestamp'],
344 resource_id=application.resource_id,
345 resource_id=application.resource_id,
345 namespace=metric['namespace'],
346 namespace=metric['namespace'],
346 tags=tags)
347 tags=tags)
347 rows.append(new_metric)
348 rows.append(new_metric)
348 es_docs.append(new_metric.es_doc())
349 es_docs.append(new_metric.es_doc())
349 session = DBSession()
350 session = DBSession()
350 session.bulk_save_objects(rows)
351 session.bulk_save_objects(rows)
351 session.flush()
352 session.flush()
352
353
353 action = 'METRICS'
354 action = 'METRICS'
354 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
355 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
355 action,
356 action,
356 str(application),
357 str(application),
357 len(dataset),
358 len(dataset),
358 proto_version
359 proto_version
359 )
360 )
360 log.info(metrics_msg)
361 log.info(metrics_msg)
361
362
362 mark_changed(session)
363 mark_changed(session)
363 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
364 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
364 Datastores.redis.incr(key, len(rows))
365 Datastores.redis.incr(key, len(rows))
365 Datastores.redis.expire(key, 3600 * 24)
366 Datastores.redis.expire(key, 3600 * 24)
366 key = REDIS_KEYS['counters']['metrics_per_minute_per_app'].format(
367 key = REDIS_KEYS['counters']['metrics_per_minute_per_app'].format(
367 resource_id, current_time)
368 resource_id, current_time)
368 Datastores.redis.incr(key, len(rows))
369 Datastores.redis.incr(key, len(rows))
369 Datastores.redis.expire(key, 3600 * 24)
370 Datastores.redis.expire(key, 3600 * 24)
370 add_metrics_es(es_docs)
371 add_metrics_es(es_docs)
371 return True
372 return True
372 except Exception as exc:
373 except Exception as exc:
373 print_traceback(log)
374 print_traceback(log)
374 add_metrics.retry(exc=exc)
375 add_metrics.retry(exc=exc)
375
376
376
377
377 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
378 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
378 def add_metrics_es(es_docs):
379 def add_metrics_es(es_docs):
379 for doc in es_docs:
380 for doc in es_docs:
380 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
381 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
381 Datastores.es.index(partition, 'log', doc)
382 Datastores.es.index(partition, 'log', doc)
382
383
383
384
384 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
385 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
385 def check_user_report_notifications(resource_id):
386 def check_user_report_notifications(resource_id):
386 since_when = datetime.utcnow()
387 since_when = datetime.utcnow()
387 try:
388 try:
388 request = get_current_request()
389 request = get_current_request()
389 application = ApplicationService.by_id(resource_id)
390 application = ApplicationService.by_id(resource_id)
390 if not application:
391 if not application:
391 return
392 return
392 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
393 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
393 ReportType.error, resource_id)
394 ReportType.error, resource_id)
394 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
395 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
395 ReportType.slow, resource_id)
396 ReportType.slow, resource_id)
396 error_group_ids = Datastores.redis.smembers(error_key)
397 error_group_ids = Datastores.redis.smembers(error_key)
397 slow_group_ids = Datastores.redis.smembers(slow_key)
398 slow_group_ids = Datastores.redis.smembers(slow_key)
398 Datastores.redis.delete(error_key)
399 Datastores.redis.delete(error_key)
399 Datastores.redis.delete(slow_key)
400 Datastores.redis.delete(slow_key)
400 err_gids = [int(g_id) for g_id in error_group_ids]
401 err_gids = [int(g_id) for g_id in error_group_ids]
401 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
402 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
402 group_ids = err_gids + slow_gids
403 group_ids = err_gids + slow_gids
403 occurence_dict = {}
404 occurence_dict = {}
404 for g_id in group_ids:
405 for g_id in group_ids:
405 key = REDIS_KEYS['counters']['report_group_occurences'].format(
406 key = REDIS_KEYS['counters']['report_group_occurences'].format(
406 g_id)
407 g_id)
407 val = Datastores.redis.get(key)
408 val = Datastores.redis.get(key)
408 Datastores.redis.delete(key)
409 Datastores.redis.delete(key)
409 if val:
410 if val:
410 occurence_dict[g_id] = int(val)
411 occurence_dict[g_id] = int(val)
411 else:
412 else:
412 occurence_dict[g_id] = 1
413 occurence_dict[g_id] = 1
413 report_groups = ReportGroupService.by_ids(group_ids)
414 report_groups = ReportGroupService.by_ids(group_ids)
414 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
415 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
415
416
416 ApplicationService.check_for_groups_alert(
417 ApplicationService.check_for_groups_alert(
417 application, 'alert', report_groups=report_groups,
418 application, 'alert', report_groups=report_groups,
418 occurence_dict=occurence_dict)
419 occurence_dict=occurence_dict)
419 users = set([p.user for p in application.users_for_perm('view')])
420 users = set([p.user for p in application.users_for_perm('view')])
420 report_groups = report_groups.all()
421 report_groups = report_groups.all()
421 for user in users:
422 for user in users:
422 UserService.report_notify(user, request, application,
423 UserService.report_notify(user, request, application,
423 report_groups=report_groups,
424 report_groups=report_groups,
424 occurence_dict=occurence_dict)
425 occurence_dict=occurence_dict)
425 for group in report_groups:
426 for group in report_groups:
426 # marks report_groups as notified
427 # marks report_groups as notified
427 if not group.notified:
428 if not group.notified:
428 group.notified = True
429 group.notified = True
429 except Exception as exc:
430 except Exception as exc:
430 print_traceback(log)
431 print_traceback(log)
431 raise
432 raise
432
433
433
434
434 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
435 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
435 def check_alerts(resource_id):
436 def check_alerts(resource_id):
436 since_when = datetime.utcnow()
437 since_when = datetime.utcnow()
437 try:
438 try:
438 request = get_current_request()
439 request = get_current_request()
439 application = ApplicationService.by_id(resource_id)
440 application = ApplicationService.by_id(resource_id)
440 if not application:
441 if not application:
441 return
442 return
442 error_key = REDIS_KEYS[
443 error_key = REDIS_KEYS[
443 'reports_to_notify_per_type_per_app_alerting'].format(
444 'reports_to_notify_per_type_per_app_alerting'].format(
444 ReportType.error, resource_id)
445 ReportType.error, resource_id)
445 slow_key = REDIS_KEYS[
446 slow_key = REDIS_KEYS[
446 'reports_to_notify_per_type_per_app_alerting'].format(
447 'reports_to_notify_per_type_per_app_alerting'].format(
447 ReportType.slow, resource_id)
448 ReportType.slow, resource_id)
448 error_group_ids = Datastores.redis.smembers(error_key)
449 error_group_ids = Datastores.redis.smembers(error_key)
449 slow_group_ids = Datastores.redis.smembers(slow_key)
450 slow_group_ids = Datastores.redis.smembers(slow_key)
450 Datastores.redis.delete(error_key)
451 Datastores.redis.delete(error_key)
451 Datastores.redis.delete(slow_key)
452 Datastores.redis.delete(slow_key)
452 err_gids = [int(g_id) for g_id in error_group_ids]
453 err_gids = [int(g_id) for g_id in error_group_ids]
453 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
454 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
454 group_ids = err_gids + slow_gids
455 group_ids = err_gids + slow_gids
455 occurence_dict = {}
456 occurence_dict = {}
456 for g_id in group_ids:
457 for g_id in group_ids:
457 key = REDIS_KEYS['counters'][
458 key = REDIS_KEYS['counters'][
458 'report_group_occurences_alerting'].format(
459 'report_group_occurences_alerting'].format(
459 g_id)
460 g_id)
460 val = Datastores.redis.get(key)
461 val = Datastores.redis.get(key)
461 Datastores.redis.delete(key)
462 Datastores.redis.delete(key)
462 if val:
463 if val:
463 occurence_dict[g_id] = int(val)
464 occurence_dict[g_id] = int(val)
464 else:
465 else:
465 occurence_dict[g_id] = 1
466 occurence_dict[g_id] = 1
466 report_groups = ReportGroupService.by_ids(group_ids)
467 report_groups = ReportGroupService.by_ids(group_ids)
467 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
468 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
468
469
469 ApplicationService.check_for_groups_alert(
470 ApplicationService.check_for_groups_alert(
470 application, 'alert', report_groups=report_groups,
471 application, 'alert', report_groups=report_groups,
471 occurence_dict=occurence_dict, since_when=since_when)
472 occurence_dict=occurence_dict, since_when=since_when)
472 except Exception as exc:
473 except Exception as exc:
473 print_traceback(log)
474 print_traceback(log)
474 raise
475 raise
475
476
476
477
477 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
478 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
478 def close_alerts():
479 def close_alerts():
479 log.warning('Checking alerts')
480 log.warning('Checking alerts')
480 since_when = datetime.utcnow()
481 since_when = datetime.utcnow()
481 try:
482 try:
482 event_types = [Event.types['error_report_alert'],
483 event_types = [Event.types['error_report_alert'],
483 Event.types['slow_report_alert'], ]
484 Event.types['slow_report_alert'], ]
484 statuses = [Event.statuses['active']]
485 statuses = [Event.statuses['active']]
485 # get events older than 5 min
486 # get events older than 5 min
486 events = EventService.by_type_and_status(
487 events = EventService.by_type_and_status(
487 event_types,
488 event_types,
488 statuses,
489 statuses,
489 older_than=(since_when - timedelta(minutes=5)))
490 older_than=(since_when - timedelta(minutes=5)))
490 for event in events:
491 for event in events:
491 # see if we can close them
492 # see if we can close them
492 event.validate_or_close(
493 event.validate_or_close(
493 since_when=(since_when - timedelta(minutes=1)))
494 since_when=(since_when - timedelta(minutes=1)))
494 except Exception as exc:
495 except Exception as exc:
495 print_traceback(log)
496 print_traceback(log)
496 raise
497 raise
497
498
498
499
499 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
500 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
500 def update_tag_counter(tag_name, tag_value, count):
501 def update_tag_counter(tag_name, tag_value, count):
501 try:
502 try:
502 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
503 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
503 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
504 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
504 sa.types.TEXT))
505 sa.types.TEXT))
505 query.update({'times_seen': Tag.times_seen + count,
506 query.update({'times_seen': Tag.times_seen + count,
506 'last_timestamp': datetime.utcnow()},
507 'last_timestamp': datetime.utcnow()},
507 synchronize_session=False)
508 synchronize_session=False)
508 session = DBSession()
509 session = DBSession()
509 mark_changed(session)
510 mark_changed(session)
510 return True
511 return True
511 except Exception as exc:
512 except Exception as exc:
512 print_traceback(log)
513 print_traceback(log)
513 update_tag_counter.retry(exc=exc)
514 update_tag_counter.retry(exc=exc)
514
515
515
516
516 @celery.task(queue="default")
517 @celery.task(queue="default")
517 def update_tag_counters():
518 def update_tag_counters():
518 """
519 """
519 Sets task to update counters for application tags
520 Sets task to update counters for application tags
520 """
521 """
521 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
522 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
522 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
523 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
523 c = collections.Counter(tags)
524 c = collections.Counter(tags)
524 for t_json, count in c.items():
525 for t_json, count in c.items():
525 tag_info = json.loads(t_json)
526 tag_info = json.loads(t_json)
526 update_tag_counter.delay(tag_info[0], tag_info[1], count)
527 update_tag_counter.delay(tag_info[0], tag_info[1], count)
527
528
528
529
529 @celery.task(queue="default")
530 @celery.task(queue="default")
530 def daily_digest():
531 def daily_digest():
531 """
532 """
532 Sends daily digest with top 50 error reports
533 Sends daily digest with top 50 error reports
533 """
534 """
534 request = get_current_request()
535 request = get_current_request()
535 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
536 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
536 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
537 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
537 since_when = datetime.utcnow() - timedelta(hours=8)
538 since_when = datetime.utcnow() - timedelta(hours=8)
538 log.warning('Generating daily digests')
539 log.warning('Generating daily digests')
539 for resource_id in apps:
540 for resource_id in apps:
540 resource_id = resource_id.decode('utf8')
541 resource_id = resource_id.decode('utf8')
541 end_date = datetime.utcnow().replace(microsecond=0, second=0)
542 end_date = datetime.utcnow().replace(microsecond=0, second=0)
542 filter_settings = {'resource': [resource_id],
543 filter_settings = {'resource': [resource_id],
543 'tags': [{'name': 'type',
544 'tags': [{'name': 'type',
544 'value': ['error'], 'op': None}],
545 'value': ['error'], 'op': None}],
545 'type': 'error', 'start_date': since_when,
546 'type': 'error', 'start_date': since_when,
546 'end_date': end_date}
547 'end_date': end_date}
547
548
548 reports = ReportGroupService.get_trending(
549 reports = ReportGroupService.get_trending(
549 request, filter_settings=filter_settings, limit=50)
550 request, filter_settings=filter_settings, limit=50)
550
551
551 application = ApplicationService.by_id(resource_id)
552 application = ApplicationService.by_id(resource_id)
552 if application:
553 if application:
553 users = set([p.user for p in application.users_for_perm('view')])
554 users = set([p.user for p in application.users_for_perm('view')])
554 for user in users:
555 for user in users:
555 user.send_digest(request, application, reports=reports,
556 user.send_digest(request, application, reports=reports,
556 since_when=since_when)
557 since_when=since_when)
557
558
558
559
559 @celery.task(queue="default")
560 @celery.task(queue="default")
560 def notifications_reports():
561 def notifications_reports():
561 """
562 """
562 Loop that checks redis for info and then issues new tasks to celery to
563 Loop that checks redis for info and then issues new tasks to celery to
563 issue notifications
564 issue notifications
564 """
565 """
565 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
566 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
566 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
567 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
567 for app in apps:
568 for app in apps:
568 log.warning('Notify for app: %s' % app)
569 log.warning('Notify for app: %s' % app)
569 check_user_report_notifications.delay(app.decode('utf8'))
570 check_user_report_notifications.delay(app.decode('utf8'))
570
571
571 @celery.task(queue="default")
572 @celery.task(queue="default")
572 def alerting_reports():
573 def alerting_reports():
573 """
574 """
574 Loop that checks redis for info and then issues new tasks to celery to
575 Loop that checks redis for info and then issues new tasks to celery to
575 perform the following:
576 perform the following:
576 - which applications should have new alerts opened
577 - which applications should have new alerts opened
577 """
578 """
578
579
579 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
580 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
580 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
581 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
581 for app in apps:
582 for app in apps:
582 log.warning('Notify for app: %s' % app)
583 log.warning('Notify for app: %s' % app)
583 check_alerts.delay(app.decode('utf8'))
584 check_alerts.delay(app.decode('utf8'))
584
585
585
586
586 @celery.task(queue="default", soft_time_limit=3600 * 4,
587 @celery.task(queue="default", soft_time_limit=3600 * 4,
587 hard_time_limit=3600 * 4, max_retries=144)
588 hard_time_limit=3600 * 4, max_retries=144)
588 def logs_cleanup(resource_id, filter_settings):
589 def logs_cleanup(resource_id, filter_settings):
589 request = get_current_request()
590 request = get_current_request()
590 request.tm.begin()
591 request.tm.begin()
591 es_query = {
592 es_query = {
592 "_source": False,
593 "_source": False,
593 "size": 5000,
594 "size": 5000,
594 "query": {
595 "query": {
595 "filtered": {
596 "filtered": {
596 "filter": {
597 "filter": {
597 "and": [{"term": {"resource_id": resource_id}}]
598 "and": [{"term": {"resource_id": resource_id}}]
598 }
599 }
599 }
600 }
600 }
601 }
601 }
602 }
602
603
603 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
604 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
604 if filter_settings['namespace']:
605 if filter_settings['namespace']:
605 query = query.filter(Log.namespace == filter_settings['namespace'][0])
606 query = query.filter(Log.namespace == filter_settings['namespace'][0])
606 es_query['query']['filtered']['filter']['and'].append(
607 es_query['query']['filtered']['filter']['and'].append(
607 {"term": {"namespace": filter_settings['namespace'][0]}}
608 {"term": {"namespace": filter_settings['namespace'][0]}}
608 )
609 )
609 query.delete(synchronize_session=False)
610 query.delete(synchronize_session=False)
610 request.tm.commit()
611 request.tm.commit()
611 result = request.es_conn.search(es_query, index='rcae_l_*',
612 result = request.es_conn.search(es_query, index='rcae_l_*',
612 doc_type='log', es_scroll='1m',
613 doc_type='log', es_scroll='1m',
613 es_search_type='scan')
614 es_search_type='scan')
614 scroll_id = result['_scroll_id']
615 scroll_id = result['_scroll_id']
615 while True:
616 while True:
616 log.warning('log_cleanup, app:{} ns:{} batch'.format(
617 log.warning('log_cleanup, app:{} ns:{} batch'.format(
617 resource_id,
618 resource_id,
618 filter_settings['namespace']
619 filter_settings['namespace']
619 ))
620 ))
620 es_docs_to_delete = []
621 es_docs_to_delete = []
621 result = request.es_conn.send_request(
622 result = request.es_conn.send_request(
622 'POST', ['_search', 'scroll'],
623 'POST', ['_search', 'scroll'],
623 body=scroll_id, query_params={"scroll": '1m'})
624 body=scroll_id, query_params={"scroll": '1m'})
624 scroll_id = result['_scroll_id']
625 scroll_id = result['_scroll_id']
625 if not result['hits']['hits']:
626 if not result['hits']['hits']:
626 break
627 break
627 for doc in result['hits']['hits']:
628 for doc in result['hits']['hits']:
628 es_docs_to_delete.append({"id": doc['_id'],
629 es_docs_to_delete.append({"id": doc['_id'],
629 "index": doc['_index']})
630 "index": doc['_index']})
630
631
631 for batch in in_batches(es_docs_to_delete, 10):
632 for batch in in_batches(es_docs_to_delete, 10):
632 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
633 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
633 **to_del)
634 **to_del)
634 for to_del in batch])
635 for to_del in batch])
General Comments 0
You need to be logged in to leave comments. Login now