##// END OF EJS Templates
redis: some cleanups and use of pipelines for better performance
ergo -
Show More
@@ -1,635 +1,650 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # AppEnlight Enterprise Edition, including its added features, Support
18 # AppEnlight Enterprise Edition, including its added features, Support
19 # services, and proprietary license terms, please see
19 # services, and proprietary license terms, please see
20 # https://rhodecode.com/licenses/
20 # https://rhodecode.com/licenses/
21
21
22 import bisect
22 import bisect
23 import collections
23 import collections
24 import math
24 import math
25 from datetime import datetime, timedelta
25 from datetime import datetime, timedelta
26
26
27 import sqlalchemy as sa
27 import sqlalchemy as sa
28 import pyelasticsearch
28 import pyelasticsearch
29
29
30 from celery.utils.log import get_task_logger
30 from celery.utils.log import get_task_logger
31 from zope.sqlalchemy import mark_changed
31 from zope.sqlalchemy import mark_changed
32 from pyramid.threadlocal import get_current_request, get_current_registry
32 from pyramid.threadlocal import get_current_request, get_current_registry
33 from appenlight.celery import celery
33 from appenlight.celery import celery
34 from appenlight.models.report_group import ReportGroup
34 from appenlight.models.report_group import ReportGroup
35 from appenlight.models import DBSession, Datastores
35 from appenlight.models import DBSession, Datastores
36 from appenlight.models.report import Report
36 from appenlight.models.report import Report
37 from appenlight.models.log import Log
37 from appenlight.models.log import Log
38 from appenlight.models.metric import Metric
38 from appenlight.models.metric import Metric
39 from appenlight.models.event import Event
39 from appenlight.models.event import Event
40
40
41 from appenlight.models.services.application import ApplicationService
41 from appenlight.models.services.application import ApplicationService
42 from appenlight.models.services.event import EventService
42 from appenlight.models.services.event import EventService
43 from appenlight.models.services.log import LogService
43 from appenlight.models.services.log import LogService
44 from appenlight.models.services.report import ReportService
44 from appenlight.models.services.report import ReportService
45 from appenlight.models.services.report_group import ReportGroupService
45 from appenlight.models.services.report_group import ReportGroupService
46 from appenlight.models.services.user import UserService
46 from appenlight.models.services.user import UserService
47 from appenlight.models.tag import Tag
47 from appenlight.models.tag import Tag
48 from appenlight.lib import print_traceback
48 from appenlight.lib import print_traceback
49 from appenlight.lib.utils import parse_proto, in_batches
49 from appenlight.lib.utils import parse_proto, in_batches
50 from appenlight.lib.ext_json import json
50 from appenlight.lib.ext_json import json
51 from appenlight.lib.redis_keys import REDIS_KEYS
51 from appenlight.lib.redis_keys import REDIS_KEYS
52 from appenlight.lib.enums import ReportType
52 from appenlight.lib.enums import ReportType
53
53
54 log = get_task_logger(__name__)
54 log = get_task_logger(__name__)
55
55
56 sample_boundries = list(range(100, 1000, 100)) + \
56 sample_boundries = list(range(100, 1000, 100)) + \
57 list(range(1000, 10000, 1000)) + \
57 list(range(1000, 10000, 1000)) + \
58 list(range(10000, 100000, 5000))
58 list(range(10000, 100000, 5000))
59
59
60
60
61 def pick_sample(total_occurences, report_type=None):
61 def pick_sample(total_occurences, report_type=None):
62 every = 1.0
62 every = 1.0
63 position = bisect.bisect_left(sample_boundries, total_occurences)
63 position = bisect.bisect_left(sample_boundries, total_occurences)
64 if position > 0:
64 if position > 0:
65 if report_type == ReportType.not_found:
65 if report_type == ReportType.not_found:
66 divide = 10.0
66 divide = 10.0
67 else:
67 else:
68 divide = 100.0
68 divide = 100.0
69 every = sample_boundries[position - 1] / divide
69 every = sample_boundries[position - 1] / divide
70 return total_occurences % every == 0
70 return total_occurences % every == 0
71
71
72
72
73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
74 def test_exception_task():
74 def test_exception_task():
75 log.error('test celery log', extra={'location': 'celery'})
75 log.error('test celery log', extra={'location': 'celery'})
76 log.warning('test celery log', extra={'location': 'celery'})
76 log.warning('test celery log', extra={'location': 'celery'})
77 raise Exception('Celery exception test')
77 raise Exception('Celery exception test')
78
78
79
79
80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
81 def test_retry_exception_task():
81 def test_retry_exception_task():
82 try:
82 try:
83 import time
83 import time
84
84
85 time.sleep(1.3)
85 time.sleep(1.3)
86 log.error('test retry celery log', extra={'location': 'celery'})
86 log.error('test retry celery log', extra={'location': 'celery'})
87 log.warning('test retry celery log', extra={'location': 'celery'})
87 log.warning('test retry celery log', extra={'location': 'celery'})
88 raise Exception('Celery exception test')
88 raise Exception('Celery exception test')
89 except Exception as exc:
89 except Exception as exc:
90 test_retry_exception_task.retry(exc=exc)
90 test_retry_exception_task.retry(exc=exc)
91
91
92
92
93 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
93 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
94 def add_reports(resource_id, params, dataset, environ=None, **kwargs):
94 def add_reports(resource_id, request_params, dataset, **kwargs):
95 proto_version = parse_proto(params.get('protocol_version', ''))
95 proto_version = parse_proto(request_params.get('protocol_version', ''))
96 current_time = datetime.utcnow().replace(second=0, microsecond=0)
96 current_time = datetime.utcnow().replace(second=0, microsecond=0)
97 try:
97 try:
98 # we will store solr docs here for single insert
98 # we will store solr docs here for single insert
99 es_report_docs = {}
99 es_report_docs = {}
100 es_report_group_docs = {}
100 es_report_group_docs = {}
101 resource = ApplicationService.by_id(resource_id)
101 resource = ApplicationService.by_id(resource_id)
102
102
103 tags = []
103 tags = []
104 es_slow_calls_docs = {}
104 es_slow_calls_docs = {}
105 es_reports_stats_rows = {}
105 es_reports_stats_rows = {}
106 for report_data in dataset:
106 for report_data in dataset:
107 # build report details for later
107 # build report details for later
108 added_details = 0
108 added_details = 0
109 report = Report()
109 report = Report()
110 report.set_data(report_data, resource, proto_version)
110 report.set_data(report_data, resource, proto_version)
111 report._skip_ft_index = True
111 report._skip_ft_index = True
112
112
113 report_group = ReportGroupService.by_hash_and_resource(
113 report_group = ReportGroupService.by_hash_and_resource(
114 report.resource_id,
114 report.resource_id,
115 report.grouping_hash
115 report.grouping_hash
116 )
116 )
117 occurences = report_data.get('occurences', 1)
117 occurences = report_data.get('occurences', 1)
118 if not report_group:
118 if not report_group:
119 # total reports will be +1 moment later
119 # total reports will be +1 moment later
120 report_group = ReportGroup(grouping_hash=report.grouping_hash,
120 report_group = ReportGroup(grouping_hash=report.grouping_hash,
121 occurences=0, total_reports=0,
121 occurences=0, total_reports=0,
122 last_report=0,
122 last_report=0,
123 priority=report.priority,
123 priority=report.priority,
124 error=report.error,
124 error=report.error,
125 first_timestamp=report.start_time)
125 first_timestamp=report.start_time)
126 report_group._skip_ft_index = True
126 report_group._skip_ft_index = True
127 report_group.report_type = report.report_type
127 report_group.report_type = report.report_type
128 report.report_group_time = report_group.first_timestamp
128 report.report_group_time = report_group.first_timestamp
129 add_sample = pick_sample(report_group.occurences,
129 add_sample = pick_sample(report_group.occurences,
130 report_type=report_group.report_type)
130 report_type=report_group.report_type)
131 if add_sample:
131 if add_sample:
132 resource.report_groups.append(report_group)
132 resource.report_groups.append(report_group)
133 report_group.reports.append(report)
133 report_group.reports.append(report)
134 added_details += 1
134 added_details += 1
135 DBSession.flush()
135 DBSession.flush()
136 if report.partition_id not in es_report_docs:
136 if report.partition_id not in es_report_docs:
137 es_report_docs[report.partition_id] = []
137 es_report_docs[report.partition_id] = []
138 es_report_docs[report.partition_id].append(report.es_doc())
138 es_report_docs[report.partition_id].append(report.es_doc())
139 tags.extend(list(report.tags.items()))
139 tags.extend(list(report.tags.items()))
140 slow_calls = report.add_slow_calls(report_data, report_group)
140 slow_calls = report.add_slow_calls(report_data, report_group)
141 DBSession.flush()
141 DBSession.flush()
142 for s_call in slow_calls:
142 for s_call in slow_calls:
143 if s_call.partition_id not in es_slow_calls_docs:
143 if s_call.partition_id not in es_slow_calls_docs:
144 es_slow_calls_docs[s_call.partition_id] = []
144 es_slow_calls_docs[s_call.partition_id] = []
145 es_slow_calls_docs[s_call.partition_id].append(
145 es_slow_calls_docs[s_call.partition_id].append(
146 s_call.es_doc())
146 s_call.es_doc())
147 # try generating new stat rows if needed
147 # try generating new stat rows if needed
148 else:
148 else:
149 # required for postprocessing to not fail later
149 # required for postprocessing to not fail later
150 report.report_group = report_group
150 report.report_group = report_group
151
151
152 stat_row = ReportService.generate_stat_rows(
152 stat_row = ReportService.generate_stat_rows(
153 report, resource, report_group)
153 report, resource, report_group)
154 if stat_row.partition_id not in es_reports_stats_rows:
154 if stat_row.partition_id not in es_reports_stats_rows:
155 es_reports_stats_rows[stat_row.partition_id] = []
155 es_reports_stats_rows[stat_row.partition_id] = []
156 es_reports_stats_rows[stat_row.partition_id].append(
156 es_reports_stats_rows[stat_row.partition_id].append(
157 stat_row.es_doc())
157 stat_row.es_doc())
158
158
159 # see if we should mark 10th occurence of report
159 # see if we should mark 10th occurence of report
160 last_occurences_10 = int(math.floor(report_group.occurences / 10))
160 last_occurences_10 = int(math.floor(report_group.occurences / 10))
161 curr_occurences_10 = int(math.floor(
161 curr_occurences_10 = int(math.floor(
162 (report_group.occurences + report.occurences) / 10))
162 (report_group.occurences + report.occurences) / 10))
163 last_occurences_100 = int(
163 last_occurences_100 = int(
164 math.floor(report_group.occurences / 100))
164 math.floor(report_group.occurences / 100))
165 curr_occurences_100 = int(math.floor(
165 curr_occurences_100 = int(math.floor(
166 (report_group.occurences + report.occurences) / 100))
166 (report_group.occurences + report.occurences) / 100))
167 notify_occurences_10 = last_occurences_10 != curr_occurences_10
167 notify_occurences_10 = last_occurences_10 != curr_occurences_10
168 notify_occurences_100 = last_occurences_100 != curr_occurences_100
168 notify_occurences_100 = last_occurences_100 != curr_occurences_100
169 report_group.occurences = ReportGroup.occurences + occurences
169 report_group.occurences = ReportGroup.occurences + occurences
170 report_group.last_timestamp = report.start_time
170 report_group.last_timestamp = report.start_time
171 report_group.summed_duration = ReportGroup.summed_duration + report.duration
171 report_group.summed_duration = ReportGroup.summed_duration + report.duration
172 summed_duration = ReportGroup.summed_duration + report.duration
172 summed_duration = ReportGroup.summed_duration + report.duration
173 summed_occurences = ReportGroup.occurences + occurences
173 summed_occurences = ReportGroup.occurences + occurences
174 report_group.average_duration = summed_duration / summed_occurences
174 report_group.average_duration = summed_duration / summed_occurences
175 report_group.run_postprocessing(report)
175 report_group.run_postprocessing(report)
176 if added_details:
176 if added_details:
177 report_group.total_reports = ReportGroup.total_reports + 1
177 report_group.total_reports = ReportGroup.total_reports + 1
178 report_group.last_report = report.id
178 report_group.last_report = report.id
179 report_group.set_notification_info(notify_10=notify_occurences_10,
179 report_group.set_notification_info(notify_10=notify_occurences_10,
180 notify_100=notify_occurences_100)
180 notify_100=notify_occurences_100)
181 DBSession.flush()
181 DBSession.flush()
182 report_group.get_report().notify_channel(report_group)
182 report_group.get_report().notify_channel(report_group)
183 if report_group.partition_id not in es_report_group_docs:
183 if report_group.partition_id not in es_report_group_docs:
184 es_report_group_docs[report_group.partition_id] = []
184 es_report_group_docs[report_group.partition_id] = []
185 es_report_group_docs[report_group.partition_id].append(
185 es_report_group_docs[report_group.partition_id].append(
186 report_group.es_doc())
186 report_group.es_doc())
187
187
188 action = 'REPORT'
188 action = 'REPORT'
189 log_msg = '%s: %s %s, client: %s, proto: %s' % (
189 log_msg = '%s: %s %s, client: %s, proto: %s' % (
190 action,
190 action,
191 report_data.get('http_status', 'unknown'),
191 report_data.get('http_status', 'unknown'),
192 str(resource),
192 str(resource),
193 report_data.get('client'),
193 report_data.get('client'),
194 proto_version)
194 proto_version)
195 log.info(log_msg)
195 log.info(log_msg)
196 total_reports = len(dataset)
196 total_reports = len(dataset)
197 redis_pipeline = Datastores.redis.pipeline(transaction=False)
197 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
198 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
198 Datastores.redis.incr(key, total_reports)
199 redis_pipeline.incr(key, total_reports)
199 Datastores.redis.expire(key, 3600 * 24)
200 redis_pipeline.expire(key, 3600 * 24)
200 key = REDIS_KEYS['counters']['reports_per_minute_per_app'].format(
201 key = REDIS_KEYS['counters']['reports_per_hour_per_app'].format(
201 resource_id, current_time)
202 resource_id, current_time.replace(minute=0))
202 Datastores.redis.incr(key, total_reports)
203 redis_pipeline.incr(key, total_reports)
203 Datastores.redis.expire(key, 3600 * 24)
204 redis_pipeline.expire(key, 3600 * 24 * 7)
205 redis_pipeline.sadd(
206 REDIS_KEYS['apps_that_got_new_data_per_hour'],
207 resource_id, current_time.replace(minute=0))
208 redis_pipeline.execute()
204
209
205 add_reports_es(es_report_group_docs, es_report_docs)
210 add_reports_es(es_report_group_docs, es_report_docs)
206 add_reports_slow_calls_es(es_slow_calls_docs)
211 add_reports_slow_calls_es(es_slow_calls_docs)
207 add_reports_stats_rows_es(es_reports_stats_rows)
212 add_reports_stats_rows_es(es_reports_stats_rows)
208 return True
213 return True
209 except Exception as exc:
214 except Exception as exc:
210 print_traceback(log)
215 print_traceback(log)
211 add_reports.retry(exc=exc)
216 add_reports.retry(exc=exc)
212
217
213
218
214 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
219 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
215 def add_reports_es(report_group_docs, report_docs):
220 def add_reports_es(report_group_docs, report_docs):
216 for k, v in report_group_docs.items():
221 for k, v in report_group_docs.items():
217 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
222 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
218 for k, v in report_docs.items():
223 for k, v in report_docs.items():
219 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
224 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
220 parent_field='_parent')
225 parent_field='_parent')
221
226
222
227
223 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
228 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
224 def add_reports_slow_calls_es(es_docs):
229 def add_reports_slow_calls_es(es_docs):
225 for k, v in es_docs.items():
230 for k, v in es_docs.items():
226 Datastores.es.bulk_index(k, 'log', v)
231 Datastores.es.bulk_index(k, 'log', v)
227
232
228
233
229 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
234 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
230 def add_reports_stats_rows_es(es_docs):
235 def add_reports_stats_rows_es(es_docs):
231 for k, v in es_docs.items():
236 for k, v in es_docs.items():
232 Datastores.es.bulk_index(k, 'log', v)
237 Datastores.es.bulk_index(k, 'log', v)
233
238
234
239
235 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
240 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
236 def add_logs(resource_id, request, dataset, environ=None, **kwargs):
241 def add_logs(resource_id, request_params, dataset, **kwargs):
237 proto_version = request.get('protocol_version')
242 proto_version = request_params.get('protocol_version')
238 current_time = datetime.utcnow().replace(second=0, microsecond=0)
243 current_time = datetime.utcnow().replace(second=0, microsecond=0)
239
244
240 try:
245 try:
241 es_docs = collections.defaultdict(list)
246 es_docs = collections.defaultdict(list)
242 application = ApplicationService.by_id(resource_id)
247 application = ApplicationService.by_id(resource_id)
243 ns_pairs = []
248 ns_pairs = []
244 for entry in dataset:
249 for entry in dataset:
245 # gather pk and ns so we can remove older versions of row later
250 # gather pk and ns so we can remove older versions of row later
246 if entry['primary_key'] is not None:
251 if entry['primary_key'] is not None:
247 ns_pairs.append({"pk": entry['primary_key'],
252 ns_pairs.append({"pk": entry['primary_key'],
248 "ns": entry['namespace']})
253 "ns": entry['namespace']})
249 log_entry = Log()
254 log_entry = Log()
250 log_entry.set_data(entry, resource=application)
255 log_entry.set_data(entry, resource=application)
251 log_entry._skip_ft_index = True
256 log_entry._skip_ft_index = True
252 application.logs.append(log_entry)
257 application.logs.append(log_entry)
253 DBSession.flush()
258 DBSession.flush()
254 # insert non pk rows first
259 # insert non pk rows first
255 if entry['primary_key'] is None:
260 if entry['primary_key'] is None:
256 es_docs[log_entry.partition_id].append(log_entry.es_doc())
261 es_docs[log_entry.partition_id].append(log_entry.es_doc())
257
262
258 # 2nd pass to delete all log entries from db foe same pk/ns pair
263 # 2nd pass to delete all log entries from db foe same pk/ns pair
259 if ns_pairs:
264 if ns_pairs:
260 ids_to_delete = []
265 ids_to_delete = []
261 es_docs = collections.defaultdict(list)
266 es_docs = collections.defaultdict(list)
262 es_docs_to_delete = collections.defaultdict(list)
267 es_docs_to_delete = collections.defaultdict(list)
263 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
268 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
264 list_of_pairs=ns_pairs)
269 list_of_pairs=ns_pairs)
265 log_dict = {}
270 log_dict = {}
266 for log_entry in found_pkey_logs:
271 for log_entry in found_pkey_logs:
267 log_key = (log_entry.primary_key, log_entry.namespace)
272 log_key = (log_entry.primary_key, log_entry.namespace)
268 if log_key not in log_dict:
273 if log_key not in log_dict:
269 log_dict[log_key] = []
274 log_dict[log_key] = []
270 log_dict[log_key].append(log_entry)
275 log_dict[log_key].append(log_entry)
271
276
272 for ns, entry_list in log_dict.items():
277 for ns, entry_list in log_dict.items():
273 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
278 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
274 # newest row needs to be indexed in es
279 # newest row needs to be indexed in es
275 log_entry = entry_list[-1]
280 log_entry = entry_list[-1]
276 # delete everything from pg and ES, leave the last row in pg
281 # delete everything from pg and ES, leave the last row in pg
277 for e in entry_list[:-1]:
282 for e in entry_list[:-1]:
278 ids_to_delete.append(e.log_id)
283 ids_to_delete.append(e.log_id)
279 es_docs_to_delete[e.partition_id].append(e.delete_hash)
284 es_docs_to_delete[e.partition_id].append(e.delete_hash)
280
285
281 es_docs_to_delete[log_entry.partition_id].append(
286 es_docs_to_delete[log_entry.partition_id].append(
282 log_entry.delete_hash)
287 log_entry.delete_hash)
283
288
284 es_docs[log_entry.partition_id].append(log_entry.es_doc())
289 es_docs[log_entry.partition_id].append(log_entry.es_doc())
285
290
286 if ids_to_delete:
291 if ids_to_delete:
287 query = DBSession.query(Log).filter(
292 query = DBSession.query(Log).filter(
288 Log.log_id.in_(ids_to_delete))
293 Log.log_id.in_(ids_to_delete))
289 query.delete(synchronize_session=False)
294 query.delete(synchronize_session=False)
290 if es_docs_to_delete:
295 if es_docs_to_delete:
291 # batch this to avoid problems with default ES bulk limits
296 # batch this to avoid problems with default ES bulk limits
292 for es_index in es_docs_to_delete.keys():
297 for es_index in es_docs_to_delete.keys():
293 for batch in in_batches(es_docs_to_delete[es_index], 20):
298 for batch in in_batches(es_docs_to_delete[es_index], 20):
294 query = {'terms': {'delete_hash': batch}}
299 query = {'terms': {'delete_hash': batch}}
295
300
296 try:
301 try:
297 Datastores.es.delete_by_query(
302 Datastores.es.delete_by_query(
298 es_index, 'log', query)
303 es_index, 'log', query)
299 except pyelasticsearch.ElasticHttpNotFoundError as exc:
304 except pyelasticsearch.ElasticHttpNotFoundError as exc:
300 msg = 'skipping index {}'.format(es_index)
305 msg = 'skipping index {}'.format(es_index)
301 log.info(msg)
306 log.info(msg)
302
307
303 total_logs = len(dataset)
308 total_logs = len(dataset)
304
309
305 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
310 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
306 str(application),
311 str(application),
307 total_logs,
312 total_logs,
308 proto_version)
313 proto_version)
309 log.info(log_msg)
314 log.info(log_msg)
310 # mark_changed(session)
315 # mark_changed(session)
316 redis_pipeline = Datastores.redis.pipeline(transaction=False)
311 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
317 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
312 Datastores.redis.incr(key, total_logs)
318 redis_pipeline.incr(key, total_logs)
313 Datastores.redis.expire(key, 3600 * 24)
319 redis_pipeline.expire(key, 3600 * 24)
314 key = REDIS_KEYS['counters']['logs_per_minute_per_app'].format(
320 key = REDIS_KEYS['counters']['logs_per_hour_per_app'].format(
315 resource_id, current_time)
321 resource_id, current_time.replace(minute=0))
316 Datastores.redis.incr(key, total_logs)
322 redis_pipeline.incr(key, total_logs)
317 Datastores.redis.expire(key, 3600 * 24)
323 redis_pipeline.expire(key, 3600 * 24 * 7)
324 redis_pipeline.sadd(
325 REDIS_KEYS['apps_that_got_new_data_per_hour'],
326 resource_id, current_time.replace(minute=0))
327 redis_pipeline.execute()
318 add_logs_es(es_docs)
328 add_logs_es(es_docs)
319 return True
329 return True
320 except Exception as exc:
330 except Exception as exc:
321 print_traceback(log)
331 print_traceback(log)
322 add_logs.retry(exc=exc)
332 add_logs.retry(exc=exc)
323
333
324
334
325 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
335 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
326 def add_logs_es(es_docs):
336 def add_logs_es(es_docs):
327 for k, v in es_docs.items():
337 for k, v in es_docs.items():
328 Datastores.es.bulk_index(k, 'log', v)
338 Datastores.es.bulk_index(k, 'log', v)
329
339
330
340
331 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
341 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
332 def add_metrics(resource_id, request, dataset, proto_version):
342 def add_metrics(resource_id, request_params, dataset, proto_version):
333 current_time = datetime.utcnow().replace(second=0, microsecond=0)
343 current_time = datetime.utcnow().replace(second=0, microsecond=0)
334 try:
344 try:
335 application = ApplicationService.by_id_cached()(resource_id)
345 application = ApplicationService.by_id_cached()(resource_id)
336 application = DBSession.merge(application, load=False)
346 application = DBSession.merge(application, load=False)
337 es_docs = []
347 es_docs = []
338 rows = []
348 rows = []
339 for metric in dataset:
349 for metric in dataset:
340 tags = dict(metric['tags'])
350 tags = dict(metric['tags'])
341 server_n = tags.get('server_name', metric['server_name']).lower()
351 server_n = tags.get('server_name', metric['server_name']).lower()
342 tags['server_name'] = server_n or 'unknown'
352 tags['server_name'] = server_n or 'unknown'
343 new_metric = Metric(
353 new_metric = Metric(
344 timestamp=metric['timestamp'],
354 timestamp=metric['timestamp'],
345 resource_id=application.resource_id,
355 resource_id=application.resource_id,
346 namespace=metric['namespace'],
356 namespace=metric['namespace'],
347 tags=tags)
357 tags=tags)
348 rows.append(new_metric)
358 rows.append(new_metric)
349 es_docs.append(new_metric.es_doc())
359 es_docs.append(new_metric.es_doc())
350 session = DBSession()
360 session = DBSession()
351 session.bulk_save_objects(rows)
361 session.bulk_save_objects(rows)
352 session.flush()
362 session.flush()
353
363
354 action = 'METRICS'
364 action = 'METRICS'
355 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
365 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
356 action,
366 action,
357 str(application),
367 str(application),
358 len(dataset),
368 len(dataset),
359 proto_version
369 proto_version
360 )
370 )
361 log.info(metrics_msg)
371 log.info(metrics_msg)
362
372
363 mark_changed(session)
373 mark_changed(session)
374 redis_pipeline = Datastores.redis.pipeline(transaction=False)
364 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
375 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
365 Datastores.redis.incr(key, len(rows))
376 redis_pipeline.incr(key, len(rows))
366 Datastores.redis.expire(key, 3600 * 24)
377 redis_pipeline.expire(key, 3600 * 24)
367 key = REDIS_KEYS['counters']['metrics_per_minute_per_app'].format(
378 key = REDIS_KEYS['counters']['metrics_per_hour_per_app'].format(
368 resource_id, current_time)
379 resource_id, current_time.replace(minute=0))
369 Datastores.redis.incr(key, len(rows))
380 redis_pipeline.incr(key, len(rows))
370 Datastores.redis.expire(key, 3600 * 24)
381 redis_pipeline.expire(key, 3600 * 24 * 7)
382 redis_pipeline.sadd(
383 REDIS_KEYS['apps_that_got_new_data_per_hour'],
384 resource_id, current_time.replace(minute=0))
385 redis_pipeline.execute()
371 add_metrics_es(es_docs)
386 add_metrics_es(es_docs)
372 return True
387 return True
373 except Exception as exc:
388 except Exception as exc:
374 print_traceback(log)
389 print_traceback(log)
375 add_metrics.retry(exc=exc)
390 add_metrics.retry(exc=exc)
376
391
377
392
378 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
393 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
379 def add_metrics_es(es_docs):
394 def add_metrics_es(es_docs):
380 for doc in es_docs:
395 for doc in es_docs:
381 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
396 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
382 Datastores.es.index(partition, 'log', doc)
397 Datastores.es.index(partition, 'log', doc)
383
398
384
399
385 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
400 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
386 def check_user_report_notifications(resource_id):
401 def check_user_report_notifications(resource_id):
387 since_when = datetime.utcnow()
402 since_when = datetime.utcnow()
388 try:
403 try:
389 request = get_current_request()
404 request = get_current_request()
390 application = ApplicationService.by_id(resource_id)
405 application = ApplicationService.by_id(resource_id)
391 if not application:
406 if not application:
392 return
407 return
393 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
408 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
394 ReportType.error, resource_id)
409 ReportType.error, resource_id)
395 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
410 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
396 ReportType.slow, resource_id)
411 ReportType.slow, resource_id)
397 error_group_ids = Datastores.redis.smembers(error_key)
412 error_group_ids = Datastores.redis.smembers(error_key)
398 slow_group_ids = Datastores.redis.smembers(slow_key)
413 slow_group_ids = Datastores.redis.smembers(slow_key)
399 Datastores.redis.delete(error_key)
414 Datastores.redis.delete(error_key)
400 Datastores.redis.delete(slow_key)
415 Datastores.redis.delete(slow_key)
401 err_gids = [int(g_id) for g_id in error_group_ids]
416 err_gids = [int(g_id) for g_id in error_group_ids]
402 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
417 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
403 group_ids = err_gids + slow_gids
418 group_ids = err_gids + slow_gids
404 occurence_dict = {}
419 occurence_dict = {}
405 for g_id in group_ids:
420 for g_id in group_ids:
406 key = REDIS_KEYS['counters']['report_group_occurences'].format(
421 key = REDIS_KEYS['counters']['report_group_occurences'].format(
407 g_id)
422 g_id)
408 val = Datastores.redis.get(key)
423 val = Datastores.redis.get(key)
409 Datastores.redis.delete(key)
424 Datastores.redis.delete(key)
410 if val:
425 if val:
411 occurence_dict[g_id] = int(val)
426 occurence_dict[g_id] = int(val)
412 else:
427 else:
413 occurence_dict[g_id] = 1
428 occurence_dict[g_id] = 1
414 report_groups = ReportGroupService.by_ids(group_ids)
429 report_groups = ReportGroupService.by_ids(group_ids)
415 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
430 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
416
431
417 ApplicationService.check_for_groups_alert(
432 ApplicationService.check_for_groups_alert(
418 application, 'alert', report_groups=report_groups,
433 application, 'alert', report_groups=report_groups,
419 occurence_dict=occurence_dict)
434 occurence_dict=occurence_dict)
420 users = set([p.user for p in application.users_for_perm('view')])
435 users = set([p.user for p in application.users_for_perm('view')])
421 report_groups = report_groups.all()
436 report_groups = report_groups.all()
422 for user in users:
437 for user in users:
423 UserService.report_notify(user, request, application,
438 UserService.report_notify(user, request, application,
424 report_groups=report_groups,
439 report_groups=report_groups,
425 occurence_dict=occurence_dict)
440 occurence_dict=occurence_dict)
426 for group in report_groups:
441 for group in report_groups:
427 # marks report_groups as notified
442 # marks report_groups as notified
428 if not group.notified:
443 if not group.notified:
429 group.notified = True
444 group.notified = True
430 except Exception as exc:
445 except Exception as exc:
431 print_traceback(log)
446 print_traceback(log)
432 raise
447 raise
433
448
434
449
435 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
450 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
436 def check_alerts(resource_id):
451 def check_alerts(resource_id):
437 since_when = datetime.utcnow()
452 since_when = datetime.utcnow()
438 try:
453 try:
439 request = get_current_request()
454 request = get_current_request()
440 application = ApplicationService.by_id(resource_id)
455 application = ApplicationService.by_id(resource_id)
441 if not application:
456 if not application:
442 return
457 return
443 error_key = REDIS_KEYS[
458 error_key = REDIS_KEYS[
444 'reports_to_notify_per_type_per_app_alerting'].format(
459 'reports_to_notify_per_type_per_app_alerting'].format(
445 ReportType.error, resource_id)
460 ReportType.error, resource_id)
446 slow_key = REDIS_KEYS[
461 slow_key = REDIS_KEYS[
447 'reports_to_notify_per_type_per_app_alerting'].format(
462 'reports_to_notify_per_type_per_app_alerting'].format(
448 ReportType.slow, resource_id)
463 ReportType.slow, resource_id)
449 error_group_ids = Datastores.redis.smembers(error_key)
464 error_group_ids = Datastores.redis.smembers(error_key)
450 slow_group_ids = Datastores.redis.smembers(slow_key)
465 slow_group_ids = Datastores.redis.smembers(slow_key)
451 Datastores.redis.delete(error_key)
466 Datastores.redis.delete(error_key)
452 Datastores.redis.delete(slow_key)
467 Datastores.redis.delete(slow_key)
453 err_gids = [int(g_id) for g_id in error_group_ids]
468 err_gids = [int(g_id) for g_id in error_group_ids]
454 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
469 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
455 group_ids = err_gids + slow_gids
470 group_ids = err_gids + slow_gids
456 occurence_dict = {}
471 occurence_dict = {}
457 for g_id in group_ids:
472 for g_id in group_ids:
458 key = REDIS_KEYS['counters'][
473 key = REDIS_KEYS['counters'][
459 'report_group_occurences_alerting'].format(
474 'report_group_occurences_alerting'].format(
460 g_id)
475 g_id)
461 val = Datastores.redis.get(key)
476 val = Datastores.redis.get(key)
462 Datastores.redis.delete(key)
477 Datastores.redis.delete(key)
463 if val:
478 if val:
464 occurence_dict[g_id] = int(val)
479 occurence_dict[g_id] = int(val)
465 else:
480 else:
466 occurence_dict[g_id] = 1
481 occurence_dict[g_id] = 1
467 report_groups = ReportGroupService.by_ids(group_ids)
482 report_groups = ReportGroupService.by_ids(group_ids)
468 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
483 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
469
484
470 ApplicationService.check_for_groups_alert(
485 ApplicationService.check_for_groups_alert(
471 application, 'alert', report_groups=report_groups,
486 application, 'alert', report_groups=report_groups,
472 occurence_dict=occurence_dict, since_when=since_when)
487 occurence_dict=occurence_dict, since_when=since_when)
473 except Exception as exc:
488 except Exception as exc:
474 print_traceback(log)
489 print_traceback(log)
475 raise
490 raise
476
491
477
492
478 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
493 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
479 def close_alerts():
494 def close_alerts():
480 log.warning('Checking alerts')
495 log.warning('Checking alerts')
481 since_when = datetime.utcnow()
496 since_when = datetime.utcnow()
482 try:
497 try:
483 event_types = [Event.types['error_report_alert'],
498 event_types = [Event.types['error_report_alert'],
484 Event.types['slow_report_alert'], ]
499 Event.types['slow_report_alert'], ]
485 statuses = [Event.statuses['active']]
500 statuses = [Event.statuses['active']]
486 # get events older than 5 min
501 # get events older than 5 min
487 events = EventService.by_type_and_status(
502 events = EventService.by_type_and_status(
488 event_types,
503 event_types,
489 statuses,
504 statuses,
490 older_than=(since_when - timedelta(minutes=5)))
505 older_than=(since_when - timedelta(minutes=5)))
491 for event in events:
506 for event in events:
492 # see if we can close them
507 # see if we can close them
493 event.validate_or_close(
508 event.validate_or_close(
494 since_when=(since_when - timedelta(minutes=1)))
509 since_when=(since_when - timedelta(minutes=1)))
495 except Exception as exc:
510 except Exception as exc:
496 print_traceback(log)
511 print_traceback(log)
497 raise
512 raise
498
513
499
514
500 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
515 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
501 def update_tag_counter(tag_name, tag_value, count):
516 def update_tag_counter(tag_name, tag_value, count):
502 try:
517 try:
503 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
518 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
504 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
519 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
505 sa.types.TEXT))
520 sa.types.TEXT))
506 query.update({'times_seen': Tag.times_seen + count,
521 query.update({'times_seen': Tag.times_seen + count,
507 'last_timestamp': datetime.utcnow()},
522 'last_timestamp': datetime.utcnow()},
508 synchronize_session=False)
523 synchronize_session=False)
509 session = DBSession()
524 session = DBSession()
510 mark_changed(session)
525 mark_changed(session)
511 return True
526 return True
512 except Exception as exc:
527 except Exception as exc:
513 print_traceback(log)
528 print_traceback(log)
514 update_tag_counter.retry(exc=exc)
529 update_tag_counter.retry(exc=exc)
515
530
516
531
517 @celery.task(queue="default")
532 @celery.task(queue="default")
518 def update_tag_counters():
533 def update_tag_counters():
519 """
534 """
520 Sets task to update counters for application tags
535 Sets task to update counters for application tags
521 """
536 """
522 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
537 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
523 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
538 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
524 c = collections.Counter(tags)
539 c = collections.Counter(tags)
525 for t_json, count in c.items():
540 for t_json, count in c.items():
526 tag_info = json.loads(t_json)
541 tag_info = json.loads(t_json)
527 update_tag_counter.delay(tag_info[0], tag_info[1], count)
542 update_tag_counter.delay(tag_info[0], tag_info[1], count)
528
543
529
544
530 @celery.task(queue="default")
545 @celery.task(queue="default")
531 def daily_digest():
546 def daily_digest():
532 """
547 """
533 Sends daily digest with top 50 error reports
548 Sends daily digest with top 50 error reports
534 """
549 """
535 request = get_current_request()
550 request = get_current_request()
536 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
551 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
537 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
552 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
538 since_when = datetime.utcnow() - timedelta(hours=8)
553 since_when = datetime.utcnow() - timedelta(hours=8)
539 log.warning('Generating daily digests')
554 log.warning('Generating daily digests')
540 for resource_id in apps:
555 for resource_id in apps:
541 resource_id = resource_id.decode('utf8')
556 resource_id = resource_id.decode('utf8')
542 end_date = datetime.utcnow().replace(microsecond=0, second=0)
557 end_date = datetime.utcnow().replace(microsecond=0, second=0)
543 filter_settings = {'resource': [resource_id],
558 filter_settings = {'resource': [resource_id],
544 'tags': [{'name': 'type',
559 'tags': [{'name': 'type',
545 'value': ['error'], 'op': None}],
560 'value': ['error'], 'op': None}],
546 'type': 'error', 'start_date': since_when,
561 'type': 'error', 'start_date': since_when,
547 'end_date': end_date}
562 'end_date': end_date}
548
563
549 reports = ReportGroupService.get_trending(
564 reports = ReportGroupService.get_trending(
550 request, filter_settings=filter_settings, limit=50)
565 request, filter_settings=filter_settings, limit=50)
551
566
552 application = ApplicationService.by_id(resource_id)
567 application = ApplicationService.by_id(resource_id)
553 if application:
568 if application:
554 users = set([p.user for p in application.users_for_perm('view')])
569 users = set([p.user for p in application.users_for_perm('view')])
555 for user in users:
570 for user in users:
556 user.send_digest(request, application, reports=reports,
571 user.send_digest(request, application, reports=reports,
557 since_when=since_when)
572 since_when=since_when)
558
573
559
574
560 @celery.task(queue="default")
575 @celery.task(queue="default")
561 def notifications_reports():
576 def notifications_reports():
562 """
577 """
563 Loop that checks redis for info and then issues new tasks to celery to
578 Loop that checks redis for info and then issues new tasks to celery to
564 issue notifications
579 issue notifications
565 """
580 """
566 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
581 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
567 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
582 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
568 for app in apps:
583 for app in apps:
569 log.warning('Notify for app: %s' % app)
584 log.warning('Notify for app: %s' % app)
570 check_user_report_notifications.delay(app.decode('utf8'))
585 check_user_report_notifications.delay(app.decode('utf8'))
571
586
572 @celery.task(queue="default")
587 @celery.task(queue="default")
573 def alerting_reports():
588 def alerting_reports():
574 """
589 """
575 Loop that checks redis for info and then issues new tasks to celery to
590 Loop that checks redis for info and then issues new tasks to celery to
576 perform the following:
591 perform the following:
577 - which applications should have new alerts opened
592 - which applications should have new alerts opened
578 """
593 """
579
594
580 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
595 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
581 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
596 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
582 for app in apps:
597 for app in apps:
583 log.warning('Notify for app: %s' % app)
598 log.warning('Notify for app: %s' % app)
584 check_alerts.delay(app.decode('utf8'))
599 check_alerts.delay(app.decode('utf8'))
585
600
586
601
587 @celery.task(queue="default", soft_time_limit=3600 * 4,
602 @celery.task(queue="default", soft_time_limit=3600 * 4,
588 hard_time_limit=3600 * 4, max_retries=144)
603 hard_time_limit=3600 * 4, max_retries=144)
589 def logs_cleanup(resource_id, filter_settings):
604 def logs_cleanup(resource_id, filter_settings):
590 request = get_current_request()
605 request = get_current_request()
591 request.tm.begin()
606 request.tm.begin()
592 es_query = {
607 es_query = {
593 "_source": False,
608 "_source": False,
594 "size": 5000,
609 "size": 5000,
595 "query": {
610 "query": {
596 "filtered": {
611 "filtered": {
597 "filter": {
612 "filter": {
598 "and": [{"term": {"resource_id": resource_id}}]
613 "and": [{"term": {"resource_id": resource_id}}]
599 }
614 }
600 }
615 }
601 }
616 }
602 }
617 }
603
618
604 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
619 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
605 if filter_settings['namespace']:
620 if filter_settings['namespace']:
606 query = query.filter(Log.namespace == filter_settings['namespace'][0])
621 query = query.filter(Log.namespace == filter_settings['namespace'][0])
607 es_query['query']['filtered']['filter']['and'].append(
622 es_query['query']['filtered']['filter']['and'].append(
608 {"term": {"namespace": filter_settings['namespace'][0]}}
623 {"term": {"namespace": filter_settings['namespace'][0]}}
609 )
624 )
610 query.delete(synchronize_session=False)
625 query.delete(synchronize_session=False)
611 request.tm.commit()
626 request.tm.commit()
612 result = request.es_conn.search(es_query, index='rcae_l_*',
627 result = request.es_conn.search(es_query, index='rcae_l_*',
613 doc_type='log', es_scroll='1m',
628 doc_type='log', es_scroll='1m',
614 es_search_type='scan')
629 es_search_type='scan')
615 scroll_id = result['_scroll_id']
630 scroll_id = result['_scroll_id']
616 while True:
631 while True:
617 log.warning('log_cleanup, app:{} ns:{} batch'.format(
632 log.warning('log_cleanup, app:{} ns:{} batch'.format(
618 resource_id,
633 resource_id,
619 filter_settings['namespace']
634 filter_settings['namespace']
620 ))
635 ))
621 es_docs_to_delete = []
636 es_docs_to_delete = []
622 result = request.es_conn.send_request(
637 result = request.es_conn.send_request(
623 'POST', ['_search', 'scroll'],
638 'POST', ['_search', 'scroll'],
624 body=scroll_id, query_params={"scroll": '1m'})
639 body=scroll_id, query_params={"scroll": '1m'})
625 scroll_id = result['_scroll_id']
640 scroll_id = result['_scroll_id']
626 if not result['hits']['hits']:
641 if not result['hits']['hits']:
627 break
642 break
628 for doc in result['hits']['hits']:
643 for doc in result['hits']['hits']:
629 es_docs_to_delete.append({"id": doc['_id'],
644 es_docs_to_delete.append({"id": doc['_id'],
630 "index": doc['_index']})
645 "index": doc['_index']})
631
646
632 for batch in in_batches(es_docs_to_delete, 10):
647 for batch in in_batches(es_docs_to_delete, 10):
633 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
648 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
634 **to_del)
649 **to_del)
635 for to_del in batch])
650 for to_del in batch])
@@ -1,83 +1,86 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # AppEnlight Enterprise Edition, including its added features, Support
18 # AppEnlight Enterprise Edition, including its added features, Support
19 # services, and proprietary license terms, please see
19 # services, and proprietary license terms, please see
20 # https://rhodecode.com/licenses/
20 # https://rhodecode.com/licenses/
21
21
22 import datetime
22 import datetime
23 import logging
23 import logging
24
24
25 from pyramid.httpexceptions import HTTPForbidden, HTTPTooManyRequests
25 from pyramid.httpexceptions import HTTPForbidden, HTTPTooManyRequests
26
26
27 from appenlight.models import Datastores
27 from appenlight.models import Datastores
28 from appenlight.models.services.config import ConfigService
28 from appenlight.models.services.config import ConfigService
29 from appenlight.lib.redis_keys import REDIS_KEYS
29 from appenlight.lib.redis_keys import REDIS_KEYS
30
30
31 log = logging.getLogger(__name__)
31 log = logging.getLogger(__name__)
32
32
33
33
34 def rate_limiting(request, resource, section, to_increment=1):
34 def rate_limiting(request, resource, section, to_increment=1):
35 tsample = datetime.datetime.utcnow().replace(second=0, microsecond=0)
35 tsample = datetime.datetime.utcnow().replace(second=0, microsecond=0)
36 key = REDIS_KEYS['rate_limits'][section].format(tsample,
36 key = REDIS_KEYS['rate_limits'][section].format(tsample,
37 resource.resource_id)
37 resource.resource_id)
38 current_count = Datastores.redis.incr(key, to_increment)
38 redis_pipeline = request.registry.redis_conn.pipeline()
39 Datastores.redis.expire(key, 3600 * 24)
39 redis_pipeline.incr(key, to_increment)
40 redis_pipeline.expire(key, 3600 * 24)
41 results = redis_pipeline.execute()
42 current_count = results[0]
40 config = ConfigService.by_key_and_section(section, 'global')
43 config = ConfigService.by_key_and_section(section, 'global')
41 limit = config.value if config else 1000
44 limit = config.value if config else 1000
42 if current_count > int(limit):
45 if current_count > int(limit):
43 log.info('RATE LIMITING: {}: {}, {}'.format(
46 log.info('RATE LIMITING: {}: {}, {}'.format(
44 section, resource, current_count))
47 section, resource, current_count))
45 abort_msg = 'Rate limits are in effect for this application'
48 abort_msg = 'Rate limits are in effect for this application'
46 raise HTTPTooManyRequests(abort_msg,
49 raise HTTPTooManyRequests(abort_msg,
47 headers={'X-AppEnlight': abort_msg})
50 headers={'X-AppEnlight': abort_msg})
48
51
49
52
50 def check_cors(request, application, should_return=True):
53 def check_cors(request, application, should_return=True):
51 """
54 """
52 Performs a check and validation if request comes from authorized domain for
55 Performs a check and validation if request comes from authorized domain for
53 application, otherwise return 403
56 application, otherwise return 403
54 """
57 """
55 origin_found = False
58 origin_found = False
56 origin = request.headers.get('Origin')
59 origin = request.headers.get('Origin')
57 if should_return:
60 if should_return:
58 log.info('CORS for %s' % origin)
61 log.info('CORS for %s' % origin)
59 if not origin:
62 if not origin:
60 return False
63 return False
61 for domain in application.domains.split('\n'):
64 for domain in application.domains.split('\n'):
62 if domain in origin:
65 if domain in origin:
63 origin_found = True
66 origin_found = True
64 if origin_found:
67 if origin_found:
65 request.response.headers.add('Access-Control-Allow-Origin', origin)
68 request.response.headers.add('Access-Control-Allow-Origin', origin)
66 request.response.headers.add('XDomainRequestAllowed', '1')
69 request.response.headers.add('XDomainRequestAllowed', '1')
67 request.response.headers.add('Access-Control-Allow-Methods',
70 request.response.headers.add('Access-Control-Allow-Methods',
68 'GET, POST, OPTIONS')
71 'GET, POST, OPTIONS')
69 request.response.headers.add('Access-Control-Allow-Headers',
72 request.response.headers.add('Access-Control-Allow-Headers',
70 'Accept-Encoding, Accept-Language, '
73 'Accept-Encoding, Accept-Language, '
71 'Content-Type, '
74 'Content-Type, '
72 'Depth, User-Agent, X-File-Size, '
75 'Depth, User-Agent, X-File-Size, '
73 'X-Requested-With, If-Modified-Since, '
76 'X-Requested-With, If-Modified-Since, '
74 'X-File-Name, '
77 'X-File-Name, '
75 'Cache-Control, Host, Pragma, Accept, '
78 'Cache-Control, Host, Pragma, Accept, '
76 'Origin, Connection, '
79 'Origin, Connection, '
77 'Referer, Cookie, '
80 'Referer, Cookie, '
78 'X-appenlight-public-api-key, '
81 'X-appenlight-public-api-key, '
79 'x-appenlight-public-api-key')
82 'x-appenlight-public-api-key')
80 request.response.headers.add('Access-Control-Max-Age', '86400')
83 request.response.headers.add('Access-Control-Max-Age', '86400')
81 return request.response
84 return request.response
82 else:
85 else:
83 return HTTPForbidden()
86 return HTTPForbidden()
@@ -1,67 +1,68 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # AppEnlight Enterprise Edition, including its added features, Support
18 # AppEnlight Enterprise Edition, including its added features, Support
19 # services, and proprietary license terms, please see
19 # services, and proprietary license terms, please see
20 # https://rhodecode.com/licenses/
20 # https://rhodecode.com/licenses/
21
21
22 BASE = 'appenlight:data:{}'
22 BASE = 'appenlight:data:{}'
23
23
24 REDIS_KEYS = {
24 REDIS_KEYS = {
25 'tasks': {
25 'tasks': {
26 'add_reports_lock': BASE.format('add_reports_lock:{}'),
26 'add_reports_lock': BASE.format('add_reports_lock:{}'),
27 'add_logs_lock': BASE.format('add_logs_lock:{}'),
27 'add_logs_lock': BASE.format('add_logs_lock:{}'),
28 },
28 },
29 'counters': {
29 'counters': {
30 'reports_per_minute': BASE.format('reports_per_minute:{}'),
30 'reports_per_minute': BASE.format('reports_per_minute:{}'),
31 'reports_per_minute_per_app': BASE.format(
31 'reports_per_hour_per_app': BASE.format(
32 'reports_per_minute_per_app:{}:{}'),
32 'reports_per_hour_per_app:{}:{}'),
33 'reports_per_type': BASE.format('reports_per_type:{}'),
33 'reports_per_type': BASE.format('reports_per_type:{}'),
34 'logs_per_minute': BASE.format('logs_per_minute:{}'),
34 'logs_per_minute': BASE.format('logs_per_minute:{}'),
35 'logs_per_minute_per_app': BASE.format(
35 'logs_per_hour_per_app': BASE.format(
36 'logs_per_minute_per_app:{}:{}'),
36 'logs_per_hour_per_app:{}:{}'),
37 'metrics_per_minute': BASE.format('metrics_per_minute:{}'),
37 'metrics_per_minute': BASE.format('metrics_per_minute:{}'),
38 'metrics_per_minute_per_app': BASE.format(
38 'metrics_per_hour_per_app': BASE.format(
39 'metrics_per_minute_per_app:{}:{}'),
39 'metrics_per_hour_per_app:{}:{}'),
40 'report_group_occurences': BASE.format('report_group_occurences:{}'),
40 'report_group_occurences': BASE.format('report_group_occurences:{}'),
41 'report_group_occurences_alerting': BASE.format(
41 'report_group_occurences_alerting': BASE.format(
42 'report_group_occurences_alerting:{}'),
42 'report_group_occurences_alerting:{}'),
43 'report_group_occurences_10th': BASE.format(
43 'report_group_occurences_10th': BASE.format(
44 'report_group_occurences_10th:{}'),
44 'report_group_occurences_10th:{}'),
45 'report_group_occurences_100th': BASE.format(
45 'report_group_occurences_100th': BASE.format(
46 'report_group_occurences_100th:{}'),
46 'report_group_occurences_100th:{}'),
47 },
47 },
48 'rate_limits': {
48 'rate_limits': {
49 'per_application_reports_rate_limit': BASE.format(
49 'per_application_reports_rate_limit': BASE.format(
50 'per_application_reports_limit:{}:{}'),
50 'per_application_reports_limit:{}:{}'),
51 'per_application_logs_rate_limit': BASE.format(
51 'per_application_logs_rate_limit': BASE.format(
52 'per_application_logs_rate_limit:{}:{}'),
52 'per_application_logs_rate_limit:{}:{}'),
53 'per_application_metrics_rate_limit': BASE.format(
53 'per_application_metrics_rate_limit': BASE.format(
54 'per_application_metrics_rate_limit:{}:{}'),
54 'per_application_metrics_rate_limit:{}:{}'),
55 },
55 },
56 'apps_that_got_new_data_per_hour': BASE.format('apps_that_got_new_data_per_hour'),
56 'apps_that_had_reports': BASE.format('apps_that_had_reports'),
57 'apps_that_had_reports': BASE.format('apps_that_had_reports'),
57 'apps_that_had_error_reports': BASE.format('apps_that_had_error_reports'),
58 'apps_that_had_error_reports': BASE.format('apps_that_had_error_reports'),
58 'apps_that_had_reports_alerting': BASE.format(
59 'apps_that_had_reports_alerting': BASE.format(
59 'apps_that_had_reports_alerting'),
60 'apps_that_had_reports_alerting'),
60 'apps_that_had_error_reports_alerting': BASE.format(
61 'apps_that_had_error_reports_alerting': BASE.format(
61 'apps_that_had_error_reports_alerting'),
62 'apps_that_had_error_reports_alerting'),
62 'reports_to_notify_per_type_per_app': BASE.format(
63 'reports_to_notify_per_type_per_app': BASE.format(
63 'reports_to_notify_per_type_per_app:{}:{}'),
64 'reports_to_notify_per_type_per_app:{}:{}'),
64 'reports_to_notify_per_type_per_app_alerting': BASE.format(
65 'reports_to_notify_per_type_per_app_alerting': BASE.format(
65 'reports_to_notify_per_type_per_app_alerting:{}:{}'),
66 'reports_to_notify_per_type_per_app_alerting:{}:{}'),
66 'seen_tag_list': BASE.format('seen_tag_list')
67 'seen_tag_list': BASE.format('seen_tag_list')
67 }
68 }
@@ -1,267 +1,268 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # AppEnlight Enterprise Edition, including its added features, Support
18 # AppEnlight Enterprise Edition, including its added features, Support
19 # services, and proprietary license terms, please see
19 # services, and proprietary license terms, please see
20 # https://rhodecode.com/licenses/
20 # https://rhodecode.com/licenses/
21
21
22 import logging
22 import logging
23 import sqlalchemy as sa
23 import sqlalchemy as sa
24
24
25 from datetime import datetime
25 from datetime import datetime
26
26
27 from pyramid.threadlocal import get_current_request
27 from pyramid.threadlocal import get_current_request
28 from sqlalchemy.dialects.postgresql import JSON
28 from sqlalchemy.dialects.postgresql import JSON
29 from ziggurat_foundations.models.base import BaseModel
29 from ziggurat_foundations.models.base import BaseModel
30
30
31 from appenlight.models import Base, get_db_session, Datastores
31 from appenlight.models import Base, get_db_session, Datastores
32 from appenlight.lib.enums import ReportType
32 from appenlight.lib.enums import ReportType
33 from appenlight.lib.rule import Rule
33 from appenlight.lib.rule import Rule
34 from appenlight.lib.redis_keys import REDIS_KEYS
34 from appenlight.lib.redis_keys import REDIS_KEYS
35 from appenlight.models.report import REPORT_TYPE_MATRIX
35 from appenlight.models.report import REPORT_TYPE_MATRIX
36
36
37 log = logging.getLogger(__name__)
37 log = logging.getLogger(__name__)
38
38
39
39
40 class ReportGroup(Base, BaseModel):
40 class ReportGroup(Base, BaseModel):
41 __tablename__ = 'reports_groups'
41 __tablename__ = 'reports_groups'
42 __table_args__ = {'implicit_returning': False}
42 __table_args__ = {'implicit_returning': False}
43
43
44 id = sa.Column(sa.BigInteger(), nullable=False, primary_key=True)
44 id = sa.Column(sa.BigInteger(), nullable=False, primary_key=True)
45 resource_id = sa.Column(sa.Integer(),
45 resource_id = sa.Column(sa.Integer(),
46 sa.ForeignKey('applications.resource_id',
46 sa.ForeignKey('applications.resource_id',
47 onupdate='CASCADE',
47 onupdate='CASCADE',
48 ondelete='CASCADE'),
48 ondelete='CASCADE'),
49 nullable=False,
49 nullable=False,
50 index=True)
50 index=True)
51 priority = sa.Column(sa.Integer, nullable=False, index=True, default=5,
51 priority = sa.Column(sa.Integer, nullable=False, index=True, default=5,
52 server_default='5')
52 server_default='5')
53 first_timestamp = sa.Column(sa.DateTime(), default=datetime.utcnow,
53 first_timestamp = sa.Column(sa.DateTime(), default=datetime.utcnow,
54 server_default=sa.func.now())
54 server_default=sa.func.now())
55 last_timestamp = sa.Column(sa.DateTime(), default=datetime.utcnow,
55 last_timestamp = sa.Column(sa.DateTime(), default=datetime.utcnow,
56 server_default=sa.func.now())
56 server_default=sa.func.now())
57 error = sa.Column(sa.UnicodeText(), index=True)
57 error = sa.Column(sa.UnicodeText(), index=True)
58 grouping_hash = sa.Column(sa.String(40), default='')
58 grouping_hash = sa.Column(sa.String(40), default='')
59 triggered_postprocesses_ids = sa.Column(JSON(), nullable=False,
59 triggered_postprocesses_ids = sa.Column(JSON(), nullable=False,
60 default=list)
60 default=list)
61 report_type = sa.Column(sa.Integer, default=1)
61 report_type = sa.Column(sa.Integer, default=1)
62 total_reports = sa.Column(sa.Integer, default=1)
62 total_reports = sa.Column(sa.Integer, default=1)
63 last_report = sa.Column(sa.Integer)
63 last_report = sa.Column(sa.Integer)
64 occurences = sa.Column(sa.Integer, default=1)
64 occurences = sa.Column(sa.Integer, default=1)
65 average_duration = sa.Column(sa.Float, default=0)
65 average_duration = sa.Column(sa.Float, default=0)
66 summed_duration = sa.Column(sa.Float, default=0)
66 summed_duration = sa.Column(sa.Float, default=0)
67 read = sa.Column(sa.Boolean(), index=True, default=False)
67 read = sa.Column(sa.Boolean(), index=True, default=False)
68 fixed = sa.Column(sa.Boolean(), index=True, default=False)
68 fixed = sa.Column(sa.Boolean(), index=True, default=False)
69 notified = sa.Column(sa.Boolean(), index=True, default=False)
69 notified = sa.Column(sa.Boolean(), index=True, default=False)
70 public = sa.Column(sa.Boolean(), index=True, default=False)
70 public = sa.Column(sa.Boolean(), index=True, default=False)
71
71
72 reports = sa.orm.relationship('Report',
72 reports = sa.orm.relationship('Report',
73 lazy='dynamic',
73 lazy='dynamic',
74 backref='report_group',
74 backref='report_group',
75 cascade="all, delete-orphan",
75 cascade="all, delete-orphan",
76 passive_deletes=True,
76 passive_deletes=True,
77 passive_updates=True, )
77 passive_updates=True, )
78
78
79 comments = sa.orm.relationship('ReportComment',
79 comments = sa.orm.relationship('ReportComment',
80 lazy='dynamic',
80 lazy='dynamic',
81 backref='report',
81 backref='report',
82 cascade="all, delete-orphan",
82 cascade="all, delete-orphan",
83 passive_deletes=True,
83 passive_deletes=True,
84 passive_updates=True,
84 passive_updates=True,
85 order_by="ReportComment.comment_id")
85 order_by="ReportComment.comment_id")
86
86
87 assigned_users = sa.orm.relationship('User',
87 assigned_users = sa.orm.relationship('User',
88 backref=sa.orm.backref(
88 backref=sa.orm.backref(
89 'assigned_reports_relation',
89 'assigned_reports_relation',
90 lazy='dynamic',
90 lazy='dynamic',
91 order_by=sa.desc(
91 order_by=sa.desc(
92 "reports_groups.id")
92 "reports_groups.id")
93 ),
93 ),
94 passive_deletes=True,
94 passive_deletes=True,
95 passive_updates=True,
95 passive_updates=True,
96 secondary='reports_assignments',
96 secondary='reports_assignments',
97 order_by="User.user_name")
97 order_by="User.user_name")
98
98
99 stats = sa.orm.relationship('ReportStat',
99 stats = sa.orm.relationship('ReportStat',
100 lazy='dynamic',
100 lazy='dynamic',
101 backref='report',
101 backref='report',
102 passive_deletes=True,
102 passive_deletes=True,
103 passive_updates=True, )
103 passive_updates=True, )
104
104
105 last_report_ref = sa.orm.relationship('Report',
105 last_report_ref = sa.orm.relationship('Report',
106 uselist=False,
106 uselist=False,
107 primaryjoin="ReportGroup.last_report "
107 primaryjoin="ReportGroup.last_report "
108 "== Report.id",
108 "== Report.id",
109 foreign_keys="Report.id",
109 foreign_keys="Report.id",
110 cascade="all, delete-orphan",
110 cascade="all, delete-orphan",
111 passive_deletes=True,
111 passive_deletes=True,
112 passive_updates=True, )
112 passive_updates=True, )
113
113
114 def __repr__(self):
114 def __repr__(self):
115 return '<ReportGroup id:{}>'.format(self.id)
115 return '<ReportGroup id:{}>'.format(self.id)
116
116
117 def get_report(self, report_id=None, public=False):
117 def get_report(self, report_id=None, public=False):
118 """
118 """
119 Gets report with specific id or latest report if id was not specified
119 Gets report with specific id or latest report if id was not specified
120 """
120 """
121 from .report import Report
121 from .report import Report
122
122
123 if not report_id:
123 if not report_id:
124 return self.last_report_ref
124 return self.last_report_ref
125 else:
125 else:
126 return self.reports.filter(Report.id == report_id).first()
126 return self.reports.filter(Report.id == report_id).first()
127
127
128 def get_public_url(self, request, _app_url=None):
128 def get_public_url(self, request, _app_url=None):
129 url = request.route_url('/', _app_url=_app_url)
129 url = request.route_url('/', _app_url=_app_url)
130 return (url + 'ui/report/%s') % self.id
130 return (url + 'ui/report/%s') % self.id
131
131
132 def run_postprocessing(self, report):
132 def run_postprocessing(self, report):
133 """
133 """
134 Alters report group priority based on postprocessing configuration
134 Alters report group priority based on postprocessing configuration
135 """
135 """
136 request = get_current_request()
136 request = get_current_request()
137 get_db_session(None, self).flush()
137 get_db_session(None, self).flush()
138 for action in self.application.postprocess_conf:
138 for action in self.application.postprocess_conf:
139 get_db_session(None, self).flush()
139 get_db_session(None, self).flush()
140 rule_obj = Rule(action.rule, REPORT_TYPE_MATRIX)
140 rule_obj = Rule(action.rule, REPORT_TYPE_MATRIX)
141 report_dict = report.get_dict(request)
141 report_dict = report.get_dict(request)
142 # if was not processed yet
142 # if was not processed yet
143 if (rule_obj.match(report_dict) and
143 if (rule_obj.match(report_dict) and
144 action.pkey not in self.triggered_postprocesses_ids):
144 action.pkey not in self.triggered_postprocesses_ids):
145 action.postprocess(self)
145 action.postprocess(self)
146 # this way sqla can track mutation of list
146 # this way sqla can track mutation of list
147 self.triggered_postprocesses_ids = \
147 self.triggered_postprocesses_ids = \
148 self.triggered_postprocesses_ids + [action.pkey]
148 self.triggered_postprocesses_ids + [action.pkey]
149
149
150 get_db_session(None, self).flush()
150 get_db_session(None, self).flush()
151 # do not go out of bounds
151 # do not go out of bounds
152 if self.priority < 1:
152 if self.priority < 1:
153 self.priority = 1
153 self.priority = 1
154 if self.priority > 10:
154 if self.priority > 10:
155 self.priority = 10
155 self.priority = 10
156
156
157 def get_dict(self, request):
157 def get_dict(self, request):
158 instance_dict = super(ReportGroup, self).get_dict()
158 instance_dict = super(ReportGroup, self).get_dict()
159 instance_dict['server_name'] = self.get_report().tags.get(
159 instance_dict['server_name'] = self.get_report().tags.get(
160 'server_name')
160 'server_name')
161 instance_dict['view_name'] = self.get_report().tags.get('view_name')
161 instance_dict['view_name'] = self.get_report().tags.get('view_name')
162 instance_dict['resource_name'] = self.application.resource_name
162 instance_dict['resource_name'] = self.application.resource_name
163 instance_dict['report_type'] = self.get_report().report_type
163 instance_dict['report_type'] = self.get_report().report_type
164 instance_dict['url_path'] = self.get_report().url_path
164 instance_dict['url_path'] = self.get_report().url_path
165 instance_dict['front_url'] = self.get_report().get_public_url(request)
165 instance_dict['front_url'] = self.get_report().get_public_url(request)
166 del instance_dict['triggered_postprocesses_ids']
166 del instance_dict['triggered_postprocesses_ids']
167 return instance_dict
167 return instance_dict
168
168
169 def es_doc(self):
169 def es_doc(self):
170 return {
170 return {
171 '_id': str(self.id),
171 '_id': str(self.id),
172 'pg_id': str(self.id),
172 'pg_id': str(self.id),
173 'resource_id': self.resource_id,
173 'resource_id': self.resource_id,
174 'error': self.error,
174 'error': self.error,
175 'fixed': self.fixed,
175 'fixed': self.fixed,
176 'public': self.public,
176 'public': self.public,
177 'read': self.read,
177 'read': self.read,
178 'priority': self.priority,
178 'priority': self.priority,
179 'occurences': self.occurences,
179 'occurences': self.occurences,
180 'average_duration': self.average_duration,
180 'average_duration': self.average_duration,
181 'summed_duration': self.summed_duration,
181 'summed_duration': self.summed_duration,
182 'first_timestamp': self.first_timestamp,
182 'first_timestamp': self.first_timestamp,
183 'last_timestamp': self.last_timestamp
183 'last_timestamp': self.last_timestamp
184 }
184 }
185
185
186 def set_notification_info(self, notify_10=False, notify_100=False):
186 def set_notification_info(self, notify_10=False, notify_100=False):
187 """
187 """
188 Update redis notification maps for notification job
188 Update redis notification maps for notification job
189 """
189 """
190 current_time = datetime.utcnow().replace(second=0, microsecond=0)
190 current_time = datetime.utcnow().replace(second=0, microsecond=0)
191 # global app counter
191 # global app counter
192 key = REDIS_KEYS['counters']['reports_per_type'].format(
192 key = REDIS_KEYS['counters']['reports_per_type'].format(
193 self.report_type, current_time)
193 self.report_type, current_time)
194 Datastores.redis.incr(key)
194 redis_pipeline = Datastores.redis.pipeline()
195 Datastores.redis.expire(key, 3600 * 24)
195 redis_pipeline.incr(key)
196 redis_pipeline.expire(key, 3600 * 24)
196 # detailed app notification for alerts and notifications
197 # detailed app notification for alerts and notifications
197 Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports'],
198 redis_pipeline.sadd(
198 self.resource_id)
199 REDIS_KEYS['apps_that_had_reports'], self.resource_id)
199 Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports_alerting'],
200 redis_pipeline.sadd(
200 self.resource_id)
201 REDIS_KEYS['apps_that_had_reports_alerting'], self.resource_id)
201 # only notify for exceptions here
202 # only notify for exceptions here
202 if self.report_type == ReportType.error:
203 if self.report_type == ReportType.error:
203 Datastores.redis.sadd(
204 redis_pipeline.sadd(
204 REDIS_KEYS['apps_that_had_reports'],
205 REDIS_KEYS['apps_that_had_reports'], self.resource_id)
205 self.resource_id)
206 redis_pipeline.sadd(
206 Datastores.redis.sadd(
207 REDIS_KEYS['apps_that_had_error_reports_alerting'],
207 REDIS_KEYS['apps_that_had_error_reports_alerting'],
208 self.resource_id)
208 self.resource_id)
209 key = REDIS_KEYS['counters']['report_group_occurences'].format(self.id)
209 key = REDIS_KEYS['counters']['report_group_occurences'].format(self.id)
210 Datastores.redis.incr(key)
210 redis_pipeline.incr(key)
211 Datastores.redis.expire(key, 3600 * 24)
211 redis_pipeline.expire(key, 3600 * 24)
212 key = REDIS_KEYS['counters']['report_group_occurences_alerting'].format(self.id)
212 key = REDIS_KEYS['counters']['report_group_occurences_alerting'].format(self.id)
213 Datastores.redis.incr(key)
213 redis_pipeline.incr(key)
214 Datastores.redis.expire(key, 3600 * 24)
214 redis_pipeline.expire(key, 3600 * 24)
215
215
216 if notify_10:
216 if notify_10:
217 key = REDIS_KEYS['counters'][
217 key = REDIS_KEYS['counters'][
218 'report_group_occurences_10th'].format(self.id)
218 'report_group_occurences_10th'].format(self.id)
219 Datastores.redis.setex(key, 3600 * 24, 1)
219 redis_pipeline.setex(key, 3600 * 24, 1)
220 if notify_100:
220 if notify_100:
221 key = REDIS_KEYS['counters'][
221 key = REDIS_KEYS['counters'][
222 'report_group_occurences_100th'].format(self.id)
222 'report_group_occurences_100th'].format(self.id)
223 Datastores.redis.setex(key, 3600 * 24, 1)
223 redis_pipeline.setex(key, 3600 * 24, 1)
224
224
225 key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
225 key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
226 self.report_type, self.resource_id)
226 self.report_type, self.resource_id)
227 Datastores.redis.sadd(key, self.id)
227 redis_pipeline.sadd(key, self.id)
228 Datastores.redis.expire(key, 3600 * 24)
228 redis_pipeline.expire(key, 3600 * 24)
229 key = REDIS_KEYS['reports_to_notify_per_type_per_app_alerting'].format(
229 key = REDIS_KEYS['reports_to_notify_per_type_per_app_alerting'].format(
230 self.report_type, self.resource_id)
230 self.report_type, self.resource_id)
231 Datastores.redis.sadd(key, self.id)
231 redis_pipeline.sadd(key, self.id)
232 Datastores.redis.expire(key, 3600 * 24)
232 redis_pipeline.expire(key, 3600 * 24)
233 redis_pipeline.execute()
233
234
234 @property
235 @property
235 def partition_id(self):
236 def partition_id(self):
236 return 'rcae_r_%s' % self.first_timestamp.strftime('%Y_%m')
237 return 'rcae_r_%s' % self.first_timestamp.strftime('%Y_%m')
237
238
238
239
239 def after_insert(mapper, connection, target):
240 def after_insert(mapper, connection, target):
240 if not hasattr(target, '_skip_ft_index'):
241 if not hasattr(target, '_skip_ft_index'):
241 data = target.es_doc()
242 data = target.es_doc()
242 data.pop('_id', None)
243 data.pop('_id', None)
243 Datastores.es.index(target.partition_id, 'report_group',
244 Datastores.es.index(target.partition_id, 'report_group',
244 data, id=target.id)
245 data, id=target.id)
245
246
246
247
247 def after_update(mapper, connection, target):
248 def after_update(mapper, connection, target):
248 if not hasattr(target, '_skip_ft_index'):
249 if not hasattr(target, '_skip_ft_index'):
249 data = target.es_doc()
250 data = target.es_doc()
250 data.pop('_id', None)
251 data.pop('_id', None)
251 Datastores.es.index(target.partition_id, 'report_group',
252 Datastores.es.index(target.partition_id, 'report_group',
252 data, id=target.id)
253 data, id=target.id)
253
254
254
255
255 def after_delete(mapper, connection, target):
256 def after_delete(mapper, connection, target):
256 query = {'term': {'group_id': target.id}}
257 query = {'term': {'group_id': target.id}}
257 # TODO: routing seems unnecessary, need to test a bit more
258 # TODO: routing seems unnecessary, need to test a bit more
258 #Datastores.es.delete_by_query(target.partition_id, 'report', query,
259 #Datastores.es.delete_by_query(target.partition_id, 'report', query,
259 # query_params={'routing':str(target.id)})
260 # query_params={'routing':str(target.id)})
260 Datastores.es.delete_by_query(target.partition_id, 'report', query)
261 Datastores.es.delete_by_query(target.partition_id, 'report', query)
261 query = {'term': {'pg_id': target.id}}
262 query = {'term': {'pg_id': target.id}}
262 Datastores.es.delete_by_query(target.partition_id, 'report_group', query)
263 Datastores.es.delete_by_query(target.partition_id, 'report_group', query)
263
264
264
265
265 sa.event.listen(ReportGroup, 'after_insert', after_insert)
266 sa.event.listen(ReportGroup, 'after_insert', after_insert)
266 sa.event.listen(ReportGroup, 'after_update', after_update)
267 sa.event.listen(ReportGroup, 'after_update', after_update)
267 sa.event.listen(ReportGroup, 'after_delete', after_delete)
268 sa.event.listen(ReportGroup, 'after_delete', after_delete)
@@ -1,200 +1,199 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # AppEnlight Enterprise Edition, including its added features, Support
18 # AppEnlight Enterprise Edition, including its added features, Support
19 # services, and proprietary license terms, please see
19 # services, and proprietary license terms, please see
20 # https://rhodecode.com/licenses/
20 # https://rhodecode.com/licenses/
21
21
22 import logging
22 import logging
23 import os
23 import os
24 import pkg_resources
24 import pkg_resources
25
25
26 from datetime import datetime, timedelta
26 from datetime import datetime, timedelta
27
27
28 import psutil
28 import psutil
29 import redis
29 import redis
30
30
31 from pyramid.view import view_config
31 from pyramid.view import view_config
32 from appenlight.models import DBSession
32 from appenlight.models import DBSession
33 from appenlight.models import Datastores
33 from appenlight.models import Datastores
34 from appenlight.lib.redis_keys import REDIS_KEYS
34 from appenlight.lib.redis_keys import REDIS_KEYS
35
35
36
36
37 def bytes2human(total):
37 def bytes2human(total):
38 giga = 1024.0 ** 3
38 giga = 1024.0 ** 3
39 mega = 1024.0 ** 2
39 mega = 1024.0 ** 2
40 kilo = 1024.0
40 kilo = 1024.0
41 if giga <= total:
41 if giga <= total:
42 return '{:0.1f}G'.format(total / giga)
42 return '{:0.1f}G'.format(total / giga)
43 elif mega <= total:
43 elif mega <= total:
44 return '{:0.1f}M'.format(total / mega)
44 return '{:0.1f}M'.format(total / mega)
45 else:
45 else:
46 return '{:0.1f}K'.format(total / kilo)
46 return '{:0.1f}K'.format(total / kilo)
47
47
48
48
49 log = logging.getLogger(__name__)
49 log = logging.getLogger(__name__)
50
50
51
51
52 @view_config(route_name='section_view',
52 @view_config(route_name='section_view',
53 match_param=['section=admin_section', 'view=system'],
53 match_param=['section=admin_section', 'view=system'],
54 renderer='json', permission='root_administration')
54 renderer='json', permission='root_administration')
55 def system(request):
55 def system(request):
56 current_time = datetime.utcnow(). \
56 current_time = datetime.utcnow(). \
57 replace(second=0, microsecond=0) - timedelta(minutes=1)
57 replace(second=0, microsecond=0) - timedelta(minutes=1)
58 # global app counter
58 # global app counter
59
59 processed_reports = request.registry.redis_conn.get(
60 processed_reports = Datastores.redis.get(
61 REDIS_KEYS['counters']['reports_per_minute'].format(current_time))
60 REDIS_KEYS['counters']['reports_per_minute'].format(current_time))
62 processed_reports = int(processed_reports) if processed_reports else 0
61 processed_reports = int(processed_reports) if processed_reports else 0
63 processed_logs = Datastores.redis.get(
62 processed_logs = request.registry.redis_conn.get(
64 REDIS_KEYS['counters']['logs_per_minute'].format(current_time))
63 REDIS_KEYS['counters']['logs_per_minute'].format(current_time))
65 processed_logs = int(processed_logs) if processed_logs else 0
64 processed_logs = int(processed_logs) if processed_logs else 0
66 processed_metrics = Datastores.redis.get(
65 processed_metrics = request.registry.redis_conn.get(
67 REDIS_KEYS['counters']['metrics_per_minute'].format(current_time))
66 REDIS_KEYS['counters']['metrics_per_minute'].format(current_time))
68 processed_metrics = int(processed_metrics) if processed_metrics else 0
67 processed_metrics = int(processed_metrics) if processed_metrics else 0
69
68
70 waiting_reports = 0
69 waiting_reports = 0
71 waiting_logs = 0
70 waiting_logs = 0
72 waiting_metrics = 0
71 waiting_metrics = 0
73 waiting_other = 0
72 waiting_other = 0
74
73
75 if 'redis' in request.registry.settings['celery.broker_type']:
74 if 'redis' in request.registry.settings['celery.broker_type']:
76 redis_client = redis.StrictRedis.from_url(
75 redis_client = redis.StrictRedis.from_url(
77 request.registry.settings['celery.broker_url'])
76 request.registry.settings['celery.broker_url'])
78 waiting_reports = redis_client.llen('reports')
77 waiting_reports = redis_client.llen('reports')
79 waiting_logs = redis_client.llen('logs')
78 waiting_logs = redis_client.llen('logs')
80 waiting_metrics = redis_client.llen('metrics')
79 waiting_metrics = redis_client.llen('metrics')
81 waiting_other = redis_client.llen('default')
80 waiting_other = redis_client.llen('default')
82
81
83 # process
82 # process
84 def replace_inf(val):
83 def replace_inf(val):
85 return val if val != psutil.RLIM_INFINITY else 'unlimited'
84 return val if val != psutil.RLIM_INFINITY else 'unlimited'
86
85
87 p = psutil.Process()
86 p = psutil.Process()
88 fd = p.rlimit(psutil.RLIMIT_NOFILE)
87 fd = p.rlimit(psutil.RLIMIT_NOFILE)
89 memlock = p.rlimit(psutil.RLIMIT_MEMLOCK)
88 memlock = p.rlimit(psutil.RLIMIT_MEMLOCK)
90 self_info = {
89 self_info = {
91 'fds': {'soft': replace_inf(fd[0]),
90 'fds': {'soft': replace_inf(fd[0]),
92 'hard': replace_inf(fd[1])},
91 'hard': replace_inf(fd[1])},
93 'memlock': {'soft': replace_inf(memlock[0]),
92 'memlock': {'soft': replace_inf(memlock[0]),
94 'hard': replace_inf(memlock[1])},
93 'hard': replace_inf(memlock[1])},
95 }
94 }
96
95
97 # disks
96 # disks
98 disks = []
97 disks = []
99 for part in psutil.disk_partitions(all=False):
98 for part in psutil.disk_partitions(all=False):
100 if os.name == 'nt':
99 if os.name == 'nt':
101 if 'cdrom' in part.opts or part.fstype == '':
100 if 'cdrom' in part.opts or part.fstype == '':
102 continue
101 continue
103 usage = psutil.disk_usage(part.mountpoint)
102 usage = psutil.disk_usage(part.mountpoint)
104 disks.append({
103 disks.append({
105 'device': part.device,
104 'device': part.device,
106 'total': bytes2human(usage.total),
105 'total': bytes2human(usage.total),
107 'used': bytes2human(usage.used),
106 'used': bytes2human(usage.used),
108 'free': bytes2human(usage.free),
107 'free': bytes2human(usage.free),
109 'percentage': int(usage.percent),
108 'percentage': int(usage.percent),
110 'mountpoint': part.mountpoint,
109 'mountpoint': part.mountpoint,
111 'fstype': part.fstype
110 'fstype': part.fstype
112 })
111 })
113
112
114 # memory
113 # memory
115 memory_v = psutil.virtual_memory()
114 memory_v = psutil.virtual_memory()
116 memory_s = psutil.swap_memory()
115 memory_s = psutil.swap_memory()
117
116
118 memory = {
117 memory = {
119 'total': bytes2human(memory_v.total),
118 'total': bytes2human(memory_v.total),
120 'available': bytes2human(memory_v.available),
119 'available': bytes2human(memory_v.available),
121 'percentage': memory_v.percent,
120 'percentage': memory_v.percent,
122 'used': bytes2human(memory_v.used),
121 'used': bytes2human(memory_v.used),
123 'free': bytes2human(memory_v.free),
122 'free': bytes2human(memory_v.free),
124 'active': bytes2human(memory_v.active),
123 'active': bytes2human(memory_v.active),
125 'inactive': bytes2human(memory_v.inactive),
124 'inactive': bytes2human(memory_v.inactive),
126 'buffers': bytes2human(memory_v.buffers),
125 'buffers': bytes2human(memory_v.buffers),
127 'cached': bytes2human(memory_v.cached),
126 'cached': bytes2human(memory_v.cached),
128 'swap_total': bytes2human(memory_s.total),
127 'swap_total': bytes2human(memory_s.total),
129 'swap_used': bytes2human(memory_s.used)
128 'swap_used': bytes2human(memory_s.used)
130 }
129 }
131
130
132 # load
131 # load
133 system_load = os.getloadavg()
132 system_load = os.getloadavg()
134
133
135 # processes
134 # processes
136 min_mem = 1024 * 1024 * 40 # 40MB
135 min_mem = 1024 * 1024 * 40 # 40MB
137 process_info = []
136 process_info = []
138 for p in psutil.process_iter():
137 for p in psutil.process_iter():
139 mem_used = p.get_memory_info().rss
138 mem_used = p.get_memory_info().rss
140 if mem_used < min_mem:
139 if mem_used < min_mem:
141 continue
140 continue
142 process_info.append({'owner': p.username(),
141 process_info.append({'owner': p.username(),
143 'pid': p.pid,
142 'pid': p.pid,
144 'cpu': round(p.get_cpu_percent(interval=0), 1),
143 'cpu': round(p.get_cpu_percent(interval=0), 1),
145 'mem_percentage': round(p.get_memory_percent(),
144 'mem_percentage': round(p.get_memory_percent(),
146 1),
145 1),
147 'mem_usage': bytes2human(mem_used),
146 'mem_usage': bytes2human(mem_used),
148 'name': p.name(),
147 'name': p.name(),
149 'command': ' '.join(p.cmdline())
148 'command': ' '.join(p.cmdline())
150 })
149 })
151 process_info = sorted(process_info, key=lambda x: x['mem_percentage'],
150 process_info = sorted(process_info, key=lambda x: x['mem_percentage'],
152 reverse=True)
151 reverse=True)
153
152
154 # pg tables
153 # pg tables
155
154
156 db_size_query = '''
155 db_size_query = '''
157 SELECT tablename, pg_total_relation_size(tablename::text) size
156 SELECT tablename, pg_total_relation_size(tablename::text) size
158 FROM pg_tables WHERE tablename NOT LIKE 'pg_%' AND
157 FROM pg_tables WHERE tablename NOT LIKE 'pg_%' AND
159 tablename NOT LIKE 'sql_%' ORDER BY size DESC;'''
158 tablename NOT LIKE 'sql_%' ORDER BY size DESC;'''
160
159
161 db_tables = []
160 db_tables = []
162 for row in DBSession.execute(db_size_query):
161 for row in DBSession.execute(db_size_query):
163 db_tables.append({"size_human": bytes2human(row.size),
162 db_tables.append({"size_human": bytes2human(row.size),
164 "table_name": row.tablename})
163 "table_name": row.tablename})
165
164
166 # es indices
165 # es indices
167 es_indices = []
166 es_indices = []
168 result = Datastores.es.send_request(
167 result = Datastores.es.send_request(
169 'GET', ['_stats', 'store, docs'], query_params={})
168 'GET', ['_stats', 'store, docs'], query_params={})
170 for ix, stats in result['indices'].items():
169 for ix, stats in result['indices'].items():
171 size = stats['primaries']['store']['size_in_bytes']
170 size = stats['primaries']['store']['size_in_bytes']
172 es_indices.append({'name': ix,
171 es_indices.append({'name': ix,
173 'size': size,
172 'size': size,
174 'size_human': bytes2human(size)})
173 'size_human': bytes2human(size)})
175
174
176 # packages
175 # packages
177
176
178 packages = ({'name': p.project_name, 'version': p.version}
177 packages = ({'name': p.project_name, 'version': p.version}
179 for p in pkg_resources.working_set)
178 for p in pkg_resources.working_set)
180
179
181 return {'db_tables': db_tables,
180 return {'db_tables': db_tables,
182 'es_indices': sorted(es_indices,
181 'es_indices': sorted(es_indices,
183 key=lambda x: x['size'], reverse=True),
182 key=lambda x: x['size'], reverse=True),
184 'process_info': process_info,
183 'process_info': process_info,
185 'system_load': system_load,
184 'system_load': system_load,
186 'disks': disks,
185 'disks': disks,
187 'memory': memory,
186 'memory': memory,
188 'packages': sorted(packages, key=lambda x: x['name'].lower()),
187 'packages': sorted(packages, key=lambda x: x['name'].lower()),
189 'current_time': current_time,
188 'current_time': current_time,
190 'queue_stats': {
189 'queue_stats': {
191 'processed_reports': processed_reports,
190 'processed_reports': processed_reports,
192 'processed_logs': processed_logs,
191 'processed_logs': processed_logs,
193 'processed_metrics': processed_metrics,
192 'processed_metrics': processed_metrics,
194 'waiting_reports': waiting_reports,
193 'waiting_reports': waiting_reports,
195 'waiting_logs': waiting_logs,
194 'waiting_logs': waiting_logs,
196 'waiting_metrics': waiting_metrics,
195 'waiting_metrics': waiting_metrics,
197 'waiting_other': waiting_other
196 'waiting_other': waiting_other
198 },
197 },
199 'self_info': self_info
198 'self_info': self_info
200 }
199 }
General Comments 0
You need to be logged in to leave comments. Login now