##// END OF EJS Templates
tasks: better counters
ergo -
Show More
@@ -1,650 +1,663 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # AppEnlight Enterprise Edition, including its added features, Support
18 # AppEnlight Enterprise Edition, including its added features, Support
19 # services, and proprietary license terms, please see
19 # services, and proprietary license terms, please see
20 # https://rhodecode.com/licenses/
20 # https://rhodecode.com/licenses/
21
21
22 import bisect
22 import bisect
23 import collections
23 import collections
24 import math
24 import math
25 from datetime import datetime, timedelta
25 from datetime import datetime, timedelta
26
26
27 import sqlalchemy as sa
27 import sqlalchemy as sa
28 import pyelasticsearch
28 import pyelasticsearch
29
29
30 from celery.utils.log import get_task_logger
30 from celery.utils.log import get_task_logger
31 from zope.sqlalchemy import mark_changed
31 from zope.sqlalchemy import mark_changed
32 from pyramid.threadlocal import get_current_request, get_current_registry
32 from pyramid.threadlocal import get_current_request, get_current_registry
33 from appenlight.celery import celery
33 from appenlight.celery import celery
34 from appenlight.models.report_group import ReportGroup
34 from appenlight.models.report_group import ReportGroup
35 from appenlight.models import DBSession, Datastores
35 from appenlight.models import DBSession, Datastores
36 from appenlight.models.report import Report
36 from appenlight.models.report import Report
37 from appenlight.models.log import Log
37 from appenlight.models.log import Log
38 from appenlight.models.metric import Metric
38 from appenlight.models.metric import Metric
39 from appenlight.models.event import Event
39 from appenlight.models.event import Event
40
40
41 from appenlight.models.services.application import ApplicationService
41 from appenlight.models.services.application import ApplicationService
42 from appenlight.models.services.event import EventService
42 from appenlight.models.services.event import EventService
43 from appenlight.models.services.log import LogService
43 from appenlight.models.services.log import LogService
44 from appenlight.models.services.report import ReportService
44 from appenlight.models.services.report import ReportService
45 from appenlight.models.services.report_group import ReportGroupService
45 from appenlight.models.services.report_group import ReportGroupService
46 from appenlight.models.services.user import UserService
46 from appenlight.models.services.user import UserService
47 from appenlight.models.tag import Tag
47 from appenlight.models.tag import Tag
48 from appenlight.lib import print_traceback
48 from appenlight.lib import print_traceback
49 from appenlight.lib.utils import parse_proto, in_batches
49 from appenlight.lib.utils import parse_proto, in_batches
50 from appenlight.lib.ext_json import json
50 from appenlight.lib.ext_json import json
51 from appenlight.lib.redis_keys import REDIS_KEYS
51 from appenlight.lib.redis_keys import REDIS_KEYS
52 from appenlight.lib.enums import ReportType
52 from appenlight.lib.enums import ReportType
53
53
54 log = get_task_logger(__name__)
54 log = get_task_logger(__name__)
55
55
56 sample_boundries = list(range(100, 1000, 100)) + \
56 sample_boundries = list(range(100, 1000, 100)) + \
57 list(range(1000, 10000, 1000)) + \
57 list(range(1000, 10000, 1000)) + \
58 list(range(10000, 100000, 5000))
58 list(range(10000, 100000, 5000))
59
59
60
60
61 def pick_sample(total_occurences, report_type=None):
61 def pick_sample(total_occurences, report_type=None):
62 every = 1.0
62 every = 1.0
63 position = bisect.bisect_left(sample_boundries, total_occurences)
63 position = bisect.bisect_left(sample_boundries, total_occurences)
64 if position > 0:
64 if position > 0:
65 if report_type == ReportType.not_found:
65 if report_type == ReportType.not_found:
66 divide = 10.0
66 divide = 10.0
67 else:
67 else:
68 divide = 100.0
68 divide = 100.0
69 every = sample_boundries[position - 1] / divide
69 every = sample_boundries[position - 1] / divide
70 return total_occurences % every == 0
70 return total_occurences % every == 0
71
71
72
72
73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
74 def test_exception_task():
74 def test_exception_task():
75 log.error('test celery log', extra={'location': 'celery'})
75 log.error('test celery log', extra={'location': 'celery'})
76 log.warning('test celery log', extra={'location': 'celery'})
76 log.warning('test celery log', extra={'location': 'celery'})
77 raise Exception('Celery exception test')
77 raise Exception('Celery exception test')
78
78
79
79
80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
81 def test_retry_exception_task():
81 def test_retry_exception_task():
82 try:
82 try:
83 import time
83 import time
84
84
85 time.sleep(1.3)
85 time.sleep(1.3)
86 log.error('test retry celery log', extra={'location': 'celery'})
86 log.error('test retry celery log', extra={'location': 'celery'})
87 log.warning('test retry celery log', extra={'location': 'celery'})
87 log.warning('test retry celery log', extra={'location': 'celery'})
88 raise Exception('Celery exception test')
88 raise Exception('Celery exception test')
89 except Exception as exc:
89 except Exception as exc:
90 test_retry_exception_task.retry(exc=exc)
90 test_retry_exception_task.retry(exc=exc)
91
91
92
92
93 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
93 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
94 def add_reports(resource_id, request_params, dataset, **kwargs):
94 def add_reports(resource_id, request_params, dataset, **kwargs):
95 proto_version = parse_proto(request_params.get('protocol_version', ''))
95 proto_version = parse_proto(request_params.get('protocol_version', ''))
96 current_time = datetime.utcnow().replace(second=0, microsecond=0)
96 current_time = datetime.utcnow().replace(second=0, microsecond=0)
97 try:
97 try:
98 # we will store solr docs here for single insert
98 # we will store solr docs here for single insert
99 es_report_docs = {}
99 es_report_docs = {}
100 es_report_group_docs = {}
100 es_report_group_docs = {}
101 resource = ApplicationService.by_id(resource_id)
101 resource = ApplicationService.by_id(resource_id)
102
102
103 tags = []
103 tags = []
104 es_slow_calls_docs = {}
104 es_slow_calls_docs = {}
105 es_reports_stats_rows = {}
105 es_reports_stats_rows = {}
106 for report_data in dataset:
106 for report_data in dataset:
107 # build report details for later
107 # build report details for later
108 added_details = 0
108 added_details = 0
109 report = Report()
109 report = Report()
110 report.set_data(report_data, resource, proto_version)
110 report.set_data(report_data, resource, proto_version)
111 report._skip_ft_index = True
111 report._skip_ft_index = True
112
112
113 report_group = ReportGroupService.by_hash_and_resource(
113 report_group = ReportGroupService.by_hash_and_resource(
114 report.resource_id,
114 report.resource_id,
115 report.grouping_hash
115 report.grouping_hash
116 )
116 )
117 occurences = report_data.get('occurences', 1)
117 occurences = report_data.get('occurences', 1)
118 if not report_group:
118 if not report_group:
119 # total reports will be +1 moment later
119 # total reports will be +1 moment later
120 report_group = ReportGroup(grouping_hash=report.grouping_hash,
120 report_group = ReportGroup(grouping_hash=report.grouping_hash,
121 occurences=0, total_reports=0,
121 occurences=0, total_reports=0,
122 last_report=0,
122 last_report=0,
123 priority=report.priority,
123 priority=report.priority,
124 error=report.error,
124 error=report.error,
125 first_timestamp=report.start_time)
125 first_timestamp=report.start_time)
126 report_group._skip_ft_index = True
126 report_group._skip_ft_index = True
127 report_group.report_type = report.report_type
127 report_group.report_type = report.report_type
128 report.report_group_time = report_group.first_timestamp
128 report.report_group_time = report_group.first_timestamp
129 add_sample = pick_sample(report_group.occurences,
129 add_sample = pick_sample(report_group.occurences,
130 report_type=report_group.report_type)
130 report_type=report_group.report_type)
131 if add_sample:
131 if add_sample:
132 resource.report_groups.append(report_group)
132 resource.report_groups.append(report_group)
133 report_group.reports.append(report)
133 report_group.reports.append(report)
134 added_details += 1
134 added_details += 1
135 DBSession.flush()
135 DBSession.flush()
136 if report.partition_id not in es_report_docs:
136 if report.partition_id not in es_report_docs:
137 es_report_docs[report.partition_id] = []
137 es_report_docs[report.partition_id] = []
138 es_report_docs[report.partition_id].append(report.es_doc())
138 es_report_docs[report.partition_id].append(report.es_doc())
139 tags.extend(list(report.tags.items()))
139 tags.extend(list(report.tags.items()))
140 slow_calls = report.add_slow_calls(report_data, report_group)
140 slow_calls = report.add_slow_calls(report_data, report_group)
141 DBSession.flush()
141 DBSession.flush()
142 for s_call in slow_calls:
142 for s_call in slow_calls:
143 if s_call.partition_id not in es_slow_calls_docs:
143 if s_call.partition_id not in es_slow_calls_docs:
144 es_slow_calls_docs[s_call.partition_id] = []
144 es_slow_calls_docs[s_call.partition_id] = []
145 es_slow_calls_docs[s_call.partition_id].append(
145 es_slow_calls_docs[s_call.partition_id].append(
146 s_call.es_doc())
146 s_call.es_doc())
147 # try generating new stat rows if needed
147 # try generating new stat rows if needed
148 else:
148 else:
149 # required for postprocessing to not fail later
149 # required for postprocessing to not fail later
150 report.report_group = report_group
150 report.report_group = report_group
151
151
152 stat_row = ReportService.generate_stat_rows(
152 stat_row = ReportService.generate_stat_rows(
153 report, resource, report_group)
153 report, resource, report_group)
154 if stat_row.partition_id not in es_reports_stats_rows:
154 if stat_row.partition_id not in es_reports_stats_rows:
155 es_reports_stats_rows[stat_row.partition_id] = []
155 es_reports_stats_rows[stat_row.partition_id] = []
156 es_reports_stats_rows[stat_row.partition_id].append(
156 es_reports_stats_rows[stat_row.partition_id].append(
157 stat_row.es_doc())
157 stat_row.es_doc())
158
158
159 # see if we should mark 10th occurence of report
159 # see if we should mark 10th occurence of report
160 last_occurences_10 = int(math.floor(report_group.occurences / 10))
160 last_occurences_10 = int(math.floor(report_group.occurences / 10))
161 curr_occurences_10 = int(math.floor(
161 curr_occurences_10 = int(math.floor(
162 (report_group.occurences + report.occurences) / 10))
162 (report_group.occurences + report.occurences) / 10))
163 last_occurences_100 = int(
163 last_occurences_100 = int(
164 math.floor(report_group.occurences / 100))
164 math.floor(report_group.occurences / 100))
165 curr_occurences_100 = int(math.floor(
165 curr_occurences_100 = int(math.floor(
166 (report_group.occurences + report.occurences) / 100))
166 (report_group.occurences + report.occurences) / 100))
167 notify_occurences_10 = last_occurences_10 != curr_occurences_10
167 notify_occurences_10 = last_occurences_10 != curr_occurences_10
168 notify_occurences_100 = last_occurences_100 != curr_occurences_100
168 notify_occurences_100 = last_occurences_100 != curr_occurences_100
169 report_group.occurences = ReportGroup.occurences + occurences
169 report_group.occurences = ReportGroup.occurences + occurences
170 report_group.last_timestamp = report.start_time
170 report_group.last_timestamp = report.start_time
171 report_group.summed_duration = ReportGroup.summed_duration + report.duration
171 report_group.summed_duration = ReportGroup.summed_duration + report.duration
172 summed_duration = ReportGroup.summed_duration + report.duration
172 summed_duration = ReportGroup.summed_duration + report.duration
173 summed_occurences = ReportGroup.occurences + occurences
173 summed_occurences = ReportGroup.occurences + occurences
174 report_group.average_duration = summed_duration / summed_occurences
174 report_group.average_duration = summed_duration / summed_occurences
175 report_group.run_postprocessing(report)
175 report_group.run_postprocessing(report)
176 if added_details:
176 if added_details:
177 report_group.total_reports = ReportGroup.total_reports + 1
177 report_group.total_reports = ReportGroup.total_reports + 1
178 report_group.last_report = report.id
178 report_group.last_report = report.id
179 report_group.set_notification_info(notify_10=notify_occurences_10,
179 report_group.set_notification_info(notify_10=notify_occurences_10,
180 notify_100=notify_occurences_100)
180 notify_100=notify_occurences_100)
181 DBSession.flush()
181 DBSession.flush()
182 report_group.get_report().notify_channel(report_group)
182 report_group.get_report().notify_channel(report_group)
183 if report_group.partition_id not in es_report_group_docs:
183 if report_group.partition_id not in es_report_group_docs:
184 es_report_group_docs[report_group.partition_id] = []
184 es_report_group_docs[report_group.partition_id] = []
185 es_report_group_docs[report_group.partition_id].append(
185 es_report_group_docs[report_group.partition_id].append(
186 report_group.es_doc())
186 report_group.es_doc())
187
187
188 action = 'REPORT'
188 action = 'REPORT'
189 log_msg = '%s: %s %s, client: %s, proto: %s' % (
189 log_msg = '%s: %s %s, client: %s, proto: %s' % (
190 action,
190 action,
191 report_data.get('http_status', 'unknown'),
191 report_data.get('http_status', 'unknown'),
192 str(resource),
192 str(resource),
193 report_data.get('client'),
193 report_data.get('client'),
194 proto_version)
194 proto_version)
195 log.info(log_msg)
195 log.info(log_msg)
196 total_reports = len(dataset)
196 total_reports = len(dataset)
197 redis_pipeline = Datastores.redis.pipeline(transaction=False)
197 redis_pipeline = Datastores.redis.pipeline(transaction=False)
198 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
198 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
199 redis_pipeline.incr(key, total_reports)
199 redis_pipeline.incr(key, total_reports)
200 redis_pipeline.expire(key, 3600 * 24)
200 redis_pipeline.expire(key, 3600 * 24)
201 key = REDIS_KEYS['counters']['events_per_minute_per_user'].format(
202 resource.owner_user_id, current_time)
203 redis_pipeline.incr(key, total_reports)
204 redis_pipeline.expire(key, 3600)
201 key = REDIS_KEYS['counters']['reports_per_hour_per_app'].format(
205 key = REDIS_KEYS['counters']['reports_per_hour_per_app'].format(
202 resource_id, current_time.replace(minute=0))
206 resource_id, current_time.replace(minute=0))
203 redis_pipeline.incr(key, total_reports)
207 redis_pipeline.incr(key, total_reports)
204 redis_pipeline.expire(key, 3600 * 24 * 7)
208 redis_pipeline.expire(key, 3600 * 24 * 7)
205 redis_pipeline.sadd(
209 redis_pipeline.sadd(
206 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
210 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
207 current_time.replace(minute=0)), resource_id)
211 current_time.replace(minute=0)), resource_id)
208 redis_pipeline.execute()
212 redis_pipeline.execute()
209
213
210 add_reports_es(es_report_group_docs, es_report_docs)
214 add_reports_es(es_report_group_docs, es_report_docs)
211 add_reports_slow_calls_es(es_slow_calls_docs)
215 add_reports_slow_calls_es(es_slow_calls_docs)
212 add_reports_stats_rows_es(es_reports_stats_rows)
216 add_reports_stats_rows_es(es_reports_stats_rows)
213 return True
217 return True
214 except Exception as exc:
218 except Exception as exc:
215 print_traceback(log)
219 print_traceback(log)
216 add_reports.retry(exc=exc)
220 add_reports.retry(exc=exc)
217
221
218
222
219 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
223 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
220 def add_reports_es(report_group_docs, report_docs):
224 def add_reports_es(report_group_docs, report_docs):
221 for k, v in report_group_docs.items():
225 for k, v in report_group_docs.items():
222 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
226 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
223 for k, v in report_docs.items():
227 for k, v in report_docs.items():
224 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
228 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
225 parent_field='_parent')
229 parent_field='_parent')
226
230
227
231
228 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
232 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
229 def add_reports_slow_calls_es(es_docs):
233 def add_reports_slow_calls_es(es_docs):
230 for k, v in es_docs.items():
234 for k, v in es_docs.items():
231 Datastores.es.bulk_index(k, 'log', v)
235 Datastores.es.bulk_index(k, 'log', v)
232
236
233
237
234 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
238 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
235 def add_reports_stats_rows_es(es_docs):
239 def add_reports_stats_rows_es(es_docs):
236 for k, v in es_docs.items():
240 for k, v in es_docs.items():
237 Datastores.es.bulk_index(k, 'log', v)
241 Datastores.es.bulk_index(k, 'log', v)
238
242
239
243
240 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
244 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
241 def add_logs(resource_id, request_params, dataset, **kwargs):
245 def add_logs(resource_id, request_params, dataset, **kwargs):
242 proto_version = request_params.get('protocol_version')
246 proto_version = request_params.get('protocol_version')
243 current_time = datetime.utcnow().replace(second=0, microsecond=0)
247 current_time = datetime.utcnow().replace(second=0, microsecond=0)
244
248
245 try:
249 try:
246 es_docs = collections.defaultdict(list)
250 es_docs = collections.defaultdict(list)
247 application = ApplicationService.by_id(resource_id)
251 resource = ApplicationService.by_id_cached()(resource_id)
252 resource = DBSession.merge(resource, load=False)
248 ns_pairs = []
253 ns_pairs = []
249 for entry in dataset:
254 for entry in dataset:
250 # gather pk and ns so we can remove older versions of row later
255 # gather pk and ns so we can remove older versions of row later
251 if entry['primary_key'] is not None:
256 if entry['primary_key'] is not None:
252 ns_pairs.append({"pk": entry['primary_key'],
257 ns_pairs.append({"pk": entry['primary_key'],
253 "ns": entry['namespace']})
258 "ns": entry['namespace']})
254 log_entry = Log()
259 log_entry = Log()
255 log_entry.set_data(entry, resource=application)
260 log_entry.set_data(entry, resource=resource)
256 log_entry._skip_ft_index = True
261 log_entry._skip_ft_index = True
257 application.logs.append(log_entry)
262 resource.logs.append(log_entry)
258 DBSession.flush()
263 DBSession.flush()
259 # insert non pk rows first
264 # insert non pk rows first
260 if entry['primary_key'] is None:
265 if entry['primary_key'] is None:
261 es_docs[log_entry.partition_id].append(log_entry.es_doc())
266 es_docs[log_entry.partition_id].append(log_entry.es_doc())
262
267
263 # 2nd pass to delete all log entries from db foe same pk/ns pair
268 # 2nd pass to delete all log entries from db foe same pk/ns pair
264 if ns_pairs:
269 if ns_pairs:
265 ids_to_delete = []
270 ids_to_delete = []
266 es_docs = collections.defaultdict(list)
271 es_docs = collections.defaultdict(list)
267 es_docs_to_delete = collections.defaultdict(list)
272 es_docs_to_delete = collections.defaultdict(list)
268 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
273 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
269 list_of_pairs=ns_pairs)
274 list_of_pairs=ns_pairs)
270 log_dict = {}
275 log_dict = {}
271 for log_entry in found_pkey_logs:
276 for log_entry in found_pkey_logs:
272 log_key = (log_entry.primary_key, log_entry.namespace)
277 log_key = (log_entry.primary_key, log_entry.namespace)
273 if log_key not in log_dict:
278 if log_key not in log_dict:
274 log_dict[log_key] = []
279 log_dict[log_key] = []
275 log_dict[log_key].append(log_entry)
280 log_dict[log_key].append(log_entry)
276
281
277 for ns, entry_list in log_dict.items():
282 for ns, entry_list in log_dict.items():
278 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
283 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
279 # newest row needs to be indexed in es
284 # newest row needs to be indexed in es
280 log_entry = entry_list[-1]
285 log_entry = entry_list[-1]
281 # delete everything from pg and ES, leave the last row in pg
286 # delete everything from pg and ES, leave the last row in pg
282 for e in entry_list[:-1]:
287 for e in entry_list[:-1]:
283 ids_to_delete.append(e.log_id)
288 ids_to_delete.append(e.log_id)
284 es_docs_to_delete[e.partition_id].append(e.delete_hash)
289 es_docs_to_delete[e.partition_id].append(e.delete_hash)
285
290
286 es_docs_to_delete[log_entry.partition_id].append(
291 es_docs_to_delete[log_entry.partition_id].append(
287 log_entry.delete_hash)
292 log_entry.delete_hash)
288
293
289 es_docs[log_entry.partition_id].append(log_entry.es_doc())
294 es_docs[log_entry.partition_id].append(log_entry.es_doc())
290
295
291 if ids_to_delete:
296 if ids_to_delete:
292 query = DBSession.query(Log).filter(
297 query = DBSession.query(Log).filter(
293 Log.log_id.in_(ids_to_delete))
298 Log.log_id.in_(ids_to_delete))
294 query.delete(synchronize_session=False)
299 query.delete(synchronize_session=False)
295 if es_docs_to_delete:
300 if es_docs_to_delete:
296 # batch this to avoid problems with default ES bulk limits
301 # batch this to avoid problems with default ES bulk limits
297 for es_index in es_docs_to_delete.keys():
302 for es_index in es_docs_to_delete.keys():
298 for batch in in_batches(es_docs_to_delete[es_index], 20):
303 for batch in in_batches(es_docs_to_delete[es_index], 20):
299 query = {'terms': {'delete_hash': batch}}
304 query = {'terms': {'delete_hash': batch}}
300
305
301 try:
306 try:
302 Datastores.es.delete_by_query(
307 Datastores.es.delete_by_query(
303 es_index, 'log', query)
308 es_index, 'log', query)
304 except pyelasticsearch.ElasticHttpNotFoundError as exc:
309 except pyelasticsearch.ElasticHttpNotFoundError as exc:
305 msg = 'skipping index {}'.format(es_index)
310 msg = 'skipping index {}'.format(es_index)
306 log.info(msg)
311 log.info(msg)
307
312
308 total_logs = len(dataset)
313 total_logs = len(dataset)
309
314
310 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
315 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
311 str(application),
316 str(resource),
312 total_logs,
317 total_logs,
313 proto_version)
318 proto_version)
314 log.info(log_msg)
319 log.info(log_msg)
315 # mark_changed(session)
320 # mark_changed(session)
316 redis_pipeline = Datastores.redis.pipeline(transaction=False)
321 redis_pipeline = Datastores.redis.pipeline(transaction=False)
317 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
322 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
318 redis_pipeline.incr(key, total_logs)
323 redis_pipeline.incr(key, total_logs)
319 redis_pipeline.expire(key, 3600 * 24)
324 redis_pipeline.expire(key, 3600 * 24)
325 key = REDIS_KEYS['counters']['events_per_minute_per_user'].format(
326 resource.owner_user_id, current_time)
327 redis_pipeline.incr(key, total_logs)
328 redis_pipeline.expire(key, 3600)
320 key = REDIS_KEYS['counters']['logs_per_hour_per_app'].format(
329 key = REDIS_KEYS['counters']['logs_per_hour_per_app'].format(
321 resource_id, current_time.replace(minute=0))
330 resource_id, current_time.replace(minute=0))
322 redis_pipeline.incr(key, total_logs)
331 redis_pipeline.incr(key, total_logs)
323 redis_pipeline.expire(key, 3600 * 24 * 7)
332 redis_pipeline.expire(key, 3600 * 24 * 7)
324 redis_pipeline.sadd(
333 redis_pipeline.sadd(
325 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
334 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
326 current_time.replace(minute=0)), resource_id)
335 current_time.replace(minute=0)), resource_id)
327 redis_pipeline.execute()
336 redis_pipeline.execute()
328 add_logs_es(es_docs)
337 add_logs_es(es_docs)
329 return True
338 return True
330 except Exception as exc:
339 except Exception as exc:
331 print_traceback(log)
340 print_traceback(log)
332 add_logs.retry(exc=exc)
341 add_logs.retry(exc=exc)
333
342
334
343
335 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
344 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
336 def add_logs_es(es_docs):
345 def add_logs_es(es_docs):
337 for k, v in es_docs.items():
346 for k, v in es_docs.items():
338 Datastores.es.bulk_index(k, 'log', v)
347 Datastores.es.bulk_index(k, 'log', v)
339
348
340
349
341 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
350 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
342 def add_metrics(resource_id, request_params, dataset, proto_version):
351 def add_metrics(resource_id, request_params, dataset, proto_version):
343 current_time = datetime.utcnow().replace(second=0, microsecond=0)
352 current_time = datetime.utcnow().replace(second=0, microsecond=0)
344 try:
353 try:
345 application = ApplicationService.by_id_cached()(resource_id)
354 resource = ApplicationService.by_id_cached()(resource_id)
346 application = DBSession.merge(application, load=False)
355 resource = DBSession.merge(resource, load=False)
347 es_docs = []
356 es_docs = []
348 rows = []
357 rows = []
349 for metric in dataset:
358 for metric in dataset:
350 tags = dict(metric['tags'])
359 tags = dict(metric['tags'])
351 server_n = tags.get('server_name', metric['server_name']).lower()
360 server_n = tags.get('server_name', metric['server_name']).lower()
352 tags['server_name'] = server_n or 'unknown'
361 tags['server_name'] = server_n or 'unknown'
353 new_metric = Metric(
362 new_metric = Metric(
354 timestamp=metric['timestamp'],
363 timestamp=metric['timestamp'],
355 resource_id=application.resource_id,
364 resource_id=resource.resource_id,
356 namespace=metric['namespace'],
365 namespace=metric['namespace'],
357 tags=tags)
366 tags=tags)
358 rows.append(new_metric)
367 rows.append(new_metric)
359 es_docs.append(new_metric.es_doc())
368 es_docs.append(new_metric.es_doc())
360 session = DBSession()
369 session = DBSession()
361 session.bulk_save_objects(rows)
370 session.bulk_save_objects(rows)
362 session.flush()
371 session.flush()
363
372
364 action = 'METRICS'
373 action = 'METRICS'
365 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
374 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
366 action,
375 action,
367 str(application),
376 str(resource),
368 len(dataset),
377 len(dataset),
369 proto_version
378 proto_version
370 )
379 )
371 log.info(metrics_msg)
380 log.info(metrics_msg)
372
381
373 mark_changed(session)
382 mark_changed(session)
374 redis_pipeline = Datastores.redis.pipeline(transaction=False)
383 redis_pipeline = Datastores.redis.pipeline(transaction=False)
375 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
384 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
376 redis_pipeline.incr(key, len(rows))
385 redis_pipeline.incr(key, len(rows))
377 redis_pipeline.expire(key, 3600 * 24)
386 redis_pipeline.expire(key, 3600 * 24)
387 key = REDIS_KEYS['counters']['events_per_minute_per_user'].format(
388 resource.owner_user_id, current_time)
389 redis_pipeline.incr(key, len(rows))
390 redis_pipeline.expire(key, 3600)
378 key = REDIS_KEYS['counters']['metrics_per_hour_per_app'].format(
391 key = REDIS_KEYS['counters']['metrics_per_hour_per_app'].format(
379 resource_id, current_time.replace(minute=0))
392 resource_id, current_time.replace(minute=0))
380 redis_pipeline.incr(key, len(rows))
393 redis_pipeline.incr(key, len(rows))
381 redis_pipeline.expire(key, 3600 * 24 * 7)
394 redis_pipeline.expire(key, 3600 * 24 * 7)
382 redis_pipeline.sadd(
395 redis_pipeline.sadd(
383 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
396 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
384 current_time.replace(minute=0)), resource_id)
397 current_time.replace(minute=0)), resource_id)
385 redis_pipeline.execute()
398 redis_pipeline.execute()
386 add_metrics_es(es_docs)
399 add_metrics_es(es_docs)
387 return True
400 return True
388 except Exception as exc:
401 except Exception as exc:
389 print_traceback(log)
402 print_traceback(log)
390 add_metrics.retry(exc=exc)
403 add_metrics.retry(exc=exc)
391
404
392
405
393 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
406 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
394 def add_metrics_es(es_docs):
407 def add_metrics_es(es_docs):
395 for doc in es_docs:
408 for doc in es_docs:
396 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
409 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
397 Datastores.es.index(partition, 'log', doc)
410 Datastores.es.index(partition, 'log', doc)
398
411
399
412
400 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
413 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
401 def check_user_report_notifications(resource_id):
414 def check_user_report_notifications(resource_id):
402 since_when = datetime.utcnow()
415 since_when = datetime.utcnow()
403 try:
416 try:
404 request = get_current_request()
417 request = get_current_request()
405 application = ApplicationService.by_id(resource_id)
418 application = ApplicationService.by_id(resource_id)
406 if not application:
419 if not application:
407 return
420 return
408 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
421 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
409 ReportType.error, resource_id)
422 ReportType.error, resource_id)
410 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
423 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
411 ReportType.slow, resource_id)
424 ReportType.slow, resource_id)
412 error_group_ids = Datastores.redis.smembers(error_key)
425 error_group_ids = Datastores.redis.smembers(error_key)
413 slow_group_ids = Datastores.redis.smembers(slow_key)
426 slow_group_ids = Datastores.redis.smembers(slow_key)
414 Datastores.redis.delete(error_key)
427 Datastores.redis.delete(error_key)
415 Datastores.redis.delete(slow_key)
428 Datastores.redis.delete(slow_key)
416 err_gids = [int(g_id) for g_id in error_group_ids]
429 err_gids = [int(g_id) for g_id in error_group_ids]
417 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
430 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
418 group_ids = err_gids + slow_gids
431 group_ids = err_gids + slow_gids
419 occurence_dict = {}
432 occurence_dict = {}
420 for g_id in group_ids:
433 for g_id in group_ids:
421 key = REDIS_KEYS['counters']['report_group_occurences'].format(
434 key = REDIS_KEYS['counters']['report_group_occurences'].format(
422 g_id)
435 g_id)
423 val = Datastores.redis.get(key)
436 val = Datastores.redis.get(key)
424 Datastores.redis.delete(key)
437 Datastores.redis.delete(key)
425 if val:
438 if val:
426 occurence_dict[g_id] = int(val)
439 occurence_dict[g_id] = int(val)
427 else:
440 else:
428 occurence_dict[g_id] = 1
441 occurence_dict[g_id] = 1
429 report_groups = ReportGroupService.by_ids(group_ids)
442 report_groups = ReportGroupService.by_ids(group_ids)
430 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
443 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
431
444
432 ApplicationService.check_for_groups_alert(
445 ApplicationService.check_for_groups_alert(
433 application, 'alert', report_groups=report_groups,
446 application, 'alert', report_groups=report_groups,
434 occurence_dict=occurence_dict)
447 occurence_dict=occurence_dict)
435 users = set([p.user for p in application.users_for_perm('view')])
448 users = set([p.user for p in application.users_for_perm('view')])
436 report_groups = report_groups.all()
449 report_groups = report_groups.all()
437 for user in users:
450 for user in users:
438 UserService.report_notify(user, request, application,
451 UserService.report_notify(user, request, application,
439 report_groups=report_groups,
452 report_groups=report_groups,
440 occurence_dict=occurence_dict)
453 occurence_dict=occurence_dict)
441 for group in report_groups:
454 for group in report_groups:
442 # marks report_groups as notified
455 # marks report_groups as notified
443 if not group.notified:
456 if not group.notified:
444 group.notified = True
457 group.notified = True
445 except Exception as exc:
458 except Exception as exc:
446 print_traceback(log)
459 print_traceback(log)
447 raise
460 raise
448
461
449
462
450 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
463 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
451 def check_alerts(resource_id):
464 def check_alerts(resource_id):
452 since_when = datetime.utcnow()
465 since_when = datetime.utcnow()
453 try:
466 try:
454 request = get_current_request()
467 request = get_current_request()
455 application = ApplicationService.by_id(resource_id)
468 application = ApplicationService.by_id(resource_id)
456 if not application:
469 if not application:
457 return
470 return
458 error_key = REDIS_KEYS[
471 error_key = REDIS_KEYS[
459 'reports_to_notify_per_type_per_app_alerting'].format(
472 'reports_to_notify_per_type_per_app_alerting'].format(
460 ReportType.error, resource_id)
473 ReportType.error, resource_id)
461 slow_key = REDIS_KEYS[
474 slow_key = REDIS_KEYS[
462 'reports_to_notify_per_type_per_app_alerting'].format(
475 'reports_to_notify_per_type_per_app_alerting'].format(
463 ReportType.slow, resource_id)
476 ReportType.slow, resource_id)
464 error_group_ids = Datastores.redis.smembers(error_key)
477 error_group_ids = Datastores.redis.smembers(error_key)
465 slow_group_ids = Datastores.redis.smembers(slow_key)
478 slow_group_ids = Datastores.redis.smembers(slow_key)
466 Datastores.redis.delete(error_key)
479 Datastores.redis.delete(error_key)
467 Datastores.redis.delete(slow_key)
480 Datastores.redis.delete(slow_key)
468 err_gids = [int(g_id) for g_id in error_group_ids]
481 err_gids = [int(g_id) for g_id in error_group_ids]
469 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
482 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
470 group_ids = err_gids + slow_gids
483 group_ids = err_gids + slow_gids
471 occurence_dict = {}
484 occurence_dict = {}
472 for g_id in group_ids:
485 for g_id in group_ids:
473 key = REDIS_KEYS['counters'][
486 key = REDIS_KEYS['counters'][
474 'report_group_occurences_alerting'].format(
487 'report_group_occurences_alerting'].format(
475 g_id)
488 g_id)
476 val = Datastores.redis.get(key)
489 val = Datastores.redis.get(key)
477 Datastores.redis.delete(key)
490 Datastores.redis.delete(key)
478 if val:
491 if val:
479 occurence_dict[g_id] = int(val)
492 occurence_dict[g_id] = int(val)
480 else:
493 else:
481 occurence_dict[g_id] = 1
494 occurence_dict[g_id] = 1
482 report_groups = ReportGroupService.by_ids(group_ids)
495 report_groups = ReportGroupService.by_ids(group_ids)
483 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
496 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
484
497
485 ApplicationService.check_for_groups_alert(
498 ApplicationService.check_for_groups_alert(
486 application, 'alert', report_groups=report_groups,
499 application, 'alert', report_groups=report_groups,
487 occurence_dict=occurence_dict, since_when=since_when)
500 occurence_dict=occurence_dict, since_when=since_when)
488 except Exception as exc:
501 except Exception as exc:
489 print_traceback(log)
502 print_traceback(log)
490 raise
503 raise
491
504
492
505
493 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
506 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
494 def close_alerts():
507 def close_alerts():
495 log.warning('Checking alerts')
508 log.warning('Checking alerts')
496 since_when = datetime.utcnow()
509 since_when = datetime.utcnow()
497 try:
510 try:
498 event_types = [Event.types['error_report_alert'],
511 event_types = [Event.types['error_report_alert'],
499 Event.types['slow_report_alert'], ]
512 Event.types['slow_report_alert'], ]
500 statuses = [Event.statuses['active']]
513 statuses = [Event.statuses['active']]
501 # get events older than 5 min
514 # get events older than 5 min
502 events = EventService.by_type_and_status(
515 events = EventService.by_type_and_status(
503 event_types,
516 event_types,
504 statuses,
517 statuses,
505 older_than=(since_when - timedelta(minutes=5)))
518 older_than=(since_when - timedelta(minutes=5)))
506 for event in events:
519 for event in events:
507 # see if we can close them
520 # see if we can close them
508 event.validate_or_close(
521 event.validate_or_close(
509 since_when=(since_when - timedelta(minutes=1)))
522 since_when=(since_when - timedelta(minutes=1)))
510 except Exception as exc:
523 except Exception as exc:
511 print_traceback(log)
524 print_traceback(log)
512 raise
525 raise
513
526
514
527
515 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
528 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
516 def update_tag_counter(tag_name, tag_value, count):
529 def update_tag_counter(tag_name, tag_value, count):
517 try:
530 try:
518 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
531 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
519 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
532 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
520 sa.types.TEXT))
533 sa.types.TEXT))
521 query.update({'times_seen': Tag.times_seen + count,
534 query.update({'times_seen': Tag.times_seen + count,
522 'last_timestamp': datetime.utcnow()},
535 'last_timestamp': datetime.utcnow()},
523 synchronize_session=False)
536 synchronize_session=False)
524 session = DBSession()
537 session = DBSession()
525 mark_changed(session)
538 mark_changed(session)
526 return True
539 return True
527 except Exception as exc:
540 except Exception as exc:
528 print_traceback(log)
541 print_traceback(log)
529 update_tag_counter.retry(exc=exc)
542 update_tag_counter.retry(exc=exc)
530
543
531
544
532 @celery.task(queue="default")
545 @celery.task(queue="default")
533 def update_tag_counters():
546 def update_tag_counters():
534 """
547 """
535 Sets task to update counters for application tags
548 Sets task to update counters for application tags
536 """
549 """
537 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
550 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
538 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
551 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
539 c = collections.Counter(tags)
552 c = collections.Counter(tags)
540 for t_json, count in c.items():
553 for t_json, count in c.items():
541 tag_info = json.loads(t_json)
554 tag_info = json.loads(t_json)
542 update_tag_counter.delay(tag_info[0], tag_info[1], count)
555 update_tag_counter.delay(tag_info[0], tag_info[1], count)
543
556
544
557
545 @celery.task(queue="default")
558 @celery.task(queue="default")
546 def daily_digest():
559 def daily_digest():
547 """
560 """
548 Sends daily digest with top 50 error reports
561 Sends daily digest with top 50 error reports
549 """
562 """
550 request = get_current_request()
563 request = get_current_request()
551 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
564 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
552 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
565 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
553 since_when = datetime.utcnow() - timedelta(hours=8)
566 since_when = datetime.utcnow() - timedelta(hours=8)
554 log.warning('Generating daily digests')
567 log.warning('Generating daily digests')
555 for resource_id in apps:
568 for resource_id in apps:
556 resource_id = resource_id.decode('utf8')
569 resource_id = resource_id.decode('utf8')
557 end_date = datetime.utcnow().replace(microsecond=0, second=0)
570 end_date = datetime.utcnow().replace(microsecond=0, second=0)
558 filter_settings = {'resource': [resource_id],
571 filter_settings = {'resource': [resource_id],
559 'tags': [{'name': 'type',
572 'tags': [{'name': 'type',
560 'value': ['error'], 'op': None}],
573 'value': ['error'], 'op': None}],
561 'type': 'error', 'start_date': since_when,
574 'type': 'error', 'start_date': since_when,
562 'end_date': end_date}
575 'end_date': end_date}
563
576
564 reports = ReportGroupService.get_trending(
577 reports = ReportGroupService.get_trending(
565 request, filter_settings=filter_settings, limit=50)
578 request, filter_settings=filter_settings, limit=50)
566
579
567 application = ApplicationService.by_id(resource_id)
580 application = ApplicationService.by_id(resource_id)
568 if application:
581 if application:
569 users = set([p.user for p in application.users_for_perm('view')])
582 users = set([p.user for p in application.users_for_perm('view')])
570 for user in users:
583 for user in users:
571 user.send_digest(request, application, reports=reports,
584 user.send_digest(request, application, reports=reports,
572 since_when=since_when)
585 since_when=since_when)
573
586
574
587
575 @celery.task(queue="default")
588 @celery.task(queue="default")
576 def notifications_reports():
589 def notifications_reports():
577 """
590 """
578 Loop that checks redis for info and then issues new tasks to celery to
591 Loop that checks redis for info and then issues new tasks to celery to
579 issue notifications
592 issue notifications
580 """
593 """
581 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
594 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
582 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
595 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
583 for app in apps:
596 for app in apps:
584 log.warning('Notify for app: %s' % app)
597 log.warning('Notify for app: %s' % app)
585 check_user_report_notifications.delay(app.decode('utf8'))
598 check_user_report_notifications.delay(app.decode('utf8'))
586
599
587 @celery.task(queue="default")
600 @celery.task(queue="default")
588 def alerting_reports():
601 def alerting_reports():
589 """
602 """
590 Loop that checks redis for info and then issues new tasks to celery to
603 Loop that checks redis for info and then issues new tasks to celery to
591 perform the following:
604 perform the following:
592 - which applications should have new alerts opened
605 - which applications should have new alerts opened
593 """
606 """
594
607
595 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
608 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
596 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
609 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
597 for app in apps:
610 for app in apps:
598 log.warning('Notify for app: %s' % app)
611 log.warning('Notify for app: %s' % app)
599 check_alerts.delay(app.decode('utf8'))
612 check_alerts.delay(app.decode('utf8'))
600
613
601
614
602 @celery.task(queue="default", soft_time_limit=3600 * 4,
615 @celery.task(queue="default", soft_time_limit=3600 * 4,
603 hard_time_limit=3600 * 4, max_retries=144)
616 hard_time_limit=3600 * 4, max_retries=144)
604 def logs_cleanup(resource_id, filter_settings):
617 def logs_cleanup(resource_id, filter_settings):
605 request = get_current_request()
618 request = get_current_request()
606 request.tm.begin()
619 request.tm.begin()
607 es_query = {
620 es_query = {
608 "_source": False,
621 "_source": False,
609 "size": 5000,
622 "size": 5000,
610 "query": {
623 "query": {
611 "filtered": {
624 "filtered": {
612 "filter": {
625 "filter": {
613 "and": [{"term": {"resource_id": resource_id}}]
626 "and": [{"term": {"resource_id": resource_id}}]
614 }
627 }
615 }
628 }
616 }
629 }
617 }
630 }
618
631
619 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
632 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
620 if filter_settings['namespace']:
633 if filter_settings['namespace']:
621 query = query.filter(Log.namespace == filter_settings['namespace'][0])
634 query = query.filter(Log.namespace == filter_settings['namespace'][0])
622 es_query['query']['filtered']['filter']['and'].append(
635 es_query['query']['filtered']['filter']['and'].append(
623 {"term": {"namespace": filter_settings['namespace'][0]}}
636 {"term": {"namespace": filter_settings['namespace'][0]}}
624 )
637 )
625 query.delete(synchronize_session=False)
638 query.delete(synchronize_session=False)
626 request.tm.commit()
639 request.tm.commit()
627 result = request.es_conn.search(es_query, index='rcae_l_*',
640 result = request.es_conn.search(es_query, index='rcae_l_*',
628 doc_type='log', es_scroll='1m',
641 doc_type='log', es_scroll='1m',
629 es_search_type='scan')
642 es_search_type='scan')
630 scroll_id = result['_scroll_id']
643 scroll_id = result['_scroll_id']
631 while True:
644 while True:
632 log.warning('log_cleanup, app:{} ns:{} batch'.format(
645 log.warning('log_cleanup, app:{} ns:{} batch'.format(
633 resource_id,
646 resource_id,
634 filter_settings['namespace']
647 filter_settings['namespace']
635 ))
648 ))
636 es_docs_to_delete = []
649 es_docs_to_delete = []
637 result = request.es_conn.send_request(
650 result = request.es_conn.send_request(
638 'POST', ['_search', 'scroll'],
651 'POST', ['_search', 'scroll'],
639 body=scroll_id, query_params={"scroll": '1m'})
652 body=scroll_id, query_params={"scroll": '1m'})
640 scroll_id = result['_scroll_id']
653 scroll_id = result['_scroll_id']
641 if not result['hits']['hits']:
654 if not result['hits']['hits']:
642 break
655 break
643 for doc in result['hits']['hits']:
656 for doc in result['hits']['hits']:
644 es_docs_to_delete.append({"id": doc['_id'],
657 es_docs_to_delete.append({"id": doc['_id'],
645 "index": doc['_index']})
658 "index": doc['_index']})
646
659
647 for batch in in_batches(es_docs_to_delete, 10):
660 for batch in in_batches(es_docs_to_delete, 10):
648 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
661 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
649 **to_del)
662 **to_del)
650 for to_del in batch])
663 for to_del in batch])
@@ -1,68 +1,70 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # AppEnlight Enterprise Edition, including its added features, Support
18 # AppEnlight Enterprise Edition, including its added features, Support
19 # services, and proprietary license terms, please see
19 # services, and proprietary license terms, please see
20 # https://rhodecode.com/licenses/
20 # https://rhodecode.com/licenses/
21
21
22 BASE = 'appenlight:data:{}'
22 BASE = 'appenlight:data:{}'
23
23
24 REDIS_KEYS = {
24 REDIS_KEYS = {
25 'tasks': {
25 'tasks': {
26 'add_reports_lock': BASE.format('add_reports_lock:{}'),
26 'add_reports_lock': BASE.format('add_reports_lock:{}'),
27 'add_logs_lock': BASE.format('add_logs_lock:{}'),
27 'add_logs_lock': BASE.format('add_logs_lock:{}'),
28 },
28 },
29 'counters': {
29 'counters': {
30 'events_per_minute_per_user': BASE.format(
31 'events_per_minute_per_user:{}:{}'),
30 'reports_per_minute': BASE.format('reports_per_minute:{}'),
32 'reports_per_minute': BASE.format('reports_per_minute:{}'),
31 'reports_per_hour_per_app': BASE.format(
33 'reports_per_hour_per_app': BASE.format(
32 'reports_per_hour_per_app:{}:{}'),
34 'reports_per_hour_per_app:{}:{}'),
33 'reports_per_type': BASE.format('reports_per_type:{}'),
35 'reports_per_type': BASE.format('reports_per_type:{}'),
34 'logs_per_minute': BASE.format('logs_per_minute:{}'),
36 'logs_per_minute': BASE.format('logs_per_minute:{}'),
35 'logs_per_hour_per_app': BASE.format(
37 'logs_per_hour_per_app': BASE.format(
36 'logs_per_hour_per_app:{}:{}'),
38 'logs_per_hour_per_app:{}:{}'),
37 'metrics_per_minute': BASE.format('metrics_per_minute:{}'),
39 'metrics_per_minute': BASE.format('metrics_per_minute:{}'),
38 'metrics_per_hour_per_app': BASE.format(
40 'metrics_per_hour_per_app': BASE.format(
39 'metrics_per_hour_per_app:{}:{}'),
41 'metrics_per_hour_per_app:{}:{}'),
40 'report_group_occurences': BASE.format('report_group_occurences:{}'),
42 'report_group_occurences': BASE.format('report_group_occurences:{}'),
41 'report_group_occurences_alerting': BASE.format(
43 'report_group_occurences_alerting': BASE.format(
42 'report_group_occurences_alerting:{}'),
44 'report_group_occurences_alerting:{}'),
43 'report_group_occurences_10th': BASE.format(
45 'report_group_occurences_10th': BASE.format(
44 'report_group_occurences_10th:{}'),
46 'report_group_occurences_10th:{}'),
45 'report_group_occurences_100th': BASE.format(
47 'report_group_occurences_100th': BASE.format(
46 'report_group_occurences_100th:{}'),
48 'report_group_occurences_100th:{}'),
47 },
49 },
48 'rate_limits': {
50 'rate_limits': {
49 'per_application_reports_rate_limit': BASE.format(
51 'per_application_reports_rate_limit': BASE.format(
50 'per_application_reports_limit:{}:{}'),
52 'per_application_reports_limit:{}:{}'),
51 'per_application_logs_rate_limit': BASE.format(
53 'per_application_logs_rate_limit': BASE.format(
52 'per_application_logs_rate_limit:{}:{}'),
54 'per_application_logs_rate_limit:{}:{}'),
53 'per_application_metrics_rate_limit': BASE.format(
55 'per_application_metrics_rate_limit': BASE.format(
54 'per_application_metrics_rate_limit:{}:{}'),
56 'per_application_metrics_rate_limit:{}:{}'),
55 },
57 },
56 'apps_that_got_new_data_per_hour': BASE.format('apps_that_got_new_data_per_hour:{}'),
58 'apps_that_got_new_data_per_hour': BASE.format('apps_that_got_new_data_per_hour:{}'),
57 'apps_that_had_reports': BASE.format('apps_that_had_reports'),
59 'apps_that_had_reports': BASE.format('apps_that_had_reports'),
58 'apps_that_had_error_reports': BASE.format('apps_that_had_error_reports'),
60 'apps_that_had_error_reports': BASE.format('apps_that_had_error_reports'),
59 'apps_that_had_reports_alerting': BASE.format(
61 'apps_that_had_reports_alerting': BASE.format(
60 'apps_that_had_reports_alerting'),
62 'apps_that_had_reports_alerting'),
61 'apps_that_had_error_reports_alerting': BASE.format(
63 'apps_that_had_error_reports_alerting': BASE.format(
62 'apps_that_had_error_reports_alerting'),
64 'apps_that_had_error_reports_alerting'),
63 'reports_to_notify_per_type_per_app': BASE.format(
65 'reports_to_notify_per_type_per_app': BASE.format(
64 'reports_to_notify_per_type_per_app:{}:{}'),
66 'reports_to_notify_per_type_per_app:{}:{}'),
65 'reports_to_notify_per_type_per_app_alerting': BASE.format(
67 'reports_to_notify_per_type_per_app_alerting': BASE.format(
66 'reports_to_notify_per_type_per_app_alerting:{}:{}'),
68 'reports_to_notify_per_type_per_app_alerting:{}:{}'),
67 'seen_tag_list': BASE.format('seen_tag_list')
69 'seen_tag_list': BASE.format('seen_tag_list')
68 }
70 }
General Comments 0
You need to be logged in to leave comments. Login now