##// END OF EJS Templates
tasks: include hours in stat keys
ergo -
Show More
@@ -1,650 +1,650 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # AppEnlight Enterprise Edition, including its added features, Support
18 # AppEnlight Enterprise Edition, including its added features, Support
19 # services, and proprietary license terms, please see
19 # services, and proprietary license terms, please see
20 # https://rhodecode.com/licenses/
20 # https://rhodecode.com/licenses/
21
21
22 import bisect
22 import bisect
23 import collections
23 import collections
24 import math
24 import math
25 from datetime import datetime, timedelta
25 from datetime import datetime, timedelta
26
26
27 import sqlalchemy as sa
27 import sqlalchemy as sa
28 import pyelasticsearch
28 import pyelasticsearch
29
29
30 from celery.utils.log import get_task_logger
30 from celery.utils.log import get_task_logger
31 from zope.sqlalchemy import mark_changed
31 from zope.sqlalchemy import mark_changed
32 from pyramid.threadlocal import get_current_request, get_current_registry
32 from pyramid.threadlocal import get_current_request, get_current_registry
33 from appenlight.celery import celery
33 from appenlight.celery import celery
34 from appenlight.models.report_group import ReportGroup
34 from appenlight.models.report_group import ReportGroup
35 from appenlight.models import DBSession, Datastores
35 from appenlight.models import DBSession, Datastores
36 from appenlight.models.report import Report
36 from appenlight.models.report import Report
37 from appenlight.models.log import Log
37 from appenlight.models.log import Log
38 from appenlight.models.metric import Metric
38 from appenlight.models.metric import Metric
39 from appenlight.models.event import Event
39 from appenlight.models.event import Event
40
40
41 from appenlight.models.services.application import ApplicationService
41 from appenlight.models.services.application import ApplicationService
42 from appenlight.models.services.event import EventService
42 from appenlight.models.services.event import EventService
43 from appenlight.models.services.log import LogService
43 from appenlight.models.services.log import LogService
44 from appenlight.models.services.report import ReportService
44 from appenlight.models.services.report import ReportService
45 from appenlight.models.services.report_group import ReportGroupService
45 from appenlight.models.services.report_group import ReportGroupService
46 from appenlight.models.services.user import UserService
46 from appenlight.models.services.user import UserService
47 from appenlight.models.tag import Tag
47 from appenlight.models.tag import Tag
48 from appenlight.lib import print_traceback
48 from appenlight.lib import print_traceback
49 from appenlight.lib.utils import parse_proto, in_batches
49 from appenlight.lib.utils import parse_proto, in_batches
50 from appenlight.lib.ext_json import json
50 from appenlight.lib.ext_json import json
51 from appenlight.lib.redis_keys import REDIS_KEYS
51 from appenlight.lib.redis_keys import REDIS_KEYS
52 from appenlight.lib.enums import ReportType
52 from appenlight.lib.enums import ReportType
53
53
54 log = get_task_logger(__name__)
54 log = get_task_logger(__name__)
55
55
56 sample_boundries = list(range(100, 1000, 100)) + \
56 sample_boundries = list(range(100, 1000, 100)) + \
57 list(range(1000, 10000, 1000)) + \
57 list(range(1000, 10000, 1000)) + \
58 list(range(10000, 100000, 5000))
58 list(range(10000, 100000, 5000))
59
59
60
60
61 def pick_sample(total_occurences, report_type=None):
61 def pick_sample(total_occurences, report_type=None):
62 every = 1.0
62 every = 1.0
63 position = bisect.bisect_left(sample_boundries, total_occurences)
63 position = bisect.bisect_left(sample_boundries, total_occurences)
64 if position > 0:
64 if position > 0:
65 if report_type == ReportType.not_found:
65 if report_type == ReportType.not_found:
66 divide = 10.0
66 divide = 10.0
67 else:
67 else:
68 divide = 100.0
68 divide = 100.0
69 every = sample_boundries[position - 1] / divide
69 every = sample_boundries[position - 1] / divide
70 return total_occurences % every == 0
70 return total_occurences % every == 0
71
71
72
72
73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
74 def test_exception_task():
74 def test_exception_task():
75 log.error('test celery log', extra={'location': 'celery'})
75 log.error('test celery log', extra={'location': 'celery'})
76 log.warning('test celery log', extra={'location': 'celery'})
76 log.warning('test celery log', extra={'location': 'celery'})
77 raise Exception('Celery exception test')
77 raise Exception('Celery exception test')
78
78
79
79
80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
81 def test_retry_exception_task():
81 def test_retry_exception_task():
82 try:
82 try:
83 import time
83 import time
84
84
85 time.sleep(1.3)
85 time.sleep(1.3)
86 log.error('test retry celery log', extra={'location': 'celery'})
86 log.error('test retry celery log', extra={'location': 'celery'})
87 log.warning('test retry celery log', extra={'location': 'celery'})
87 log.warning('test retry celery log', extra={'location': 'celery'})
88 raise Exception('Celery exception test')
88 raise Exception('Celery exception test')
89 except Exception as exc:
89 except Exception as exc:
90 test_retry_exception_task.retry(exc=exc)
90 test_retry_exception_task.retry(exc=exc)
91
91
92
92
93 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
93 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
94 def add_reports(resource_id, request_params, dataset, **kwargs):
94 def add_reports(resource_id, request_params, dataset, **kwargs):
95 proto_version = parse_proto(request_params.get('protocol_version', ''))
95 proto_version = parse_proto(request_params.get('protocol_version', ''))
96 current_time = datetime.utcnow().replace(second=0, microsecond=0)
96 current_time = datetime.utcnow().replace(second=0, microsecond=0)
97 try:
97 try:
98 # we will store solr docs here for single insert
98 # we will store solr docs here for single insert
99 es_report_docs = {}
99 es_report_docs = {}
100 es_report_group_docs = {}
100 es_report_group_docs = {}
101 resource = ApplicationService.by_id(resource_id)
101 resource = ApplicationService.by_id(resource_id)
102
102
103 tags = []
103 tags = []
104 es_slow_calls_docs = {}
104 es_slow_calls_docs = {}
105 es_reports_stats_rows = {}
105 es_reports_stats_rows = {}
106 for report_data in dataset:
106 for report_data in dataset:
107 # build report details for later
107 # build report details for later
108 added_details = 0
108 added_details = 0
109 report = Report()
109 report = Report()
110 report.set_data(report_data, resource, proto_version)
110 report.set_data(report_data, resource, proto_version)
111 report._skip_ft_index = True
111 report._skip_ft_index = True
112
112
113 report_group = ReportGroupService.by_hash_and_resource(
113 report_group = ReportGroupService.by_hash_and_resource(
114 report.resource_id,
114 report.resource_id,
115 report.grouping_hash
115 report.grouping_hash
116 )
116 )
117 occurences = report_data.get('occurences', 1)
117 occurences = report_data.get('occurences', 1)
118 if not report_group:
118 if not report_group:
119 # total reports will be +1 moment later
119 # total reports will be +1 moment later
120 report_group = ReportGroup(grouping_hash=report.grouping_hash,
120 report_group = ReportGroup(grouping_hash=report.grouping_hash,
121 occurences=0, total_reports=0,
121 occurences=0, total_reports=0,
122 last_report=0,
122 last_report=0,
123 priority=report.priority,
123 priority=report.priority,
124 error=report.error,
124 error=report.error,
125 first_timestamp=report.start_time)
125 first_timestamp=report.start_time)
126 report_group._skip_ft_index = True
126 report_group._skip_ft_index = True
127 report_group.report_type = report.report_type
127 report_group.report_type = report.report_type
128 report.report_group_time = report_group.first_timestamp
128 report.report_group_time = report_group.first_timestamp
129 add_sample = pick_sample(report_group.occurences,
129 add_sample = pick_sample(report_group.occurences,
130 report_type=report_group.report_type)
130 report_type=report_group.report_type)
131 if add_sample:
131 if add_sample:
132 resource.report_groups.append(report_group)
132 resource.report_groups.append(report_group)
133 report_group.reports.append(report)
133 report_group.reports.append(report)
134 added_details += 1
134 added_details += 1
135 DBSession.flush()
135 DBSession.flush()
136 if report.partition_id not in es_report_docs:
136 if report.partition_id not in es_report_docs:
137 es_report_docs[report.partition_id] = []
137 es_report_docs[report.partition_id] = []
138 es_report_docs[report.partition_id].append(report.es_doc())
138 es_report_docs[report.partition_id].append(report.es_doc())
139 tags.extend(list(report.tags.items()))
139 tags.extend(list(report.tags.items()))
140 slow_calls = report.add_slow_calls(report_data, report_group)
140 slow_calls = report.add_slow_calls(report_data, report_group)
141 DBSession.flush()
141 DBSession.flush()
142 for s_call in slow_calls:
142 for s_call in slow_calls:
143 if s_call.partition_id not in es_slow_calls_docs:
143 if s_call.partition_id not in es_slow_calls_docs:
144 es_slow_calls_docs[s_call.partition_id] = []
144 es_slow_calls_docs[s_call.partition_id] = []
145 es_slow_calls_docs[s_call.partition_id].append(
145 es_slow_calls_docs[s_call.partition_id].append(
146 s_call.es_doc())
146 s_call.es_doc())
147 # try generating new stat rows if needed
147 # try generating new stat rows if needed
148 else:
148 else:
149 # required for postprocessing to not fail later
149 # required for postprocessing to not fail later
150 report.report_group = report_group
150 report.report_group = report_group
151
151
152 stat_row = ReportService.generate_stat_rows(
152 stat_row = ReportService.generate_stat_rows(
153 report, resource, report_group)
153 report, resource, report_group)
154 if stat_row.partition_id not in es_reports_stats_rows:
154 if stat_row.partition_id not in es_reports_stats_rows:
155 es_reports_stats_rows[stat_row.partition_id] = []
155 es_reports_stats_rows[stat_row.partition_id] = []
156 es_reports_stats_rows[stat_row.partition_id].append(
156 es_reports_stats_rows[stat_row.partition_id].append(
157 stat_row.es_doc())
157 stat_row.es_doc())
158
158
159 # see if we should mark 10th occurence of report
159 # see if we should mark 10th occurence of report
160 last_occurences_10 = int(math.floor(report_group.occurences / 10))
160 last_occurences_10 = int(math.floor(report_group.occurences / 10))
161 curr_occurences_10 = int(math.floor(
161 curr_occurences_10 = int(math.floor(
162 (report_group.occurences + report.occurences) / 10))
162 (report_group.occurences + report.occurences) / 10))
163 last_occurences_100 = int(
163 last_occurences_100 = int(
164 math.floor(report_group.occurences / 100))
164 math.floor(report_group.occurences / 100))
165 curr_occurences_100 = int(math.floor(
165 curr_occurences_100 = int(math.floor(
166 (report_group.occurences + report.occurences) / 100))
166 (report_group.occurences + report.occurences) / 100))
167 notify_occurences_10 = last_occurences_10 != curr_occurences_10
167 notify_occurences_10 = last_occurences_10 != curr_occurences_10
168 notify_occurences_100 = last_occurences_100 != curr_occurences_100
168 notify_occurences_100 = last_occurences_100 != curr_occurences_100
169 report_group.occurences = ReportGroup.occurences + occurences
169 report_group.occurences = ReportGroup.occurences + occurences
170 report_group.last_timestamp = report.start_time
170 report_group.last_timestamp = report.start_time
171 report_group.summed_duration = ReportGroup.summed_duration + report.duration
171 report_group.summed_duration = ReportGroup.summed_duration + report.duration
172 summed_duration = ReportGroup.summed_duration + report.duration
172 summed_duration = ReportGroup.summed_duration + report.duration
173 summed_occurences = ReportGroup.occurences + occurences
173 summed_occurences = ReportGroup.occurences + occurences
174 report_group.average_duration = summed_duration / summed_occurences
174 report_group.average_duration = summed_duration / summed_occurences
175 report_group.run_postprocessing(report)
175 report_group.run_postprocessing(report)
176 if added_details:
176 if added_details:
177 report_group.total_reports = ReportGroup.total_reports + 1
177 report_group.total_reports = ReportGroup.total_reports + 1
178 report_group.last_report = report.id
178 report_group.last_report = report.id
179 report_group.set_notification_info(notify_10=notify_occurences_10,
179 report_group.set_notification_info(notify_10=notify_occurences_10,
180 notify_100=notify_occurences_100)
180 notify_100=notify_occurences_100)
181 DBSession.flush()
181 DBSession.flush()
182 report_group.get_report().notify_channel(report_group)
182 report_group.get_report().notify_channel(report_group)
183 if report_group.partition_id not in es_report_group_docs:
183 if report_group.partition_id not in es_report_group_docs:
184 es_report_group_docs[report_group.partition_id] = []
184 es_report_group_docs[report_group.partition_id] = []
185 es_report_group_docs[report_group.partition_id].append(
185 es_report_group_docs[report_group.partition_id].append(
186 report_group.es_doc())
186 report_group.es_doc())
187
187
188 action = 'REPORT'
188 action = 'REPORT'
189 log_msg = '%s: %s %s, client: %s, proto: %s' % (
189 log_msg = '%s: %s %s, client: %s, proto: %s' % (
190 action,
190 action,
191 report_data.get('http_status', 'unknown'),
191 report_data.get('http_status', 'unknown'),
192 str(resource),
192 str(resource),
193 report_data.get('client'),
193 report_data.get('client'),
194 proto_version)
194 proto_version)
195 log.info(log_msg)
195 log.info(log_msg)
196 total_reports = len(dataset)
196 total_reports = len(dataset)
197 redis_pipeline = Datastores.redis.pipeline(transaction=False)
197 redis_pipeline = Datastores.redis.pipeline(transaction=False)
198 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
198 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
199 redis_pipeline.incr(key, total_reports)
199 redis_pipeline.incr(key, total_reports)
200 redis_pipeline.expire(key, 3600 * 24)
200 redis_pipeline.expire(key, 3600 * 24)
201 key = REDIS_KEYS['counters']['reports_per_hour_per_app'].format(
201 key = REDIS_KEYS['counters']['reports_per_hour_per_app'].format(
202 resource_id, current_time.replace(minute=0))
202 resource_id, current_time.replace(minute=0))
203 redis_pipeline.incr(key, total_reports)
203 redis_pipeline.incr(key, total_reports)
204 redis_pipeline.expire(key, 3600 * 24 * 7)
204 redis_pipeline.expire(key, 3600 * 24 * 7)
205 redis_pipeline.sadd(
205 redis_pipeline.sadd(
206 REDIS_KEYS['apps_that_got_new_data_per_hour'],
206 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
207 resource_id, current_time.replace(minute=0))
207 current_time.replace(minute=0)), resource_id)
208 redis_pipeline.execute()
208 redis_pipeline.execute()
209
209
210 add_reports_es(es_report_group_docs, es_report_docs)
210 add_reports_es(es_report_group_docs, es_report_docs)
211 add_reports_slow_calls_es(es_slow_calls_docs)
211 add_reports_slow_calls_es(es_slow_calls_docs)
212 add_reports_stats_rows_es(es_reports_stats_rows)
212 add_reports_stats_rows_es(es_reports_stats_rows)
213 return True
213 return True
214 except Exception as exc:
214 except Exception as exc:
215 print_traceback(log)
215 print_traceback(log)
216 add_reports.retry(exc=exc)
216 add_reports.retry(exc=exc)
217
217
218
218
219 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
219 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
220 def add_reports_es(report_group_docs, report_docs):
220 def add_reports_es(report_group_docs, report_docs):
221 for k, v in report_group_docs.items():
221 for k, v in report_group_docs.items():
222 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
222 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
223 for k, v in report_docs.items():
223 for k, v in report_docs.items():
224 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
224 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
225 parent_field='_parent')
225 parent_field='_parent')
226
226
227
227
228 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
228 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
229 def add_reports_slow_calls_es(es_docs):
229 def add_reports_slow_calls_es(es_docs):
230 for k, v in es_docs.items():
230 for k, v in es_docs.items():
231 Datastores.es.bulk_index(k, 'log', v)
231 Datastores.es.bulk_index(k, 'log', v)
232
232
233
233
234 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
234 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
235 def add_reports_stats_rows_es(es_docs):
235 def add_reports_stats_rows_es(es_docs):
236 for k, v in es_docs.items():
236 for k, v in es_docs.items():
237 Datastores.es.bulk_index(k, 'log', v)
237 Datastores.es.bulk_index(k, 'log', v)
238
238
239
239
240 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
240 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
241 def add_logs(resource_id, request_params, dataset, **kwargs):
241 def add_logs(resource_id, request_params, dataset, **kwargs):
242 proto_version = request_params.get('protocol_version')
242 proto_version = request_params.get('protocol_version')
243 current_time = datetime.utcnow().replace(second=0, microsecond=0)
243 current_time = datetime.utcnow().replace(second=0, microsecond=0)
244
244
245 try:
245 try:
246 es_docs = collections.defaultdict(list)
246 es_docs = collections.defaultdict(list)
247 application = ApplicationService.by_id(resource_id)
247 application = ApplicationService.by_id(resource_id)
248 ns_pairs = []
248 ns_pairs = []
249 for entry in dataset:
249 for entry in dataset:
250 # gather pk and ns so we can remove older versions of row later
250 # gather pk and ns so we can remove older versions of row later
251 if entry['primary_key'] is not None:
251 if entry['primary_key'] is not None:
252 ns_pairs.append({"pk": entry['primary_key'],
252 ns_pairs.append({"pk": entry['primary_key'],
253 "ns": entry['namespace']})
253 "ns": entry['namespace']})
254 log_entry = Log()
254 log_entry = Log()
255 log_entry.set_data(entry, resource=application)
255 log_entry.set_data(entry, resource=application)
256 log_entry._skip_ft_index = True
256 log_entry._skip_ft_index = True
257 application.logs.append(log_entry)
257 application.logs.append(log_entry)
258 DBSession.flush()
258 DBSession.flush()
259 # insert non pk rows first
259 # insert non pk rows first
260 if entry['primary_key'] is None:
260 if entry['primary_key'] is None:
261 es_docs[log_entry.partition_id].append(log_entry.es_doc())
261 es_docs[log_entry.partition_id].append(log_entry.es_doc())
262
262
263 # 2nd pass to delete all log entries from db foe same pk/ns pair
263 # 2nd pass to delete all log entries from db foe same pk/ns pair
264 if ns_pairs:
264 if ns_pairs:
265 ids_to_delete = []
265 ids_to_delete = []
266 es_docs = collections.defaultdict(list)
266 es_docs = collections.defaultdict(list)
267 es_docs_to_delete = collections.defaultdict(list)
267 es_docs_to_delete = collections.defaultdict(list)
268 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
268 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
269 list_of_pairs=ns_pairs)
269 list_of_pairs=ns_pairs)
270 log_dict = {}
270 log_dict = {}
271 for log_entry in found_pkey_logs:
271 for log_entry in found_pkey_logs:
272 log_key = (log_entry.primary_key, log_entry.namespace)
272 log_key = (log_entry.primary_key, log_entry.namespace)
273 if log_key not in log_dict:
273 if log_key not in log_dict:
274 log_dict[log_key] = []
274 log_dict[log_key] = []
275 log_dict[log_key].append(log_entry)
275 log_dict[log_key].append(log_entry)
276
276
277 for ns, entry_list in log_dict.items():
277 for ns, entry_list in log_dict.items():
278 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
278 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
279 # newest row needs to be indexed in es
279 # newest row needs to be indexed in es
280 log_entry = entry_list[-1]
280 log_entry = entry_list[-1]
281 # delete everything from pg and ES, leave the last row in pg
281 # delete everything from pg and ES, leave the last row in pg
282 for e in entry_list[:-1]:
282 for e in entry_list[:-1]:
283 ids_to_delete.append(e.log_id)
283 ids_to_delete.append(e.log_id)
284 es_docs_to_delete[e.partition_id].append(e.delete_hash)
284 es_docs_to_delete[e.partition_id].append(e.delete_hash)
285
285
286 es_docs_to_delete[log_entry.partition_id].append(
286 es_docs_to_delete[log_entry.partition_id].append(
287 log_entry.delete_hash)
287 log_entry.delete_hash)
288
288
289 es_docs[log_entry.partition_id].append(log_entry.es_doc())
289 es_docs[log_entry.partition_id].append(log_entry.es_doc())
290
290
291 if ids_to_delete:
291 if ids_to_delete:
292 query = DBSession.query(Log).filter(
292 query = DBSession.query(Log).filter(
293 Log.log_id.in_(ids_to_delete))
293 Log.log_id.in_(ids_to_delete))
294 query.delete(synchronize_session=False)
294 query.delete(synchronize_session=False)
295 if es_docs_to_delete:
295 if es_docs_to_delete:
296 # batch this to avoid problems with default ES bulk limits
296 # batch this to avoid problems with default ES bulk limits
297 for es_index in es_docs_to_delete.keys():
297 for es_index in es_docs_to_delete.keys():
298 for batch in in_batches(es_docs_to_delete[es_index], 20):
298 for batch in in_batches(es_docs_to_delete[es_index], 20):
299 query = {'terms': {'delete_hash': batch}}
299 query = {'terms': {'delete_hash': batch}}
300
300
301 try:
301 try:
302 Datastores.es.delete_by_query(
302 Datastores.es.delete_by_query(
303 es_index, 'log', query)
303 es_index, 'log', query)
304 except pyelasticsearch.ElasticHttpNotFoundError as exc:
304 except pyelasticsearch.ElasticHttpNotFoundError as exc:
305 msg = 'skipping index {}'.format(es_index)
305 msg = 'skipping index {}'.format(es_index)
306 log.info(msg)
306 log.info(msg)
307
307
308 total_logs = len(dataset)
308 total_logs = len(dataset)
309
309
310 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
310 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
311 str(application),
311 str(application),
312 total_logs,
312 total_logs,
313 proto_version)
313 proto_version)
314 log.info(log_msg)
314 log.info(log_msg)
315 # mark_changed(session)
315 # mark_changed(session)
316 redis_pipeline = Datastores.redis.pipeline(transaction=False)
316 redis_pipeline = Datastores.redis.pipeline(transaction=False)
317 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
317 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
318 redis_pipeline.incr(key, total_logs)
318 redis_pipeline.incr(key, total_logs)
319 redis_pipeline.expire(key, 3600 * 24)
319 redis_pipeline.expire(key, 3600 * 24)
320 key = REDIS_KEYS['counters']['logs_per_hour_per_app'].format(
320 key = REDIS_KEYS['counters']['logs_per_hour_per_app'].format(
321 resource_id, current_time.replace(minute=0))
321 resource_id, current_time.replace(minute=0))
322 redis_pipeline.incr(key, total_logs)
322 redis_pipeline.incr(key, total_logs)
323 redis_pipeline.expire(key, 3600 * 24 * 7)
323 redis_pipeline.expire(key, 3600 * 24 * 7)
324 redis_pipeline.sadd(
324 redis_pipeline.sadd(
325 REDIS_KEYS['apps_that_got_new_data_per_hour'],
325 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
326 resource_id, current_time.replace(minute=0))
326 current_time.replace(minute=0)), resource_id)
327 redis_pipeline.execute()
327 redis_pipeline.execute()
328 add_logs_es(es_docs)
328 add_logs_es(es_docs)
329 return True
329 return True
330 except Exception as exc:
330 except Exception as exc:
331 print_traceback(log)
331 print_traceback(log)
332 add_logs.retry(exc=exc)
332 add_logs.retry(exc=exc)
333
333
334
334
335 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
335 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
336 def add_logs_es(es_docs):
336 def add_logs_es(es_docs):
337 for k, v in es_docs.items():
337 for k, v in es_docs.items():
338 Datastores.es.bulk_index(k, 'log', v)
338 Datastores.es.bulk_index(k, 'log', v)
339
339
340
340
341 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
341 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
342 def add_metrics(resource_id, request_params, dataset, proto_version):
342 def add_metrics(resource_id, request_params, dataset, proto_version):
343 current_time = datetime.utcnow().replace(second=0, microsecond=0)
343 current_time = datetime.utcnow().replace(second=0, microsecond=0)
344 try:
344 try:
345 application = ApplicationService.by_id_cached()(resource_id)
345 application = ApplicationService.by_id_cached()(resource_id)
346 application = DBSession.merge(application, load=False)
346 application = DBSession.merge(application, load=False)
347 es_docs = []
347 es_docs = []
348 rows = []
348 rows = []
349 for metric in dataset:
349 for metric in dataset:
350 tags = dict(metric['tags'])
350 tags = dict(metric['tags'])
351 server_n = tags.get('server_name', metric['server_name']).lower()
351 server_n = tags.get('server_name', metric['server_name']).lower()
352 tags['server_name'] = server_n or 'unknown'
352 tags['server_name'] = server_n or 'unknown'
353 new_metric = Metric(
353 new_metric = Metric(
354 timestamp=metric['timestamp'],
354 timestamp=metric['timestamp'],
355 resource_id=application.resource_id,
355 resource_id=application.resource_id,
356 namespace=metric['namespace'],
356 namespace=metric['namespace'],
357 tags=tags)
357 tags=tags)
358 rows.append(new_metric)
358 rows.append(new_metric)
359 es_docs.append(new_metric.es_doc())
359 es_docs.append(new_metric.es_doc())
360 session = DBSession()
360 session = DBSession()
361 session.bulk_save_objects(rows)
361 session.bulk_save_objects(rows)
362 session.flush()
362 session.flush()
363
363
364 action = 'METRICS'
364 action = 'METRICS'
365 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
365 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
366 action,
366 action,
367 str(application),
367 str(application),
368 len(dataset),
368 len(dataset),
369 proto_version
369 proto_version
370 )
370 )
371 log.info(metrics_msg)
371 log.info(metrics_msg)
372
372
373 mark_changed(session)
373 mark_changed(session)
374 redis_pipeline = Datastores.redis.pipeline(transaction=False)
374 redis_pipeline = Datastores.redis.pipeline(transaction=False)
375 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
375 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
376 redis_pipeline.incr(key, len(rows))
376 redis_pipeline.incr(key, len(rows))
377 redis_pipeline.expire(key, 3600 * 24)
377 redis_pipeline.expire(key, 3600 * 24)
378 key = REDIS_KEYS['counters']['metrics_per_hour_per_app'].format(
378 key = REDIS_KEYS['counters']['metrics_per_hour_per_app'].format(
379 resource_id, current_time.replace(minute=0))
379 resource_id, current_time.replace(minute=0))
380 redis_pipeline.incr(key, len(rows))
380 redis_pipeline.incr(key, len(rows))
381 redis_pipeline.expire(key, 3600 * 24 * 7)
381 redis_pipeline.expire(key, 3600 * 24 * 7)
382 redis_pipeline.sadd(
382 redis_pipeline.sadd(
383 REDIS_KEYS['apps_that_got_new_data_per_hour'],
383 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
384 resource_id, current_time.replace(minute=0))
384 current_time.replace(minute=0)), resource_id)
385 redis_pipeline.execute()
385 redis_pipeline.execute()
386 add_metrics_es(es_docs)
386 add_metrics_es(es_docs)
387 return True
387 return True
388 except Exception as exc:
388 except Exception as exc:
389 print_traceback(log)
389 print_traceback(log)
390 add_metrics.retry(exc=exc)
390 add_metrics.retry(exc=exc)
391
391
392
392
393 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
393 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
394 def add_metrics_es(es_docs):
394 def add_metrics_es(es_docs):
395 for doc in es_docs:
395 for doc in es_docs:
396 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
396 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
397 Datastores.es.index(partition, 'log', doc)
397 Datastores.es.index(partition, 'log', doc)
398
398
399
399
400 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
400 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
401 def check_user_report_notifications(resource_id):
401 def check_user_report_notifications(resource_id):
402 since_when = datetime.utcnow()
402 since_when = datetime.utcnow()
403 try:
403 try:
404 request = get_current_request()
404 request = get_current_request()
405 application = ApplicationService.by_id(resource_id)
405 application = ApplicationService.by_id(resource_id)
406 if not application:
406 if not application:
407 return
407 return
408 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
408 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
409 ReportType.error, resource_id)
409 ReportType.error, resource_id)
410 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
410 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
411 ReportType.slow, resource_id)
411 ReportType.slow, resource_id)
412 error_group_ids = Datastores.redis.smembers(error_key)
412 error_group_ids = Datastores.redis.smembers(error_key)
413 slow_group_ids = Datastores.redis.smembers(slow_key)
413 slow_group_ids = Datastores.redis.smembers(slow_key)
414 Datastores.redis.delete(error_key)
414 Datastores.redis.delete(error_key)
415 Datastores.redis.delete(slow_key)
415 Datastores.redis.delete(slow_key)
416 err_gids = [int(g_id) for g_id in error_group_ids]
416 err_gids = [int(g_id) for g_id in error_group_ids]
417 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
417 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
418 group_ids = err_gids + slow_gids
418 group_ids = err_gids + slow_gids
419 occurence_dict = {}
419 occurence_dict = {}
420 for g_id in group_ids:
420 for g_id in group_ids:
421 key = REDIS_KEYS['counters']['report_group_occurences'].format(
421 key = REDIS_KEYS['counters']['report_group_occurences'].format(
422 g_id)
422 g_id)
423 val = Datastores.redis.get(key)
423 val = Datastores.redis.get(key)
424 Datastores.redis.delete(key)
424 Datastores.redis.delete(key)
425 if val:
425 if val:
426 occurence_dict[g_id] = int(val)
426 occurence_dict[g_id] = int(val)
427 else:
427 else:
428 occurence_dict[g_id] = 1
428 occurence_dict[g_id] = 1
429 report_groups = ReportGroupService.by_ids(group_ids)
429 report_groups = ReportGroupService.by_ids(group_ids)
430 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
430 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
431
431
432 ApplicationService.check_for_groups_alert(
432 ApplicationService.check_for_groups_alert(
433 application, 'alert', report_groups=report_groups,
433 application, 'alert', report_groups=report_groups,
434 occurence_dict=occurence_dict)
434 occurence_dict=occurence_dict)
435 users = set([p.user for p in application.users_for_perm('view')])
435 users = set([p.user for p in application.users_for_perm('view')])
436 report_groups = report_groups.all()
436 report_groups = report_groups.all()
437 for user in users:
437 for user in users:
438 UserService.report_notify(user, request, application,
438 UserService.report_notify(user, request, application,
439 report_groups=report_groups,
439 report_groups=report_groups,
440 occurence_dict=occurence_dict)
440 occurence_dict=occurence_dict)
441 for group in report_groups:
441 for group in report_groups:
442 # marks report_groups as notified
442 # marks report_groups as notified
443 if not group.notified:
443 if not group.notified:
444 group.notified = True
444 group.notified = True
445 except Exception as exc:
445 except Exception as exc:
446 print_traceback(log)
446 print_traceback(log)
447 raise
447 raise
448
448
449
449
450 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
450 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
451 def check_alerts(resource_id):
451 def check_alerts(resource_id):
452 since_when = datetime.utcnow()
452 since_when = datetime.utcnow()
453 try:
453 try:
454 request = get_current_request()
454 request = get_current_request()
455 application = ApplicationService.by_id(resource_id)
455 application = ApplicationService.by_id(resource_id)
456 if not application:
456 if not application:
457 return
457 return
458 error_key = REDIS_KEYS[
458 error_key = REDIS_KEYS[
459 'reports_to_notify_per_type_per_app_alerting'].format(
459 'reports_to_notify_per_type_per_app_alerting'].format(
460 ReportType.error, resource_id)
460 ReportType.error, resource_id)
461 slow_key = REDIS_KEYS[
461 slow_key = REDIS_KEYS[
462 'reports_to_notify_per_type_per_app_alerting'].format(
462 'reports_to_notify_per_type_per_app_alerting'].format(
463 ReportType.slow, resource_id)
463 ReportType.slow, resource_id)
464 error_group_ids = Datastores.redis.smembers(error_key)
464 error_group_ids = Datastores.redis.smembers(error_key)
465 slow_group_ids = Datastores.redis.smembers(slow_key)
465 slow_group_ids = Datastores.redis.smembers(slow_key)
466 Datastores.redis.delete(error_key)
466 Datastores.redis.delete(error_key)
467 Datastores.redis.delete(slow_key)
467 Datastores.redis.delete(slow_key)
468 err_gids = [int(g_id) for g_id in error_group_ids]
468 err_gids = [int(g_id) for g_id in error_group_ids]
469 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
469 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
470 group_ids = err_gids + slow_gids
470 group_ids = err_gids + slow_gids
471 occurence_dict = {}
471 occurence_dict = {}
472 for g_id in group_ids:
472 for g_id in group_ids:
473 key = REDIS_KEYS['counters'][
473 key = REDIS_KEYS['counters'][
474 'report_group_occurences_alerting'].format(
474 'report_group_occurences_alerting'].format(
475 g_id)
475 g_id)
476 val = Datastores.redis.get(key)
476 val = Datastores.redis.get(key)
477 Datastores.redis.delete(key)
477 Datastores.redis.delete(key)
478 if val:
478 if val:
479 occurence_dict[g_id] = int(val)
479 occurence_dict[g_id] = int(val)
480 else:
480 else:
481 occurence_dict[g_id] = 1
481 occurence_dict[g_id] = 1
482 report_groups = ReportGroupService.by_ids(group_ids)
482 report_groups = ReportGroupService.by_ids(group_ids)
483 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
483 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
484
484
485 ApplicationService.check_for_groups_alert(
485 ApplicationService.check_for_groups_alert(
486 application, 'alert', report_groups=report_groups,
486 application, 'alert', report_groups=report_groups,
487 occurence_dict=occurence_dict, since_when=since_when)
487 occurence_dict=occurence_dict, since_when=since_when)
488 except Exception as exc:
488 except Exception as exc:
489 print_traceback(log)
489 print_traceback(log)
490 raise
490 raise
491
491
492
492
493 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
493 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
494 def close_alerts():
494 def close_alerts():
495 log.warning('Checking alerts')
495 log.warning('Checking alerts')
496 since_when = datetime.utcnow()
496 since_when = datetime.utcnow()
497 try:
497 try:
498 event_types = [Event.types['error_report_alert'],
498 event_types = [Event.types['error_report_alert'],
499 Event.types['slow_report_alert'], ]
499 Event.types['slow_report_alert'], ]
500 statuses = [Event.statuses['active']]
500 statuses = [Event.statuses['active']]
501 # get events older than 5 min
501 # get events older than 5 min
502 events = EventService.by_type_and_status(
502 events = EventService.by_type_and_status(
503 event_types,
503 event_types,
504 statuses,
504 statuses,
505 older_than=(since_when - timedelta(minutes=5)))
505 older_than=(since_when - timedelta(minutes=5)))
506 for event in events:
506 for event in events:
507 # see if we can close them
507 # see if we can close them
508 event.validate_or_close(
508 event.validate_or_close(
509 since_when=(since_when - timedelta(minutes=1)))
509 since_when=(since_when - timedelta(minutes=1)))
510 except Exception as exc:
510 except Exception as exc:
511 print_traceback(log)
511 print_traceback(log)
512 raise
512 raise
513
513
514
514
515 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
515 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
516 def update_tag_counter(tag_name, tag_value, count):
516 def update_tag_counter(tag_name, tag_value, count):
517 try:
517 try:
518 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
518 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
519 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
519 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
520 sa.types.TEXT))
520 sa.types.TEXT))
521 query.update({'times_seen': Tag.times_seen + count,
521 query.update({'times_seen': Tag.times_seen + count,
522 'last_timestamp': datetime.utcnow()},
522 'last_timestamp': datetime.utcnow()},
523 synchronize_session=False)
523 synchronize_session=False)
524 session = DBSession()
524 session = DBSession()
525 mark_changed(session)
525 mark_changed(session)
526 return True
526 return True
527 except Exception as exc:
527 except Exception as exc:
528 print_traceback(log)
528 print_traceback(log)
529 update_tag_counter.retry(exc=exc)
529 update_tag_counter.retry(exc=exc)
530
530
531
531
532 @celery.task(queue="default")
532 @celery.task(queue="default")
533 def update_tag_counters():
533 def update_tag_counters():
534 """
534 """
535 Sets task to update counters for application tags
535 Sets task to update counters for application tags
536 """
536 """
537 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
537 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
538 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
538 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
539 c = collections.Counter(tags)
539 c = collections.Counter(tags)
540 for t_json, count in c.items():
540 for t_json, count in c.items():
541 tag_info = json.loads(t_json)
541 tag_info = json.loads(t_json)
542 update_tag_counter.delay(tag_info[0], tag_info[1], count)
542 update_tag_counter.delay(tag_info[0], tag_info[1], count)
543
543
544
544
545 @celery.task(queue="default")
545 @celery.task(queue="default")
546 def daily_digest():
546 def daily_digest():
547 """
547 """
548 Sends daily digest with top 50 error reports
548 Sends daily digest with top 50 error reports
549 """
549 """
550 request = get_current_request()
550 request = get_current_request()
551 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
551 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
552 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
552 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
553 since_when = datetime.utcnow() - timedelta(hours=8)
553 since_when = datetime.utcnow() - timedelta(hours=8)
554 log.warning('Generating daily digests')
554 log.warning('Generating daily digests')
555 for resource_id in apps:
555 for resource_id in apps:
556 resource_id = resource_id.decode('utf8')
556 resource_id = resource_id.decode('utf8')
557 end_date = datetime.utcnow().replace(microsecond=0, second=0)
557 end_date = datetime.utcnow().replace(microsecond=0, second=0)
558 filter_settings = {'resource': [resource_id],
558 filter_settings = {'resource': [resource_id],
559 'tags': [{'name': 'type',
559 'tags': [{'name': 'type',
560 'value': ['error'], 'op': None}],
560 'value': ['error'], 'op': None}],
561 'type': 'error', 'start_date': since_when,
561 'type': 'error', 'start_date': since_when,
562 'end_date': end_date}
562 'end_date': end_date}
563
563
564 reports = ReportGroupService.get_trending(
564 reports = ReportGroupService.get_trending(
565 request, filter_settings=filter_settings, limit=50)
565 request, filter_settings=filter_settings, limit=50)
566
566
567 application = ApplicationService.by_id(resource_id)
567 application = ApplicationService.by_id(resource_id)
568 if application:
568 if application:
569 users = set([p.user for p in application.users_for_perm('view')])
569 users = set([p.user for p in application.users_for_perm('view')])
570 for user in users:
570 for user in users:
571 user.send_digest(request, application, reports=reports,
571 user.send_digest(request, application, reports=reports,
572 since_when=since_when)
572 since_when=since_when)
573
573
574
574
575 @celery.task(queue="default")
575 @celery.task(queue="default")
576 def notifications_reports():
576 def notifications_reports():
577 """
577 """
578 Loop that checks redis for info and then issues new tasks to celery to
578 Loop that checks redis for info and then issues new tasks to celery to
579 issue notifications
579 issue notifications
580 """
580 """
581 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
581 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
582 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
582 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
583 for app in apps:
583 for app in apps:
584 log.warning('Notify for app: %s' % app)
584 log.warning('Notify for app: %s' % app)
585 check_user_report_notifications.delay(app.decode('utf8'))
585 check_user_report_notifications.delay(app.decode('utf8'))
586
586
587 @celery.task(queue="default")
587 @celery.task(queue="default")
588 def alerting_reports():
588 def alerting_reports():
589 """
589 """
590 Loop that checks redis for info and then issues new tasks to celery to
590 Loop that checks redis for info and then issues new tasks to celery to
591 perform the following:
591 perform the following:
592 - which applications should have new alerts opened
592 - which applications should have new alerts opened
593 """
593 """
594
594
595 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
595 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
596 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
596 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
597 for app in apps:
597 for app in apps:
598 log.warning('Notify for app: %s' % app)
598 log.warning('Notify for app: %s' % app)
599 check_alerts.delay(app.decode('utf8'))
599 check_alerts.delay(app.decode('utf8'))
600
600
601
601
602 @celery.task(queue="default", soft_time_limit=3600 * 4,
602 @celery.task(queue="default", soft_time_limit=3600 * 4,
603 hard_time_limit=3600 * 4, max_retries=144)
603 hard_time_limit=3600 * 4, max_retries=144)
604 def logs_cleanup(resource_id, filter_settings):
604 def logs_cleanup(resource_id, filter_settings):
605 request = get_current_request()
605 request = get_current_request()
606 request.tm.begin()
606 request.tm.begin()
607 es_query = {
607 es_query = {
608 "_source": False,
608 "_source": False,
609 "size": 5000,
609 "size": 5000,
610 "query": {
610 "query": {
611 "filtered": {
611 "filtered": {
612 "filter": {
612 "filter": {
613 "and": [{"term": {"resource_id": resource_id}}]
613 "and": [{"term": {"resource_id": resource_id}}]
614 }
614 }
615 }
615 }
616 }
616 }
617 }
617 }
618
618
619 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
619 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
620 if filter_settings['namespace']:
620 if filter_settings['namespace']:
621 query = query.filter(Log.namespace == filter_settings['namespace'][0])
621 query = query.filter(Log.namespace == filter_settings['namespace'][0])
622 es_query['query']['filtered']['filter']['and'].append(
622 es_query['query']['filtered']['filter']['and'].append(
623 {"term": {"namespace": filter_settings['namespace'][0]}}
623 {"term": {"namespace": filter_settings['namespace'][0]}}
624 )
624 )
625 query.delete(synchronize_session=False)
625 query.delete(synchronize_session=False)
626 request.tm.commit()
626 request.tm.commit()
627 result = request.es_conn.search(es_query, index='rcae_l_*',
627 result = request.es_conn.search(es_query, index='rcae_l_*',
628 doc_type='log', es_scroll='1m',
628 doc_type='log', es_scroll='1m',
629 es_search_type='scan')
629 es_search_type='scan')
630 scroll_id = result['_scroll_id']
630 scroll_id = result['_scroll_id']
631 while True:
631 while True:
632 log.warning('log_cleanup, app:{} ns:{} batch'.format(
632 log.warning('log_cleanup, app:{} ns:{} batch'.format(
633 resource_id,
633 resource_id,
634 filter_settings['namespace']
634 filter_settings['namespace']
635 ))
635 ))
636 es_docs_to_delete = []
636 es_docs_to_delete = []
637 result = request.es_conn.send_request(
637 result = request.es_conn.send_request(
638 'POST', ['_search', 'scroll'],
638 'POST', ['_search', 'scroll'],
639 body=scroll_id, query_params={"scroll": '1m'})
639 body=scroll_id, query_params={"scroll": '1m'})
640 scroll_id = result['_scroll_id']
640 scroll_id = result['_scroll_id']
641 if not result['hits']['hits']:
641 if not result['hits']['hits']:
642 break
642 break
643 for doc in result['hits']['hits']:
643 for doc in result['hits']['hits']:
644 es_docs_to_delete.append({"id": doc['_id'],
644 es_docs_to_delete.append({"id": doc['_id'],
645 "index": doc['_index']})
645 "index": doc['_index']})
646
646
647 for batch in in_batches(es_docs_to_delete, 10):
647 for batch in in_batches(es_docs_to_delete, 10):
648 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
648 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
649 **to_del)
649 **to_del)
650 for to_del in batch])
650 for to_del in batch])
@@ -1,68 +1,68 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # AppEnlight Enterprise Edition, including its added features, Support
18 # AppEnlight Enterprise Edition, including its added features, Support
19 # services, and proprietary license terms, please see
19 # services, and proprietary license terms, please see
20 # https://rhodecode.com/licenses/
20 # https://rhodecode.com/licenses/
21
21
22 BASE = 'appenlight:data:{}'
22 BASE = 'appenlight:data:{}'
23
23
24 REDIS_KEYS = {
24 REDIS_KEYS = {
25 'tasks': {
25 'tasks': {
26 'add_reports_lock': BASE.format('add_reports_lock:{}'),
26 'add_reports_lock': BASE.format('add_reports_lock:{}'),
27 'add_logs_lock': BASE.format('add_logs_lock:{}'),
27 'add_logs_lock': BASE.format('add_logs_lock:{}'),
28 },
28 },
29 'counters': {
29 'counters': {
30 'reports_per_minute': BASE.format('reports_per_minute:{}'),
30 'reports_per_minute': BASE.format('reports_per_minute:{}'),
31 'reports_per_hour_per_app': BASE.format(
31 'reports_per_hour_per_app': BASE.format(
32 'reports_per_hour_per_app:{}:{}'),
32 'reports_per_hour_per_app:{}:{}'),
33 'reports_per_type': BASE.format('reports_per_type:{}'),
33 'reports_per_type': BASE.format('reports_per_type:{}'),
34 'logs_per_minute': BASE.format('logs_per_minute:{}'),
34 'logs_per_minute': BASE.format('logs_per_minute:{}'),
35 'logs_per_hour_per_app': BASE.format(
35 'logs_per_hour_per_app': BASE.format(
36 'logs_per_hour_per_app:{}:{}'),
36 'logs_per_hour_per_app:{}:{}'),
37 'metrics_per_minute': BASE.format('metrics_per_minute:{}'),
37 'metrics_per_minute': BASE.format('metrics_per_minute:{}'),
38 'metrics_per_hour_per_app': BASE.format(
38 'metrics_per_hour_per_app': BASE.format(
39 'metrics_per_hour_per_app:{}:{}'),
39 'metrics_per_hour_per_app:{}:{}'),
40 'report_group_occurences': BASE.format('report_group_occurences:{}'),
40 'report_group_occurences': BASE.format('report_group_occurences:{}'),
41 'report_group_occurences_alerting': BASE.format(
41 'report_group_occurences_alerting': BASE.format(
42 'report_group_occurences_alerting:{}'),
42 'report_group_occurences_alerting:{}'),
43 'report_group_occurences_10th': BASE.format(
43 'report_group_occurences_10th': BASE.format(
44 'report_group_occurences_10th:{}'),
44 'report_group_occurences_10th:{}'),
45 'report_group_occurences_100th': BASE.format(
45 'report_group_occurences_100th': BASE.format(
46 'report_group_occurences_100th:{}'),
46 'report_group_occurences_100th:{}'),
47 },
47 },
48 'rate_limits': {
48 'rate_limits': {
49 'per_application_reports_rate_limit': BASE.format(
49 'per_application_reports_rate_limit': BASE.format(
50 'per_application_reports_limit:{}:{}'),
50 'per_application_reports_limit:{}:{}'),
51 'per_application_logs_rate_limit': BASE.format(
51 'per_application_logs_rate_limit': BASE.format(
52 'per_application_logs_rate_limit:{}:{}'),
52 'per_application_logs_rate_limit:{}:{}'),
53 'per_application_metrics_rate_limit': BASE.format(
53 'per_application_metrics_rate_limit': BASE.format(
54 'per_application_metrics_rate_limit:{}:{}'),
54 'per_application_metrics_rate_limit:{}:{}'),
55 },
55 },
56 'apps_that_got_new_data_per_hour': BASE.format('apps_that_got_new_data_per_hour'),
56 'apps_that_got_new_data_per_hour': BASE.format('apps_that_got_new_data_per_hour:{}'),
57 'apps_that_had_reports': BASE.format('apps_that_had_reports'),
57 'apps_that_had_reports': BASE.format('apps_that_had_reports'),
58 'apps_that_had_error_reports': BASE.format('apps_that_had_error_reports'),
58 'apps_that_had_error_reports': BASE.format('apps_that_had_error_reports'),
59 'apps_that_had_reports_alerting': BASE.format(
59 'apps_that_had_reports_alerting': BASE.format(
60 'apps_that_had_reports_alerting'),
60 'apps_that_had_reports_alerting'),
61 'apps_that_had_error_reports_alerting': BASE.format(
61 'apps_that_had_error_reports_alerting': BASE.format(
62 'apps_that_had_error_reports_alerting'),
62 'apps_that_had_error_reports_alerting'),
63 'reports_to_notify_per_type_per_app': BASE.format(
63 'reports_to_notify_per_type_per_app': BASE.format(
64 'reports_to_notify_per_type_per_app:{}:{}'),
64 'reports_to_notify_per_type_per_app:{}:{}'),
65 'reports_to_notify_per_type_per_app_alerting': BASE.format(
65 'reports_to_notify_per_type_per_app_alerting': BASE.format(
66 'reports_to_notify_per_type_per_app_alerting:{}:{}'),
66 'reports_to_notify_per_type_per_app_alerting:{}:{}'),
67 'seen_tag_list': BASE.format('seen_tag_list')
67 'seen_tag_list': BASE.format('seen_tag_list')
68 }
68 }
General Comments 0
You need to be logged in to leave comments. Login now